iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`](std::ops::Deref)
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, Handler};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
36
37#[common_fields({
38    backtrace: Option<Backtrace>,
39    #[snafu(implicit)]
40    span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46    /// Request failed due to rpc error.
47    #[snafu(display("rpc error: {source}"))]
48    Rpc { source: irpc::Error },
49    /// Request failed due an actual error.
50    #[snafu(display("inner error: {source}"))]
51    Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55    fn from(value: irpc::Error) -> Self {
56        RpcSnafu.into_error(value)
57    }
58}
59
60impl From<Error> for RequestError {
61    fn from(value: Error) -> Self {
62        InnerSnafu.into_error(value)
63    }
64}
65
66impl From<io::Error> for RequestError {
67    fn from(value: io::Error) -> Self {
68        InnerSnafu.into_error(value.into())
69    }
70}
71
72impl From<irpc::channel::RecvError> for RequestError {
73    fn from(value: irpc::channel::RecvError) -> Self {
74        RpcSnafu.into_error(value.into())
75    }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81    backtrace: Option<Backtrace>,
82    #[snafu(implicit)]
83    span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89    #[snafu(display("send error: {source}"))]
90    Send { source: irpc::channel::SendError },
91    #[snafu(display("recv error: {source}"))]
92    Recv { source: irpc::channel::RecvError },
93    #[snafu(display("request error: {source}"))]
94    Request { source: irpc::RequestError },
95    #[snafu(display("io error: {source}"))]
96    ExportBaoIo { source: io::Error },
97    #[snafu(display("encode error: {source}"))]
98    ExportBaoInner { source: bao_tree::io::EncodeError },
99}
100
101impl From<ExportBaoError> for Error {
102    fn from(e: ExportBaoError) -> Self {
103        match e {
104            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
105            ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
106            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
107            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
108            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
109        }
110    }
111}
112
113impl From<irpc::Error> for ExportBaoError {
114    fn from(e: irpc::Error) -> Self {
115        match e {
116            irpc::Error::Recv(e) => RecvSnafu.into_error(e),
117            irpc::Error::Send(e) => SendSnafu.into_error(e),
118            irpc::Error::Request(e) => RequestSnafu.into_error(e),
119            irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
120        }
121    }
122}
123
124impl From<io::Error> for ExportBaoError {
125    fn from(value: io::Error) -> Self {
126        ExportBaoIoSnafu.into_error(value)
127    }
128}
129
130impl From<irpc::channel::RecvError> for ExportBaoError {
131    fn from(value: irpc::channel::RecvError) -> Self {
132        RecvSnafu.into_error(value)
133    }
134}
135
136impl From<irpc::channel::SendError> for ExportBaoError {
137    fn from(value: irpc::channel::SendError) -> Self {
138        SendSnafu.into_error(value)
139    }
140}
141
142impl From<irpc::RequestError> for ExportBaoError {
143    fn from(value: irpc::RequestError) -> Self {
144        RequestSnafu.into_error(value)
145    }
146}
147
148impl From<bao_tree::io::EncodeError> for ExportBaoError {
149    fn from(value: bao_tree::io::EncodeError) -> Self {
150        ExportBaoInnerSnafu.into_error(value)
151    }
152}
153
154pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
155
156#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
157pub enum Error {
158    #[serde(with = "crate::util::serde::io_error_serde")]
159    Io(io::Error),
160}
161
162impl Error {
163    pub fn io(
164        kind: io::ErrorKind,
165        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
166    ) -> Self {
167        Self::Io(io::Error::new(kind, msg.into()))
168    }
169
170    pub fn other<E>(msg: E) -> Self
171    where
172        E: Into<Box<dyn std::error::Error + Send + Sync>>,
173    {
174        Self::Io(io::Error::other(msg.into()))
175    }
176}
177
178impl From<irpc::Error> for Error {
179    fn from(e: irpc::Error) -> Self {
180        Self::Io(e.into())
181    }
182}
183
184impl From<RequestError> for Error {
185    fn from(e: RequestError) -> Self {
186        match e {
187            RequestError::Rpc { source, .. } => Self::Io(source.into()),
188            RequestError::Inner { source, .. } => source,
189        }
190    }
191}
192
193impl From<irpc::channel::RecvError> for Error {
194    fn from(e: irpc::channel::RecvError) -> Self {
195        Self::Io(e.into())
196    }
197}
198
199impl From<irpc::rpc::WriteError> for Error {
200    fn from(e: irpc::rpc::WriteError) -> Self {
201        Self::Io(e.into())
202    }
203}
204
205impl From<irpc::RequestError> for Error {
206    fn from(e: irpc::RequestError) -> Self {
207        Self::Io(e.into())
208    }
209}
210
211impl From<irpc::channel::SendError> for Error {
212    fn from(e: irpc::channel::SendError) -> Self {
213        Self::Io(e.into())
214    }
215}
216
217impl std::error::Error for Error {
218    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
219        match self {
220            Error::Io(e) => Some(e),
221        }
222    }
223}
224
225impl From<EncodeError> for Error {
226    fn from(value: EncodeError) -> Self {
227        match value {
228            EncodeError::Io(cause) => Self::Io(cause),
229            _ => Self::other(value),
230        }
231    }
232}
233
234pub type Result<T> = std::result::Result<T, Error>;
235
236/// The main entry point for the store API.
237#[derive(Debug, Clone, ref_cast::RefCast)]
238#[repr(transparent)]
239pub struct Store {
240    client: ApiClient,
241}
242
243impl Deref for Store {
244    type Target = blobs::Blobs;
245
246    fn deref(&self) -> &Self::Target {
247        blobs::Blobs::ref_from_sender(&self.client)
248    }
249}
250
251impl Store {
252    /// The tags API.
253    pub fn tags(&self) -> &Tags {
254        Tags::ref_from_sender(&self.client)
255    }
256
257    /// The blobs API.
258    pub fn blobs(&self) -> &blobs::Blobs {
259        blobs::Blobs::ref_from_sender(&self.client)
260    }
261
262    /// API for getting blobs from a *single* remote node.
263    pub fn remote(&self) -> &remote::Remote {
264        remote::Remote::ref_from_sender(&self.client)
265    }
266
267    /// Create a downloader for more complex downloads.
268    ///
269    /// Unlike the other APIs, this creates an object that has internal state,
270    /// so don't create it ad hoc but store it somewhere if you need it multiple
271    /// times.
272    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
273        downloader::Downloader::new(self, endpoint)
274    }
275
276    /// Connect to a remote store as a rpc client.
277    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
278        let sender = irpc::Client::quinn(endpoint, addr);
279        Store::from_sender(sender)
280    }
281
282    /// Listen on a quinn endpoint for incoming rpc connections.
283    pub async fn listen(self, endpoint: quinn::Endpoint) {
284        let local = self.client.local().unwrap().clone();
285        let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
286            let local = local.clone();
287            Box::pin({
288                match req {
289                    Request::SetTag(msg) => local.send((msg, tx)),
290                    Request::CreateTag(msg) => local.send((msg, tx)),
291                    Request::DeleteTags(msg) => local.send((msg, tx)),
292                    Request::RenameTag(msg) => local.send((msg, tx)),
293                    Request::ListTags(msg) => local.send((msg, tx)),
294
295                    Request::ListTempTags(msg) => local.send((msg, tx)),
296                    Request::CreateTempTag(msg) => local.send((msg, tx)),
297
298                    Request::BlobStatus(msg) => local.send((msg, tx)),
299
300                    Request::ImportBytes(msg) => local.send((msg, tx)),
301                    Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
302                    Request::ImportBao(msg) => local.send((msg, tx, rx)),
303                    Request::ImportPath(msg) => local.send((msg, tx)),
304                    Request::ListBlobs(msg) => local.send((msg, tx)),
305                    Request::DeleteBlobs(msg) => local.send((msg, tx)),
306                    Request::Batch(msg) => local.send((msg, tx, rx)),
307
308                    Request::ExportBao(msg) => local.send((msg, tx)),
309                    Request::ExportRanges(msg) => local.send((msg, tx)),
310                    Request::ExportPath(msg) => local.send((msg, tx)),
311
312                    Request::Observe(msg) => local.send((msg, tx)),
313
314                    Request::ClearProtected(msg) => local.send((msg, tx)),
315                    Request::SyncDb(msg) => local.send((msg, tx)),
316                    Request::Shutdown(msg) => local.send((msg, tx)),
317                }
318            })
319        });
320        listen::<Request>(endpoint, handler).await
321    }
322
323    pub async fn sync_db(&self) -> RequestResult<()> {
324        let msg = SyncDbRequest;
325        self.client.rpc(msg).await??;
326        Ok(())
327    }
328
329    pub async fn shutdown(&self) -> irpc::Result<()> {
330        let msg = ShutdownRequest;
331        self.client.rpc(msg).await?;
332        Ok(())
333    }
334
335    pub(crate) fn from_sender(client: ApiClient) -> Self {
336        Self { client }
337    }
338
339    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
340        Self::ref_cast(client)
341    }
342}