1use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, Handler};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
36
37#[common_fields({
38 backtrace: Option<Backtrace>,
39 #[snafu(implicit)]
40 span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46 #[snafu(display("rpc error: {source}"))]
48 Rpc { source: irpc::Error },
49 #[snafu(display("inner error: {source}"))]
51 Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55 fn from(value: irpc::Error) -> Self {
56 RpcSnafu.into_error(value)
57 }
58}
59
60impl From<Error> for RequestError {
61 fn from(value: Error) -> Self {
62 InnerSnafu.into_error(value)
63 }
64}
65
66impl From<io::Error> for RequestError {
67 fn from(value: io::Error) -> Self {
68 InnerSnafu.into_error(value.into())
69 }
70}
71
72impl From<irpc::channel::RecvError> for RequestError {
73 fn from(value: irpc::channel::RecvError) -> Self {
74 RpcSnafu.into_error(value.into())
75 }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81 backtrace: Option<Backtrace>,
82 #[snafu(implicit)]
83 span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89 #[snafu(display("send error: {source}"))]
90 Send { source: irpc::channel::SendError },
91 #[snafu(display("recv error: {source}"))]
92 Recv { source: irpc::channel::RecvError },
93 #[snafu(display("request error: {source}"))]
94 Request { source: irpc::RequestError },
95 #[snafu(display("io error: {source}"))]
96 ExportBaoIo { source: io::Error },
97 #[snafu(display("encode error: {source}"))]
98 ExportBaoInner { source: bao_tree::io::EncodeError },
99}
100
101impl From<ExportBaoError> for Error {
102 fn from(e: ExportBaoError) -> Self {
103 match e {
104 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
105 ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
106 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
107 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
108 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
109 }
110 }
111}
112
113impl From<irpc::Error> for ExportBaoError {
114 fn from(e: irpc::Error) -> Self {
115 match e {
116 irpc::Error::Recv(e) => RecvSnafu.into_error(e),
117 irpc::Error::Send(e) => SendSnafu.into_error(e),
118 irpc::Error::Request(e) => RequestSnafu.into_error(e),
119 irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
120 }
121 }
122}
123
124impl From<io::Error> for ExportBaoError {
125 fn from(value: io::Error) -> Self {
126 ExportBaoIoSnafu.into_error(value)
127 }
128}
129
130impl From<irpc::channel::RecvError> for ExportBaoError {
131 fn from(value: irpc::channel::RecvError) -> Self {
132 RecvSnafu.into_error(value)
133 }
134}
135
136impl From<irpc::channel::SendError> for ExportBaoError {
137 fn from(value: irpc::channel::SendError) -> Self {
138 SendSnafu.into_error(value)
139 }
140}
141
142impl From<irpc::RequestError> for ExportBaoError {
143 fn from(value: irpc::RequestError) -> Self {
144 RequestSnafu.into_error(value)
145 }
146}
147
148impl From<bao_tree::io::EncodeError> for ExportBaoError {
149 fn from(value: bao_tree::io::EncodeError) -> Self {
150 ExportBaoInnerSnafu.into_error(value)
151 }
152}
153
154pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
155
156#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
157pub enum Error {
158 #[serde(with = "crate::util::serde::io_error_serde")]
159 Io(io::Error),
160}
161
162impl Error {
163 pub fn io(
164 kind: io::ErrorKind,
165 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
166 ) -> Self {
167 Self::Io(io::Error::new(kind, msg.into()))
168 }
169
170 pub fn other<E>(msg: E) -> Self
171 where
172 E: Into<Box<dyn std::error::Error + Send + Sync>>,
173 {
174 Self::Io(io::Error::other(msg.into()))
175 }
176}
177
178impl From<irpc::Error> for Error {
179 fn from(e: irpc::Error) -> Self {
180 Self::Io(e.into())
181 }
182}
183
184impl From<RequestError> for Error {
185 fn from(e: RequestError) -> Self {
186 match e {
187 RequestError::Rpc { source, .. } => Self::Io(source.into()),
188 RequestError::Inner { source, .. } => source,
189 }
190 }
191}
192
193impl From<irpc::channel::RecvError> for Error {
194 fn from(e: irpc::channel::RecvError) -> Self {
195 Self::Io(e.into())
196 }
197}
198
199impl From<irpc::rpc::WriteError> for Error {
200 fn from(e: irpc::rpc::WriteError) -> Self {
201 Self::Io(e.into())
202 }
203}
204
205impl From<irpc::RequestError> for Error {
206 fn from(e: irpc::RequestError) -> Self {
207 Self::Io(e.into())
208 }
209}
210
211impl From<irpc::channel::SendError> for Error {
212 fn from(e: irpc::channel::SendError) -> Self {
213 Self::Io(e.into())
214 }
215}
216
217impl std::error::Error for Error {
218 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
219 match self {
220 Error::Io(e) => Some(e),
221 }
222 }
223}
224
225impl From<EncodeError> for Error {
226 fn from(value: EncodeError) -> Self {
227 match value {
228 EncodeError::Io(cause) => Self::Io(cause),
229 _ => Self::other(value),
230 }
231 }
232}
233
234pub type Result<T> = std::result::Result<T, Error>;
235
236#[derive(Debug, Clone, ref_cast::RefCast)]
238#[repr(transparent)]
239pub struct Store {
240 client: ApiClient,
241}
242
243impl Deref for Store {
244 type Target = blobs::Blobs;
245
246 fn deref(&self) -> &Self::Target {
247 blobs::Blobs::ref_from_sender(&self.client)
248 }
249}
250
251impl Store {
252 pub fn tags(&self) -> &Tags {
254 Tags::ref_from_sender(&self.client)
255 }
256
257 pub fn blobs(&self) -> &blobs::Blobs {
259 blobs::Blobs::ref_from_sender(&self.client)
260 }
261
262 pub fn remote(&self) -> &remote::Remote {
264 remote::Remote::ref_from_sender(&self.client)
265 }
266
267 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
273 downloader::Downloader::new(self, endpoint)
274 }
275
276 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
278 let sender = irpc::Client::quinn(endpoint, addr);
279 Store::from_sender(sender)
280 }
281
282 pub async fn listen(self, endpoint: quinn::Endpoint) {
284 let local = self.client.local().unwrap().clone();
285 let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
286 let local = local.clone();
287 Box::pin({
288 match req {
289 Request::SetTag(msg) => local.send((msg, tx)),
290 Request::CreateTag(msg) => local.send((msg, tx)),
291 Request::DeleteTags(msg) => local.send((msg, tx)),
292 Request::RenameTag(msg) => local.send((msg, tx)),
293 Request::ListTags(msg) => local.send((msg, tx)),
294
295 Request::ListTempTags(msg) => local.send((msg, tx)),
296 Request::CreateTempTag(msg) => local.send((msg, tx)),
297
298 Request::BlobStatus(msg) => local.send((msg, tx)),
299
300 Request::ImportBytes(msg) => local.send((msg, tx)),
301 Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
302 Request::ImportBao(msg) => local.send((msg, tx, rx)),
303 Request::ImportPath(msg) => local.send((msg, tx)),
304 Request::ListBlobs(msg) => local.send((msg, tx)),
305 Request::DeleteBlobs(msg) => local.send((msg, tx)),
306 Request::Batch(msg) => local.send((msg, tx, rx)),
307
308 Request::ExportBao(msg) => local.send((msg, tx)),
309 Request::ExportRanges(msg) => local.send((msg, tx)),
310 Request::ExportPath(msg) => local.send((msg, tx)),
311
312 Request::Observe(msg) => local.send((msg, tx)),
313
314 Request::ClearProtected(msg) => local.send((msg, tx)),
315 Request::SyncDb(msg) => local.send((msg, tx)),
316 Request::Shutdown(msg) => local.send((msg, tx)),
317 }
318 })
319 });
320 listen::<Request>(endpoint, handler).await
321 }
322
323 pub async fn sync_db(&self) -> RequestResult<()> {
324 let msg = SyncDbRequest;
325 self.client.rpc(msg).await??;
326 Ok(())
327 }
328
329 pub async fn shutdown(&self) -> irpc::Result<()> {
330 let msg = ShutdownRequest;
331 self.client.rpc(msg).await?;
332 Ok(())
333 }
334
335 pub(crate) fn from_sender(client: ApiClient) -> Self {
336 Self { client }
337 }
338
339 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
340 Self::ref_cast(client)
341 }
342}