1use std::{
67 collections::{HashMap, HashSet},
68 fmt, fs,
69 future::Future,
70 io::Write,
71 num::NonZeroU64,
72 ops::Deref,
73 path::{Path, PathBuf},
74 sync::Arc,
75};
76
77use bao_tree::{
78 io::{
79 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
80 sync::ReadAt,
81 BaoContentItem, Leaf,
82 },
83 ChunkNum, ChunkRanges,
84};
85use bytes::Bytes;
86use delete_set::{BaoFilePart, ProtectHandle};
87use entry_state::{DataLocation, OutboardLocation};
88use gc::run_gc;
89use import::{ImportEntry, ImportSource};
90use irpc::channel::mpsc;
91use meta::{list_blobs, Snapshot};
92use n0_future::{future::yield_now, io};
93use nested_enum_utils::enum_conversions;
94use range_collections::range_set::RangeSetRange;
95use tokio::task::{Id, JoinError, JoinSet};
96use tracing::{error, instrument, trace};
97
98use crate::{
99 api::{
100 proto::{
101 self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
102 CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
103 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
104 ImportBaoRequest, ObserveMsg, Scope,
105 },
106 ApiClient,
107 },
108 store::{
109 util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
110 Hash,
111 },
112 util::{
113 channel::oneshot,
114 temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
115 ChunkRangesExt,
116 },
117};
118mod bao_file;
119use bao_file::{BaoFileHandle, BaoFileHandleWeak};
120mod delete_set;
121mod entry_state;
122mod import;
123mod meta;
124pub mod options;
125pub(crate) mod util;
126use entry_state::EntryState;
127use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
128use options::Options;
129use tracing::Instrument;
130mod gc;
131
132use super::HashAndFormat;
133use crate::api::{
134 self,
135 blobs::{AddProgressItem, ExportMode, ExportProgressItem},
136 Store,
137};
138
139fn new_uuid() -> [u8; 16] {
141 use rand::RngCore;
142 let mut rng = rand::thread_rng();
143 let mut bytes = [0u8; 16];
144 rng.fill_bytes(&mut bytes);
145 bytes
146}
147
148fn temp_name() -> String {
150 format!("{}.temp", hex::encode(new_uuid()))
151}
152
153#[derive(Debug)]
154#[enum_conversions()]
155pub(crate) enum InternalCommand {
156 Dump(meta::Dump),
157 FinishImport(ImportEntryMsg),
158 ClearScope(ClearScope),
159}
160
161#[derive(Debug)]
162pub(crate) struct ClearScope {
163 pub scope: Scope,
164}
165
166impl InternalCommand {
167 pub fn parent_span(&self) -> tracing::Span {
168 match self {
169 Self::Dump(_) => tracing::Span::current(),
170 Self::ClearScope(_) => tracing::Span::current(),
171 Self::FinishImport(cmd) => cmd
172 .parent_span_opt()
173 .cloned()
174 .unwrap_or_else(tracing::Span::current),
175 }
176 }
177}
178
179#[derive(Debug)]
181struct TaskContext {
182 pub options: Arc<Options>,
184 pub db: meta::Db,
186 pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
188 pub empty: BaoFileHandle,
190 pub protect: ProtectHandle,
192}
193
194impl TaskContext {
195 pub async fn clear_scope(&self, scope: Scope) {
196 self.internal_cmd_tx
197 .send(ClearScope { scope }.into())
198 .await
199 .ok();
200 }
201}
202
203#[derive(Debug)]
204struct Actor {
205 context: Arc<TaskContext>,
207 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
209 fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
211 tasks: JoinSet<()>,
213 running: HashSet<Id>,
215 handles: HashMap<Hash, Slot>,
217 temp_tags: TempTags,
219 _rt: RtWrapper,
221}
222
223struct HashContext {
227 slot: Slot,
228 ctx: Arc<TaskContext>,
229}
230
231impl HashContext {
232 pub fn db(&self) -> &meta::Db {
233 &self.ctx.db
234 }
235
236 pub fn options(&self) -> &Arc<Options> {
237 &self.ctx.options
238 }
239
240 pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option<BaoFileHandleWeak>> {
241 self.slot.0.lock().await
242 }
243
244 pub fn protect(&self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
245 self.ctx.protect.protect(hash, parts);
246 }
247
248 pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250 let (tx, rx) = oneshot::channel();
251 self.db()
252 .send(
253 meta::Update {
254 hash,
255 state,
256 tx: Some(tx),
257 span: tracing::Span::current(),
258 }
259 .into(),
260 )
261 .await?;
262 rx.await.map_err(|_e| io::Error::other(""))??;
263 Ok(())
264 }
265
266 pub async fn get_entry_state(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
267 if hash == Hash::EMPTY {
268 return Ok(Some(EntryState::Complete {
269 data_location: DataLocation::Inline(Bytes::new()),
270 outboard_location: OutboardLocation::NotNeeded,
271 }));
272 }
273 let (tx, rx) = oneshot::channel();
274 self.db()
275 .send(
276 meta::Get {
277 hash,
278 tx,
279 span: tracing::Span::current(),
280 }
281 .into(),
282 )
283 .await
284 .ok();
285 let res = rx.await.map_err(io::Error::other)?;
286 Ok(res.state?)
287 }
288
289 pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291 let (tx, rx) = oneshot::channel();
292 self.db()
293 .send(
294 meta::Set {
295 hash,
296 state,
297 tx,
298 span: tracing::Span::current(),
299 }
300 .into(),
301 )
302 .await
303 .map_err(io::Error::other)?;
304 rx.await.map_err(|_e| io::Error::other(""))??;
305 Ok(())
306 }
307
308 pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
309 if create {
310 self.get_or_create(hash).await
311 } else {
312 self.get(hash).await
313 }
314 }
315
316 pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
317 if hash == Hash::EMPTY {
318 return Ok(self.ctx.empty.clone());
319 }
320 let res = self
321 .slot
322 .get_or_create(|| async {
323 let res = self.db().get(hash).await.map_err(io::Error::other)?;
324 let res = match res {
325 Some(state) => open_bao_file(&hash, state, &self.ctx).await,
326 None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
327 };
328 Ok((res?, ()))
329 })
330 .await
331 .map_err(api::Error::from);
332 let (res, _) = res?;
333 Ok(res)
334 }
335
336 pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
337 if hash == Hash::EMPTY {
338 return Ok(self.ctx.empty.clone());
339 }
340 let res = self
341 .slot
342 .get_or_create(|| async {
343 let res = self.db().get(hash).await.map_err(io::Error::other)?;
344 let res = match res {
345 Some(state) => open_bao_file(&hash, state, &self.ctx).await,
346 None => Ok(BaoFileHandle::new_partial_mem(
347 hash,
348 self.ctx.options.clone(),
349 )),
350 };
351 Ok((res?, ()))
352 })
353 .await
354 .map_err(api::Error::from);
355 trace!("{res:?}");
356 let (res, _) = res?;
357 Ok(res)
358 }
359}
360
361async fn open_bao_file(
362 hash: &Hash,
363 state: EntryState<Bytes>,
364 ctx: &TaskContext,
365) -> io::Result<BaoFileHandle> {
366 let options = &ctx.options;
367 Ok(match state {
368 EntryState::Complete {
369 data_location,
370 outboard_location,
371 } => {
372 let data = match data_location {
373 DataLocation::Inline(data) => MemOrFile::Mem(data),
374 DataLocation::Owned(size) => {
375 let path = options.path.data_path(hash);
376 let file = fs::File::open(&path)?;
377 MemOrFile::File(FixedSize::new(file, size))
378 }
379 DataLocation::External(paths, size) => {
380 let Some(path) = paths.into_iter().next() else {
381 return Err(io::Error::other("no external data path"));
382 };
383 let file = fs::File::open(&path)?;
384 MemOrFile::File(FixedSize::new(file, size))
385 }
386 };
387 let outboard = match outboard_location {
388 OutboardLocation::NotNeeded => MemOrFile::empty(),
389 OutboardLocation::Inline(data) => MemOrFile::Mem(data),
390 OutboardLocation::Owned => {
391 let path = options.path.outboard_path(hash);
392 let file = fs::File::open(&path)?;
393 MemOrFile::File(file)
394 }
395 };
396 BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
397 }
398 EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
399 })
400}
401
402#[derive(Debug, Clone, Default)]
405pub(crate) struct Slot(Arc<tokio::sync::Mutex<Option<BaoFileHandleWeak>>>);
406
407impl Slot {
408 pub async fn is_live(&self) -> bool {
409 let slot = self.0.lock().await;
410 slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false)
411 }
412
413 pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
418 where
419 F: FnOnce() -> Fut,
420 Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
421 T: Default,
422 {
423 let mut slot = self.0.lock().await;
424 if let Some(weak) = &*slot {
425 if let Some(handle) = weak.upgrade() {
426 return Ok((handle, Default::default()));
427 }
428 }
429 let handle = make().await;
430 if let Ok((handle, _)) = &handle {
431 *slot = Some(handle.downgrade());
432 }
433 handle
434 }
435}
436
437impl Actor {
438 fn db(&self) -> &meta::Db {
439 &self.context.db
440 }
441
442 fn context(&self) -> Arc<TaskContext> {
443 self.context.clone()
444 }
445
446 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
447 let span = tracing::Span::current();
448 let id = self.tasks.spawn(fut.instrument(span)).id();
449 self.running.insert(id);
450 }
451
452 fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
453 match res {
454 Ok((id, _)) => {
455 self.running.remove(&id);
457 }
459 Err(e) => {
460 error!("task failed: {e}");
461 }
462 }
463 }
464
465 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
466 let CreateTempTagMsg { tx, inner, .. } = cmd;
467 let mut tt = self.temp_tags.create(inner.scope, inner.value);
468 if tx.is_rpc() {
469 tt.leak();
470 }
471 tx.send(tt).await.ok();
472 }
473
474 async fn clear_dead_handles(&mut self) {
475 let mut to_remove = Vec::new();
476 for (hash, slot) in &self.handles {
477 if !slot.is_live().await {
478 to_remove.push(*hash);
479 }
480 }
481 for hash in to_remove {
482 if let Some(slot) = self.handles.remove(&hash) {
483 let guard = slot.0.lock().await;
485 let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
486 if is_live {
487 drop(guard);
488 self.handles.insert(hash, slot);
489 }
490 }
491 }
492 }
493
494 async fn handle_command(&mut self, cmd: Command) {
495 let span = cmd.parent_span();
496 let _entered = span.enter();
497 match cmd {
498 Command::SyncDb(cmd) => {
499 trace!("{cmd:?}");
500 self.db().send(cmd.into()).await.ok();
501 }
502 Command::Shutdown(cmd) => {
503 trace!("{cmd:?}");
504 self.db().send(cmd.into()).await.ok();
505 }
506 Command::CreateTag(cmd) => {
507 trace!("{cmd:?}");
508 self.db().send(cmd.into()).await.ok();
509 }
510 Command::SetTag(cmd) => {
511 trace!("{cmd:?}");
512 self.db().send(cmd.into()).await.ok();
513 }
514 Command::ListTags(cmd) => {
515 trace!("{cmd:?}");
516 self.db().send(cmd.into()).await.ok();
517 }
518 Command::DeleteTags(cmd) => {
519 trace!("{cmd:?}");
520 self.db().send(cmd.into()).await.ok();
521 }
522 Command::RenameTag(cmd) => {
523 trace!("{cmd:?}");
524 self.db().send(cmd.into()).await.ok();
525 }
526 Command::ClearProtected(cmd) => {
527 trace!("{cmd:?}");
528 self.clear_dead_handles().await;
529 self.db().send(cmd.into()).await.ok();
530 }
531 Command::BlobStatus(cmd) => {
532 trace!("{cmd:?}");
533 self.db().send(cmd.into()).await.ok();
534 }
535 Command::ListBlobs(cmd) => {
536 trace!("{cmd:?}");
537 let (tx, rx) = tokio::sync::oneshot::channel();
538 self.db()
539 .send(
540 Snapshot {
541 tx,
542 span: cmd.span.clone(),
543 }
544 .into(),
545 )
546 .await
547 .ok();
548 if let Ok(snapshot) = rx.await {
549 self.spawn(list_blobs(snapshot, cmd));
550 }
551 }
552 Command::DeleteBlobs(cmd) => {
553 trace!("{cmd:?}");
554 self.db().send(cmd.into()).await.ok();
555 }
556 Command::Batch(cmd) => {
557 trace!("{cmd:?}");
558 let (id, scope) = self.temp_tags.create_scope();
559 self.spawn(handle_batch(cmd, id, scope, self.context()));
560 }
561 Command::CreateTempTag(cmd) => {
562 trace!("{cmd:?}");
563 self.create_temp_tag(cmd).await;
564 }
565 Command::ListTempTags(cmd) => {
566 trace!("{cmd:?}");
567 let tts = self.temp_tags.list();
568 cmd.tx.send(tts).await.ok();
569 }
570 Command::ImportBytes(cmd) => {
571 trace!("{cmd:?}");
572 self.spawn(import_bytes(cmd, self.context()));
573 }
574 Command::ImportByteStream(cmd) => {
575 trace!("{cmd:?}");
576 self.spawn(import_byte_stream(cmd, self.context()));
577 }
578 Command::ImportPath(cmd) => {
579 trace!("{cmd:?}");
580 self.spawn(import_path(cmd, self.context()));
581 }
582 Command::ExportPath(cmd) => {
583 trace!("{cmd:?}");
584 let ctx = self.hash_context(cmd.hash);
585 self.spawn(export_path(cmd, ctx));
586 }
587 Command::ExportBao(cmd) => {
588 trace!("{cmd:?}");
589 let ctx = self.hash_context(cmd.hash);
590 self.spawn(export_bao(cmd, ctx));
591 }
592 Command::ExportRanges(cmd) => {
593 trace!("{cmd:?}");
594 let ctx = self.hash_context(cmd.hash);
595 self.spawn(export_ranges(cmd, ctx));
596 }
597 Command::ImportBao(cmd) => {
598 trace!("{cmd:?}");
599 let ctx = self.hash_context(cmd.hash);
600 self.spawn(import_bao(cmd, ctx));
601 }
602 Command::Observe(cmd) => {
603 trace!("{cmd:?}");
604 let ctx = self.hash_context(cmd.hash);
605 self.spawn(observe(cmd, ctx));
606 }
607 }
608 }
609
610 fn hash_context(&mut self, hash: Hash) -> HashContext {
612 HashContext {
613 slot: self.handles.entry(hash).or_default().clone(),
614 ctx: self.context.clone(),
615 }
616 }
617
618 async fn handle_fs_command(&mut self, cmd: InternalCommand) {
619 let span = cmd.parent_span();
620 let _entered = span.enter();
621 match cmd {
622 InternalCommand::Dump(cmd) => {
623 trace!("{cmd:?}");
624 self.db().send(cmd.into()).await.ok();
625 }
626 InternalCommand::ClearScope(cmd) => {
627 trace!("{cmd:?}");
628 self.temp_tags.end_scope(cmd.scope);
629 }
630 InternalCommand::FinishImport(cmd) => {
631 trace!("{cmd:?}");
632 if cmd.hash == Hash::EMPTY {
633 cmd.tx
634 .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
635 .await
636 .ok();
637 } else {
638 let tt = self.temp_tags.create(
639 cmd.scope,
640 HashAndFormat {
641 hash: cmd.hash,
642 format: cmd.format,
643 },
644 );
645 let ctx = self.hash_context(cmd.hash);
646 self.spawn(finish_import(cmd, tt, ctx));
647 }
648 }
649 }
650 }
651
652 async fn run(mut self) {
653 loop {
654 tokio::select! {
655 cmd = self.cmd_rx.recv() => {
656 let Some(cmd) = cmd else {
657 break;
658 };
659 self.handle_command(cmd).await;
660 }
661 Some(cmd) = self.fs_cmd_rx.recv() => {
662 self.handle_fs_command(cmd).await;
663 }
664 Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
665 self.log_task_result(res);
666 }
667 }
668 }
669 }
670
671 async fn new(
672 db_path: PathBuf,
673 rt: RtWrapper,
674 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
675 fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
676 fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
677 options: Arc<Options>,
678 ) -> anyhow::Result<Self> {
679 trace!(
680 "creating data directory: {}",
681 options.path.data_path.display()
682 );
683 fs::create_dir_all(&options.path.data_path)?;
684 trace!(
685 "creating temp directory: {}",
686 options.path.temp_path.display()
687 );
688 fs::create_dir_all(&options.path.temp_path)?;
689 trace!(
690 "creating parent directory for db file{}",
691 db_path.parent().unwrap().display()
692 );
693 fs::create_dir_all(db_path.parent().unwrap())?;
694 let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
695 let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
696 let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
697 let slot_context = Arc::new(TaskContext {
698 options: options.clone(),
699 db: meta::Db::new(db_send),
700 internal_cmd_tx: fs_commands_tx,
701 empty: BaoFileHandle::new_complete(
702 Hash::EMPTY,
703 MemOrFile::empty(),
704 MemOrFile::empty(),
705 options,
706 ),
707 protect,
708 });
709 rt.spawn(db_actor.run());
710 Ok(Self {
711 context: slot_context,
712 cmd_rx,
713 fs_cmd_rx: fs_commands_rx,
714 tasks: JoinSet::new(),
715 running: HashSet::new(),
716 handles: Default::default(),
717 temp_tags: Default::default(),
718 _rt: rt,
719 })
720 }
721}
722
723struct RtWrapper(Option<tokio::runtime::Runtime>);
724
725impl From<tokio::runtime::Runtime> for RtWrapper {
726 fn from(rt: tokio::runtime::Runtime) -> Self {
727 Self(Some(rt))
728 }
729}
730
731impl fmt::Debug for RtWrapper {
732 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733 ValueOrPoisioned(self.0.as_ref()).fmt(f)
734 }
735}
736
737impl Deref for RtWrapper {
738 type Target = tokio::runtime::Runtime;
739
740 fn deref(&self) -> &Self::Target {
741 self.0.as_ref().unwrap()
742 }
743}
744
745impl Drop for RtWrapper {
746 fn drop(&mut self) {
747 if let Some(rt) = self.0.take() {
748 trace!("dropping tokio runtime");
749 tokio::task::block_in_place(|| {
750 drop(rt);
751 });
752 trace!("dropped tokio runtime");
753 }
754 }
755}
756
757async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
758 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
759 error!("batch failed: {cause}");
760 }
761 ctx.clear_scope(id).await;
762}
763
764async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
765 let BatchMsg { tx, mut rx, .. } = cmd;
766 trace!("created scope {}", id);
767 tx.send(id).await.map_err(api::Error::other)?;
768 while let Some(msg) = rx.recv().await? {
769 match msg {
770 BatchResponse::Drop(msg) => scope.on_drop(&msg),
771 BatchResponse::Ping => {}
772 }
773 }
774 Ok(())
775}
776
777#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
778async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) {
779 let res = match finish_import_impl(cmd.inner, ctx).await {
780 Ok(()) => {
781 if cmd.tx.is_rpc() {
784 trace!("leaking temp tag {}", tt.hash_and_format());
785 tt.leak();
786 }
787 AddProgressItem::Done(tt)
788 }
789 Err(cause) => AddProgressItem::Error(cause),
790 };
791 cmd.tx.send(res).await.ok();
792}
793
794async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> {
795 let ImportEntry {
796 source,
797 hash,
798 outboard,
799 ..
800 } = import_data;
801 let options = ctx.options();
802 match &source {
803 ImportSource::Memory(data) => {
804 debug_assert!(options.is_inlined_data(data.len() as u64));
805 }
806 ImportSource::External(_, _, size) => {
807 debug_assert!(!options.is_inlined_data(*size));
808 }
809 ImportSource::TempFile(_, _, size) => {
810 debug_assert!(!options.is_inlined_data(*size));
811 }
812 }
813 let guard = ctx.lock().await;
814 let handle = guard.as_ref().and_then(|x| x.upgrade());
815 ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]);
821 let data_location = match source {
822 ImportSource::Memory(data) => DataLocation::Inline(data),
823 ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
824 ImportSource::TempFile(path, _file, size) => {
825 let target = ctx.options().path.data_path(&hash);
828 trace!(
829 "moving temp file to owned data location: {} -> {}",
830 path.display(),
831 target.display()
832 );
833 if let Err(cause) = fs::rename(&path, &target) {
834 error!(
835 "failed to move temp file {} to owned data location {}: {cause}",
836 path.display(),
837 target.display()
838 );
839 }
840 DataLocation::Owned(size)
841 }
842 };
843 let outboard_location = match outboard {
844 MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
845 MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
846 MemOrFile::File(path) => {
847 let target = ctx.options().path.outboard_path(&hash);
849 trace!(
850 "moving temp file to owned outboard location: {} -> {}",
851 path.display(),
852 target.display()
853 );
854 if let Err(cause) = fs::rename(&path, &target) {
855 error!(
856 "failed to move temp file {} to owned outboard location {}: {cause}",
857 path.display(),
858 target.display()
859 );
860 }
861 OutboardLocation::Owned
862 }
863 };
864 if let Some(handle) = handle {
865 let data = match &data_location {
866 DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
867 DataLocation::Owned(size) => {
868 let path = ctx.options().path.data_path(&hash);
869 let file = fs::File::open(&path)?;
870 MemOrFile::File(FixedSize::new(file, *size))
871 }
872 DataLocation::External(paths, size) => {
873 let Some(path) = paths.iter().next() else {
874 return Err(io::Error::other("no external data path"));
875 };
876 let file = fs::File::open(path)?;
877 MemOrFile::File(FixedSize::new(file, *size))
878 }
879 };
880 let outboard = match &outboard_location {
881 OutboardLocation::NotNeeded => MemOrFile::empty(),
882 OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
883 OutboardLocation::Owned => {
884 let path = ctx.options().path.outboard_path(&hash);
885 let file = fs::File::open(&path)?;
886 MemOrFile::File(file)
887 }
888 };
889 handle.complete(data, outboard);
890 }
891 let state = EntryState::Complete {
892 data_location,
893 outboard_location,
894 };
895 ctx.update(hash, state).await?;
896 Ok(())
897}
898
899#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
900async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) {
901 trace!("{cmd:?}");
902 let ImportBaoMsg {
903 inner: ImportBaoRequest { size, hash },
904 rx,
905 tx,
906 ..
907 } = cmd;
908 let res = match ctx.get_or_create(hash).await {
909 Ok(handle) => import_bao_impl(size, rx, handle, ctx).await,
910 Err(cause) => Err(cause),
911 };
912 trace!("{res:?}");
913 tx.send(res).await.ok();
914}
915
916fn chunk_range(leaf: &Leaf) -> ChunkRanges {
917 let start = ChunkNum::chunks(leaf.offset);
918 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
919 (start..end).into()
920}
921
922async fn import_bao_impl(
923 size: NonZeroU64,
924 mut rx: mpsc::Receiver<BaoContentItem>,
925 handle: BaoFileHandle,
926 ctx: HashContext,
927) -> api::Result<()> {
928 trace!(
929 "importing bao: {} {} bytes",
930 handle.hash().fmt_short(),
931 size
932 );
933 let mut batch = Vec::<BaoContentItem>::new();
934 let mut ranges = ChunkRanges::empty();
935 while let Some(item) = rx.recv().await? {
936 if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
938 let bitfield = Bitfield::new_unchecked(ranges, size.into());
939 handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
940 batch.clear();
941 ranges = ChunkRanges::empty();
942 }
943 if let BaoContentItem::Leaf(leaf) = &item {
944 let leaf_range = chunk_range(leaf);
945 if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
946 {
947 return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
948 }
949 ranges |= leaf_range;
950 }
951 batch.push(item);
952 }
953 if !batch.is_empty() {
954 let bitfield = Bitfield::new_unchecked(ranges, size.into());
955 handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
956 }
957 Ok(())
958}
959
960#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
961async fn observe(cmd: ObserveMsg, ctx: HashContext) {
962 let Ok(handle) = ctx.get_or_create(cmd.hash).await else {
963 return;
964 };
965 handle.subscribe().forward(cmd.tx).await.ok();
966}
967
968#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
969async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
970 match ctx.get(cmd.hash).await {
971 Ok(handle) => {
972 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
973 cmd.tx
974 .send(ExportRangesItem::Error(cause.into()))
975 .await
976 .ok();
977 }
978 }
979 Err(cause) => {
980 cmd.tx.send(ExportRangesItem::Error(cause)).await.ok();
981 }
982 }
983}
984
985async fn export_ranges_impl(
986 cmd: ExportRangesRequest,
987 tx: &mut mpsc::Sender<ExportRangesItem>,
988 handle: BaoFileHandle,
989) -> io::Result<()> {
990 let ExportRangesRequest { ranges, hash } = cmd;
991 trace!(
992 "exporting ranges: {hash} {ranges:?} size={}",
993 handle.current_size()?
994 );
995 debug_assert!(handle.hash() == hash, "hash mismatch");
996 let bitfield = handle.bitfield()?;
997 let data = handle.data_reader();
998 let size = bitfield.size();
999 for range in ranges.iter() {
1000 let range = match range {
1001 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1002 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1003 };
1004 let requested = ChunkRanges::bytes(range.start..range.end);
1005 if !bitfield.ranges.is_superset(&requested) {
1006 return Err(io::Error::other(format!(
1007 "missing range: {requested:?}, present: {bitfield:?}",
1008 )));
1009 }
1010 let bs = 1024;
1011 let mut offset = range.start;
1012 loop {
1013 let end: u64 = (offset + bs).min(range.end);
1014 let size = (end - offset) as usize;
1015 tx.send(ExportRangesItem::Data(Leaf {
1016 offset,
1017 data: data.read_bytes_at(offset, size)?,
1018 }))
1019 .await?;
1020 offset = end;
1021 if offset >= range.end {
1022 break;
1023 }
1024 }
1025 }
1026 Ok(())
1027}
1028
1029#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1030async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1031 match ctx.get_maybe_create(cmd.hash, false).await {
1032 Ok(handle) => {
1033 if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
1034 cmd.tx
1035 .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1036 .await
1037 .ok();
1038 }
1039 }
1040 Err(cause) => {
1041 let crate::api::Error::Io(cause) = cause;
1042 cmd.tx
1043 .send(bao_tree::io::EncodeError::Io(cause).into())
1044 .await
1045 .ok();
1046 }
1047 }
1048}
1049
1050async fn export_bao_impl(
1051 cmd: ExportBaoRequest,
1052 tx: &mut mpsc::Sender<EncodedItem>,
1053 handle: BaoFileHandle,
1054) -> anyhow::Result<()> {
1055 let ExportBaoRequest { ranges, hash, .. } = cmd;
1056 debug_assert!(handle.hash() == hash, "hash mismatch");
1057 let outboard = handle.outboard()?;
1058 let size = outboard.tree.size();
1059 if size == 0 && hash != Hash::EMPTY {
1060 return Ok(());
1062 }
1063 trace!("exporting bao: {hash} {ranges:?} size={size}",);
1064 let data = handle.data_reader();
1065 let tx = BaoTreeSender::new(tx);
1066 traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1067 Ok(())
1068}
1069
1070#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1071async fn export_path(cmd: ExportPathMsg, ctx: HashContext) {
1072 let ExportPathMsg { inner, mut tx, .. } = cmd;
1073 if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await {
1074 tx.send(cause.into()).await.ok();
1075 }
1076}
1077
1078async fn export_path_impl(
1079 cmd: ExportPathRequest,
1080 tx: &mut mpsc::Sender<ExportProgressItem>,
1081 ctx: HashContext,
1082) -> api::Result<()> {
1083 let ExportPathRequest { mode, target, .. } = cmd;
1084 if !target.is_absolute() {
1085 return Err(api::Error::io(
1086 io::ErrorKind::InvalidInput,
1087 "path is not absolute",
1088 ));
1089 }
1090 if let Some(parent) = target.parent() {
1091 fs::create_dir_all(parent)?;
1092 }
1093 let _guard = ctx.lock().await;
1094 let state = ctx.get_entry_state(cmd.hash).await?;
1095 let (data_location, outboard_location) = match state {
1096 Some(EntryState::Complete {
1097 data_location,
1098 outboard_location,
1099 }) => (data_location, outboard_location),
1100 Some(EntryState::Partial { .. }) => {
1101 return Err(api::Error::io(
1102 io::ErrorKind::InvalidInput,
1103 "cannot export partial entry",
1104 ));
1105 }
1106 None => {
1107 return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1108 }
1109 };
1110 trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1111 let data = match data_location {
1112 DataLocation::Inline(data) => MemOrFile::Mem(data),
1113 DataLocation::Owned(size) => {
1114 MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size))
1115 }
1116 DataLocation::External(paths, size) => MemOrFile::File((
1117 paths
1118 .into_iter()
1119 .next()
1120 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no external data path"))?,
1121 size,
1122 )),
1123 };
1124 let size = match &data {
1125 MemOrFile::Mem(data) => data.len() as u64,
1126 MemOrFile::File((_, size)) => *size,
1127 };
1128 tx.send(ExportProgressItem::Size(size))
1129 .await
1130 .map_err(api::Error::other)?;
1131 match data {
1132 MemOrFile::Mem(data) => {
1133 let mut target = fs::File::create(&target)?;
1134 target.write_all(&data)?;
1135 }
1136 MemOrFile::File((source_path, size)) => match mode {
1137 ExportMode::Copy => {
1138 let source = fs::File::open(&source_path)?;
1139 let mut target = fs::File::create(&target)?;
1140 copy_with_progress(&source, size, &mut target, tx).await?
1141 }
1142 ExportMode::TryReference => {
1143 match std::fs::rename(&source_path, &target) {
1144 Ok(()) => {}
1145 Err(cause) => {
1146 const ERR_CROSS: i32 = 18;
1147 if cause.raw_os_error() == Some(ERR_CROSS) {
1148 let source = fs::File::open(&source_path)?;
1149 let mut target = fs::File::create(&target)?;
1150 copy_with_progress(&source, size, &mut target, tx).await?;
1151 } else {
1152 return Err(cause.into());
1153 }
1154 }
1155 }
1156 ctx.set(
1157 cmd.hash,
1158 EntryState::Complete {
1159 data_location: DataLocation::External(vec![target], size),
1160 outboard_location,
1161 },
1162 )
1163 .await?;
1164 }
1165 },
1166 }
1167 tx.send(ExportProgressItem::Done)
1168 .await
1169 .map_err(api::Error::other)?;
1170 Ok(())
1171}
1172
1173async fn copy_with_progress(
1174 file: impl ReadAt,
1175 size: u64,
1176 target: &mut impl Write,
1177 tx: &mut mpsc::Sender<ExportProgressItem>,
1178) -> io::Result<()> {
1179 let mut offset = 0;
1180 let mut buf = vec![0u8; 1024 * 1024];
1181 while offset < size {
1182 let remaining = buf.len().min((size - offset) as usize);
1183 let buf: &mut [u8] = &mut buf[..remaining];
1184 file.read_exact_at(offset, buf)?;
1185 target.write_all(buf)?;
1186 tx.try_send(ExportProgressItem::CopyProgress(offset))
1187 .await
1188 .map_err(|_e| io::Error::other(""))?;
1189 yield_now().await;
1190 offset += buf.len() as u64;
1191 }
1192 Ok(())
1193}
1194
1195impl FsStore {
1196 pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1198 let path = root.as_ref();
1199 let db_path = path.join("blobs.db");
1200 let options = Options::new(path);
1201 Self::load_with_opts(db_path, options).await
1202 }
1203
1204 pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1206 let rt = tokio::runtime::Builder::new_multi_thread()
1207 .thread_name("iroh-blob-store")
1208 .enable_time()
1209 .build()?;
1210 let handle = rt.handle().clone();
1211 let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1212 let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1213 let gc_config = options.gc.clone();
1214 let actor = handle
1215 .spawn(Actor::new(
1216 db_path,
1217 rt.into(),
1218 commands_rx,
1219 fs_commands_rx,
1220 fs_commands_tx.clone(),
1221 Arc::new(options),
1222 ))
1223 .await??;
1224 handle.spawn(actor.run());
1225 let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1226 if let Some(config) = gc_config {
1227 handle.spawn(run_gc(store.deref().clone(), config));
1228 }
1229 Ok(store)
1230 }
1231}
1232
1233#[derive(Debug, Clone)]
1243pub struct FsStore {
1244 sender: ApiClient,
1245 db: tokio::sync::mpsc::Sender<InternalCommand>,
1246}
1247
1248impl Deref for FsStore {
1249 type Target = Store;
1250
1251 fn deref(&self) -> &Self::Target {
1252 Store::ref_from_sender(&self.sender)
1253 }
1254}
1255
1256impl AsRef<Store> for FsStore {
1257 fn as_ref(&self) -> &Store {
1258 self.deref()
1259 }
1260}
1261
1262impl FsStore {
1263 fn new(
1264 sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1265 db: tokio::sync::mpsc::Sender<InternalCommand>,
1266 ) -> Self {
1267 Self {
1268 sender: sender.into(),
1269 db,
1270 }
1271 }
1272
1273 pub async fn dump(&self) -> anyhow::Result<()> {
1274 let (tx, rx) = oneshot::channel();
1275 self.db
1276 .send(
1277 meta::Dump {
1278 tx,
1279 span: tracing::Span::current(),
1280 }
1281 .into(),
1282 )
1283 .await?;
1284 rx.await??;
1285 Ok(())
1286 }
1287}
1288
1289#[cfg(test)]
1290pub mod tests {
1291 use core::panic;
1292 use std::collections::{HashMap, HashSet};
1293
1294 use bao_tree::{
1295 io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1296 ChunkRanges,
1297 };
1298 use n0_future::{stream, Stream, StreamExt};
1299 use testresult::TestResult;
1300 use walkdir::WalkDir;
1301
1302 use super::*;
1303 use crate::{
1304 api::blobs::Bitfield,
1305 store::{
1306 util::{read_checksummed, SliceInfoExt, Tag},
1307 HashAndFormat, IROH_BLOCK_SIZE,
1308 },
1309 };
1310
1311 pub const INTERESTING_SIZES: [usize; 8] = [
1313 0, 1, 1024, 1024 * 16 - 1, 1024 * 16, 1024 * 16 + 1, 1024 * 1024, 1024 * 1024 * 8, ];
1322
1323 pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1326 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1327 let mut encoded = Vec::new();
1328 let size = data.len() as u64;
1329 encoded.extend_from_slice(&size.to_le_bytes());
1330 bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1331 Ok((outboard.root.into(), encoded))
1332 }
1333
1334 pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1335 let last_chunk = ChunkNum::chunks(size);
1336 let data_range = ChunkRanges::from(..last_chunk);
1337 let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1338 if last_chunk == 0 {
1339 ChunkRanges::all()
1340 } else {
1341 ChunkRanges::from(last_chunk - 1..)
1342 }
1343 } else {
1344 ranges.clone()
1345 };
1346 round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1347 }
1348
1349 fn create_n0_bao_full(
1350 data: &[u8],
1351 ranges: &ChunkRanges,
1352 ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1353 let ranges = round_up_request(data.len() as u64, ranges);
1354 let (hash, encoded) = create_n0_bao(data, &ranges)?;
1355 Ok((hash, ranges, encoded))
1356 }
1357
1358 #[tokio::test]
1359 async fn test_observe() -> TestResult<()> {
1361 tracing_subscriber::fmt::try_init().ok();
1362 let testdir = tempfile::tempdir()?;
1363 let db_dir = testdir.path().join("db");
1364 let options = Options::new(&db_dir);
1365 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1366 let sizes = INTERESTING_SIZES;
1367 for size in sizes {
1368 let data = test_data(size);
1369 let ranges = ChunkRanges::all();
1370 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1371 let obs = store.observe(hash);
1372 let task = tokio::spawn(async move {
1373 obs.await_completion().await?;
1374 api::Result::Ok(())
1375 });
1376 store.import_bao_bytes(hash, ranges, bao).await?;
1377 task.await??;
1378 }
1379 Ok(())
1380 }
1381
1382 pub fn test_data(n: usize) -> Bytes {
1388 let mut res = Vec::with_capacity(n);
1389 for i in 0..n {
1391 let block_num = i / 1024;
1393 let ascii_val = 65 + (block_num % 26) as u8;
1395 res.push(ascii_val);
1396 }
1397 Bytes::from(res)
1398 }
1399
1400 #[tokio::test]
1402 async fn test_import_byte_stream() -> TestResult<()> {
1403 tracing_subscriber::fmt::try_init().ok();
1404 let testdir = tempfile::tempdir()?;
1405 let db_dir = testdir.path().join("db");
1406 let store = FsStore::load(db_dir).await?;
1407 for size in INTERESTING_SIZES {
1408 let expected = test_data(size);
1409 let expected_hash = Hash::new(&expected);
1410 let stream = bytes_to_stream(expected.clone(), 1023);
1411 let obs = store.observe(expected_hash);
1412 let tt = store.add_stream(stream).await.temp_tag().await?;
1413 assert_eq!(expected_hash, *tt.hash());
1414 obs.await_completion().await?;
1416 let actual = store.get_bytes(expected_hash).await?;
1417 assert_eq!(&expected, &actual);
1419 }
1420 Ok(())
1421 }
1422
1423 #[tokio::test]
1425 async fn test_import_bytes() -> TestResult<()> {
1426 tracing_subscriber::fmt::try_init().ok();
1427 let testdir = tempfile::tempdir()?;
1428 let db_dir = testdir.path().join("db");
1429 let store = FsStore::load(&db_dir).await?;
1430 let sizes = INTERESTING_SIZES;
1431 trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1432 for size in sizes {
1433 let expected = test_data(size);
1434 let expected_hash = Hash::new(&expected);
1435 let obs = store.observe(expected_hash);
1436 let tt = store.add_bytes(expected.clone()).await?;
1437 assert_eq!(expected_hash, tt.hash);
1438 obs.await_completion().await?;
1440 let actual = store.get_bytes(expected_hash).await?;
1441 assert_eq!(&expected, &actual);
1443 }
1444 store.shutdown().await?;
1445 dump_dir_full(db_dir)?;
1446 Ok(())
1447 }
1448
1449 #[tokio::test]
1451 #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1452 async fn test_roundtrip_bytes_small() -> TestResult<()> {
1453 tracing_subscriber::fmt::try_init().ok();
1454 let testdir = tempfile::tempdir()?;
1455 let db_dir = testdir.path().join("db");
1456 let store = FsStore::load(db_dir).await?;
1457 for size in INTERESTING_SIZES
1458 .into_iter()
1459 .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1460 {
1461 let expected = test_data(size);
1462 let expected_hash = Hash::new(&expected);
1463 let obs = store.observe(expected_hash);
1464 let tt = store.add_bytes(expected.clone()).await?;
1465 assert_eq!(expected_hash, tt.hash);
1466 let actual = store.get_bytes(expected_hash).await?;
1467 assert_eq!(&expected, &actual);
1469 assert_eq!(
1470 &expected.addr(),
1471 &actual.addr(),
1472 "address mismatch for size {size}"
1473 );
1474 obs.await_completion().await?;
1478 }
1479 store.shutdown().await?;
1480 Ok(())
1481 }
1482
1483 #[tokio::test]
1485 async fn test_import_path() -> TestResult<()> {
1486 tracing_subscriber::fmt::try_init().ok();
1487 let testdir = tempfile::tempdir()?;
1488 let db_dir = testdir.path().join("db");
1489 let store = FsStore::load(db_dir).await?;
1490 for size in INTERESTING_SIZES {
1491 let expected = test_data(size);
1492 let expected_hash = Hash::new(&expected);
1493 let path = testdir.path().join(format!("in-{size}"));
1494 fs::write(&path, &expected)?;
1495 let obs = store.observe(expected_hash);
1496 let tt = store.add_path(&path).await?;
1497 assert_eq!(expected_hash, tt.hash);
1498 obs.await_completion().await?;
1500 let actual = store.get_bytes(expected_hash).await?;
1501 assert_eq!(&expected, &actual, "size={size}");
1503 }
1504 dump_dir_full(testdir.path())?;
1505 Ok(())
1506 }
1507
1508 #[tokio::test]
1510 async fn test_export_path() -> TestResult<()> {
1511 tracing_subscriber::fmt::try_init().ok();
1512 let testdir = tempfile::tempdir()?;
1513 let db_dir = testdir.path().join("db");
1514 let store = FsStore::load(db_dir).await?;
1515 for size in INTERESTING_SIZES {
1516 let expected = test_data(size);
1517 let expected_hash = Hash::new(&expected);
1518 let tt = store.add_bytes(expected.clone()).await?;
1519 assert_eq!(expected_hash, tt.hash);
1520 let out_path = testdir.path().join(format!("out-{size}"));
1521 store.export(expected_hash, &out_path).await?;
1522 let actual = fs::read(&out_path)?;
1523 assert_eq!(expected, actual);
1524 }
1525 Ok(())
1526 }
1527
1528 #[tokio::test]
1529 async fn test_import_bao_ranges() -> TestResult<()> {
1530 tracing_subscriber::fmt::try_init().ok();
1531 let testdir = tempfile::tempdir()?;
1532 let db_dir = testdir.path().join("db");
1533 {
1534 let store = FsStore::load(&db_dir).await?;
1535 let data = test_data(100000);
1536 let ranges = ChunkRanges::chunks(16..32);
1537 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1538 store
1539 .import_bao_bytes(hash, ranges.clone(), bao.clone())
1540 .await?;
1541 let bitfield = store.observe(hash).await?;
1542 assert_eq!(bitfield.ranges, ranges);
1543 assert_eq!(bitfield.size(), data.len() as u64);
1544 let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1545 assert_eq!(export, bao);
1546 }
1547 Ok(())
1548 }
1549
1550 #[tokio::test]
1551 async fn test_import_bao_minimal() -> TestResult<()> {
1552 tracing_subscriber::fmt::try_init().ok();
1553 let testdir = tempfile::tempdir()?;
1554 let sizes = [1];
1555 let db_dir = testdir.path().join("db");
1556 {
1557 let store = FsStore::load(&db_dir).await?;
1558 for size in sizes {
1559 let data = vec![0u8; size];
1560 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1561 let data = Bytes::from(encoded);
1562 store
1563 .import_bao_bytes(hash, ChunkRanges::all(), data)
1564 .await?;
1565 }
1566 store.shutdown().await?;
1567 }
1568 Ok(())
1569 }
1570
1571 #[tokio::test]
1572 async fn test_import_bao_simple() -> TestResult<()> {
1573 tracing_subscriber::fmt::try_init().ok();
1574 let testdir = tempfile::tempdir()?;
1575 let sizes = [1048576];
1576 let db_dir = testdir.path().join("db");
1577 {
1578 let store = FsStore::load(&db_dir).await?;
1579 for size in sizes {
1580 let data = vec![0u8; size];
1581 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1582 let data = Bytes::from(encoded);
1583 trace!("importing size={}", size);
1584 store
1585 .import_bao_bytes(hash, ChunkRanges::all(), data)
1586 .await?;
1587 }
1588 store.shutdown().await?;
1589 }
1590 Ok(())
1591 }
1592
1593 #[tokio::test]
1594 async fn test_import_bao_persistence_full() -> TestResult<()> {
1595 tracing_subscriber::fmt::try_init().ok();
1596 let testdir = tempfile::tempdir()?;
1597 let sizes = INTERESTING_SIZES;
1598 let db_dir = testdir.path().join("db");
1599 {
1600 let store = FsStore::load(&db_dir).await?;
1601 for size in sizes {
1602 let data = vec![0u8; size];
1603 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1604 let data = Bytes::from(encoded);
1605 store
1606 .import_bao_bytes(hash, ChunkRanges::all(), data)
1607 .await?;
1608 }
1609 store.shutdown().await?;
1610 }
1611 {
1612 let store = FsStore::load(&db_dir).await?;
1613 for size in sizes {
1614 let expected = vec![0u8; size];
1615 let hash = Hash::new(&expected);
1616 let actual = store
1617 .export_bao(hash, ChunkRanges::all())
1618 .data_to_vec()
1619 .await?;
1620 assert_eq!(&expected, &actual);
1621 }
1622 store.shutdown().await?;
1623 }
1624 Ok(())
1625 }
1626
1627 #[tokio::test]
1628 async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1629 tracing_subscriber::fmt::try_init().ok();
1630 let testdir = tempfile::tempdir()?;
1631 let sizes = INTERESTING_SIZES;
1632 let db_dir = testdir.path().join("db");
1633 let just_size = ChunkRanges::last_chunk();
1634 {
1635 let store = FsStore::load(&db_dir).await?;
1636 for size in sizes {
1637 let data = test_data(size);
1638 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1639 let data = Bytes::from(encoded);
1640 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1641 panic!("failed to import size={size}: {cause}");
1642 }
1643 }
1644 store.dump().await?;
1645 store.shutdown().await?;
1646 }
1647 {
1648 let store = FsStore::load(&db_dir).await?;
1649 store.dump().await?;
1650 for size in sizes {
1651 let data = test_data(size);
1652 let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1653 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1654 Ok(actual) => actual,
1655 Err(cause) => panic!("failed to export size={size}: {cause}"),
1656 };
1657 assert_eq!(&expected, &actual);
1658 }
1659 store.shutdown().await?;
1660 }
1661 dump_dir_full(testdir.path())?;
1662 Ok(())
1663 }
1664
1665 #[tokio::test]
1666 async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1667 tracing_subscriber::fmt::try_init().ok();
1668 let testdir = tempfile::tempdir()?;
1669 let sizes = INTERESTING_SIZES;
1670 let db_dir = testdir.path().join("db");
1671 let just_size = ChunkRanges::last_chunk();
1672 {
1674 let store = FsStore::load(&db_dir).await?;
1675 for size in sizes {
1676 let data = test_data(size);
1677 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1678 let data = Bytes::from(encoded);
1679 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1680 panic!("failed to import size={size}: {cause}");
1681 }
1682 }
1683 store.dump().await?;
1684 store.shutdown().await?;
1685 }
1686 dump_dir_full(testdir.path())?;
1687 {
1689 let store = FsStore::load(&db_dir).await?;
1690 for size in sizes {
1691 let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1692 if remaining.is_empty() {
1693 continue;
1694 }
1695 let data = test_data(size);
1696 let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1697 let data = Bytes::from(encoded);
1698 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1699 panic!("failed to import size={size}: {cause}");
1700 }
1701 }
1702 store.dump().await?;
1703 store.shutdown().await?;
1704 }
1705 {
1707 let store = FsStore::load(&db_dir).await?;
1708 store.dump().await?;
1709 for size in sizes {
1710 let data = test_data(size);
1711 let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1712 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1713 Ok(actual) => actual,
1714 Err(cause) => panic!("failed to export size={size}: {cause}"),
1715 };
1716 assert_eq!(&expected, &actual);
1717 }
1718 store.dump().await?;
1719 store.shutdown().await?;
1720 }
1721 dump_dir_full(testdir.path())?;
1722 Ok(())
1723 }
1724
1725 fn just_size() -> ChunkRanges {
1726 ChunkRanges::last_chunk()
1727 }
1728
1729 #[tokio::test]
1730 async fn test_import_bao_persistence_observe() -> TestResult<()> {
1731 tracing_subscriber::fmt::try_init().ok();
1732 let testdir = tempfile::tempdir()?;
1733 let sizes = INTERESTING_SIZES;
1734 let db_dir = testdir.path().join("db");
1735 let just_size = just_size();
1736 {
1738 let store = FsStore::load(&db_dir).await?;
1739 for size in sizes {
1740 let data = test_data(size);
1741 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1742 let data = Bytes::from(encoded);
1743 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1744 panic!("failed to import size={size}: {cause}");
1745 }
1746 }
1747 store.dump().await?;
1748 store.shutdown().await?;
1749 }
1750 dump_dir_full(testdir.path())?;
1751 {
1753 let store = FsStore::load(&db_dir).await?;
1754 for size in sizes {
1755 let expected_ranges = round_up_request(size as u64, &just_size);
1756 let data = test_data(size);
1757 let hash = Hash::new(&data);
1758 let bitfield = store.observe(hash).await?;
1759 assert_eq!(bitfield.ranges, expected_ranges);
1760 }
1761 store.dump().await?;
1762 store.shutdown().await?;
1763 }
1764 Ok(())
1765 }
1766
1767 #[tokio::test]
1768 async fn test_import_bao_persistence_recover() -> TestResult<()> {
1769 tracing_subscriber::fmt::try_init().ok();
1770 let testdir = tempfile::tempdir()?;
1771 let sizes = INTERESTING_SIZES;
1772 let db_dir = testdir.path().join("db");
1773 let options = Options::new(&db_dir);
1774 let just_size = just_size();
1775 {
1777 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1778 for size in sizes {
1779 let data = test_data(size);
1780 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1781 let data = Bytes::from(encoded);
1782 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1783 panic!("failed to import size={size}: {cause}");
1784 }
1785 }
1786 store.dump().await?;
1787 store.shutdown().await?;
1788 }
1789 delete_rec(testdir.path(), "bitfield")?;
1790 dump_dir_full(testdir.path())?;
1791 {
1793 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1794 for size in sizes {
1795 let expected_ranges = round_up_request(size as u64, &just_size);
1796 let data = test_data(size);
1797 let hash = Hash::new(&data);
1798 let bitfield = store.observe(hash).await?;
1799 assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1800 }
1801 store.dump().await?;
1802 store.shutdown().await?;
1803 }
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn test_import_bytes_persistence_full() -> TestResult<()> {
1809 tracing_subscriber::fmt::try_init().ok();
1810 let testdir = tempfile::tempdir()?;
1811 let sizes = INTERESTING_SIZES;
1812 let db_dir = testdir.path().join("db");
1813 {
1814 let store = FsStore::load(&db_dir).await?;
1815 let mut tts = Vec::new();
1816 for size in sizes {
1817 let data = test_data(size);
1818 let data = data;
1819 tts.push(store.add_bytes(data.clone()).await?);
1820 }
1821 store.dump().await?;
1822 store.shutdown().await?;
1823 }
1824 {
1825 let store = FsStore::load(&db_dir).await?;
1826 store.dump().await?;
1827 for size in sizes {
1828 let expected = test_data(size);
1829 let hash = Hash::new(&expected);
1830 let Ok(actual) = store
1831 .export_bao(hash, ChunkRanges::all())
1832 .data_to_vec()
1833 .await
1834 else {
1835 panic!("failed to export size={size}");
1836 };
1837 assert_eq!(&expected, &actual, "size={size}");
1838 }
1839 store.shutdown().await?;
1840 }
1841 Ok(())
1842 }
1843
1844 async fn test_batch(store: &Store) -> TestResult<()> {
1845 let batch = store.blobs().batch().await?;
1846 let tt1 = batch.temp_tag(Hash::new("foo")).await?;
1847 let tt2 = batch.add_slice("boo").await?;
1848 let tts = store
1849 .tags()
1850 .list_temp_tags()
1851 .await?
1852 .collect::<HashSet<_>>()
1853 .await;
1854 assert!(tts.contains(tt1.hash_and_format()));
1855 assert!(tts.contains(tt2.hash_and_format()));
1856 drop(batch);
1857 store.sync_db().await?;
1858 let tts = store
1859 .tags()
1860 .list_temp_tags()
1861 .await?
1862 .collect::<HashSet<_>>()
1863 .await;
1864 assert!(!tts.contains(tt1.hash_and_format()));
1866 assert!(!tts.contains(tt2.hash_and_format()));
1867 drop(tt1);
1868 drop(tt2);
1869 Ok(())
1870 }
1871
1872 #[tokio::test]
1873 async fn test_batch_fs() -> TestResult<()> {
1874 tracing_subscriber::fmt::try_init().ok();
1875 let testdir = tempfile::tempdir()?;
1876 let db_dir = testdir.path().join("db");
1877 let store = FsStore::load(db_dir).await?;
1878 test_batch(&store).await
1879 }
1880
1881 #[tokio::test]
1882 async fn smoke() -> TestResult<()> {
1883 tracing_subscriber::fmt::try_init().ok();
1884 let testdir = tempfile::tempdir()?;
1885 let db_dir = testdir.path().join("db");
1886 let store = FsStore::load(db_dir).await?;
1887 let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
1888 store.tags().set(Tag::from("test"), haf).await?;
1889 store.tags().set(Tag::from("boo"), haf).await?;
1890 store.tags().set(Tag::from("bar"), haf).await?;
1891 let sizes = INTERESTING_SIZES;
1892 let mut hashes = Vec::new();
1893 let mut data_by_hash = HashMap::new();
1894 let mut bao_by_hash = HashMap::new();
1895 for size in sizes {
1896 let data = vec![0u8; size];
1897 let data = Bytes::from(data);
1898 let tt = store.add_bytes(data.clone()).temp_tag().await?;
1899 data_by_hash.insert(*tt.hash(), data);
1900 hashes.push(tt);
1901 }
1902 store.sync_db().await?;
1903 for tt in &hashes {
1904 let hash = *tt.hash();
1905 let path = testdir.path().join(format!("{hash}.txt"));
1906 store.export(hash, path).await?;
1907 }
1908 for tt in &hashes {
1909 let hash = tt.hash();
1910 let data = store
1911 .export_bao(*hash, ChunkRanges::all())
1912 .data_to_vec()
1913 .await
1914 .unwrap();
1915 assert_eq!(data, data_by_hash[hash].to_vec());
1916 let bao = store
1917 .export_bao(*hash, ChunkRanges::all())
1918 .bao_to_vec()
1919 .await
1920 .unwrap();
1921 bao_by_hash.insert(*hash, bao);
1922 }
1923 store.dump().await?;
1924
1925 for size in sizes {
1926 let data = test_data(size);
1927 let ranges = ChunkRanges::all();
1928 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1929 store.import_bao_bytes(hash, ranges, bao).await?;
1930 }
1931
1932 for (_hash, _bao_tree) in bao_by_hash {
1933 }
1943 Ok(())
1944 }
1945
1946 pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
1947 let ext = extension.trim_start_matches('.').to_lowercase();
1949
1950 for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
1951 let path = entry.path();
1952
1953 if path.is_file() {
1954 if let Some(file_ext) = path.extension() {
1955 if file_ext.to_string_lossy().to_lowercase() == ext {
1956 println!("Deleting: {}", path.display());
1957 fs::remove_file(path)?;
1958 }
1959 }
1960 }
1961 }
1962
1963 Ok(())
1964 }
1965
1966 pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
1967 let mut entries: Vec<_> = WalkDir::new(&path)
1968 .into_iter()
1969 .filter_map(Result::ok) .collect();
1971
1972 entries.sort_by(|a, b| a.path().cmp(b.path()));
1974
1975 for entry in entries {
1976 let depth = entry.depth();
1977 let indent = " ".repeat(depth); let name = entry.file_name().to_string_lossy();
1979 let size = entry.metadata()?.len(); if entry.file_type().is_file() {
1982 println!("{indent}{name} ({size} bytes)");
1983 } else if entry.file_type().is_dir() {
1984 println!("{indent}{name}/");
1985 }
1986 }
1987 Ok(())
1988 }
1989
1990 pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
1991 let mut entries: Vec<_> = WalkDir::new(&path)
1992 .into_iter()
1993 .filter_map(Result::ok) .collect();
1995
1996 entries.sort_by(|a, b| a.path().cmp(b.path()));
1998
1999 for entry in entries {
2000 let depth = entry.depth();
2001 let indent = " ".repeat(depth);
2002 let name = entry.file_name().to_string_lossy();
2003
2004 if entry.file_type().is_dir() {
2005 println!("{indent}{name}/");
2006 } else if entry.file_type().is_file() {
2007 let size = entry.metadata()?.len();
2008 println!("{indent}{name} ({size} bytes)");
2009
2010 let path = entry.path();
2012 if name.ends_with(".data") {
2013 print!("{indent} ");
2014 dump_file(path, 1024 * 16)?;
2015 } else if name.ends_with(".obao4") {
2016 print!("{indent} ");
2017 dump_file(path, 64)?;
2018 } else if name.ends_with(".sizes4") {
2019 print!("{indent} ");
2020 dump_file(path, 8)?;
2021 } else if name.ends_with(".bitfield") {
2022 match read_checksummed::<Bitfield>(path) {
2023 Ok(bitfield) => {
2024 println!("{indent} bitfield: {bitfield:?}");
2025 }
2026 Err(cause) => {
2027 println!("{indent} bitfield: error: {cause}");
2028 }
2029 }
2030 } else {
2031 continue; };
2033 }
2034 }
2035 Ok(())
2036 }
2037
2038 pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2039 let bits = file_bits(path, chunk_size)?;
2040 println!("{}", print_bitfield_ansi(bits));
2041 Ok(())
2042 }
2043
2044 pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2045 let file = fs::File::open(&path)?;
2046 let file_size = file.metadata()?.len();
2047 let mut buffer = vec![0u8; chunk_size as usize];
2048 let mut bits = Vec::new();
2049
2050 let mut offset = 0u64;
2051 while offset < file_size {
2052 let remaining = file_size - offset;
2053 let current_chunk_size = chunk_size.min(remaining);
2054
2055 let chunk = &mut buffer[..current_chunk_size as usize];
2056 file.read_exact_at(offset, chunk)?;
2057
2058 let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2059 bits.push(has_non_zero);
2060
2061 offset += current_chunk_size;
2062 }
2063
2064 Ok(bits)
2065 }
2066
2067 #[allow(dead_code)]
2068 fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2069 bits.into_iter()
2070 .map(|bit| if bit { '#' } else { '_' })
2071 .collect()
2072 }
2073
2074 fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2075 let mut result = String::new();
2076 let mut iter = bits.into_iter();
2077
2078 while let Some(b1) = iter.next() {
2079 let b2 = iter.next();
2080
2081 let white_fg = "\x1b[97m"; let reset = "\x1b[0m"; let gray_bg = "\x1b[100m"; let black_bg = "\x1b[40m"; let colored_char = match (b1, b2) {
2088 (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), };
2095
2096 result.push_str(&colored_char);
2097 }
2098
2099 result.push_str("\x1b[0m");
2101 result
2102 }
2103
2104 fn bytes_to_stream(
2105 bytes: Bytes,
2106 chunk_size: usize,
2107 ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2108 assert!(chunk_size > 0, "Chunk size must be greater than 0");
2109 stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2110 if offset >= bytes.len() {
2111 None
2112 } else {
2113 let chunk_len = chunk_size.min(bytes.len() - offset);
2114 let chunk = bytes.slice(offset..offset + chunk_len);
2115 Some((Ok(chunk), (bytes, offset + chunk_len)))
2116 }
2117 })
2118 }
2119}