1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2use std::{
17 fmt::{self, Debug},
18 io,
19 num::NonZeroU64,
20 ops::{Bound, RangeBounds},
21 path::PathBuf,
22 pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27 io::{mixed::EncodedItem, BaoContentItem, Leaf},
28 ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32 channel::{mpsc, oneshot},
33 rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43#[allow(dead_code)]
44pub(crate) trait HashSpecific {
45 fn hash(&self) -> Hash;
46
47 fn hash_short(&self) -> ArrayString<10> {
48 self.hash().fmt_short()
49 }
50}
51
52impl HashSpecific for ImportBaoMsg {
53 fn hash(&self) -> crate::Hash {
54 self.inner.hash
55 }
56}
57
58impl HashSpecific for ObserveMsg {
59 fn hash(&self) -> crate::Hash {
60 self.inner.hash
61 }
62}
63
64impl HashSpecific for ExportBaoMsg {
65 fn hash(&self) -> crate::Hash {
66 self.inner.hash
67 }
68}
69
70impl HashSpecific for ExportRangesMsg {
71 fn hash(&self) -> crate::Hash {
72 self.inner.hash
73 }
74}
75
76impl HashSpecific for ExportPathMsg {
77 fn hash(&self) -> crate::Hash {
78 self.inner.hash
79 }
80}
81
82pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
83
84impl HashSpecific for CreateTagMsg {
85 fn hash(&self) -> crate::Hash {
86 self.inner.value.hash
87 }
88}
89
90#[rpc_requests(message = Command, alias = "Msg")]
91#[derive(Debug, Serialize, Deserialize)]
92pub enum Request {
93 #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
94 ListBlobs(ListRequest),
95 #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
96 Batch(BatchRequest),
97 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
98 DeleteBlobs(BlobDeleteRequest),
99 #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
100 ImportBao(ImportBaoRequest),
101 #[rpc(tx = mpsc::Sender<EncodedItem>)]
102 ExportBao(ExportBaoRequest),
103 #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
104 ExportRanges(ExportRangesRequest),
105 #[rpc(tx = mpsc::Sender<Bitfield>)]
106 Observe(ObserveRequest),
107 #[rpc(tx = oneshot::Sender<BlobStatus>)]
108 BlobStatus(BlobStatusRequest),
109 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
110 ImportBytes(ImportBytesRequest),
111 #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
112 ImportByteStream(ImportByteStreamRequest),
113 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
114 ImportPath(ImportPathRequest),
115 #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
116 ExportPath(ExportPathRequest),
117 #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
118 ListTags(ListTagsRequest),
119 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
120 SetTag(SetTagRequest),
121 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
122 DeleteTags(DeleteTagsRequest),
123 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
124 RenameTag(RenameTagRequest),
125 #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
126 CreateTag(CreateTagRequest),
127 #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
128 ListTempTags(ListTempTagsRequest),
129 #[rpc(tx = oneshot::Sender<TempTag>)]
130 CreateTempTag(CreateTempTagRequest),
131 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
132 SyncDb(SyncDbRequest),
133 #[rpc(tx = oneshot::Sender<()>)]
134 WaitIdle(WaitIdleRequest),
135 #[rpc(tx = oneshot::Sender<()>)]
136 Shutdown(ShutdownRequest),
137 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
138 ClearProtected(ClearProtectedRequest),
139}
140
141#[derive(Debug, Serialize, Deserialize)]
142pub struct WaitIdleRequest;
143
144#[derive(Debug, Serialize, Deserialize)]
145pub struct SyncDbRequest;
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct ShutdownRequest;
149
150#[derive(Debug, Serialize, Deserialize)]
151pub struct ClearProtectedRequest;
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct BlobStatusRequest {
155 pub hash: Hash,
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159pub struct ListRequest;
160
161#[derive(Debug, Serialize, Deserialize)]
162pub struct BatchRequest;
163
164#[derive(Debug, Serialize, Deserialize)]
165pub enum BatchResponse {
166 Drop(HashAndFormat),
167 Ping,
168}
169
170#[derive(Debug, Serialize, Deserialize)]
172pub struct BlobDeleteRequest {
173 pub hashes: Vec<Hash>,
174 pub force: bool,
175}
176
177#[derive(Serialize, Deserialize)]
179pub struct ImportBytesRequest {
180 pub data: Bytes,
181 pub format: BlobFormat,
182 pub scope: Scope,
183}
184
185impl fmt::Debug for ImportBytesRequest {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 f.debug_struct("ImportBytes")
188 .field("data", &self.data.len())
189 .field("format", &self.format)
190 .field("scope", &self.scope)
191 .finish()
192 }
193}
194
195#[derive(Debug, Serialize, Deserialize)]
196pub struct ImportPathRequest {
197 pub path: PathBuf,
198 pub mode: ImportMode,
199 pub format: BlobFormat,
200 pub scope: Scope,
201}
202
203#[derive(Debug, Serialize, Deserialize)]
209pub struct ImportBaoRequest {
210 pub hash: Hash,
211 pub size: NonZeroU64,
212}
213
214#[derive(Debug, Serialize, Deserialize)]
216pub struct ObserveRequest {
217 pub hash: Hash,
218}
219
220#[derive(Debug, Serialize, Deserialize)]
224pub struct ExportBaoRequest {
225 pub hash: Hash,
226 pub ranges: ChunkRanges,
227}
228
229#[derive(Debug, Serialize, Deserialize)]
231pub struct ExportRangesRequest {
232 pub hash: Hash,
233 pub ranges: RangeSet2<u64>,
234}
235
236#[derive(Debug, Serialize, Deserialize)]
243pub struct ExportPathRequest {
244 pub hash: Hash,
245 pub mode: ExportMode,
246 pub target: PathBuf,
247}
248
249#[derive(Debug, Serialize, Deserialize)]
250pub struct ImportByteStreamRequest {
251 pub format: BlobFormat,
252 pub scope: Scope,
253}
254
255#[derive(Debug, Serialize, Deserialize)]
256pub enum ImportByteStreamUpdate {
257 Bytes(Bytes),
258 Done,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ListTagsRequest {
264 pub hash_seq: bool,
266 pub raw: bool,
268 pub from: Option<Tag>,
270 pub to: Option<Tag>,
272}
273
274impl ListTagsRequest {
275 pub fn range<R, E>(range: R) -> Self
277 where
278 R: RangeBounds<E>,
279 E: AsRef<[u8]>,
280 {
281 let (from, to) = tags_from_range(range);
282 Self {
283 from,
284 to,
285 raw: true,
286 hash_seq: true,
287 }
288 }
289
290 pub fn prefix(prefix: &[u8]) -> Self {
292 let from = Tag::from(prefix);
293 let to = from.next_prefix();
294 Self {
295 raw: true,
296 hash_seq: true,
297 from: Some(from),
298 to,
299 }
300 }
301
302 pub fn single(name: &[u8]) -> Self {
304 let from = Tag::from(name);
305 Self {
306 to: Some(from.successor()),
307 from: Some(from),
308 raw: true,
309 hash_seq: true,
310 }
311 }
312
313 pub fn all() -> Self {
315 Self {
316 raw: true,
317 hash_seq: true,
318 from: None,
319 to: None,
320 }
321 }
322
323 pub fn raw() -> Self {
325 Self {
326 raw: true,
327 hash_seq: false,
328 from: None,
329 to: None,
330 }
331 }
332
333 pub fn hash_seq() -> Self {
335 Self {
336 raw: false,
337 hash_seq: true,
338 from: None,
339 to: None,
340 }
341 }
342}
343
344#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
346pub struct TagInfo {
347 pub name: Tag,
349 pub format: BlobFormat,
351 pub hash: Hash,
353}
354
355impl From<TagInfo> for HashAndFormat {
356 fn from(tag_info: TagInfo) -> Self {
357 HashAndFormat {
358 hash: tag_info.hash,
359 format: tag_info.format,
360 }
361 }
362}
363
364impl TagInfo {
365 pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
367 let name = name.as_ref();
368 let value = value.into();
369 Self {
370 name: Tag::from(name),
371 hash: value.hash,
372 format: value.format,
373 }
374 }
375
376 pub fn hash_and_format(&self) -> HashAndFormat {
378 HashAndFormat {
379 hash: self.hash,
380 format: self.format,
381 }
382 }
383}
384
385pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
386where
387 R: RangeBounds<E>,
388 E: AsRef<[u8]>,
389{
390 let from = match range.start_bound() {
391 Bound::Included(start) => Some(Tag::from(start.as_ref())),
392 Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
393 Bound::Unbounded => None,
394 };
395 let to = match range.end_bound() {
396 Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
397 Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
398 Bound::Unbounded => None,
399 };
400 (from, to)
401}
402
403#[derive(Debug, Serialize, Deserialize)]
405pub struct CreateTempTagRequest {
406 pub scope: Scope,
407 pub value: HashAndFormat,
408}
409
410#[derive(Debug, Serialize, Deserialize)]
412pub struct ListTempTagsRequest;
413
414#[derive(Debug, Serialize, Deserialize)]
416pub struct RenameTagRequest {
417 pub from: Tag,
419 pub to: Tag,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct DeleteTagsRequest {
426 pub from: Option<Tag>,
428 pub to: Option<Tag>,
430}
431
432impl DeleteTagsRequest {
433 pub fn single(name: &[u8]) -> Self {
435 let name = Tag::from(name);
436 Self {
437 to: Some(name.successor()),
438 from: Some(name),
439 }
440 }
441
442 pub fn range<R, E>(range: R) -> Self
444 where
445 R: RangeBounds<E>,
446 E: AsRef<[u8]>,
447 {
448 let (from, to) = tags_from_range(range);
449 Self { from, to }
450 }
451
452 pub fn prefix(prefix: &[u8]) -> Self {
454 let from = Tag::from(prefix);
455 let to = from.next_prefix();
456 Self {
457 from: Some(from),
458 to,
459 }
460 }
461}
462
463#[derive(Debug, Serialize, Deserialize)]
465pub struct SetTagRequest {
466 pub name: Tag,
467 pub value: HashAndFormat,
468}
469
470#[derive(Debug, Serialize, Deserialize)]
472pub struct CreateTagRequest {
473 pub value: HashAndFormat,
474}
475
476#[derive(Debug, Serialize, Deserialize)]
478pub struct ProcessExitRequest {
479 pub code: i32,
480}
481
482#[derive(Debug, Serialize, Deserialize)]
496pub enum AddProgressItem {
497 CopyProgress(u64),
507 Size(u64),
516 CopyDone,
521 OutboardProgress(u64),
526 Done(TempTag),
535 Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
543}
544
545impl From<io::Error> for AddProgressItem {
546 fn from(e: io::Error) -> Self {
547 Self::Error(e)
548 }
549}
550
551#[derive(Debug, Serialize, Deserialize)]
552pub enum ExportRangesItem {
553 Size(u64),
558 Data(Leaf),
560 Error(super::Error),
562}
563
564impl From<super::Error> for ExportRangesItem {
565 fn from(e: super::Error) -> Self {
566 Self::Error(e)
567 }
568}
569
570impl From<Leaf> for ExportRangesItem {
571 fn from(leaf: Leaf) -> Self {
572 Self::Data(leaf)
573 }
574}
575
576#[derive(Debug, Serialize, Deserialize)]
585pub enum ExportProgressItem {
586 Size(u64),
591 CopyProgress(u64),
599 Done,
607 Error(super::Error),
615}
616
617impl From<super::Error> for ExportProgressItem {
618 fn from(e: super::Error) -> Self {
619 Self::Error(e)
620 }
621}
622
623#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
630pub enum ImportMode {
631 #[default]
636 Copy,
637 TryReference,
645}
646
647#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
654pub enum ExportMode {
655 #[default]
660 Copy,
661 TryReference,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
674pub enum BlobStatus {
675 NotFound,
677 Partial {
679 size: Option<u64>,
681 },
682 Complete {
684 size: u64,
686 },
687}
688
689#[derive(
691 Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
692)]
693pub struct Scope(pub(crate) u64);
694
695impl Scope {
696 pub const GLOBAL: Self = Self(0);
697}
698
699impl std::fmt::Debug for Scope {
700 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
701 if self.0 == 0 {
702 write!(f, "Global")
703 } else {
704 f.debug_tuple("Scope").field(&self.0).finish()
705 }
706 }
707}