iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
6
7use iroh::Endpoint;
8use irpc::rpc::{listen, Handler};
9use n0_snafu::SpanTrace;
10use nested_enum_utils::common_fields;
11use proto::{Request, ShutdownRequest, SyncDbRequest};
12use ref_cast::RefCast;
13use serde::{Deserialize, Serialize};
14use snafu::{Backtrace, IntoError, Snafu};
15use tags::Tags;
16
17pub mod blobs;
18pub mod downloader;
19pub mod proto;
20pub mod remote;
21pub mod tags;
22pub use crate::{store::util::Tag, util::temp_tag::TempTag};
23
24pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
25
26#[common_fields({
27    backtrace: Option<Backtrace>,
28    #[snafu(implicit)]
29    span_trace: SpanTrace,
30})]
31#[allow(missing_docs)]
32#[non_exhaustive]
33#[derive(Debug, Snafu)]
34pub enum RequestError {
35    /// Request failed due to rpc error.
36    #[snafu(display("rpc error: {source}"))]
37    Rpc { source: irpc::Error },
38    /// Request failed due an actual error.
39    #[snafu(display("inner error: {source}"))]
40    Inner { source: Error },
41}
42
43impl From<irpc::Error> for RequestError {
44    fn from(value: irpc::Error) -> Self {
45        RpcSnafu.into_error(value)
46    }
47}
48
49impl From<Error> for RequestError {
50    fn from(value: Error) -> Self {
51        InnerSnafu.into_error(value)
52    }
53}
54
55impl From<io::Error> for RequestError {
56    fn from(value: io::Error) -> Self {
57        InnerSnafu.into_error(value.into())
58    }
59}
60
61impl From<irpc::channel::RecvError> for RequestError {
62    fn from(value: irpc::channel::RecvError) -> Self {
63        RpcSnafu.into_error(value.into())
64    }
65}
66
67pub type RequestResult<T> = std::result::Result<T, RequestError>;
68
69#[common_fields({
70    backtrace: Option<Backtrace>,
71    #[snafu(implicit)]
72    span_trace: SpanTrace,
73})]
74#[allow(missing_docs)]
75#[non_exhaustive]
76#[derive(Debug, Snafu)]
77pub enum ExportBaoError {
78    #[snafu(display("send error: {source}"))]
79    Send { source: irpc::channel::SendError },
80    #[snafu(display("recv error: {source}"))]
81    Recv { source: irpc::channel::RecvError },
82    #[snafu(display("request error: {source}"))]
83    Request { source: irpc::RequestError },
84    #[snafu(display("io error: {source}"))]
85    ExportBaoIo { source: io::Error },
86    #[snafu(display("encode error: {source}"))]
87    ExportBaoInner { source: bao_tree::io::EncodeError },
88}
89
90impl From<ExportBaoError> for Error {
91    fn from(e: ExportBaoError) -> Self {
92        match e {
93            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
94            ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
95            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
96            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
97            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
98        }
99    }
100}
101
102impl From<irpc::Error> for ExportBaoError {
103    fn from(e: irpc::Error) -> Self {
104        match e {
105            irpc::Error::Recv(e) => RecvSnafu.into_error(e),
106            irpc::Error::Send(e) => SendSnafu.into_error(e),
107            irpc::Error::Request(e) => RequestSnafu.into_error(e),
108            irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
109        }
110    }
111}
112
113impl From<io::Error> for ExportBaoError {
114    fn from(value: io::Error) -> Self {
115        ExportBaoIoSnafu.into_error(value)
116    }
117}
118
119impl From<irpc::channel::RecvError> for ExportBaoError {
120    fn from(value: irpc::channel::RecvError) -> Self {
121        RecvSnafu.into_error(value)
122    }
123}
124
125impl From<irpc::channel::SendError> for ExportBaoError {
126    fn from(value: irpc::channel::SendError) -> Self {
127        SendSnafu.into_error(value)
128    }
129}
130
131impl From<irpc::RequestError> for ExportBaoError {
132    fn from(value: irpc::RequestError) -> Self {
133        RequestSnafu.into_error(value)
134    }
135}
136
137impl From<bao_tree::io::EncodeError> for ExportBaoError {
138    fn from(value: bao_tree::io::EncodeError) -> Self {
139        ExportBaoInnerSnafu.into_error(value)
140    }
141}
142
143pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
144
145#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
146pub enum Error {
147    #[serde(with = "crate::util::serde::io_error_serde")]
148    Io(io::Error),
149}
150
151impl Error {
152    pub fn io(
153        kind: io::ErrorKind,
154        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
155    ) -> Self {
156        Self::Io(io::Error::new(kind, msg.into()))
157    }
158
159    pub fn other<E>(msg: E) -> Self
160    where
161        E: Into<Box<dyn std::error::Error + Send + Sync>>,
162    {
163        Self::Io(io::Error::other(msg.into()))
164    }
165}
166
167impl From<irpc::Error> for Error {
168    fn from(e: irpc::Error) -> Self {
169        Self::Io(e.into())
170    }
171}
172
173impl From<RequestError> for Error {
174    fn from(e: RequestError) -> Self {
175        match e {
176            RequestError::Rpc { source, .. } => Self::Io(source.into()),
177            RequestError::Inner { source, .. } => source,
178        }
179    }
180}
181
182impl From<irpc::channel::RecvError> for Error {
183    fn from(e: irpc::channel::RecvError) -> Self {
184        Self::Io(e.into())
185    }
186}
187
188impl From<irpc::rpc::WriteError> for Error {
189    fn from(e: irpc::rpc::WriteError) -> Self {
190        Self::Io(e.into())
191    }
192}
193
194impl From<irpc::RequestError> for Error {
195    fn from(e: irpc::RequestError) -> Self {
196        Self::Io(e.into())
197    }
198}
199
200impl From<irpc::channel::SendError> for Error {
201    fn from(e: irpc::channel::SendError) -> Self {
202        Self::Io(e.into())
203    }
204}
205
206impl std::error::Error for Error {
207    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
208        match self {
209            Error::Io(e) => Some(e),
210        }
211    }
212}
213
214pub type Result<T> = std::result::Result<T, Error>;
215
216/// The main entry point for the store API.
217#[derive(Debug, Clone, ref_cast::RefCast)]
218#[repr(transparent)]
219pub struct Store {
220    client: ApiClient,
221}
222
223impl Deref for Store {
224    type Target = blobs::Blobs;
225
226    fn deref(&self) -> &Self::Target {
227        blobs::Blobs::ref_from_sender(&self.client)
228    }
229}
230
231impl Store {
232    /// The tags API.
233    pub fn tags(&self) -> &Tags {
234        Tags::ref_from_sender(&self.client)
235    }
236
237    /// The blobs API.
238    pub fn blobs(&self) -> &blobs::Blobs {
239        blobs::Blobs::ref_from_sender(&self.client)
240    }
241
242    /// API for getting blobs from a *single* remote node.
243    pub fn remote(&self) -> &remote::Remote {
244        remote::Remote::ref_from_sender(&self.client)
245    }
246
247    /// Create a downloader for more complex downloads.
248    ///
249    /// Unlike the other APIs, this creates an object that has internal state,
250    /// so don't create it ad hoc but store it somewhere if you need it multiple
251    /// times.
252    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
253        downloader::Downloader::new(self, endpoint)
254    }
255
256    /// Connect to a remote store as a rpc client.
257    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
258        let sender = irpc::Client::quinn(endpoint, addr);
259        Store::from_sender(sender)
260    }
261
262    /// Listen on a quinn endpoint for incoming rpc connections.
263    pub async fn listen(self, endpoint: quinn::Endpoint) {
264        let local = self.client.local().unwrap().clone();
265        let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
266            let local = local.clone();
267            Box::pin({
268                match req {
269                    Request::SetTag(msg) => local.send((msg, tx)),
270                    Request::CreateTag(msg) => local.send((msg, tx)),
271                    Request::DeleteTags(msg) => local.send((msg, tx)),
272                    Request::RenameTag(msg) => local.send((msg, tx)),
273                    Request::ListTags(msg) => local.send((msg, tx)),
274
275                    Request::ListTempTags(msg) => local.send((msg, tx)),
276                    Request::CreateTempTag(msg) => local.send((msg, tx)),
277
278                    Request::BlobStatus(msg) => local.send((msg, tx)),
279
280                    Request::ImportBytes(msg) => local.send((msg, tx)),
281                    Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
282                    Request::ImportBao(msg) => local.send((msg, tx, rx)),
283                    Request::ImportPath(msg) => local.send((msg, tx)),
284                    Request::ListBlobs(msg) => local.send((msg, tx)),
285                    Request::DeleteBlobs(msg) => local.send((msg, tx)),
286                    Request::Batch(msg) => local.send((msg, tx, rx)),
287
288                    Request::ExportBao(msg) => local.send((msg, tx)),
289                    Request::ExportRanges(msg) => local.send((msg, tx)),
290                    Request::ExportPath(msg) => local.send((msg, tx)),
291
292                    Request::Observe(msg) => local.send((msg, tx)),
293
294                    Request::ClearProtected(msg) => local.send((msg, tx)),
295                    Request::SyncDb(msg) => local.send((msg, tx)),
296                    Request::Shutdown(msg) => local.send((msg, tx)),
297                }
298            })
299        });
300        listen::<Request>(endpoint, handler).await
301    }
302
303    pub async fn sync_db(&self) -> RequestResult<()> {
304        let msg = SyncDbRequest;
305        self.client.rpc(msg).await??;
306        Ok(())
307    }
308
309    pub async fn shutdown(&self) -> irpc::Result<()> {
310        let msg = ShutdownRequest;
311        self.client.rpc(msg).await?;
312        Ok(())
313    }
314
315    pub(crate) fn from_sender(client: ApiClient) -> Self {
316        Self { client }
317    }
318
319    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
320        Self::ref_cast(client)
321    }
322}