1use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_snafu::SpanTrace;
21use nested_enum_utils::common_fields;
22use proto::{Request, ShutdownRequest, SyncDbRequest};
23use ref_cast::RefCast;
24use serde::{Deserialize, Serialize};
25use snafu::{Backtrace, IntoError, Snafu};
26use tags::Tags;
27
28pub mod blobs;
29pub mod downloader;
30pub mod proto;
31pub mod remote;
32pub mod tags;
33use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
34pub use crate::{store::util::Tag, util::temp_tag::TempTag};
35
36pub(crate) type ApiClient = irpc::Client<proto::Request>;
37
38#[common_fields({
39 backtrace: Option<Backtrace>,
40 #[snafu(implicit)]
41 span_trace: SpanTrace,
42})]
43#[allow(missing_docs)]
44#[non_exhaustive]
45#[derive(Debug, Snafu)]
46pub enum RequestError {
47 #[snafu(display("rpc error: {source}"))]
49 Rpc { source: irpc::Error },
50 #[snafu(display("inner error: {source}"))]
52 Inner { source: Error },
53}
54
55impl From<irpc::Error> for RequestError {
56 fn from(value: irpc::Error) -> Self {
57 RpcSnafu.into_error(value)
58 }
59}
60
61impl From<Error> for RequestError {
62 fn from(value: Error) -> Self {
63 InnerSnafu.into_error(value)
64 }
65}
66
67impl From<io::Error> for RequestError {
68 fn from(value: io::Error) -> Self {
69 InnerSnafu.into_error(value.into())
70 }
71}
72
73impl From<irpc::channel::mpsc::RecvError> for RequestError {
74 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
75 RpcSnafu.into_error(value.into())
76 }
77}
78
79pub type RequestResult<T> = std::result::Result<T, RequestError>;
80
81#[common_fields({
82 backtrace: Option<Backtrace>,
83 #[snafu(implicit)]
84 span_trace: SpanTrace,
85})]
86#[allow(missing_docs)]
87#[non_exhaustive]
88#[derive(Debug, Snafu)]
89pub enum ExportBaoError {
90 #[snafu(display("send error: {source}"))]
91 Send { source: irpc::channel::SendError },
92 #[snafu(display("mpsc recv error: {source}"))]
93 MpscRecv {
94 source: irpc::channel::mpsc::RecvError,
95 },
96 #[snafu(display("oneshot recv error: {source}"))]
97 OneshotRecv {
98 source: irpc::channel::oneshot::RecvError,
99 },
100 #[snafu(display("request error: {source}"))]
101 Request { source: irpc::RequestError },
102 #[snafu(display("io error: {source}"))]
103 ExportBaoIo { source: io::Error },
104 #[snafu(display("encode error: {source}"))]
105 ExportBaoInner { source: bao_tree::io::EncodeError },
106 #[snafu(display("client error: {source}"))]
107 ClientError { source: ProgressError },
108}
109
110impl From<ExportBaoError> for Error {
111 fn from(e: ExportBaoError) -> Self {
112 match e {
113 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
114 ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
115 ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
116 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
117 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
118 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
119 ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
120 }
121 }
122}
123
124impl From<irpc::Error> for ExportBaoError {
125 fn from(e: irpc::Error) -> Self {
126 match e {
127 irpc::Error::MpscRecv(e) => MpscRecvSnafu.into_error(e),
128 irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e),
129 irpc::Error::Send(e) => SendSnafu.into_error(e),
130 irpc::Error::Request(e) => RequestSnafu.into_error(e),
131 irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
132 }
133 }
134}
135
136impl From<io::Error> for ExportBaoError {
137 fn from(value: io::Error) -> Self {
138 ExportBaoIoSnafu.into_error(value)
139 }
140}
141
142impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
143 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
144 MpscRecvSnafu.into_error(value)
145 }
146}
147
148impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
149 fn from(value: irpc::channel::oneshot::RecvError) -> Self {
150 OneshotRecvSnafu.into_error(value)
151 }
152}
153
154impl From<irpc::channel::SendError> for ExportBaoError {
155 fn from(value: irpc::channel::SendError) -> Self {
156 SendSnafu.into_error(value)
157 }
158}
159
160impl From<irpc::RequestError> for ExportBaoError {
161 fn from(value: irpc::RequestError) -> Self {
162 RequestSnafu.into_error(value)
163 }
164}
165
166impl From<bao_tree::io::EncodeError> for ExportBaoError {
167 fn from(value: bao_tree::io::EncodeError) -> Self {
168 ExportBaoInnerSnafu.into_error(value)
169 }
170}
171
172impl From<ProgressError> for ExportBaoError {
173 fn from(value: ProgressError) -> Self {
174 ClientSnafu.into_error(value)
175 }
176}
177
178pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
179
180#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
181pub enum Error {
182 #[serde(with = "crate::util::serde::io_error_serde")]
183 Io(io::Error),
184}
185
186impl Error {
187 pub fn io(
188 kind: io::ErrorKind,
189 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
190 ) -> Self {
191 Self::Io(io::Error::new(kind, msg.into()))
192 }
193
194 pub fn other<E>(msg: E) -> Self
195 where
196 E: Into<Box<dyn std::error::Error + Send + Sync>>,
197 {
198 Self::Io(io::Error::other(msg.into()))
199 }
200}
201
202impl From<irpc::Error> for Error {
203 fn from(e: irpc::Error) -> Self {
204 Self::Io(e.into())
205 }
206}
207
208impl From<RequestError> for Error {
209 fn from(e: RequestError) -> Self {
210 match e {
211 RequestError::Rpc { source, .. } => Self::Io(source.into()),
212 RequestError::Inner { source, .. } => source,
213 }
214 }
215}
216
217impl From<irpc::channel::mpsc::RecvError> for Error {
218 fn from(e: irpc::channel::mpsc::RecvError) -> Self {
219 Self::Io(e.into())
220 }
221}
222
223impl From<irpc::rpc::WriteError> for Error {
224 fn from(e: irpc::rpc::WriteError) -> Self {
225 Self::Io(e.into())
226 }
227}
228
229impl From<irpc::RequestError> for Error {
230 fn from(e: irpc::RequestError) -> Self {
231 Self::Io(e.into())
232 }
233}
234
235impl From<irpc::channel::SendError> for Error {
236 fn from(e: irpc::channel::SendError) -> Self {
237 Self::Io(e.into())
238 }
239}
240
241impl std::error::Error for Error {
242 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
243 match self {
244 Error::Io(e) => Some(e),
245 }
246 }
247}
248
249impl From<EncodeError> for Error {
250 fn from(value: EncodeError) -> Self {
251 match value {
252 EncodeError::Io(cause) => Self::Io(cause),
253 _ => Self::other(value),
254 }
255 }
256}
257
258pub type Result<T> = std::result::Result<T, Error>;
259
260#[derive(Debug, Clone, ref_cast::RefCast)]
262#[repr(transparent)]
263pub struct Store {
264 client: ApiClient,
265}
266
267impl Deref for Store {
268 type Target = blobs::Blobs;
269
270 fn deref(&self) -> &Self::Target {
271 blobs::Blobs::ref_from_sender(&self.client)
272 }
273}
274
275impl Store {
276 pub fn tags(&self) -> &Tags {
278 Tags::ref_from_sender(&self.client)
279 }
280
281 pub fn blobs(&self) -> &blobs::Blobs {
283 blobs::Blobs::ref_from_sender(&self.client)
284 }
285
286 pub fn remote(&self) -> &remote::Remote {
288 remote::Remote::ref_from_sender(&self.client)
289 }
290
291 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
297 downloader::Downloader::new(self, endpoint)
298 }
299
300 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
302 let sender = irpc::Client::quinn(endpoint, addr);
303 Store::from_sender(sender)
304 }
305
306 pub async fn listen(self, endpoint: quinn::Endpoint) {
308 let local = self.client.as_local().unwrap().clone();
309 let handler = Request::remote_handler(local);
310 listen::<Request>(endpoint, handler).await
311 }
312
313 pub async fn sync_db(&self) -> RequestResult<()> {
314 let msg = SyncDbRequest;
315 self.client.rpc(msg).await??;
316 Ok(())
317 }
318
319 pub async fn shutdown(&self) -> irpc::Result<()> {
320 let msg = ShutdownRequest;
321 self.client.rpc(msg).await?;
322 Ok(())
323 }
324
325 pub async fn wait_idle(&self) -> irpc::Result<()> {
337 let msg = WaitIdleRequest;
338 self.client.rpc(msg).await?;
339 Ok(())
340 }
341
342 pub(crate) fn from_sender(client: ApiClient) -> Self {
343 Self { client }
344 }
345
346 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
347 Self::ref_cast(client)
348 }
349}