1use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
6
7use iroh::Endpoint;
8use irpc::rpc::{listen, Handler};
9use n0_snafu::SpanTrace;
10use nested_enum_utils::common_fields;
11use proto::{Request, ShutdownRequest, SyncDbRequest};
12use ref_cast::RefCast;
13use serde::{Deserialize, Serialize};
14use snafu::{Backtrace, IntoError, Snafu};
15use tags::Tags;
16
17pub mod blobs;
18pub mod downloader;
19pub mod proto;
20pub mod remote;
21pub mod tags;
22pub use crate::{store::util::Tag, util::temp_tag::TempTag};
23
24pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
25
26#[common_fields({
27 backtrace: Option<Backtrace>,
28 #[snafu(implicit)]
29 span_trace: SpanTrace,
30})]
31#[allow(missing_docs)]
32#[non_exhaustive]
33#[derive(Debug, Snafu)]
34pub enum RequestError {
35 #[snafu(display("rpc error: {source}"))]
37 Rpc { source: irpc::Error },
38 #[snafu(display("inner error: {source}"))]
40 Inner { source: Error },
41}
42
43impl From<irpc::Error> for RequestError {
44 fn from(value: irpc::Error) -> Self {
45 RpcSnafu.into_error(value)
46 }
47}
48
49impl From<Error> for RequestError {
50 fn from(value: Error) -> Self {
51 InnerSnafu.into_error(value)
52 }
53}
54
55impl From<io::Error> for RequestError {
56 fn from(value: io::Error) -> Self {
57 InnerSnafu.into_error(value.into())
58 }
59}
60
61impl From<irpc::channel::RecvError> for RequestError {
62 fn from(value: irpc::channel::RecvError) -> Self {
63 RpcSnafu.into_error(value.into())
64 }
65}
66
67pub type RequestResult<T> = std::result::Result<T, RequestError>;
68
69#[common_fields({
70 backtrace: Option<Backtrace>,
71 #[snafu(implicit)]
72 span_trace: SpanTrace,
73})]
74#[allow(missing_docs)]
75#[non_exhaustive]
76#[derive(Debug, Snafu)]
77pub enum ExportBaoError {
78 #[snafu(display("send error: {source}"))]
79 Send { source: irpc::channel::SendError },
80 #[snafu(display("recv error: {source}"))]
81 Recv { source: irpc::channel::RecvError },
82 #[snafu(display("request error: {source}"))]
83 Request { source: irpc::RequestError },
84 #[snafu(display("io error: {source}"))]
85 ExportBaoIo { source: io::Error },
86 #[snafu(display("encode error: {source}"))]
87 ExportBaoInner { source: bao_tree::io::EncodeError },
88}
89
90impl From<ExportBaoError> for Error {
91 fn from(e: ExportBaoError) -> Self {
92 match e {
93 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
94 ExportBaoError::Recv { source, .. } => Self::Io(source.into()),
95 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
96 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
97 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
98 }
99 }
100}
101
102impl From<irpc::Error> for ExportBaoError {
103 fn from(e: irpc::Error) -> Self {
104 match e {
105 irpc::Error::Recv(e) => RecvSnafu.into_error(e),
106 irpc::Error::Send(e) => SendSnafu.into_error(e),
107 irpc::Error::Request(e) => RequestSnafu.into_error(e),
108 irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
109 }
110 }
111}
112
113impl From<io::Error> for ExportBaoError {
114 fn from(value: io::Error) -> Self {
115 ExportBaoIoSnafu.into_error(value)
116 }
117}
118
119impl From<irpc::channel::RecvError> for ExportBaoError {
120 fn from(value: irpc::channel::RecvError) -> Self {
121 RecvSnafu.into_error(value)
122 }
123}
124
125impl From<irpc::channel::SendError> for ExportBaoError {
126 fn from(value: irpc::channel::SendError) -> Self {
127 SendSnafu.into_error(value)
128 }
129}
130
131impl From<irpc::RequestError> for ExportBaoError {
132 fn from(value: irpc::RequestError) -> Self {
133 RequestSnafu.into_error(value)
134 }
135}
136
137impl From<bao_tree::io::EncodeError> for ExportBaoError {
138 fn from(value: bao_tree::io::EncodeError) -> Self {
139 ExportBaoInnerSnafu.into_error(value)
140 }
141}
142
143pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
144
145#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
146pub enum Error {
147 #[serde(with = "crate::util::serde::io_error_serde")]
148 Io(io::Error),
149}
150
151impl Error {
152 pub fn io(
153 kind: io::ErrorKind,
154 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
155 ) -> Self {
156 Self::Io(io::Error::new(kind, msg.into()))
157 }
158
159 pub fn other<E>(msg: E) -> Self
160 where
161 E: Into<Box<dyn std::error::Error + Send + Sync>>,
162 {
163 Self::Io(io::Error::other(msg.into()))
164 }
165}
166
167impl From<irpc::Error> for Error {
168 fn from(e: irpc::Error) -> Self {
169 Self::Io(e.into())
170 }
171}
172
173impl From<RequestError> for Error {
174 fn from(e: RequestError) -> Self {
175 match e {
176 RequestError::Rpc { source, .. } => Self::Io(source.into()),
177 RequestError::Inner { source, .. } => source,
178 }
179 }
180}
181
182impl From<irpc::channel::RecvError> for Error {
183 fn from(e: irpc::channel::RecvError) -> Self {
184 Self::Io(e.into())
185 }
186}
187
188impl From<irpc::rpc::WriteError> for Error {
189 fn from(e: irpc::rpc::WriteError) -> Self {
190 Self::Io(e.into())
191 }
192}
193
194impl From<irpc::RequestError> for Error {
195 fn from(e: irpc::RequestError) -> Self {
196 Self::Io(e.into())
197 }
198}
199
200impl From<irpc::channel::SendError> for Error {
201 fn from(e: irpc::channel::SendError) -> Self {
202 Self::Io(e.into())
203 }
204}
205
206impl std::error::Error for Error {
207 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
208 match self {
209 Error::Io(e) => Some(e),
210 }
211 }
212}
213
214pub type Result<T> = std::result::Result<T, Error>;
215
216#[derive(Debug, Clone, ref_cast::RefCast)]
218#[repr(transparent)]
219pub struct Store {
220 client: ApiClient,
221}
222
223impl Deref for Store {
224 type Target = blobs::Blobs;
225
226 fn deref(&self) -> &Self::Target {
227 blobs::Blobs::ref_from_sender(&self.client)
228 }
229}
230
231impl Store {
232 pub fn tags(&self) -> &Tags {
234 Tags::ref_from_sender(&self.client)
235 }
236
237 pub fn blobs(&self) -> &blobs::Blobs {
239 blobs::Blobs::ref_from_sender(&self.client)
240 }
241
242 pub fn remote(&self) -> &remote::Remote {
244 remote::Remote::ref_from_sender(&self.client)
245 }
246
247 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
253 downloader::Downloader::new(self, endpoint)
254 }
255
256 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
258 let sender = irpc::Client::quinn(endpoint, addr);
259 Store::from_sender(sender)
260 }
261
262 pub async fn listen(self, endpoint: quinn::Endpoint) {
264 let local = self.client.local().unwrap().clone();
265 let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
266 let local = local.clone();
267 Box::pin({
268 match req {
269 Request::SetTag(msg) => local.send((msg, tx)),
270 Request::CreateTag(msg) => local.send((msg, tx)),
271 Request::DeleteTags(msg) => local.send((msg, tx)),
272 Request::RenameTag(msg) => local.send((msg, tx)),
273 Request::ListTags(msg) => local.send((msg, tx)),
274
275 Request::ListTempTags(msg) => local.send((msg, tx)),
276 Request::CreateTempTag(msg) => local.send((msg, tx)),
277
278 Request::BlobStatus(msg) => local.send((msg, tx)),
279
280 Request::ImportBytes(msg) => local.send((msg, tx)),
281 Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
282 Request::ImportBao(msg) => local.send((msg, tx, rx)),
283 Request::ImportPath(msg) => local.send((msg, tx)),
284 Request::ListBlobs(msg) => local.send((msg, tx)),
285 Request::DeleteBlobs(msg) => local.send((msg, tx)),
286 Request::Batch(msg) => local.send((msg, tx, rx)),
287
288 Request::ExportBao(msg) => local.send((msg, tx)),
289 Request::ExportRanges(msg) => local.send((msg, tx)),
290 Request::ExportPath(msg) => local.send((msg, tx)),
291
292 Request::Observe(msg) => local.send((msg, tx)),
293
294 Request::ClearProtected(msg) => local.send((msg, tx)),
295 Request::SyncDb(msg) => local.send((msg, tx)),
296 Request::Shutdown(msg) => local.send((msg, tx)),
297 }
298 })
299 });
300 listen::<Request>(endpoint, handler).await
301 }
302
303 pub async fn sync_db(&self) -> RequestResult<()> {
304 let msg = SyncDbRequest;
305 self.client.rpc(msg).await??;
306 Ok(())
307 }
308
309 pub async fn shutdown(&self) -> irpc::Result<()> {
310 let msg = ShutdownRequest;
311 self.client.rpc(msg).await?;
312 Ok(())
313 }
314
315 pub(crate) fn from_sender(client: ApiClient) -> Self {
316 Self { client }
317 }
318
319 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
320 Self::ref_cast(client)
321 }
322}