iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
34pub use crate::{store::util::Tag, util::temp_tag::TempTag};
35
36pub(crate) type ApiClient = irpc::Client<proto::Request>;
37
38#[common_fields({
39    backtrace: Option<Backtrace>,
40    #[snafu(implicit)]
41    span_trace: SpanTrace,
42})]
43#[allow(missing_docs)]
44#[non_exhaustive]
45#[derive(Debug, Snafu)]
46pub enum RequestError {
47    /// Request failed due to rpc error.
48    #[snafu(display("rpc error: {source}"))]
49    Rpc { source: irpc::Error },
50    /// Request failed due an actual error.
51    #[snafu(display("inner error: {source}"))]
52    Inner { source: Error },
53}
54
55impl From<irpc::Error> for RequestError {
56    fn from(value: irpc::Error) -> Self {
57        RpcSnafu.into_error(value)
58    }
59}
60
61impl From<Error> for RequestError {
62    fn from(value: Error) -> Self {
63        InnerSnafu.into_error(value)
64    }
65}
66
67impl From<io::Error> for RequestError {
68    fn from(value: io::Error) -> Self {
69        InnerSnafu.into_error(value.into())
70    }
71}
72
73impl From<irpc::channel::mpsc::RecvError> for RequestError {
74    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
75        RpcSnafu.into_error(value.into())
76    }
77}
78
79pub type RequestResult<T> = std::result::Result<T, RequestError>;
80
81#[common_fields({
82    backtrace: Option<Backtrace>,
83    #[snafu(implicit)]
84    span_trace: SpanTrace,
85})]
86#[allow(missing_docs)]
87#[non_exhaustive]
88#[derive(Debug, Snafu)]
89pub enum ExportBaoError {
90    #[snafu(display("send error: {source}"))]
91    Send { source: irpc::channel::SendError },
92    #[snafu(display("mpsc recv error: {source}"))]
93    MpscRecv {
94        source: irpc::channel::mpsc::RecvError,
95    },
96    #[snafu(display("oneshot recv error: {source}"))]
97    OneshotRecv {
98        source: irpc::channel::oneshot::RecvError,
99    },
100    #[snafu(display("request error: {source}"))]
101    Request { source: irpc::RequestError },
102    #[snafu(display("io error: {source}"))]
103    ExportBaoIo { source: io::Error },
104    #[snafu(display("encode error: {source}"))]
105    ExportBaoInner { source: bao_tree::io::EncodeError },
106    #[snafu(display("client error: {source}"))]
107    ClientError { source: ProgressError },
108}
109
110impl From<ExportBaoError> for Error {
111    fn from(e: ExportBaoError) -> Self {
112        match e {
113            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
114            ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
115            ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
116            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
117            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
118            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
119            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
120        }
121    }
122}
123
124impl From<irpc::Error> for ExportBaoError {
125    fn from(e: irpc::Error) -> Self {
126        match e {
127            irpc::Error::MpscRecv(e) => MpscRecvSnafu.into_error(e),
128            irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e),
129            irpc::Error::Send(e) => SendSnafu.into_error(e),
130            irpc::Error::Request(e) => RequestSnafu.into_error(e),
131            irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
132        }
133    }
134}
135
136impl From<io::Error> for ExportBaoError {
137    fn from(value: io::Error) -> Self {
138        ExportBaoIoSnafu.into_error(value)
139    }
140}
141
142impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
143    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
144        MpscRecvSnafu.into_error(value)
145    }
146}
147
148impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
149    fn from(value: irpc::channel::oneshot::RecvError) -> Self {
150        OneshotRecvSnafu.into_error(value)
151    }
152}
153
154impl From<irpc::channel::SendError> for ExportBaoError {
155    fn from(value: irpc::channel::SendError) -> Self {
156        SendSnafu.into_error(value)
157    }
158}
159
160impl From<irpc::RequestError> for ExportBaoError {
161    fn from(value: irpc::RequestError) -> Self {
162        RequestSnafu.into_error(value)
163    }
164}
165
166impl From<bao_tree::io::EncodeError> for ExportBaoError {
167    fn from(value: bao_tree::io::EncodeError) -> Self {
168        ExportBaoInnerSnafu.into_error(value)
169    }
170}
171
172impl From<ProgressError> for ExportBaoError {
173    fn from(value: ProgressError) -> Self {
174        ClientSnafu.into_error(value)
175    }
176}
177
178pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
179
180#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
181pub enum Error {
182    #[serde(with = "crate::util::serde::io_error_serde")]
183    Io(io::Error),
184}
185
186impl Error {
187    pub fn io(
188        kind: io::ErrorKind,
189        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
190    ) -> Self {
191        Self::Io(io::Error::new(kind, msg.into()))
192    }
193
194    pub fn other<E>(msg: E) -> Self
195    where
196        E: Into<Box<dyn std::error::Error + Send + Sync>>,
197    {
198        Self::Io(io::Error::other(msg.into()))
199    }
200}
201
202impl From<irpc::Error> for Error {
203    fn from(e: irpc::Error) -> Self {
204        Self::Io(e.into())
205    }
206}
207
208impl From<RequestError> for Error {
209    fn from(e: RequestError) -> Self {
210        match e {
211            RequestError::Rpc { source, .. } => Self::Io(source.into()),
212            RequestError::Inner { source, .. } => source,
213        }
214    }
215}
216
217impl From<irpc::channel::mpsc::RecvError> for Error {
218    fn from(e: irpc::channel::mpsc::RecvError) -> Self {
219        Self::Io(e.into())
220    }
221}
222
223impl From<irpc::rpc::WriteError> for Error {
224    fn from(e: irpc::rpc::WriteError) -> Self {
225        Self::Io(e.into())
226    }
227}
228
229impl From<irpc::RequestError> for Error {
230    fn from(e: irpc::RequestError) -> Self {
231        Self::Io(e.into())
232    }
233}
234
235impl From<irpc::channel::SendError> for Error {
236    fn from(e: irpc::channel::SendError) -> Self {
237        Self::Io(e.into())
238    }
239}
240
241impl std::error::Error for Error {
242    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
243        match self {
244            Error::Io(e) => Some(e),
245        }
246    }
247}
248
249impl From<EncodeError> for Error {
250    fn from(value: EncodeError) -> Self {
251        match value {
252            EncodeError::Io(cause) => Self::Io(cause),
253            _ => Self::other(value),
254        }
255    }
256}
257
258pub type Result<T> = std::result::Result<T, Error>;
259
260/// The main entry point for the store API.
261#[derive(Debug, Clone, ref_cast::RefCast)]
262#[repr(transparent)]
263pub struct Store {
264    client: ApiClient,
265}
266
267impl Deref for Store {
268    type Target = blobs::Blobs;
269
270    fn deref(&self) -> &Self::Target {
271        blobs::Blobs::ref_from_sender(&self.client)
272    }
273}
274
275impl Store {
276    /// The tags API.
277    pub fn tags(&self) -> &Tags {
278        Tags::ref_from_sender(&self.client)
279    }
280
281    /// The blobs API.
282    pub fn blobs(&self) -> &blobs::Blobs {
283        blobs::Blobs::ref_from_sender(&self.client)
284    }
285
286    /// API for getting blobs from a *single* remote node.
287    pub fn remote(&self) -> &remote::Remote {
288        remote::Remote::ref_from_sender(&self.client)
289    }
290
291    /// Create a downloader for more complex downloads.
292    ///
293    /// Unlike the other APIs, this creates an object that has internal state,
294    /// so don't create it ad hoc but store it somewhere if you need it multiple
295    /// times.
296    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
297        downloader::Downloader::new(self, endpoint)
298    }
299
300    /// Connect to a remote store as a rpc client.
301    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
302        let sender = irpc::Client::quinn(endpoint, addr);
303        Store::from_sender(sender)
304    }
305
306    /// Listen on a quinn endpoint for incoming rpc connections.
307    pub async fn listen(self, endpoint: quinn::Endpoint) {
308        let local = self.client.as_local().unwrap().clone();
309        let handler = Request::remote_handler(local);
310        listen::<Request>(endpoint, handler).await
311    }
312
313    pub async fn sync_db(&self) -> RequestResult<()> {
314        let msg = SyncDbRequest;
315        self.client.rpc(msg).await??;
316        Ok(())
317    }
318
319    pub async fn shutdown(&self) -> irpc::Result<()> {
320        let msg = ShutdownRequest;
321        self.client.rpc(msg).await?;
322        Ok(())
323    }
324
325    /// Waits for the store to become completely idle.
326    ///
327    /// This is mostly useful for tests, where you want to check that e.g. the
328    /// store has written all data to disk.
329    ///
330    /// Note that a store is not guaranteed to become idle, if it is being
331    /// interacted with concurrently. So this might wait forever.
332    ///
333    /// Also note that once you get the callback, the store is not guaranteed to
334    /// still be idle. All this tells you that there was a point in time where
335    /// the store was idle between the call and the response.
336    pub async fn wait_idle(&self) -> irpc::Result<()> {
337        let msg = WaitIdleRequest;
338        self.client.rpc(msg).await?;
339        Ok(())
340    }
341
342    pub(crate) fn from_sender(client: ApiClient) -> Self {
343        Self { client }
344    }
345
346    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
347        Self::ref_cast(client)
348    }
349}