1use std::{
67 collections::{HashMap, HashSet},
68 fmt, fs,
69 future::Future,
70 io::Write,
71 num::NonZeroU64,
72 ops::Deref,
73 path::{Path, PathBuf},
74 sync::Arc,
75};
76
77use bao_tree::{
78 io::{
79 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
80 sync::ReadAt,
81 BaoContentItem, Leaf,
82 },
83 ChunkNum, ChunkRanges,
84};
85use bytes::Bytes;
86use delete_set::{BaoFilePart, ProtectHandle};
87use entry_state::{DataLocation, OutboardLocation};
88use gc::run_gc;
89use import::{ImportEntry, ImportSource};
90use irpc::channel::mpsc;
91use meta::{list_blobs, Snapshot};
92use n0_future::{future::yield_now, io};
93use nested_enum_utils::enum_conversions;
94use range_collections::range_set::RangeSetRange;
95use tokio::task::{Id, JoinError, JoinSet};
96use tracing::{error, instrument, trace};
97
98use crate::{
99 api::{
100 proto::{
101 self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
102 CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
103 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
104 ImportBaoRequest, ObserveMsg, Scope,
105 },
106 ApiClient,
107 },
108 store::{
109 util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
110 Hash,
111 },
112 util::{
113 channel::oneshot,
114 temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
115 ChunkRangesExt,
116 },
117};
118mod bao_file;
119use bao_file::{BaoFileHandle, BaoFileHandleWeak};
120mod delete_set;
121mod entry_state;
122mod import;
123mod meta;
124pub mod options;
125pub(crate) mod util;
126use entry_state::EntryState;
127use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
128use options::Options;
129use tracing::Instrument;
130mod gc;
131
132use super::HashAndFormat;
133use crate::api::{
134 self,
135 blobs::{AddProgressItem, ExportMode, ExportProgressItem},
136 Store,
137};
138
139fn new_uuid() -> [u8; 16] {
141 use rand::RngCore;
142 let mut rng = rand::thread_rng();
143 let mut bytes = [0u8; 16];
144 rng.fill_bytes(&mut bytes);
145 bytes
146}
147
148fn temp_name() -> String {
150 format!("{}.temp", hex::encode(new_uuid()))
151}
152
153#[derive(Debug)]
154#[enum_conversions()]
155pub(crate) enum InternalCommand {
156 Dump(meta::Dump),
157 FinishImport(ImportEntryMsg),
158 ClearScope(ClearScope),
159}
160
161#[derive(Debug)]
162pub(crate) struct ClearScope {
163 pub scope: Scope,
164}
165
166impl InternalCommand {
167 pub fn parent_span(&self) -> tracing::Span {
168 match self {
169 Self::Dump(_) => tracing::Span::current(),
170 Self::ClearScope(_) => tracing::Span::current(),
171 Self::FinishImport(cmd) => cmd
172 .parent_span_opt()
173 .cloned()
174 .unwrap_or_else(tracing::Span::current),
175 }
176 }
177}
178
179#[derive(Debug)]
181struct TaskContext {
182 pub options: Arc<Options>,
184 pub db: meta::Db,
186 pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
188 pub empty: BaoFileHandle,
190 pub protect: ProtectHandle,
192}
193
194impl TaskContext {
195 pub async fn clear_scope(&self, scope: Scope) {
196 self.internal_cmd_tx
197 .send(ClearScope { scope }.into())
198 .await
199 .ok();
200 }
201}
202
203#[derive(Debug)]
204struct Actor {
205 context: Arc<TaskContext>,
207 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
209 fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
211 tasks: JoinSet<()>,
213 running: HashSet<Id>,
215 handles: HashMap<Hash, Slot>,
217 temp_tags: TempTags,
219 _rt: RtWrapper,
221}
222
223struct HashContext {
227 slot: Slot,
228 ctx: Arc<TaskContext>,
229}
230
231impl HashContext {
232 pub fn db(&self) -> &meta::Db {
233 &self.ctx.db
234 }
235
236 pub fn options(&self) -> &Arc<Options> {
237 &self.ctx.options
238 }
239
240 pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option<BaoFileHandleWeak>> {
241 self.slot.0.lock().await
242 }
243
244 pub fn protect(&self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
245 self.ctx.protect.protect(hash, parts);
246 }
247
248 pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250 let (tx, rx) = oneshot::channel();
251 self.db()
252 .send(
253 meta::Update {
254 hash,
255 state,
256 tx: Some(tx),
257 span: tracing::Span::current(),
258 }
259 .into(),
260 )
261 .await?;
262 rx.await.map_err(|_e| io::Error::other(""))??;
263 Ok(())
264 }
265
266 pub async fn get_entry_state(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
267 if hash == Hash::EMPTY {
268 return Ok(Some(EntryState::Complete {
269 data_location: DataLocation::Inline(Bytes::new()),
270 outboard_location: OutboardLocation::NotNeeded,
271 }));
272 }
273 let (tx, rx) = oneshot::channel();
274 self.db()
275 .send(
276 meta::Get {
277 hash,
278 tx,
279 span: tracing::Span::current(),
280 }
281 .into(),
282 )
283 .await
284 .ok();
285 let res = rx.await.map_err(io::Error::other)?;
286 Ok(res.state?)
287 }
288
289 pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291 let (tx, rx) = oneshot::channel();
292 self.db()
293 .send(
294 meta::Set {
295 hash,
296 state,
297 tx,
298 span: tracing::Span::current(),
299 }
300 .into(),
301 )
302 .await
303 .map_err(io::Error::other)?;
304 rx.await.map_err(|_e| io::Error::other(""))??;
305 Ok(())
306 }
307
308 pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
309 if hash == Hash::EMPTY {
310 return Ok(self.ctx.empty.clone());
311 }
312 let res = self
313 .slot
314 .get_or_create(|| async {
315 let res = self.db().get(hash).await.map_err(io::Error::other)?;
316 let res = match res {
317 Some(state) => open_bao_file(&hash, state, &self.ctx).await,
318 None => Ok(BaoFileHandle::new_partial_mem(
319 hash,
320 self.ctx.options.clone(),
321 )),
322 };
323 Ok((res?, ()))
324 })
325 .await
326 .map_err(api::Error::from);
327 trace!("{res:?}");
328 let (res, _) = res?;
329 Ok(res)
330 }
331}
332
333async fn open_bao_file(
334 hash: &Hash,
335 state: EntryState<Bytes>,
336 ctx: &TaskContext,
337) -> io::Result<BaoFileHandle> {
338 let options = &ctx.options;
339 Ok(match state {
340 EntryState::Complete {
341 data_location,
342 outboard_location,
343 } => {
344 let data = match data_location {
345 DataLocation::Inline(data) => MemOrFile::Mem(data),
346 DataLocation::Owned(size) => {
347 let path = options.path.data_path(hash);
348 let file = fs::File::open(&path)?;
349 MemOrFile::File(FixedSize::new(file, size))
350 }
351 DataLocation::External(paths, size) => {
352 let Some(path) = paths.into_iter().next() else {
353 return Err(io::Error::other("no external data path"));
354 };
355 let file = fs::File::open(&path)?;
356 MemOrFile::File(FixedSize::new(file, size))
357 }
358 };
359 let outboard = match outboard_location {
360 OutboardLocation::NotNeeded => MemOrFile::empty(),
361 OutboardLocation::Inline(data) => MemOrFile::Mem(data),
362 OutboardLocation::Owned => {
363 let path = options.path.outboard_path(hash);
364 let file = fs::File::open(&path)?;
365 MemOrFile::File(file)
366 }
367 };
368 BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
369 }
370 EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
371 })
372}
373
374#[derive(Debug, Clone, Default)]
377pub(crate) struct Slot(Arc<tokio::sync::Mutex<Option<BaoFileHandleWeak>>>);
378
379impl Slot {
380 pub async fn is_live(&self) -> bool {
381 let slot = self.0.lock().await;
382 slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false)
383 }
384
385 pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
390 where
391 F: FnOnce() -> Fut,
392 Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
393 T: Default,
394 {
395 let mut slot = self.0.lock().await;
396 if let Some(weak) = &*slot {
397 if let Some(handle) = weak.upgrade() {
398 return Ok((handle, Default::default()));
399 }
400 }
401 let handle = make().await;
402 if let Ok((handle, _)) = &handle {
403 *slot = Some(handle.downgrade());
404 }
405 handle
406 }
407}
408
409impl Actor {
410 fn db(&self) -> &meta::Db {
411 &self.context.db
412 }
413
414 fn context(&self) -> Arc<TaskContext> {
415 self.context.clone()
416 }
417
418 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
419 let span = tracing::Span::current();
420 let id = self.tasks.spawn(fut.instrument(span)).id();
421 self.running.insert(id);
422 }
423
424 fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
425 match res {
426 Ok((id, _)) => {
427 self.running.remove(&id);
429 }
431 Err(e) => {
432 error!("task failed: {e}");
433 }
434 }
435 }
436
437 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
438 let CreateTempTagMsg { tx, inner, .. } = cmd;
439 let mut tt = self.temp_tags.create(inner.scope, inner.value);
440 if tx.is_rpc() {
441 tt.leak();
442 }
443 tx.send(tt).await.ok();
444 }
445
446 async fn clear_dead_handles(&mut self) {
447 let mut to_remove = Vec::new();
448 for (hash, slot) in &self.handles {
449 if !slot.is_live().await {
450 to_remove.push(*hash);
451 }
452 }
453 for hash in to_remove {
454 if let Some(slot) = self.handles.remove(&hash) {
455 let guard = slot.0.lock().await;
457 let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
458 if is_live {
459 drop(guard);
460 self.handles.insert(hash, slot);
461 }
462 }
463 }
464 }
465
466 async fn handle_command(&mut self, cmd: Command) {
467 let span = cmd.parent_span();
468 let _entered = span.enter();
469 match cmd {
470 Command::SyncDb(cmd) => {
471 trace!("{cmd:?}");
472 self.db().send(cmd.into()).await.ok();
473 }
474 Command::Shutdown(cmd) => {
475 trace!("{cmd:?}");
476 self.db().send(cmd.into()).await.ok();
477 }
478 Command::CreateTag(cmd) => {
479 trace!("{cmd:?}");
480 self.db().send(cmd.into()).await.ok();
481 }
482 Command::SetTag(cmd) => {
483 trace!("{cmd:?}");
484 self.db().send(cmd.into()).await.ok();
485 }
486 Command::ListTags(cmd) => {
487 trace!("{cmd:?}");
488 self.db().send(cmd.into()).await.ok();
489 }
490 Command::DeleteTags(cmd) => {
491 trace!("{cmd:?}");
492 self.db().send(cmd.into()).await.ok();
493 }
494 Command::RenameTag(cmd) => {
495 trace!("{cmd:?}");
496 self.db().send(cmd.into()).await.ok();
497 }
498 Command::ClearProtected(cmd) => {
499 trace!("{cmd:?}");
500 self.clear_dead_handles().await;
501 self.db().send(cmd.into()).await.ok();
502 }
503 Command::BlobStatus(cmd) => {
504 trace!("{cmd:?}");
505 self.db().send(cmd.into()).await.ok();
506 }
507 Command::ListBlobs(cmd) => {
508 trace!("{cmd:?}");
509 let (tx, rx) = tokio::sync::oneshot::channel();
510 self.db()
511 .send(
512 Snapshot {
513 tx,
514 span: cmd.span.clone(),
515 }
516 .into(),
517 )
518 .await
519 .ok();
520 if let Ok(snapshot) = rx.await {
521 self.spawn(list_blobs(snapshot, cmd));
522 }
523 }
524 Command::DeleteBlobs(cmd) => {
525 trace!("{cmd:?}");
526 self.db().send(cmd.into()).await.ok();
527 }
528 Command::Batch(cmd) => {
529 trace!("{cmd:?}");
530 let (id, scope) = self.temp_tags.create_scope();
531 self.spawn(handle_batch(cmd, id, scope, self.context()));
532 }
533 Command::CreateTempTag(cmd) => {
534 trace!("{cmd:?}");
535 self.create_temp_tag(cmd).await;
536 }
537 Command::ListTempTags(cmd) => {
538 trace!("{cmd:?}");
539 let tts = self.temp_tags.list();
540 cmd.tx.send(tts).await.ok();
541 }
542 Command::ImportBytes(cmd) => {
543 trace!("{cmd:?}");
544 self.spawn(import_bytes(cmd, self.context()));
545 }
546 Command::ImportByteStream(cmd) => {
547 trace!("{cmd:?}");
548 self.spawn(import_byte_stream(cmd, self.context()));
549 }
550 Command::ImportPath(cmd) => {
551 trace!("{cmd:?}");
552 self.spawn(import_path(cmd, self.context()));
553 }
554 Command::ExportPath(cmd) => {
555 trace!("{cmd:?}");
556 let ctx = self.hash_context(cmd.hash);
557 self.spawn(export_path(cmd, ctx));
558 }
559 Command::ExportBao(cmd) => {
560 trace!("{cmd:?}");
561 let ctx = self.hash_context(cmd.hash);
562 self.spawn(export_bao(cmd, ctx));
563 }
564 Command::ExportRanges(cmd) => {
565 trace!("{cmd:?}");
566 let ctx = self.hash_context(cmd.hash);
567 self.spawn(export_ranges(cmd, ctx));
568 }
569 Command::ImportBao(cmd) => {
570 trace!("{cmd:?}");
571 let ctx = self.hash_context(cmd.hash);
572 self.spawn(import_bao(cmd, ctx));
573 }
574 Command::Observe(cmd) => {
575 trace!("{cmd:?}");
576 let ctx = self.hash_context(cmd.hash);
577 self.spawn(observe(cmd, ctx));
578 }
579 }
580 }
581
582 fn hash_context(&mut self, hash: Hash) -> HashContext {
584 HashContext {
585 slot: self.handles.entry(hash).or_default().clone(),
586 ctx: self.context.clone(),
587 }
588 }
589
590 async fn handle_fs_command(&mut self, cmd: InternalCommand) {
591 let span = cmd.parent_span();
592 let _entered = span.enter();
593 match cmd {
594 InternalCommand::Dump(cmd) => {
595 trace!("{cmd:?}");
596 self.db().send(cmd.into()).await.ok();
597 }
598 InternalCommand::ClearScope(cmd) => {
599 trace!("{cmd:?}");
600 self.temp_tags.end_scope(cmd.scope);
601 }
602 InternalCommand::FinishImport(cmd) => {
603 trace!("{cmd:?}");
604 if cmd.hash == Hash::EMPTY {
605 cmd.tx
606 .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
607 .await
608 .ok();
609 } else {
610 let tt = self.temp_tags.create(
611 cmd.scope,
612 HashAndFormat {
613 hash: cmd.hash,
614 format: cmd.format,
615 },
616 );
617 let ctx = self.hash_context(cmd.hash);
618 self.spawn(finish_import(cmd, tt, ctx));
619 }
620 }
621 }
622 }
623
624 async fn run(mut self) {
625 loop {
626 tokio::select! {
627 cmd = self.cmd_rx.recv() => {
628 let Some(cmd) = cmd else {
629 break;
630 };
631 self.handle_command(cmd).await;
632 }
633 Some(cmd) = self.fs_cmd_rx.recv() => {
634 self.handle_fs_command(cmd).await;
635 }
636 Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
637 self.log_task_result(res);
638 }
639 }
640 }
641 }
642
643 async fn new(
644 db_path: PathBuf,
645 rt: RtWrapper,
646 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
647 fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
648 fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
649 options: Arc<Options>,
650 ) -> anyhow::Result<Self> {
651 trace!(
652 "creating data directory: {}",
653 options.path.data_path.display()
654 );
655 fs::create_dir_all(&options.path.data_path)?;
656 trace!(
657 "creating temp directory: {}",
658 options.path.temp_path.display()
659 );
660 fs::create_dir_all(&options.path.temp_path)?;
661 trace!(
662 "creating parent directory for db file{}",
663 db_path.parent().unwrap().display()
664 );
665 fs::create_dir_all(db_path.parent().unwrap())?;
666 let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
667 let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
668 let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
669 let slot_context = Arc::new(TaskContext {
670 options: options.clone(),
671 db: meta::Db::new(db_send),
672 internal_cmd_tx: fs_commands_tx,
673 empty: BaoFileHandle::new_complete(
674 Hash::EMPTY,
675 MemOrFile::empty(),
676 MemOrFile::empty(),
677 options,
678 ),
679 protect,
680 });
681 rt.spawn(db_actor.run());
682 Ok(Self {
683 context: slot_context,
684 cmd_rx,
685 fs_cmd_rx: fs_commands_rx,
686 tasks: JoinSet::new(),
687 running: HashSet::new(),
688 handles: Default::default(),
689 temp_tags: Default::default(),
690 _rt: rt,
691 })
692 }
693}
694
695struct RtWrapper(Option<tokio::runtime::Runtime>);
696
697impl From<tokio::runtime::Runtime> for RtWrapper {
698 fn from(rt: tokio::runtime::Runtime) -> Self {
699 Self(Some(rt))
700 }
701}
702
703impl fmt::Debug for RtWrapper {
704 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705 ValueOrPoisioned(self.0.as_ref()).fmt(f)
706 }
707}
708
709impl Deref for RtWrapper {
710 type Target = tokio::runtime::Runtime;
711
712 fn deref(&self) -> &Self::Target {
713 self.0.as_ref().unwrap()
714 }
715}
716
717impl Drop for RtWrapper {
718 fn drop(&mut self) {
719 if let Some(rt) = self.0.take() {
720 trace!("dropping tokio runtime");
721 tokio::task::block_in_place(|| {
722 drop(rt);
723 });
724 trace!("dropped tokio runtime");
725 }
726 }
727}
728
729async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
730 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
731 error!("batch failed: {cause}");
732 }
733 ctx.clear_scope(id).await;
734}
735
736async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
737 let BatchMsg { tx, mut rx, .. } = cmd;
738 trace!("created scope {}", id);
739 tx.send(id).await.map_err(api::Error::other)?;
740 while let Some(msg) = rx.recv().await? {
741 match msg {
742 BatchResponse::Drop(msg) => scope.on_drop(&msg),
743 BatchResponse::Ping => {}
744 }
745 }
746 Ok(())
747}
748
749#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
750async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) {
751 let res = match finish_import_impl(cmd.inner, ctx).await {
752 Ok(()) => {
753 if cmd.tx.is_rpc() {
756 trace!("leaking temp tag {}", tt.hash_and_format());
757 tt.leak();
758 }
759 AddProgressItem::Done(tt)
760 }
761 Err(cause) => AddProgressItem::Error(cause),
762 };
763 cmd.tx.send(res).await.ok();
764}
765
766async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> {
767 let ImportEntry {
768 source,
769 hash,
770 outboard,
771 ..
772 } = import_data;
773 let options = ctx.options();
774 match &source {
775 ImportSource::Memory(data) => {
776 debug_assert!(options.is_inlined_data(data.len() as u64));
777 }
778 ImportSource::External(_, _, size) => {
779 debug_assert!(!options.is_inlined_data(*size));
780 }
781 ImportSource::TempFile(_, _, size) => {
782 debug_assert!(!options.is_inlined_data(*size));
783 }
784 }
785 let guard = ctx.lock().await;
786 let handle = guard.as_ref().and_then(|x| x.upgrade());
787 ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]);
793 let data_location = match source {
794 ImportSource::Memory(data) => DataLocation::Inline(data),
795 ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
796 ImportSource::TempFile(path, _file, size) => {
797 let target = ctx.options().path.data_path(&hash);
800 trace!(
801 "moving temp file to owned data location: {} -> {}",
802 path.display(),
803 target.display()
804 );
805 if let Err(cause) = fs::rename(&path, &target) {
806 error!(
807 "failed to move temp file {} to owned data location {}: {cause}",
808 path.display(),
809 target.display()
810 );
811 }
812 DataLocation::Owned(size)
813 }
814 };
815 let outboard_location = match outboard {
816 MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
817 MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
818 MemOrFile::File(path) => {
819 let target = ctx.options().path.outboard_path(&hash);
821 trace!(
822 "moving temp file to owned outboard location: {} -> {}",
823 path.display(),
824 target.display()
825 );
826 if let Err(cause) = fs::rename(&path, &target) {
827 error!(
828 "failed to move temp file {} to owned outboard location {}: {cause}",
829 path.display(),
830 target.display()
831 );
832 }
833 OutboardLocation::Owned
834 }
835 };
836 if let Some(handle) = handle {
837 let data = match &data_location {
838 DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
839 DataLocation::Owned(size) => {
840 let path = ctx.options().path.data_path(&hash);
841 let file = fs::File::open(&path)?;
842 MemOrFile::File(FixedSize::new(file, *size))
843 }
844 DataLocation::External(paths, size) => {
845 let Some(path) = paths.iter().next() else {
846 return Err(io::Error::other("no external data path"));
847 };
848 let file = fs::File::open(path)?;
849 MemOrFile::File(FixedSize::new(file, *size))
850 }
851 };
852 let outboard = match &outboard_location {
853 OutboardLocation::NotNeeded => MemOrFile::empty(),
854 OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
855 OutboardLocation::Owned => {
856 let path = ctx.options().path.outboard_path(&hash);
857 let file = fs::File::open(&path)?;
858 MemOrFile::File(file)
859 }
860 };
861 handle.complete(data, outboard);
862 }
863 let state = EntryState::Complete {
864 data_location,
865 outboard_location,
866 };
867 ctx.update(hash, state).await?;
868 Ok(())
869}
870
871#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
872async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) {
873 trace!("{cmd:?}");
874 let ImportBaoMsg {
875 inner: ImportBaoRequest { size, hash },
876 rx,
877 tx,
878 ..
879 } = cmd;
880 let res = match ctx.get_or_create(hash).await {
881 Ok(handle) => import_bao_impl(size, rx, handle, ctx).await,
882 Err(cause) => Err(cause),
883 };
884 trace!("{res:?}");
885 tx.send(res).await.ok();
886}
887
888fn chunk_range(leaf: &Leaf) -> ChunkRanges {
889 let start = ChunkNum::chunks(leaf.offset);
890 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
891 (start..end).into()
892}
893
894async fn import_bao_impl(
895 size: NonZeroU64,
896 mut rx: mpsc::Receiver<BaoContentItem>,
897 handle: BaoFileHandle,
898 ctx: HashContext,
899) -> api::Result<()> {
900 trace!(
901 "importing bao: {} {} bytes",
902 handle.hash().fmt_short(),
903 size
904 );
905 let mut batch = Vec::<BaoContentItem>::new();
906 let mut ranges = ChunkRanges::empty();
907 while let Some(item) = rx.recv().await? {
908 if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
910 let bitfield = Bitfield::new_unchecked(ranges, size.into());
911 handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
912 batch.clear();
913 ranges = ChunkRanges::empty();
914 }
915 if let BaoContentItem::Leaf(leaf) = &item {
916 let leaf_range = chunk_range(leaf);
917 if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
918 {
919 return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
920 }
921 ranges |= leaf_range;
922 }
923 batch.push(item);
924 }
925 if !batch.is_empty() {
926 let bitfield = Bitfield::new_unchecked(ranges, size.into());
927 handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
928 }
929 Ok(())
930}
931
932#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
933async fn observe(cmd: ObserveMsg, ctx: HashContext) {
934 let Ok(handle) = ctx.get_or_create(cmd.hash).await else {
935 return;
936 };
937 handle.subscribe().forward(cmd.tx).await.ok();
938}
939
940#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
941async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
942 match ctx.get_or_create(cmd.hash).await {
943 Ok(handle) => {
944 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
945 cmd.tx
946 .send(ExportRangesItem::Error(cause.into()))
947 .await
948 .ok();
949 }
950 }
951 Err(cause) => {
952 cmd.tx.send(ExportRangesItem::Error(cause)).await.ok();
953 }
954 }
955}
956
957async fn export_ranges_impl(
958 cmd: ExportRangesRequest,
959 tx: &mut mpsc::Sender<ExportRangesItem>,
960 handle: BaoFileHandle,
961) -> io::Result<()> {
962 let ExportRangesRequest { ranges, hash } = cmd;
963 trace!(
964 "exporting ranges: {hash} {ranges:?} size={}",
965 handle.current_size()?
966 );
967 debug_assert!(handle.hash() == hash, "hash mismatch");
968 let bitfield = handle.bitfield()?;
969 let data = handle.data_reader();
970 let size = bitfield.size();
971 for range in ranges.iter() {
972 let range = match range {
973 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
974 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
975 };
976 let requested = ChunkRanges::bytes(range.start..range.end);
977 if !bitfield.ranges.is_superset(&requested) {
978 return Err(io::Error::other(format!(
979 "missing range: {requested:?}, present: {bitfield:?}",
980 )));
981 }
982 let bs = 1024;
983 let mut offset = range.start;
984 loop {
985 let end: u64 = (offset + bs).min(range.end);
986 let size = (end - offset) as usize;
987 tx.send(ExportRangesItem::Data(Leaf {
988 offset,
989 data: data.read_bytes_at(offset, size)?,
990 }))
991 .await?;
992 offset = end;
993 if offset >= range.end {
994 break;
995 }
996 }
997 }
998 Ok(())
999}
1000
1001#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1002async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1003 match ctx.get_or_create(cmd.hash).await {
1004 Ok(handle) => {
1005 if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
1006 cmd.tx
1007 .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1008 .await
1009 .ok();
1010 }
1011 }
1012 Err(cause) => {
1013 let cause = anyhow::anyhow!("failed to open file: {cause}");
1014 cmd.tx
1015 .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1016 .await
1017 .ok();
1018 }
1019 }
1020}
1021
1022async fn export_bao_impl(
1023 cmd: ExportBaoRequest,
1024 tx: &mut mpsc::Sender<EncodedItem>,
1025 handle: BaoFileHandle,
1026) -> anyhow::Result<()> {
1027 let ExportBaoRequest { ranges, hash } = cmd;
1028 debug_assert!(handle.hash() == hash, "hash mismatch");
1029 let outboard = handle.outboard()?;
1030 let size = outboard.tree.size();
1031 if size == 0 && hash != Hash::EMPTY {
1032 return Ok(());
1034 }
1035 trace!("exporting bao: {hash} {ranges:?} size={size}",);
1036 let data = handle.data_reader();
1037 let tx = BaoTreeSender::new(tx);
1038 traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1039 Ok(())
1040}
1041
1042#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1043async fn export_path(cmd: ExportPathMsg, ctx: HashContext) {
1044 let ExportPathMsg { inner, mut tx, .. } = cmd;
1045 if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await {
1046 tx.send(cause.into()).await.ok();
1047 }
1048}
1049
1050async fn export_path_impl(
1051 cmd: ExportPathRequest,
1052 tx: &mut mpsc::Sender<ExportProgressItem>,
1053 ctx: HashContext,
1054) -> api::Result<()> {
1055 let ExportPathRequest { mode, target, .. } = cmd;
1056 if !target.is_absolute() {
1057 return Err(api::Error::io(
1058 io::ErrorKind::InvalidInput,
1059 "path is not absolute",
1060 ));
1061 }
1062 if let Some(parent) = target.parent() {
1063 fs::create_dir_all(parent)?;
1064 }
1065 let _guard = ctx.lock().await;
1066 let state = ctx.get_entry_state(cmd.hash).await?;
1067 let (data_location, outboard_location) = match state {
1068 Some(EntryState::Complete {
1069 data_location,
1070 outboard_location,
1071 }) => (data_location, outboard_location),
1072 Some(EntryState::Partial { .. }) => {
1073 return Err(api::Error::io(
1074 io::ErrorKind::InvalidInput,
1075 "cannot export partial entry",
1076 ));
1077 }
1078 None => {
1079 return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1080 }
1081 };
1082 trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1083 let data = match data_location {
1084 DataLocation::Inline(data) => MemOrFile::Mem(data),
1085 DataLocation::Owned(size) => {
1086 MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size))
1087 }
1088 DataLocation::External(paths, size) => MemOrFile::File((
1089 paths
1090 .into_iter()
1091 .next()
1092 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no external data path"))?,
1093 size,
1094 )),
1095 };
1096 let size = match &data {
1097 MemOrFile::Mem(data) => data.len() as u64,
1098 MemOrFile::File((_, size)) => *size,
1099 };
1100 tx.send(ExportProgressItem::Size(size))
1101 .await
1102 .map_err(api::Error::other)?;
1103 match data {
1104 MemOrFile::Mem(data) => {
1105 let mut target = fs::File::create(&target)?;
1106 target.write_all(&data)?;
1107 }
1108 MemOrFile::File((source_path, size)) => match mode {
1109 ExportMode::Copy => {
1110 let source = fs::File::open(&source_path)?;
1111 let mut target = fs::File::create(&target)?;
1112 copy_with_progress(&source, size, &mut target, tx).await?
1113 }
1114 ExportMode::TryReference => {
1115 match std::fs::rename(&source_path, &target) {
1116 Ok(()) => {}
1117 Err(cause) => {
1118 const ERR_CROSS: i32 = 18;
1119 if cause.raw_os_error() == Some(ERR_CROSS) {
1120 let source = fs::File::open(&source_path)?;
1121 let mut target = fs::File::create(&target)?;
1122 copy_with_progress(&source, size, &mut target, tx).await?;
1123 } else {
1124 return Err(cause.into());
1125 }
1126 }
1127 }
1128 ctx.set(
1129 cmd.hash,
1130 EntryState::Complete {
1131 data_location: DataLocation::External(vec![target], size),
1132 outboard_location,
1133 },
1134 )
1135 .await?;
1136 }
1137 },
1138 }
1139 tx.send(ExportProgressItem::Done)
1140 .await
1141 .map_err(api::Error::other)?;
1142 Ok(())
1143}
1144
1145async fn copy_with_progress(
1146 file: impl ReadAt,
1147 size: u64,
1148 target: &mut impl Write,
1149 tx: &mut mpsc::Sender<ExportProgressItem>,
1150) -> io::Result<()> {
1151 let mut offset = 0;
1152 let mut buf = vec![0u8; 1024 * 1024];
1153 while offset < size {
1154 let remaining = buf.len().min((size - offset) as usize);
1155 let buf: &mut [u8] = &mut buf[..remaining];
1156 file.read_exact_at(offset, buf)?;
1157 target.write_all(buf)?;
1158 tx.try_send(ExportProgressItem::CopyProgress(offset))
1159 .await
1160 .map_err(|_e| io::Error::other(""))?;
1161 yield_now().await;
1162 offset += buf.len() as u64;
1163 }
1164 Ok(())
1165}
1166
1167impl FsStore {
1168 pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1170 let path = root.as_ref();
1171 let db_path = path.join("blobs.db");
1172 let options = Options::new(path);
1173 Self::load_with_opts(db_path, options).await
1174 }
1175
1176 pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1178 let rt = tokio::runtime::Builder::new_multi_thread()
1179 .thread_name("iroh-blob-store")
1180 .enable_time()
1181 .build()?;
1182 let handle = rt.handle().clone();
1183 let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1184 let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1185 let gc_config = options.gc.clone();
1186 let actor = handle
1187 .spawn(Actor::new(
1188 db_path,
1189 rt.into(),
1190 commands_rx,
1191 fs_commands_rx,
1192 fs_commands_tx.clone(),
1193 Arc::new(options),
1194 ))
1195 .await??;
1196 handle.spawn(actor.run());
1197 let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1198 if let Some(config) = gc_config {
1199 handle.spawn(run_gc(store.deref().clone(), config));
1200 }
1201 Ok(store)
1202 }
1203}
1204
1205#[derive(Debug, Clone)]
1215pub struct FsStore {
1216 sender: ApiClient,
1217 db: tokio::sync::mpsc::Sender<InternalCommand>,
1218}
1219
1220impl Deref for FsStore {
1221 type Target = Store;
1222
1223 fn deref(&self) -> &Self::Target {
1224 Store::ref_from_sender(&self.sender)
1225 }
1226}
1227
1228impl AsRef<Store> for FsStore {
1229 fn as_ref(&self) -> &Store {
1230 self.deref()
1231 }
1232}
1233
1234impl FsStore {
1235 fn new(
1236 sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1237 db: tokio::sync::mpsc::Sender<InternalCommand>,
1238 ) -> Self {
1239 Self {
1240 sender: sender.into(),
1241 db,
1242 }
1243 }
1244
1245 pub async fn dump(&self) -> anyhow::Result<()> {
1246 let (tx, rx) = oneshot::channel();
1247 self.db
1248 .send(
1249 meta::Dump {
1250 tx,
1251 span: tracing::Span::current(),
1252 }
1253 .into(),
1254 )
1255 .await?;
1256 rx.await??;
1257 Ok(())
1258 }
1259}
1260
1261#[cfg(test)]
1262pub mod tests {
1263 use core::panic;
1264 use std::collections::{HashMap, HashSet};
1265
1266 use bao_tree::{
1267 io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1268 ChunkRanges,
1269 };
1270 use n0_future::{stream, Stream, StreamExt};
1271 use testresult::TestResult;
1272 use walkdir::WalkDir;
1273
1274 use super::*;
1275 use crate::{
1276 api::blobs::Bitfield,
1277 store::{
1278 util::{read_checksummed, SliceInfoExt, Tag},
1279 HashAndFormat, IROH_BLOCK_SIZE,
1280 },
1281 };
1282
1283 pub const INTERESTING_SIZES: [usize; 8] = [
1285 0, 1, 1024, 1024 * 16 - 1, 1024 * 16, 1024 * 16 + 1, 1024 * 1024, 1024 * 1024 * 8, ];
1294
1295 pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1298 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1299 let mut encoded = Vec::new();
1300 let size = data.len() as u64;
1301 encoded.extend_from_slice(&size.to_le_bytes());
1302 bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1303 Ok((outboard.root.into(), encoded))
1304 }
1305
1306 pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1307 let last_chunk = ChunkNum::chunks(size);
1308 let data_range = ChunkRanges::from(..last_chunk);
1309 let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1310 if last_chunk == 0 {
1311 ChunkRanges::all()
1312 } else {
1313 ChunkRanges::from(last_chunk - 1..)
1314 }
1315 } else {
1316 ranges.clone()
1317 };
1318 round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1319 }
1320
1321 fn create_n0_bao_full(
1322 data: &[u8],
1323 ranges: &ChunkRanges,
1324 ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1325 let ranges = round_up_request(data.len() as u64, ranges);
1326 let (hash, encoded) = create_n0_bao(data, &ranges)?;
1327 Ok((hash, ranges, encoded))
1328 }
1329
1330 #[tokio::test]
1331 async fn test_observe() -> TestResult<()> {
1333 tracing_subscriber::fmt::try_init().ok();
1334 let testdir = tempfile::tempdir()?;
1335 let db_dir = testdir.path().join("db");
1336 let options = Options::new(&db_dir);
1337 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1338 let sizes = INTERESTING_SIZES;
1339 for size in sizes {
1340 let data = test_data(size);
1341 let ranges = ChunkRanges::all();
1342 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1343 let obs = store.observe(hash);
1344 let task = tokio::spawn(async move {
1345 obs.await_completion().await?;
1346 api::Result::Ok(())
1347 });
1348 store.import_bao_bytes(hash, ranges, bao).await?;
1349 task.await??;
1350 }
1351 Ok(())
1352 }
1353
1354 pub fn test_data(n: usize) -> Bytes {
1360 let mut res = Vec::with_capacity(n);
1361 for i in 0..n {
1363 let block_num = i / 1024;
1365 let ascii_val = 65 + (block_num % 26) as u8;
1367 res.push(ascii_val);
1368 }
1369 Bytes::from(res)
1370 }
1371
1372 #[tokio::test]
1374 async fn test_import_byte_stream() -> TestResult<()> {
1375 tracing_subscriber::fmt::try_init().ok();
1376 let testdir = tempfile::tempdir()?;
1377 let db_dir = testdir.path().join("db");
1378 let store = FsStore::load(db_dir).await?;
1379 for size in INTERESTING_SIZES {
1380 let expected = test_data(size);
1381 let expected_hash = Hash::new(&expected);
1382 let stream = bytes_to_stream(expected.clone(), 1023);
1383 let obs = store.observe(expected_hash);
1384 let tt = store.add_stream(stream).await.temp_tag().await?;
1385 assert_eq!(expected_hash, *tt.hash());
1386 obs.await_completion().await?;
1388 let actual = store.get_bytes(expected_hash).await?;
1389 assert_eq!(&expected, &actual);
1391 }
1392 Ok(())
1393 }
1394
1395 #[tokio::test]
1397 async fn test_import_bytes() -> TestResult<()> {
1398 tracing_subscriber::fmt::try_init().ok();
1399 let testdir = tempfile::tempdir()?;
1400 let db_dir = testdir.path().join("db");
1401 let store = FsStore::load(&db_dir).await?;
1402 let sizes = INTERESTING_SIZES;
1403 trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1404 for size in sizes {
1405 let expected = test_data(size);
1406 let expected_hash = Hash::new(&expected);
1407 let obs = store.observe(expected_hash);
1408 let tt = store.add_bytes(expected.clone()).await?;
1409 assert_eq!(expected_hash, tt.hash);
1410 obs.await_completion().await?;
1412 let actual = store.get_bytes(expected_hash).await?;
1413 assert_eq!(&expected, &actual);
1415 }
1416 store.shutdown().await?;
1417 dump_dir_full(db_dir)?;
1418 Ok(())
1419 }
1420
1421 #[tokio::test]
1423 #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1424 async fn test_roundtrip_bytes_small() -> TestResult<()> {
1425 tracing_subscriber::fmt::try_init().ok();
1426 let testdir = tempfile::tempdir()?;
1427 let db_dir = testdir.path().join("db");
1428 let store = FsStore::load(db_dir).await?;
1429 for size in INTERESTING_SIZES
1430 .into_iter()
1431 .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1432 {
1433 let expected = test_data(size);
1434 let expected_hash = Hash::new(&expected);
1435 let obs = store.observe(expected_hash);
1436 let tt = store.add_bytes(expected.clone()).await?;
1437 assert_eq!(expected_hash, tt.hash);
1438 let actual = store.get_bytes(expected_hash).await?;
1439 assert_eq!(&expected, &actual);
1441 assert_eq!(
1442 &expected.addr(),
1443 &actual.addr(),
1444 "address mismatch for size {size}"
1445 );
1446 obs.await_completion().await?;
1450 }
1451 store.shutdown().await?;
1452 Ok(())
1453 }
1454
1455 #[tokio::test]
1457 async fn test_import_path() -> TestResult<()> {
1458 tracing_subscriber::fmt::try_init().ok();
1459 let testdir = tempfile::tempdir()?;
1460 let db_dir = testdir.path().join("db");
1461 let store = FsStore::load(db_dir).await?;
1462 for size in INTERESTING_SIZES {
1463 let expected = test_data(size);
1464 let expected_hash = Hash::new(&expected);
1465 let path = testdir.path().join(format!("in-{size}"));
1466 fs::write(&path, &expected)?;
1467 let obs = store.observe(expected_hash);
1468 let tt = store.add_path(&path).await?;
1469 assert_eq!(expected_hash, tt.hash);
1470 obs.await_completion().await?;
1472 let actual = store.get_bytes(expected_hash).await?;
1473 assert_eq!(&expected, &actual, "size={size}");
1475 }
1476 dump_dir_full(testdir.path())?;
1477 Ok(())
1478 }
1479
1480 #[tokio::test]
1482 async fn test_export_path() -> TestResult<()> {
1483 tracing_subscriber::fmt::try_init().ok();
1484 let testdir = tempfile::tempdir()?;
1485 let db_dir = testdir.path().join("db");
1486 let store = FsStore::load(db_dir).await?;
1487 for size in INTERESTING_SIZES {
1488 let expected = test_data(size);
1489 let expected_hash = Hash::new(&expected);
1490 let tt = store.add_bytes(expected.clone()).await?;
1491 assert_eq!(expected_hash, tt.hash);
1492 let out_path = testdir.path().join(format!("out-{size}"));
1493 store.export(expected_hash, &out_path).await?;
1494 let actual = fs::read(&out_path)?;
1495 assert_eq!(expected, actual);
1496 }
1497 Ok(())
1498 }
1499
1500 #[tokio::test]
1501 async fn test_import_bao_ranges() -> TestResult<()> {
1502 tracing_subscriber::fmt::try_init().ok();
1503 let testdir = tempfile::tempdir()?;
1504 let db_dir = testdir.path().join("db");
1505 {
1506 let store = FsStore::load(&db_dir).await?;
1507 let data = test_data(100000);
1508 let ranges = ChunkRanges::chunks(16..32);
1509 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1510 store
1511 .import_bao_bytes(hash, ranges.clone(), bao.clone())
1512 .await?;
1513 let bitfield = store.observe(hash).await?;
1514 assert_eq!(bitfield.ranges, ranges);
1515 assert_eq!(bitfield.size(), data.len() as u64);
1516 let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1517 assert_eq!(export, bao);
1518 }
1519 Ok(())
1520 }
1521
1522 #[tokio::test]
1523 async fn test_import_bao_minimal() -> TestResult<()> {
1524 tracing_subscriber::fmt::try_init().ok();
1525 let testdir = tempfile::tempdir()?;
1526 let sizes = [1];
1527 let db_dir = testdir.path().join("db");
1528 {
1529 let store = FsStore::load(&db_dir).await?;
1530 for size in sizes {
1531 let data = vec![0u8; size];
1532 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1533 let data = Bytes::from(encoded);
1534 store
1535 .import_bao_bytes(hash, ChunkRanges::all(), data)
1536 .await?;
1537 }
1538 store.shutdown().await?;
1539 }
1540 Ok(())
1541 }
1542
1543 #[tokio::test]
1544 async fn test_import_bao_simple() -> TestResult<()> {
1545 tracing_subscriber::fmt::try_init().ok();
1546 let testdir = tempfile::tempdir()?;
1547 let sizes = [1048576];
1548 let db_dir = testdir.path().join("db");
1549 {
1550 let store = FsStore::load(&db_dir).await?;
1551 for size in sizes {
1552 let data = vec![0u8; size];
1553 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1554 let data = Bytes::from(encoded);
1555 trace!("importing size={}", size);
1556 store
1557 .import_bao_bytes(hash, ChunkRanges::all(), data)
1558 .await?;
1559 }
1560 store.shutdown().await?;
1561 }
1562 Ok(())
1563 }
1564
1565 #[tokio::test]
1566 async fn test_import_bao_persistence_full() -> TestResult<()> {
1567 tracing_subscriber::fmt::try_init().ok();
1568 let testdir = tempfile::tempdir()?;
1569 let sizes = INTERESTING_SIZES;
1570 let db_dir = testdir.path().join("db");
1571 {
1572 let store = FsStore::load(&db_dir).await?;
1573 for size in sizes {
1574 let data = vec![0u8; size];
1575 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1576 let data = Bytes::from(encoded);
1577 store
1578 .import_bao_bytes(hash, ChunkRanges::all(), data)
1579 .await?;
1580 }
1581 store.shutdown().await?;
1582 }
1583 {
1584 let store = FsStore::load(&db_dir).await?;
1585 for size in sizes {
1586 let expected = vec![0u8; size];
1587 let hash = Hash::new(&expected);
1588 let actual = store
1589 .export_bao(hash, ChunkRanges::all())
1590 .data_to_vec()
1591 .await?;
1592 assert_eq!(&expected, &actual);
1593 }
1594 store.shutdown().await?;
1595 }
1596 Ok(())
1597 }
1598
1599 #[tokio::test]
1600 async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1601 tracing_subscriber::fmt::try_init().ok();
1602 let testdir = tempfile::tempdir()?;
1603 let sizes = INTERESTING_SIZES;
1604 let db_dir = testdir.path().join("db");
1605 let just_size = ChunkRanges::last_chunk();
1606 {
1607 let store = FsStore::load(&db_dir).await?;
1608 for size in sizes {
1609 let data = test_data(size);
1610 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1611 let data = Bytes::from(encoded);
1612 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1613 panic!("failed to import size={size}: {cause}");
1614 }
1615 }
1616 store.dump().await?;
1617 store.shutdown().await?;
1618 }
1619 {
1620 let store = FsStore::load(&db_dir).await?;
1621 store.dump().await?;
1622 for size in sizes {
1623 let data = test_data(size);
1624 let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1625 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1626 Ok(actual) => actual,
1627 Err(cause) => panic!("failed to export size={size}: {cause}"),
1628 };
1629 assert_eq!(&expected, &actual);
1630 }
1631 store.shutdown().await?;
1632 }
1633 dump_dir_full(testdir.path())?;
1634 Ok(())
1635 }
1636
1637 #[tokio::test]
1638 async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1639 tracing_subscriber::fmt::try_init().ok();
1640 let testdir = tempfile::tempdir()?;
1641 let sizes = INTERESTING_SIZES;
1642 let db_dir = testdir.path().join("db");
1643 let just_size = ChunkRanges::last_chunk();
1644 {
1646 let store = FsStore::load(&db_dir).await?;
1647 for size in sizes {
1648 let data = test_data(size);
1649 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1650 let data = Bytes::from(encoded);
1651 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1652 panic!("failed to import size={size}: {cause}");
1653 }
1654 }
1655 store.dump().await?;
1656 store.shutdown().await?;
1657 }
1658 dump_dir_full(testdir.path())?;
1659 {
1661 let store = FsStore::load(&db_dir).await?;
1662 for size in sizes {
1663 let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1664 if remaining.is_empty() {
1665 continue;
1666 }
1667 let data = test_data(size);
1668 let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1669 let data = Bytes::from(encoded);
1670 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1671 panic!("failed to import size={size}: {cause}");
1672 }
1673 }
1674 store.dump().await?;
1675 store.shutdown().await?;
1676 }
1677 {
1679 let store = FsStore::load(&db_dir).await?;
1680 store.dump().await?;
1681 for size in sizes {
1682 let data = test_data(size);
1683 let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1684 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1685 Ok(actual) => actual,
1686 Err(cause) => panic!("failed to export size={size}: {cause}"),
1687 };
1688 assert_eq!(&expected, &actual);
1689 }
1690 store.dump().await?;
1691 store.shutdown().await?;
1692 }
1693 dump_dir_full(testdir.path())?;
1694 Ok(())
1695 }
1696
1697 fn just_size() -> ChunkRanges {
1698 ChunkRanges::last_chunk()
1699 }
1700
1701 #[tokio::test]
1702 async fn test_import_bao_persistence_observe() -> TestResult<()> {
1703 tracing_subscriber::fmt::try_init().ok();
1704 let testdir = tempfile::tempdir()?;
1705 let sizes = INTERESTING_SIZES;
1706 let db_dir = testdir.path().join("db");
1707 let just_size = just_size();
1708 {
1710 let store = FsStore::load(&db_dir).await?;
1711 for size in sizes {
1712 let data = test_data(size);
1713 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1714 let data = Bytes::from(encoded);
1715 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1716 panic!("failed to import size={size}: {cause}");
1717 }
1718 }
1719 store.dump().await?;
1720 store.shutdown().await?;
1721 }
1722 dump_dir_full(testdir.path())?;
1723 {
1725 let store = FsStore::load(&db_dir).await?;
1726 for size in sizes {
1727 let expected_ranges = round_up_request(size as u64, &just_size);
1728 let data = test_data(size);
1729 let hash = Hash::new(&data);
1730 let bitfield = store.observe(hash).await?;
1731 assert_eq!(bitfield.ranges, expected_ranges);
1732 }
1733 store.dump().await?;
1734 store.shutdown().await?;
1735 }
1736 Ok(())
1737 }
1738
1739 #[tokio::test]
1740 async fn test_import_bao_persistence_recover() -> TestResult<()> {
1741 tracing_subscriber::fmt::try_init().ok();
1742 let testdir = tempfile::tempdir()?;
1743 let sizes = INTERESTING_SIZES;
1744 let db_dir = testdir.path().join("db");
1745 let options = Options::new(&db_dir);
1746 let just_size = just_size();
1747 {
1749 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1750 for size in sizes {
1751 let data = test_data(size);
1752 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1753 let data = Bytes::from(encoded);
1754 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1755 panic!("failed to import size={size}: {cause}");
1756 }
1757 }
1758 store.dump().await?;
1759 store.shutdown().await?;
1760 }
1761 delete_rec(testdir.path(), "bitfield")?;
1762 dump_dir_full(testdir.path())?;
1763 {
1765 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1766 for size in sizes {
1767 let expected_ranges = round_up_request(size as u64, &just_size);
1768 let data = test_data(size);
1769 let hash = Hash::new(&data);
1770 let bitfield = store.observe(hash).await?;
1771 assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1772 }
1773 store.dump().await?;
1774 store.shutdown().await?;
1775 }
1776 Ok(())
1777 }
1778
1779 #[tokio::test]
1780 async fn test_import_bytes_persistence_full() -> TestResult<()> {
1781 tracing_subscriber::fmt::try_init().ok();
1782 let testdir = tempfile::tempdir()?;
1783 let sizes = INTERESTING_SIZES;
1784 let db_dir = testdir.path().join("db");
1785 {
1786 let store = FsStore::load(&db_dir).await?;
1787 let mut tts = Vec::new();
1788 for size in sizes {
1789 let data = test_data(size);
1790 let data = data;
1791 tts.push(store.add_bytes(data.clone()).await?);
1792 }
1793 store.dump().await?;
1794 store.shutdown().await?;
1795 }
1796 {
1797 let store = FsStore::load(&db_dir).await?;
1798 store.dump().await?;
1799 for size in sizes {
1800 let expected = test_data(size);
1801 let hash = Hash::new(&expected);
1802 let Ok(actual) = store
1803 .export_bao(hash, ChunkRanges::all())
1804 .data_to_vec()
1805 .await
1806 else {
1807 panic!("failed to export size={size}");
1808 };
1809 assert_eq!(&expected, &actual, "size={size}");
1810 }
1811 store.shutdown().await?;
1812 }
1813 Ok(())
1814 }
1815
1816 async fn test_batch(store: &Store) -> TestResult<()> {
1817 let batch = store.blobs().batch().await?;
1818 let tt1 = batch.temp_tag(Hash::new("foo")).await?;
1819 let tt2 = batch.add_slice("boo").await?;
1820 let tts = store
1821 .tags()
1822 .list_temp_tags()
1823 .await?
1824 .collect::<HashSet<_>>()
1825 .await;
1826 assert!(tts.contains(tt1.hash_and_format()));
1827 assert!(tts.contains(tt2.hash_and_format()));
1828 drop(batch);
1829 store.sync_db().await?;
1830 let tts = store
1831 .tags()
1832 .list_temp_tags()
1833 .await?
1834 .collect::<HashSet<_>>()
1835 .await;
1836 assert!(!tts.contains(tt1.hash_and_format()));
1838 assert!(!tts.contains(tt2.hash_and_format()));
1839 drop(tt1);
1840 drop(tt2);
1841 Ok(())
1842 }
1843
1844 #[tokio::test]
1845 async fn test_batch_fs() -> TestResult<()> {
1846 tracing_subscriber::fmt::try_init().ok();
1847 let testdir = tempfile::tempdir()?;
1848 let db_dir = testdir.path().join("db");
1849 let store = FsStore::load(db_dir).await?;
1850 test_batch(&store).await
1851 }
1852
1853 #[tokio::test]
1854 async fn smoke() -> TestResult<()> {
1855 tracing_subscriber::fmt::try_init().ok();
1856 let testdir = tempfile::tempdir()?;
1857 let db_dir = testdir.path().join("db");
1858 let store = FsStore::load(db_dir).await?;
1859 let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
1860 store.tags().set(Tag::from("test"), haf).await?;
1861 store.tags().set(Tag::from("boo"), haf).await?;
1862 store.tags().set(Tag::from("bar"), haf).await?;
1863 let sizes = INTERESTING_SIZES;
1864 let mut hashes = Vec::new();
1865 let mut data_by_hash = HashMap::new();
1866 let mut bao_by_hash = HashMap::new();
1867 for size in sizes {
1868 let data = vec![0u8; size];
1869 let data = Bytes::from(data);
1870 let tt = store.add_bytes(data.clone()).temp_tag().await?;
1871 data_by_hash.insert(*tt.hash(), data);
1872 hashes.push(tt);
1873 }
1874 store.sync_db().await?;
1875 for tt in &hashes {
1876 let hash = *tt.hash();
1877 let path = testdir.path().join(format!("{hash}.txt"));
1878 store.export(hash, path).await?;
1879 }
1880 for tt in &hashes {
1881 let hash = tt.hash();
1882 let data = store
1883 .export_bao(*hash, ChunkRanges::all())
1884 .data_to_vec()
1885 .await
1886 .unwrap();
1887 assert_eq!(data, data_by_hash[hash].to_vec());
1888 let bao = store
1889 .export_bao(*hash, ChunkRanges::all())
1890 .bao_to_vec()
1891 .await
1892 .unwrap();
1893 bao_by_hash.insert(*hash, bao);
1894 }
1895 store.dump().await?;
1896
1897 for size in sizes {
1898 let data = test_data(size);
1899 let ranges = ChunkRanges::all();
1900 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1901 store.import_bao_bytes(hash, ranges, bao).await?;
1902 }
1903
1904 for (_hash, _bao_tree) in bao_by_hash {
1905 }
1915 Ok(())
1916 }
1917
1918 pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
1919 let ext = extension.trim_start_matches('.').to_lowercase();
1921
1922 for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
1923 let path = entry.path();
1924
1925 if path.is_file() {
1926 if let Some(file_ext) = path.extension() {
1927 if file_ext.to_string_lossy().to_lowercase() == ext {
1928 println!("Deleting: {}", path.display());
1929 fs::remove_file(path)?;
1930 }
1931 }
1932 }
1933 }
1934
1935 Ok(())
1936 }
1937
1938 pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
1939 let mut entries: Vec<_> = WalkDir::new(&path)
1940 .into_iter()
1941 .filter_map(Result::ok) .collect();
1943
1944 entries.sort_by(|a, b| a.path().cmp(b.path()));
1946
1947 for entry in entries {
1948 let depth = entry.depth();
1949 let indent = " ".repeat(depth); let name = entry.file_name().to_string_lossy();
1951 let size = entry.metadata()?.len(); if entry.file_type().is_file() {
1954 println!("{indent}{name} ({size} bytes)");
1955 } else if entry.file_type().is_dir() {
1956 println!("{indent}{name}/");
1957 }
1958 }
1959 Ok(())
1960 }
1961
1962 pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
1963 let mut entries: Vec<_> = WalkDir::new(&path)
1964 .into_iter()
1965 .filter_map(Result::ok) .collect();
1967
1968 entries.sort_by(|a, b| a.path().cmp(b.path()));
1970
1971 for entry in entries {
1972 let depth = entry.depth();
1973 let indent = " ".repeat(depth);
1974 let name = entry.file_name().to_string_lossy();
1975
1976 if entry.file_type().is_dir() {
1977 println!("{indent}{name}/");
1978 } else if entry.file_type().is_file() {
1979 let size = entry.metadata()?.len();
1980 println!("{indent}{name} ({size} bytes)");
1981
1982 let path = entry.path();
1984 if name.ends_with(".data") {
1985 print!("{indent} ");
1986 dump_file(path, 1024 * 16)?;
1987 } else if name.ends_with(".obao4") {
1988 print!("{indent} ");
1989 dump_file(path, 64)?;
1990 } else if name.ends_with(".sizes4") {
1991 print!("{indent} ");
1992 dump_file(path, 8)?;
1993 } else if name.ends_with(".bitfield") {
1994 match read_checksummed::<Bitfield>(path) {
1995 Ok(bitfield) => {
1996 println!("{indent} bitfield: {bitfield:?}");
1997 }
1998 Err(cause) => {
1999 println!("{indent} bitfield: error: {cause}");
2000 }
2001 }
2002 } else {
2003 continue; };
2005 }
2006 }
2007 Ok(())
2008 }
2009
2010 pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2011 let bits = file_bits(path, chunk_size)?;
2012 println!("{}", print_bitfield_ansi(bits));
2013 Ok(())
2014 }
2015
2016 pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2017 let file = fs::File::open(&path)?;
2018 let file_size = file.metadata()?.len();
2019 let mut buffer = vec![0u8; chunk_size as usize];
2020 let mut bits = Vec::new();
2021
2022 let mut offset = 0u64;
2023 while offset < file_size {
2024 let remaining = file_size - offset;
2025 let current_chunk_size = chunk_size.min(remaining);
2026
2027 let chunk = &mut buffer[..current_chunk_size as usize];
2028 file.read_exact_at(offset, chunk)?;
2029
2030 let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2031 bits.push(has_non_zero);
2032
2033 offset += current_chunk_size;
2034 }
2035
2036 Ok(bits)
2037 }
2038
2039 #[allow(dead_code)]
2040 fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2041 bits.into_iter()
2042 .map(|bit| if bit { '#' } else { '_' })
2043 .collect()
2044 }
2045
2046 fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2047 let mut result = String::new();
2048 let mut iter = bits.into_iter();
2049
2050 while let Some(b1) = iter.next() {
2051 let b2 = iter.next();
2052
2053 let white_fg = "\x1b[97m"; let reset = "\x1b[0m"; let gray_bg = "\x1b[100m"; let black_bg = "\x1b[40m"; let colored_char = match (b1, b2) {
2060 (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), };
2067
2068 result.push_str(&colored_char);
2069 }
2070
2071 result.push_str("\x1b[0m");
2073 result
2074 }
2075
2076 fn bytes_to_stream(
2077 bytes: Bytes,
2078 chunk_size: usize,
2079 ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2080 assert!(chunk_size > 0, "Chunk size must be greater than 0");
2081 stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2082 if offset >= bytes.len() {
2083 None
2084 } else {
2085 let chunk_len = chunk_size.min(bytes.len() - offset);
2086 let chunk = bytes.slice(offset..offset + chunk_len);
2087 Some((Ok(chunk), (bytes, offset + chunk_len)))
2088 }
2089 })
2090 }
2091}