1use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
34pub use crate::{store::util::Tag, util::temp_tag::TempTag};
35
36pub(crate) type ApiClient = irpc::Client<proto::Request>;
37
38#[common_fields({
39 backtrace: Option<Backtrace>,
40 #[snafu(implicit)]
41 span_trace: SpanTrace,
42})]
43#[allow(missing_docs)]
44#[non_exhaustive]
45#[derive(Debug, Snafu)]
46pub enum RequestError {
47 #[snafu(display("rpc error: {source}"))]
49 Rpc { source: irpc::Error },
50 #[snafu(display("inner error: {source}"))]
52 Inner { source: Error },
53}
54
55impl From<irpc::Error> for RequestError {
56 fn from(value: irpc::Error) -> Self {
57 RpcSnafu.into_error(value)
58 }
59}
60
61impl From<Error> for RequestError {
62 fn from(value: Error) -> Self {
63 InnerSnafu.into_error(value)
64 }
65}
66
67impl From<io::Error> for RequestError {
68 fn from(value: io::Error) -> Self {
69 InnerSnafu.into_error(value.into())
70 }
71}
72
73impl From<irpc::channel::RecvError> for RequestError {
74 fn from(value: irpc::channel::RecvError) -> Self {
75 RpcSnafu.into_error(value.into())
76 }
77}
78
79pub type RequestResult<T> = std::result::Result<T, RequestError>;
80
81#[common_fields({
82 backtrace: Option<Backtrace>,
83 #[snafu(implicit)]
84 span_trace: SpanTrace,
85})]
86#[allow(missing_docs)]
87#[non_exhaustive]
88#[derive(Debug, Snafu)]
89pub enum ExportBaoError {
90 #[snafu(display("send error: {source}"))]
91 Send { source: irpc::channel::SendError },
92 #[snafu(display("recv error: {source}"))]
93 Recv { source: irpc::channel::RecvError },
94 #[snafu(display("request error: {source}"))]
95 Request { source: irpc::RequestError },
96 #[snafu(display("io error: {source}"))]
97 ExportBaoIo { source: io::Error },
98 #[snafu(display("encode error: {source}"))]
99 ExportBaoInner { source: bao_tree::io::EncodeError },
100 #[snafu(display("client error: {source}"))]
101 ClientError { source: ProgressError },
102}
103
104impl From<ExportBaoError> for Error {
105 fn from(e: ExportBaoError) -> Self {
106 match e {
107 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
108 ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
109 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
110 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
111 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
112 ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
113 }
114 }
115}
116
117impl From<irpc::Error> for ExportBaoError {
118 fn from(e: irpc::Error) -> Self {
119 match e {
120 irpc::Error::Recv(e) => RecvSnafu.into_error(e),
121 irpc::Error::Send(e) => SendSnafu.into_error(e),
122 irpc::Error::Request(e) => RequestSnafu.into_error(e),
123 irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
124 }
125 }
126}
127
128impl From<io::Error> for ExportBaoError {
129 fn from(value: io::Error) -> Self {
130 ExportBaoIoSnafu.into_error(value)
131 }
132}
133
134impl From<irpc::channel::RecvError> for ExportBaoError {
135 fn from(value: irpc::channel::RecvError) -> Self {
136 RecvSnafu.into_error(value)
137 }
138}
139
140impl From<irpc::channel::SendError> for ExportBaoError {
141 fn from(value: irpc::channel::SendError) -> Self {
142 SendSnafu.into_error(value)
143 }
144}
145
146impl From<irpc::RequestError> for ExportBaoError {
147 fn from(value: irpc::RequestError) -> Self {
148 RequestSnafu.into_error(value)
149 }
150}
151
152impl From<bao_tree::io::EncodeError> for ExportBaoError {
153 fn from(value: bao_tree::io::EncodeError) -> Self {
154 ExportBaoInnerSnafu.into_error(value)
155 }
156}
157
158impl From<ProgressError> for ExportBaoError {
159 fn from(value: ProgressError) -> Self {
160 ClientSnafu.into_error(value)
161 }
162}
163
164pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
165
166#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
167pub enum Error {
168 #[serde(with = "crate::util::serde::io_error_serde")]
169 Io(io::Error),
170}
171
172impl Error {
173 pub fn io(
174 kind: io::ErrorKind,
175 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
176 ) -> Self {
177 Self::Io(io::Error::new(kind, msg.into()))
178 }
179
180 pub fn other<E>(msg: E) -> Self
181 where
182 E: Into<Box<dyn std::error::Error + Send + Sync>>,
183 {
184 Self::Io(io::Error::other(msg.into()))
185 }
186}
187
188impl From<irpc::Error> for Error {
189 fn from(e: irpc::Error) -> Self {
190 Self::Io(e.into())
191 }
192}
193
194impl From<RequestError> for Error {
195 fn from(e: RequestError) -> Self {
196 match e {
197 RequestError::Rpc { source, .. } => Self::Io(source.into()),
198 RequestError::Inner { source, .. } => source,
199 }
200 }
201}
202
203impl From<irpc::channel::RecvError> for Error {
204 fn from(e: irpc::channel::RecvError) -> Self {
205 Self::Io(e.into())
206 }
207}
208
209impl From<irpc::rpc::WriteError> for Error {
210 fn from(e: irpc::rpc::WriteError) -> Self {
211 Self::Io(e.into())
212 }
213}
214
215impl From<irpc::RequestError> for Error {
216 fn from(e: irpc::RequestError) -> Self {
217 Self::Io(e.into())
218 }
219}
220
221impl From<irpc::channel::SendError> for Error {
222 fn from(e: irpc::channel::SendError) -> Self {
223 Self::Io(e.into())
224 }
225}
226
227impl std::error::Error for Error {
228 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
229 match self {
230 Error::Io(e) => Some(e),
231 }
232 }
233}
234
235impl From<EncodeError> for Error {
236 fn from(value: EncodeError) -> Self {
237 match value {
238 EncodeError::Io(cause) => Self::Io(cause),
239 _ => Self::other(value),
240 }
241 }
242}
243
244pub type Result<T> = std::result::Result<T, Error>;
245
246#[derive(Debug, Clone, ref_cast::RefCast)]
248#[repr(transparent)]
249pub struct Store {
250 client: ApiClient,
251}
252
253impl Deref for Store {
254 type Target = blobs::Blobs;
255
256 fn deref(&self) -> &Self::Target {
257 blobs::Blobs::ref_from_sender(&self.client)
258 }
259}
260
261impl Store {
262 pub fn tags(&self) -> &Tags {
264 Tags::ref_from_sender(&self.client)
265 }
266
267 pub fn blobs(&self) -> &blobs::Blobs {
269 blobs::Blobs::ref_from_sender(&self.client)
270 }
271
272 pub fn remote(&self) -> &remote::Remote {
274 remote::Remote::ref_from_sender(&self.client)
275 }
276
277 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
283 downloader::Downloader::new(self, endpoint)
284 }
285
286 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
288 let sender = irpc::Client::quinn(endpoint, addr);
289 Store::from_sender(sender)
290 }
291
292 pub async fn listen(self, endpoint: quinn::Endpoint) {
294 let local = self.client.as_local().unwrap().clone();
295 let handler = Request::remote_handler(local);
296 listen::<Request>(endpoint, handler).await
297 }
298
299 pub async fn sync_db(&self) -> RequestResult<()> {
300 let msg = SyncDbRequest;
301 self.client.rpc(msg).await??;
302 Ok(())
303 }
304
305 pub async fn shutdown(&self) -> irpc::Result<()> {
306 let msg = ShutdownRequest;
307 self.client.rpc(msg).await?;
308 Ok(())
309 }
310
311 pub async fn wait_idle(&self) -> irpc::Result<()> {
323 let msg = WaitIdleRequest;
324 self.client.rpc(msg).await?;
325 Ok(())
326 }
327
328 pub(crate) fn from_sender(client: ApiClient) -> Self {
329 Self { client }
330 }
331
332 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
333 Self::ref_cast(client)
334 }
335}