1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2use std::{
17 fmt::{self, Debug},
18 io,
19 num::NonZeroU64,
20 ops::{Bound, RangeBounds},
21 path::PathBuf,
22 pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27 io::{mixed::EncodedItem, BaoContentItem, Leaf},
28 ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32 channel::{mpsc, oneshot},
33 rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43pub(crate) trait HashSpecific {
44 fn hash(&self) -> Hash;
45
46 fn hash_short(&self) -> ArrayString<10> {
47 self.hash().fmt_short()
48 }
49}
50
51impl HashSpecific for ImportBaoMsg {
52 fn hash(&self) -> crate::Hash {
53 self.inner.hash
54 }
55}
56
57impl HashSpecific for ObserveMsg {
58 fn hash(&self) -> crate::Hash {
59 self.inner.hash
60 }
61}
62
63impl HashSpecific for ExportBaoMsg {
64 fn hash(&self) -> crate::Hash {
65 self.inner.hash
66 }
67}
68
69impl HashSpecific for ExportRangesMsg {
70 fn hash(&self) -> crate::Hash {
71 self.inner.hash
72 }
73}
74
75impl HashSpecific for ExportPathMsg {
76 fn hash(&self) -> crate::Hash {
77 self.inner.hash
78 }
79}
80
81pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
82
83impl HashSpecific for CreateTagMsg {
84 fn hash(&self) -> crate::Hash {
85 self.inner.value.hash
86 }
87}
88
89#[rpc_requests(message = Command, alias = "Msg")]
90#[derive(Debug, Serialize, Deserialize)]
91pub enum Request {
92 #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
93 ListBlobs(ListRequest),
94 #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
95 Batch(BatchRequest),
96 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
97 DeleteBlobs(BlobDeleteRequest),
98 #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
99 ImportBao(ImportBaoRequest),
100 #[rpc(tx = mpsc::Sender<EncodedItem>)]
101 ExportBao(ExportBaoRequest),
102 #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
103 ExportRanges(ExportRangesRequest),
104 #[rpc(tx = mpsc::Sender<Bitfield>)]
105 Observe(ObserveRequest),
106 #[rpc(tx = oneshot::Sender<BlobStatus>)]
107 BlobStatus(BlobStatusRequest),
108 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
109 ImportBytes(ImportBytesRequest),
110 #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
111 ImportByteStream(ImportByteStreamRequest),
112 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
113 ImportPath(ImportPathRequest),
114 #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
115 ExportPath(ExportPathRequest),
116 #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
117 ListTags(ListTagsRequest),
118 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
119 SetTag(SetTagRequest),
120 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
121 DeleteTags(DeleteTagsRequest),
122 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
123 RenameTag(RenameTagRequest),
124 #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
125 CreateTag(CreateTagRequest),
126 #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
127 ListTempTags(ListTempTagsRequest),
128 #[rpc(tx = oneshot::Sender<TempTag>)]
129 CreateTempTag(CreateTempTagRequest),
130 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
131 SyncDb(SyncDbRequest),
132 #[rpc(tx = oneshot::Sender<()>)]
133 Shutdown(ShutdownRequest),
134 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
135 ClearProtected(ClearProtectedRequest),
136}
137
138#[derive(Debug, Serialize, Deserialize)]
139pub struct SyncDbRequest;
140
141#[derive(Debug, Serialize, Deserialize)]
142pub struct ShutdownRequest;
143
144#[derive(Debug, Serialize, Deserialize)]
145pub struct ClearProtectedRequest;
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct BlobStatusRequest {
149 pub hash: Hash,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153pub struct ListRequest;
154
155#[derive(Debug, Serialize, Deserialize)]
156pub struct BatchRequest;
157
158#[derive(Debug, Serialize, Deserialize)]
159pub enum BatchResponse {
160 Drop(HashAndFormat),
161 Ping,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
166pub struct BlobDeleteRequest {
167 pub hashes: Vec<Hash>,
168 pub force: bool,
169}
170
171#[derive(Serialize, Deserialize)]
173pub struct ImportBytesRequest {
174 pub data: Bytes,
175 pub format: BlobFormat,
176 pub scope: Scope,
177}
178
179impl fmt::Debug for ImportBytesRequest {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 f.debug_struct("ImportBytes")
182 .field("data", &self.data.len())
183 .field("format", &self.format)
184 .field("scope", &self.scope)
185 .finish()
186 }
187}
188
189#[derive(Debug, Serialize, Deserialize)]
190pub struct ImportPathRequest {
191 pub path: PathBuf,
192 pub mode: ImportMode,
193 pub format: BlobFormat,
194 pub scope: Scope,
195}
196
197#[derive(Debug, Serialize, Deserialize)]
203pub struct ImportBaoRequest {
204 pub hash: Hash,
205 pub size: NonZeroU64,
206}
207
208#[derive(Debug, Serialize, Deserialize)]
210pub struct ObserveRequest {
211 pub hash: Hash,
212}
213
214#[derive(Debug, Serialize, Deserialize)]
218pub struct ExportBaoRequest {
219 pub hash: Hash,
220 pub ranges: ChunkRanges,
221}
222
223#[derive(Debug, Serialize, Deserialize)]
225pub struct ExportRangesRequest {
226 pub hash: Hash,
227 pub ranges: RangeSet2<u64>,
228}
229
230#[derive(Debug, Serialize, Deserialize)]
237pub struct ExportPathRequest {
238 pub hash: Hash,
239 pub mode: ExportMode,
240 pub target: PathBuf,
241}
242
243#[derive(Debug, Serialize, Deserialize)]
244pub struct ImportByteStreamRequest {
245 pub format: BlobFormat,
246 pub scope: Scope,
247}
248
249#[derive(Debug, Serialize, Deserialize)]
250pub enum ImportByteStreamUpdate {
251 Bytes(Bytes),
252 Done,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct ListTagsRequest {
258 pub hash_seq: bool,
260 pub raw: bool,
262 pub from: Option<Tag>,
264 pub to: Option<Tag>,
266}
267
268impl ListTagsRequest {
269 pub fn range<R, E>(range: R) -> Self
271 where
272 R: RangeBounds<E>,
273 E: AsRef<[u8]>,
274 {
275 let (from, to) = tags_from_range(range);
276 Self {
277 from,
278 to,
279 raw: true,
280 hash_seq: true,
281 }
282 }
283
284 pub fn prefix(prefix: &[u8]) -> Self {
286 let from = Tag::from(prefix);
287 let to = from.next_prefix();
288 Self {
289 raw: true,
290 hash_seq: true,
291 from: Some(from),
292 to,
293 }
294 }
295
296 pub fn single(name: &[u8]) -> Self {
298 let from = Tag::from(name);
299 Self {
300 to: Some(from.successor()),
301 from: Some(from),
302 raw: true,
303 hash_seq: true,
304 }
305 }
306
307 pub fn all() -> Self {
309 Self {
310 raw: true,
311 hash_seq: true,
312 from: None,
313 to: None,
314 }
315 }
316
317 pub fn raw() -> Self {
319 Self {
320 raw: true,
321 hash_seq: false,
322 from: None,
323 to: None,
324 }
325 }
326
327 pub fn hash_seq() -> Self {
329 Self {
330 raw: false,
331 hash_seq: true,
332 from: None,
333 to: None,
334 }
335 }
336}
337
338#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
340pub struct TagInfo {
341 pub name: Tag,
343 pub format: BlobFormat,
345 pub hash: Hash,
347}
348
349impl From<TagInfo> for HashAndFormat {
350 fn from(tag_info: TagInfo) -> Self {
351 HashAndFormat {
352 hash: tag_info.hash,
353 format: tag_info.format,
354 }
355 }
356}
357
358impl TagInfo {
359 pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
361 let name = name.as_ref();
362 let value = value.into();
363 Self {
364 name: Tag::from(name),
365 hash: value.hash,
366 format: value.format,
367 }
368 }
369
370 pub fn hash_and_format(&self) -> HashAndFormat {
372 HashAndFormat {
373 hash: self.hash,
374 format: self.format,
375 }
376 }
377}
378
379pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
380where
381 R: RangeBounds<E>,
382 E: AsRef<[u8]>,
383{
384 let from = match range.start_bound() {
385 Bound::Included(start) => Some(Tag::from(start.as_ref())),
386 Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
387 Bound::Unbounded => None,
388 };
389 let to = match range.end_bound() {
390 Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
391 Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
392 Bound::Unbounded => None,
393 };
394 (from, to)
395}
396
397#[derive(Debug, Serialize, Deserialize)]
399pub struct CreateTempTagRequest {
400 pub scope: Scope,
401 pub value: HashAndFormat,
402}
403
404#[derive(Debug, Serialize, Deserialize)]
406pub struct ListTempTagsRequest;
407
408#[derive(Debug, Serialize, Deserialize)]
410pub struct RenameTagRequest {
411 pub from: Tag,
413 pub to: Tag,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
419pub struct DeleteTagsRequest {
420 pub from: Option<Tag>,
422 pub to: Option<Tag>,
424}
425
426impl DeleteTagsRequest {
427 pub fn single(name: &[u8]) -> Self {
429 let name = Tag::from(name);
430 Self {
431 to: Some(name.successor()),
432 from: Some(name),
433 }
434 }
435
436 pub fn range<R, E>(range: R) -> Self
438 where
439 R: RangeBounds<E>,
440 E: AsRef<[u8]>,
441 {
442 let (from, to) = tags_from_range(range);
443 Self { from, to }
444 }
445
446 pub fn prefix(prefix: &[u8]) -> Self {
448 let from = Tag::from(prefix);
449 let to = from.next_prefix();
450 Self {
451 from: Some(from),
452 to,
453 }
454 }
455}
456
457#[derive(Debug, Serialize, Deserialize)]
459pub struct SetTagRequest {
460 pub name: Tag,
461 pub value: HashAndFormat,
462}
463
464#[derive(Debug, Serialize, Deserialize)]
466pub struct CreateTagRequest {
467 pub value: HashAndFormat,
468}
469
470#[derive(Debug, Serialize, Deserialize)]
472pub struct ProcessExitRequest {
473 pub code: i32,
474}
475
476#[derive(Debug, Serialize, Deserialize)]
490pub enum AddProgressItem {
491 CopyProgress(u64),
501 Size(u64),
510 CopyDone,
515 OutboardProgress(u64),
520 Done(TempTag),
529 Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
537}
538
539impl From<io::Error> for AddProgressItem {
540 fn from(e: io::Error) -> Self {
541 Self::Error(e)
542 }
543}
544
545#[derive(Debug, Serialize, Deserialize)]
546pub enum ExportRangesItem {
547 Size(u64),
552 Data(Leaf),
554 Error(super::Error),
556}
557
558impl From<super::Error> for ExportRangesItem {
559 fn from(e: super::Error) -> Self {
560 Self::Error(e)
561 }
562}
563
564impl From<Leaf> for ExportRangesItem {
565 fn from(leaf: Leaf) -> Self {
566 Self::Data(leaf)
567 }
568}
569
570#[derive(Debug, Serialize, Deserialize)]
579pub enum ExportProgressItem {
580 Size(u64),
585 CopyProgress(u64),
593 Done,
601 Error(super::Error),
609}
610
611impl From<super::Error> for ExportProgressItem {
612 fn from(e: super::Error) -> Self {
613 Self::Error(e)
614 }
615}
616
617#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
624pub enum ImportMode {
625 #[default]
630 Copy,
631 TryReference,
639}
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
648pub enum ExportMode {
649 #[default]
654 Copy,
655 TryReference,
664}
665
666#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
668pub enum BlobStatus {
669 NotFound,
671 Partial {
673 size: Option<u64>,
675 },
676 Complete {
678 size: u64,
680 },
681}
682
683#[derive(
685 Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
686)]
687pub struct Scope(pub(crate) u64);
688
689impl Scope {
690 pub const GLOBAL: Self = Self(0);
691}
692
693impl std::fmt::Debug for Scope {
694 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695 if self.0 == 0 {
696 write!(f, "Global")
697 } else {
698 f.debug_tuple("Scope").field(&self.0).finish()
699 }
700 }
701}