1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2use std::{
17 fmt::{self, Debug},
18 io,
19 num::NonZeroU64,
20 ops::{Bound, RangeBounds},
21 path::PathBuf,
22 pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27 io::{mixed::EncodedItem, BaoContentItem, Leaf},
28 ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32 channel::{mpsc, oneshot},
33 rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43pub(crate) trait HashSpecific {
44 fn hash(&self) -> Hash;
45
46 fn hash_short(&self) -> ArrayString<10> {
47 self.hash().fmt_short()
48 }
49}
50
51impl HashSpecific for ImportBaoMsg {
52 fn hash(&self) -> crate::Hash {
53 self.inner.hash
54 }
55}
56
57impl HashSpecific for ObserveMsg {
58 fn hash(&self) -> crate::Hash {
59 self.inner.hash
60 }
61}
62
63impl HashSpecific for ExportBaoMsg {
64 fn hash(&self) -> crate::Hash {
65 self.inner.hash
66 }
67}
68
69impl HashSpecific for ExportRangesMsg {
70 fn hash(&self) -> crate::Hash {
71 self.inner.hash
72 }
73}
74
75impl HashSpecific for ExportPathMsg {
76 fn hash(&self) -> crate::Hash {
77 self.inner.hash
78 }
79}
80
81pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
82
83impl HashSpecific for CreateTagMsg {
84 fn hash(&self) -> crate::Hash {
85 self.inner.value.hash
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct StoreService;
91impl irpc::Service for StoreService {}
92
93#[rpc_requests(StoreService, message = Command, alias = "Msg")]
94#[derive(Debug, Serialize, Deserialize)]
95pub enum Request {
96 #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
97 ListBlobs(ListRequest),
98 #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
99 Batch(BatchRequest),
100 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
101 DeleteBlobs(BlobDeleteRequest),
102 #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
103 ImportBao(ImportBaoRequest),
104 #[rpc(tx = mpsc::Sender<EncodedItem>)]
105 ExportBao(ExportBaoRequest),
106 #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
107 ExportRanges(ExportRangesRequest),
108 #[rpc(tx = mpsc::Sender<Bitfield>)]
109 Observe(ObserveRequest),
110 #[rpc(tx = oneshot::Sender<BlobStatus>)]
111 BlobStatus(BlobStatusRequest),
112 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
113 ImportBytes(ImportBytesRequest),
114 #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
115 ImportByteStream(ImportByteStreamRequest),
116 #[rpc(tx = mpsc::Sender<AddProgressItem>)]
117 ImportPath(ImportPathRequest),
118 #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
119 ExportPath(ExportPathRequest),
120 #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
121 ListTags(ListTagsRequest),
122 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
123 SetTag(SetTagRequest),
124 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
125 DeleteTags(DeleteTagsRequest),
126 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
127 RenameTag(RenameTagRequest),
128 #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
129 CreateTag(CreateTagRequest),
130 #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
131 ListTempTags(ListTempTagsRequest),
132 #[rpc(tx = oneshot::Sender<TempTag>)]
133 CreateTempTag(CreateTempTagRequest),
134 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
135 SyncDb(SyncDbRequest),
136 #[rpc(tx = oneshot::Sender<()>)]
137 Shutdown(ShutdownRequest),
138 #[rpc(tx = oneshot::Sender<super::Result<()>>)]
139 ClearProtected(ClearProtectedRequest),
140}
141
142#[derive(Debug, Serialize, Deserialize)]
143pub struct SyncDbRequest;
144
145#[derive(Debug, Serialize, Deserialize)]
146pub struct ShutdownRequest;
147
148#[derive(Debug, Serialize, Deserialize)]
149pub struct ClearProtectedRequest;
150
151#[derive(Debug, Serialize, Deserialize)]
152pub struct BlobStatusRequest {
153 pub hash: Hash,
154}
155
156#[derive(Debug, Serialize, Deserialize)]
157pub struct ListRequest;
158
159#[derive(Debug, Serialize, Deserialize)]
160pub struct BatchRequest;
161
162#[derive(Debug, Serialize, Deserialize)]
163pub enum BatchResponse {
164 Drop(HashAndFormat),
165 Ping,
166}
167
168#[derive(Debug, Serialize, Deserialize)]
170pub struct BlobDeleteRequest {
171 pub hashes: Vec<Hash>,
172 pub force: bool,
173}
174
175#[derive(Serialize, Deserialize)]
177pub struct ImportBytesRequest {
178 pub data: Bytes,
179 pub format: BlobFormat,
180 pub scope: Scope,
181}
182
183impl fmt::Debug for ImportBytesRequest {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 f.debug_struct("ImportBytes")
186 .field("data", &self.data.len())
187 .field("format", &self.format)
188 .field("scope", &self.scope)
189 .finish()
190 }
191}
192
193#[derive(Debug, Serialize, Deserialize)]
194pub struct ImportPathRequest {
195 pub path: PathBuf,
196 pub mode: ImportMode,
197 pub format: BlobFormat,
198 pub scope: Scope,
199}
200
201#[derive(Debug, Serialize, Deserialize)]
207pub struct ImportBaoRequest {
208 pub hash: Hash,
209 pub size: NonZeroU64,
210}
211
212#[derive(Debug, Serialize, Deserialize)]
214pub struct ObserveRequest {
215 pub hash: Hash,
216}
217
218#[derive(Debug, Serialize, Deserialize)]
222pub struct ExportBaoRequest {
223 pub hash: Hash,
224 pub ranges: ChunkRanges,
225}
226
227#[derive(Debug, Serialize, Deserialize)]
229pub struct ExportRangesRequest {
230 pub hash: Hash,
231 pub ranges: RangeSet2<u64>,
232}
233
234#[derive(Debug, Serialize, Deserialize)]
241pub struct ExportPathRequest {
242 pub hash: Hash,
243 pub mode: ExportMode,
244 pub target: PathBuf,
245}
246
247#[derive(Debug, Serialize, Deserialize)]
248pub struct ImportByteStreamRequest {
249 pub format: BlobFormat,
250 pub scope: Scope,
251}
252
253#[derive(Debug, Serialize, Deserialize)]
254pub enum ImportByteStreamUpdate {
255 Bytes(Bytes),
256 Done,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ListTagsRequest {
262 pub hash_seq: bool,
264 pub raw: bool,
266 pub from: Option<Tag>,
268 pub to: Option<Tag>,
270}
271
272impl ListTagsRequest {
273 pub fn range<R, E>(range: R) -> Self
275 where
276 R: RangeBounds<E>,
277 E: AsRef<[u8]>,
278 {
279 let (from, to) = tags_from_range(range);
280 Self {
281 from,
282 to,
283 raw: true,
284 hash_seq: true,
285 }
286 }
287
288 pub fn prefix(prefix: &[u8]) -> Self {
290 let from = Tag::from(prefix);
291 let to = from.next_prefix();
292 Self {
293 raw: true,
294 hash_seq: true,
295 from: Some(from),
296 to,
297 }
298 }
299
300 pub fn single(name: &[u8]) -> Self {
302 let from = Tag::from(name);
303 Self {
304 to: Some(from.successor()),
305 from: Some(from),
306 raw: true,
307 hash_seq: true,
308 }
309 }
310
311 pub fn all() -> Self {
313 Self {
314 raw: true,
315 hash_seq: true,
316 from: None,
317 to: None,
318 }
319 }
320
321 pub fn raw() -> Self {
323 Self {
324 raw: true,
325 hash_seq: false,
326 from: None,
327 to: None,
328 }
329 }
330
331 pub fn hash_seq() -> Self {
333 Self {
334 raw: false,
335 hash_seq: true,
336 from: None,
337 to: None,
338 }
339 }
340}
341
342#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
344pub struct TagInfo {
345 pub name: Tag,
347 pub format: BlobFormat,
349 pub hash: Hash,
351}
352
353impl From<TagInfo> for HashAndFormat {
354 fn from(tag_info: TagInfo) -> Self {
355 HashAndFormat {
356 hash: tag_info.hash,
357 format: tag_info.format,
358 }
359 }
360}
361
362impl TagInfo {
363 pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
365 let name = name.as_ref();
366 let value = value.into();
367 Self {
368 name: Tag::from(name),
369 hash: value.hash,
370 format: value.format,
371 }
372 }
373
374 pub fn hash_and_format(&self) -> HashAndFormat {
376 HashAndFormat {
377 hash: self.hash,
378 format: self.format,
379 }
380 }
381}
382
383pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
384where
385 R: RangeBounds<E>,
386 E: AsRef<[u8]>,
387{
388 let from = match range.start_bound() {
389 Bound::Included(start) => Some(Tag::from(start.as_ref())),
390 Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
391 Bound::Unbounded => None,
392 };
393 let to = match range.end_bound() {
394 Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
395 Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
396 Bound::Unbounded => None,
397 };
398 (from, to)
399}
400
401#[derive(Debug, Serialize, Deserialize)]
403pub struct CreateTempTagRequest {
404 pub scope: Scope,
405 pub value: HashAndFormat,
406}
407
408#[derive(Debug, Serialize, Deserialize)]
410pub struct ListTempTagsRequest;
411
412#[derive(Debug, Serialize, Deserialize)]
414pub struct RenameTagRequest {
415 pub from: Tag,
417 pub to: Tag,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct DeleteTagsRequest {
424 pub from: Option<Tag>,
426 pub to: Option<Tag>,
428}
429
430impl DeleteTagsRequest {
431 pub fn single(name: &[u8]) -> Self {
433 let name = Tag::from(name);
434 Self {
435 to: Some(name.successor()),
436 from: Some(name),
437 }
438 }
439
440 pub fn range<R, E>(range: R) -> Self
442 where
443 R: RangeBounds<E>,
444 E: AsRef<[u8]>,
445 {
446 let (from, to) = tags_from_range(range);
447 Self { from, to }
448 }
449
450 pub fn prefix(prefix: &[u8]) -> Self {
452 let from = Tag::from(prefix);
453 let to = from.next_prefix();
454 Self {
455 from: Some(from),
456 to,
457 }
458 }
459}
460
461#[derive(Debug, Serialize, Deserialize)]
463pub struct SetTagRequest {
464 pub name: Tag,
465 pub value: HashAndFormat,
466}
467
468#[derive(Debug, Serialize, Deserialize)]
470pub struct CreateTagRequest {
471 pub value: HashAndFormat,
472}
473
474#[derive(Debug, Serialize, Deserialize)]
476pub struct ProcessExitRequest {
477 pub code: i32,
478}
479
480#[derive(Debug, Serialize, Deserialize)]
494pub enum AddProgressItem {
495 CopyProgress(u64),
505 Size(u64),
514 CopyDone,
519 OutboardProgress(u64),
524 Done(TempTag),
533 Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
541}
542
543impl From<io::Error> for AddProgressItem {
544 fn from(e: io::Error) -> Self {
545 Self::Error(e)
546 }
547}
548
549#[derive(Debug, Serialize, Deserialize)]
550pub enum ExportRangesItem {
551 Size(u64),
556 Data(Leaf),
558 Error(super::Error),
560}
561
562impl From<super::Error> for ExportRangesItem {
563 fn from(e: super::Error) -> Self {
564 Self::Error(e)
565 }
566}
567
568impl From<Leaf> for ExportRangesItem {
569 fn from(leaf: Leaf) -> Self {
570 Self::Data(leaf)
571 }
572}
573
574#[derive(Debug, Serialize, Deserialize)]
583pub enum ExportProgressItem {
584 Size(u64),
589 CopyProgress(u64),
597 Done,
605 Error(super::Error),
613}
614
615impl From<super::Error> for ExportProgressItem {
616 fn from(e: super::Error) -> Self {
617 Self::Error(e)
618 }
619}
620
621#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
628pub enum ImportMode {
629 #[default]
634 Copy,
635 TryReference,
643}
644
645#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
652pub enum ExportMode {
653 #[default]
658 Copy,
659 TryReference,
668}
669
670#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub enum BlobStatus {
673 NotFound,
675 Partial {
677 size: Option<u64>,
679 },
680 Complete {
682 size: u64,
684 },
685}
686
687#[derive(
689 Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
690)]
691pub struct Scope(pub(crate) u64);
692
693impl Scope {
694 pub const GLOBAL: Self = Self(0);
695}
696
697impl std::fmt::Debug for Scope {
698 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
699 if self.0 == 0 {
700 write!(f, "Global")
701 } else {
702 f.debug_tuple("Scope").field(&self.0).finish()
703 }
704 }
705}