iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
34pub use crate::{store::util::Tag, util::temp_tag::TempTag};
35
36pub(crate) type ApiClient = irpc::Client<proto::Request>;
37
38#[common_fields({
39    backtrace: Option<Backtrace>,
40    #[snafu(implicit)]
41    span_trace: SpanTrace,
42})]
43#[allow(missing_docs)]
44#[non_exhaustive]
45#[derive(Debug, Snafu)]
46pub enum RequestError {
47    /// Request failed due to rpc error.
48    #[snafu(display("rpc error: {source}"))]
49    Rpc { source: irpc::Error },
50    /// Request failed due an actual error.
51    #[snafu(display("inner error: {source}"))]
52    Inner { source: Error },
53}
54
55impl From<irpc::Error> for RequestError {
56    fn from(value: irpc::Error) -> Self {
57        RpcSnafu.into_error(value)
58    }
59}
60
61impl From<Error> for RequestError {
62    fn from(value: Error) -> Self {
63        InnerSnafu.into_error(value)
64    }
65}
66
67impl From<io::Error> for RequestError {
68    fn from(value: io::Error) -> Self {
69        InnerSnafu.into_error(value.into())
70    }
71}
72
73impl From<irpc::channel::RecvError> for RequestError {
74    fn from(value: irpc::channel::RecvError) -> Self {
75        RpcSnafu.into_error(value.into())
76    }
77}
78
79pub type RequestResult<T> = std::result::Result<T, RequestError>;
80
81#[common_fields({
82    backtrace: Option<Backtrace>,
83    #[snafu(implicit)]
84    span_trace: SpanTrace,
85})]
86#[allow(missing_docs)]
87#[non_exhaustive]
88#[derive(Debug, Snafu)]
89pub enum ExportBaoError {
90    #[snafu(display("send error: {source}"))]
91    Send { source: irpc::channel::SendError },
92    #[snafu(display("recv error: {source}"))]
93    Recv { source: irpc::channel::RecvError },
94    #[snafu(display("request error: {source}"))]
95    Request { source: irpc::RequestError },
96    #[snafu(display("io error: {source}"))]
97    ExportBaoIo { source: io::Error },
98    #[snafu(display("encode error: {source}"))]
99    ExportBaoInner { source: bao_tree::io::EncodeError },
100    #[snafu(display("client error: {source}"))]
101    ClientError { source: ProgressError },
102}
103
104impl From<ExportBaoError> for Error {
105    fn from(e: ExportBaoError) -> Self {
106        match e {
107            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
108            ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
109            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
110            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
111            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
112            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
113        }
114    }
115}
116
117impl From<irpc::Error> for ExportBaoError {
118    fn from(e: irpc::Error) -> Self {
119        match e {
120            irpc::Error::Recv(e) => RecvSnafu.into_error(e),
121            irpc::Error::Send(e) => SendSnafu.into_error(e),
122            irpc::Error::Request(e) => RequestSnafu.into_error(e),
123            irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
124        }
125    }
126}
127
128impl From<io::Error> for ExportBaoError {
129    fn from(value: io::Error) -> Self {
130        ExportBaoIoSnafu.into_error(value)
131    }
132}
133
134impl From<irpc::channel::RecvError> for ExportBaoError {
135    fn from(value: irpc::channel::RecvError) -> Self {
136        RecvSnafu.into_error(value)
137    }
138}
139
140impl From<irpc::channel::SendError> for ExportBaoError {
141    fn from(value: irpc::channel::SendError) -> Self {
142        SendSnafu.into_error(value)
143    }
144}
145
146impl From<irpc::RequestError> for ExportBaoError {
147    fn from(value: irpc::RequestError) -> Self {
148        RequestSnafu.into_error(value)
149    }
150}
151
152impl From<bao_tree::io::EncodeError> for ExportBaoError {
153    fn from(value: bao_tree::io::EncodeError) -> Self {
154        ExportBaoInnerSnafu.into_error(value)
155    }
156}
157
158impl From<ProgressError> for ExportBaoError {
159    fn from(value: ProgressError) -> Self {
160        ClientSnafu.into_error(value)
161    }
162}
163
164pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
165
166#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
167pub enum Error {
168    #[serde(with = "crate::util::serde::io_error_serde")]
169    Io(io::Error),
170}
171
172impl Error {
173    pub fn io(
174        kind: io::ErrorKind,
175        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
176    ) -> Self {
177        Self::Io(io::Error::new(kind, msg.into()))
178    }
179
180    pub fn other<E>(msg: E) -> Self
181    where
182        E: Into<Box<dyn std::error::Error + Send + Sync>>,
183    {
184        Self::Io(io::Error::other(msg.into()))
185    }
186}
187
188impl From<irpc::Error> for Error {
189    fn from(e: irpc::Error) -> Self {
190        Self::Io(e.into())
191    }
192}
193
194impl From<RequestError> for Error {
195    fn from(e: RequestError) -> Self {
196        match e {
197            RequestError::Rpc { source, .. } => Self::Io(source.into()),
198            RequestError::Inner { source, .. } => source,
199        }
200    }
201}
202
203impl From<irpc::channel::RecvError> for Error {
204    fn from(e: irpc::channel::RecvError) -> Self {
205        Self::Io(e.into())
206    }
207}
208
209impl From<irpc::rpc::WriteError> for Error {
210    fn from(e: irpc::rpc::WriteError) -> Self {
211        Self::Io(e.into())
212    }
213}
214
215impl From<irpc::RequestError> for Error {
216    fn from(e: irpc::RequestError) -> Self {
217        Self::Io(e.into())
218    }
219}
220
221impl From<irpc::channel::SendError> for Error {
222    fn from(e: irpc::channel::SendError) -> Self {
223        Self::Io(e.into())
224    }
225}
226
227impl std::error::Error for Error {
228    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
229        match self {
230            Error::Io(e) => Some(e),
231        }
232    }
233}
234
235impl From<EncodeError> for Error {
236    fn from(value: EncodeError) -> Self {
237        match value {
238            EncodeError::Io(cause) => Self::Io(cause),
239            _ => Self::other(value),
240        }
241    }
242}
243
244pub type Result<T> = std::result::Result<T, Error>;
245
246/// The main entry point for the store API.
247#[derive(Debug, Clone, ref_cast::RefCast)]
248#[repr(transparent)]
249pub struct Store {
250    client: ApiClient,
251}
252
253impl Deref for Store {
254    type Target = blobs::Blobs;
255
256    fn deref(&self) -> &Self::Target {
257        blobs::Blobs::ref_from_sender(&self.client)
258    }
259}
260
261impl Store {
262    /// The tags API.
263    pub fn tags(&self) -> &Tags {
264        Tags::ref_from_sender(&self.client)
265    }
266
267    /// The blobs API.
268    pub fn blobs(&self) -> &blobs::Blobs {
269        blobs::Blobs::ref_from_sender(&self.client)
270    }
271
272    /// API for getting blobs from a *single* remote node.
273    pub fn remote(&self) -> &remote::Remote {
274        remote::Remote::ref_from_sender(&self.client)
275    }
276
277    /// Create a downloader for more complex downloads.
278    ///
279    /// Unlike the other APIs, this creates an object that has internal state,
280    /// so don't create it ad hoc but store it somewhere if you need it multiple
281    /// times.
282    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
283        downloader::Downloader::new(self, endpoint)
284    }
285
286    /// Connect to a remote store as a rpc client.
287    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
288        let sender = irpc::Client::quinn(endpoint, addr);
289        Store::from_sender(sender)
290    }
291
292    /// Listen on a quinn endpoint for incoming rpc connections.
293    pub async fn listen(self, endpoint: quinn::Endpoint) {
294        let local = self.client.as_local().unwrap().clone();
295        let handler = Request::remote_handler(local);
296        listen::<Request>(endpoint, handler).await
297    }
298
299    pub async fn sync_db(&self) -> RequestResult<()> {
300        let msg = SyncDbRequest;
301        self.client.rpc(msg).await??;
302        Ok(())
303    }
304
305    pub async fn shutdown(&self) -> irpc::Result<()> {
306        let msg = ShutdownRequest;
307        self.client.rpc(msg).await?;
308        Ok(())
309    }
310
311    /// Waits for the store to become completely idle.
312    ///
313    /// This is mostly useful for tests, where you want to check that e.g. the
314    /// store has written all data to disk.
315    ///
316    /// Note that a store is not guaranteed to become idle, if it is being
317    /// interacted with concurrently. So this might wait forever.
318    ///
319    /// Also note that once you get the callback, the store is not guaranteed to
320    /// still be idle. All this tells you that there was a point in time where
321    /// the store was idle between the call and the response.
322    pub async fn wait_idle(&self) -> irpc::Result<()> {
323        let msg = WaitIdleRequest;
324        self.client.rpc(msg).await?;
325        Ok(())
326    }
327
328    pub(crate) fn from_sender(client: ApiClient) -> Self {
329        Self { client }
330    }
331
332    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
333        Self::ref_cast(client)
334    }
335}