iroh_blobs/store/
fs.rs

1//! # File based blob store.
2//!
3//! A file based blob store needs a writeable directory to work with.
4//!
5//! General design:
6//!
7//! The file store consists of two actors.
8//!
9//! # The main actor
10//!
11//! The purpose of the main actor is to handle user commands and own a map of
12//! handles for hashes that are currently being worked on.
13//!
14//! It also owns tasks for ongoing import and export operations, as well as the
15//! database actor.
16//!
17//! Handling a command almost always involves either forwarding it to the
18//! database actor or creating a hash context and spawning a task.
19//!
20//! # The database actor
21//!
22//! The database actor is responsible for storing metadata about each hash,
23//! as well as inlined data and outboard data for small files.
24//!
25//! In addition to the metadata, the database actor also stores tags.
26//!
27//! # Tasks
28//!
29//! Tasks do not return a result. They are responsible for sending an error
30//! to the requester if possible. Otherwise, just dropping the sender will
31//! also fail the receiver, but without a descriptive error message.
32//!
33//! Tasks are usually implemented as an impl fn that does return a result,
34//! and a wrapper (named `..._task`) that just forwards the error, if any.
35//!
36//! That way you can use `?` syntax in the task implementation. The impl fns
37//! are also easier to test.
38//!
39//! # Context
40//!
41//! The main actor holds a TaskContext that is needed for almost all tasks,
42//! such as the config and a way to interact with the database.
43//!
44//! For tasks that are specific to a hash, a HashContext combines the task
45//! context with a slot from the table of the main actor that can be used
46//! to obtain an unique handle for the hash.
47//!
48//! # Runtime
49//!
50//! The fs store owns and manages its own tokio runtime. Dropping the store
51//! will clean up the database and shut down the runtime. However, some parts
52//! of the persistent state won't make it to disk, so operations involving large
53//! partial blobs will have a large initial delay on the next startup.
54//!
55//! It is also not guaranteed that all write operations will make it to disk.
56//! The on-disk store will be in a consistent state, but might miss some writes
57//! in the last seconds before shutdown.
58//!
59//! To avoid this, you can use the [`crate::api::Store::shutdown`] method to
60//! cleanly shut down the store and save ephemeral state to disk.
61//!
62//! Note that if you use the store inside a [`iroh::protocol::Router`] and shut
63//! down the router using [`iroh::protocol::Router::shutdown`], the store will be
64//! safely shut down as well. Any store refs you are holding will be inoperable
65//! after this.
66use std::{
67    collections::{HashMap, HashSet},
68    fmt, fs,
69    future::Future,
70    io::Write,
71    num::NonZeroU64,
72    ops::Deref,
73    path::{Path, PathBuf},
74    sync::Arc,
75};
76
77use bao_tree::{
78    io::{
79        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
80        sync::ReadAt,
81        BaoContentItem, Leaf,
82    },
83    ChunkNum, ChunkRanges,
84};
85use bytes::Bytes;
86use delete_set::{BaoFilePart, ProtectHandle};
87use entry_state::{DataLocation, OutboardLocation};
88use gc::run_gc;
89use import::{ImportEntry, ImportSource};
90use irpc::channel::mpsc;
91use meta::{list_blobs, Snapshot};
92use n0_future::{future::yield_now, io};
93use nested_enum_utils::enum_conversions;
94use range_collections::range_set::RangeSetRange;
95use tokio::task::{Id, JoinError, JoinSet};
96use tracing::{error, instrument, trace};
97
98use crate::{
99    api::{
100        proto::{
101            self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
102            CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
103            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
104            ImportBaoRequest, ObserveMsg, Scope,
105        },
106        ApiClient,
107    },
108    store::{
109        util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
110        Hash,
111    },
112    util::{
113        channel::oneshot,
114        temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
115        ChunkRangesExt,
116    },
117};
118mod bao_file;
119use bao_file::{BaoFileHandle, BaoFileHandleWeak};
120mod delete_set;
121mod entry_state;
122mod import;
123mod meta;
124pub mod options;
125pub(crate) mod util;
126use entry_state::EntryState;
127use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
128use options::Options;
129use tracing::Instrument;
130mod gc;
131
132use super::HashAndFormat;
133use crate::api::{
134    self,
135    blobs::{AddProgressItem, ExportMode, ExportProgressItem},
136    Store,
137};
138
139/// Create a 16 byte unique ID.
140fn new_uuid() -> [u8; 16] {
141    use rand::RngCore;
142    let mut rng = rand::thread_rng();
143    let mut bytes = [0u8; 16];
144    rng.fill_bytes(&mut bytes);
145    bytes
146}
147
148/// Create temp file name based on a 16 byte UUID.
149fn temp_name() -> String {
150    format!("{}.temp", hex::encode(new_uuid()))
151}
152
153#[derive(Debug)]
154#[enum_conversions()]
155pub(crate) enum InternalCommand {
156    Dump(meta::Dump),
157    FinishImport(ImportEntryMsg),
158    ClearScope(ClearScope),
159}
160
161#[derive(Debug)]
162pub(crate) struct ClearScope {
163    pub scope: Scope,
164}
165
166impl InternalCommand {
167    pub fn parent_span(&self) -> tracing::Span {
168        match self {
169            Self::Dump(_) => tracing::Span::current(),
170            Self::ClearScope(_) => tracing::Span::current(),
171            Self::FinishImport(cmd) => cmd
172                .parent_span_opt()
173                .cloned()
174                .unwrap_or_else(tracing::Span::current),
175        }
176    }
177}
178
179/// Context needed by most tasks
180#[derive(Debug)]
181struct TaskContext {
182    // Store options such as paths and inline thresholds, in an Arc to cheaply share with tasks.
183    pub options: Arc<Options>,
184    // Metadata database, basically a mpsc sender with some extra functionality.
185    pub db: meta::Db,
186    // Handle to send internal commands
187    pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
188    /// The file handle for the empty hash.
189    pub empty: BaoFileHandle,
190    /// Handle to protect files from deletion.
191    pub protect: ProtectHandle,
192}
193
194impl TaskContext {
195    pub async fn clear_scope(&self, scope: Scope) {
196        self.internal_cmd_tx
197            .send(ClearScope { scope }.into())
198            .await
199            .ok();
200    }
201}
202
203#[derive(Debug)]
204struct Actor {
205    // Context that can be cheaply shared with tasks.
206    context: Arc<TaskContext>,
207    // Receiver for incoming user commands.
208    cmd_rx: tokio::sync::mpsc::Receiver<Command>,
209    // Receiver for incoming file store specific commands.
210    fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
211    // Tasks for import and export operations.
212    tasks: JoinSet<()>,
213    // Running tasks
214    running: HashSet<Id>,
215    // handles
216    handles: HashMap<Hash, Slot>,
217    // temp tags
218    temp_tags: TempTags,
219    // our private tokio runtime. It has to live somewhere.
220    _rt: RtWrapper,
221}
222
223/// Wraps a slot and the task context.
224///
225/// This contains everything a hash-specific task should need.
226struct HashContext {
227    slot: Slot,
228    ctx: Arc<TaskContext>,
229}
230
231impl HashContext {
232    pub fn db(&self) -> &meta::Db {
233        &self.ctx.db
234    }
235
236    pub fn options(&self) -> &Arc<Options> {
237        &self.ctx.options
238    }
239
240    pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option<BaoFileHandleWeak>> {
241        self.slot.0.lock().await
242    }
243
244    pub fn protect(&self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
245        self.ctx.protect.protect(hash, parts);
246    }
247
248    /// Update the entry state in the database, and wait for completion.
249    pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250        let (tx, rx) = oneshot::channel();
251        self.db()
252            .send(
253                meta::Update {
254                    hash,
255                    state,
256                    tx: Some(tx),
257                    span: tracing::Span::current(),
258                }
259                .into(),
260            )
261            .await?;
262        rx.await.map_err(|_e| io::Error::other(""))??;
263        Ok(())
264    }
265
266    pub async fn get_entry_state(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
267        if hash == Hash::EMPTY {
268            return Ok(Some(EntryState::Complete {
269                data_location: DataLocation::Inline(Bytes::new()),
270                outboard_location: OutboardLocation::NotNeeded,
271            }));
272        }
273        let (tx, rx) = oneshot::channel();
274        self.db()
275            .send(
276                meta::Get {
277                    hash,
278                    tx,
279                    span: tracing::Span::current(),
280                }
281                .into(),
282            )
283            .await
284            .ok();
285        let res = rx.await.map_err(io::Error::other)?;
286        Ok(res.state?)
287    }
288
289    /// Update the entry state in the database, and wait for completion.
290    pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291        let (tx, rx) = oneshot::channel();
292        self.db()
293            .send(
294                meta::Set {
295                    hash,
296                    state,
297                    tx,
298                    span: tracing::Span::current(),
299                }
300                .into(),
301            )
302            .await
303            .map_err(io::Error::other)?;
304        rx.await.map_err(|_e| io::Error::other(""))??;
305        Ok(())
306    }
307
308    pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
309        if create {
310            self.get_or_create(hash).await
311        } else {
312            self.get(hash).await
313        }
314    }
315
316    pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
317        if hash == Hash::EMPTY {
318            return Ok(self.ctx.empty.clone());
319        }
320        let res = self
321            .slot
322            .get_or_create(|| async {
323                let res = self.db().get(hash).await.map_err(io::Error::other)?;
324                let res = match res {
325                    Some(state) => open_bao_file(&hash, state, &self.ctx).await,
326                    None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
327                };
328                Ok((res?, ()))
329            })
330            .await
331            .map_err(api::Error::from);
332        let (res, _) = res?;
333        Ok(res)
334    }
335
336    pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
337        if hash == Hash::EMPTY {
338            return Ok(self.ctx.empty.clone());
339        }
340        let res = self
341            .slot
342            .get_or_create(|| async {
343                let res = self.db().get(hash).await.map_err(io::Error::other)?;
344                let res = match res {
345                    Some(state) => open_bao_file(&hash, state, &self.ctx).await,
346                    None => Ok(BaoFileHandle::new_partial_mem(
347                        hash,
348                        self.ctx.options.clone(),
349                    )),
350                };
351                Ok((res?, ()))
352            })
353            .await
354            .map_err(api::Error::from);
355        trace!("{res:?}");
356        let (res, _) = res?;
357        Ok(res)
358    }
359}
360
361async fn open_bao_file(
362    hash: &Hash,
363    state: EntryState<Bytes>,
364    ctx: &TaskContext,
365) -> io::Result<BaoFileHandle> {
366    let options = &ctx.options;
367    Ok(match state {
368        EntryState::Complete {
369            data_location,
370            outboard_location,
371        } => {
372            let data = match data_location {
373                DataLocation::Inline(data) => MemOrFile::Mem(data),
374                DataLocation::Owned(size) => {
375                    let path = options.path.data_path(hash);
376                    let file = fs::File::open(&path)?;
377                    MemOrFile::File(FixedSize::new(file, size))
378                }
379                DataLocation::External(paths, size) => {
380                    let Some(path) = paths.into_iter().next() else {
381                        return Err(io::Error::other("no external data path"));
382                    };
383                    let file = fs::File::open(&path)?;
384                    MemOrFile::File(FixedSize::new(file, size))
385                }
386            };
387            let outboard = match outboard_location {
388                OutboardLocation::NotNeeded => MemOrFile::empty(),
389                OutboardLocation::Inline(data) => MemOrFile::Mem(data),
390                OutboardLocation::Owned => {
391                    let path = options.path.outboard_path(hash);
392                    let file = fs::File::open(&path)?;
393                    MemOrFile::File(file)
394                }
395            };
396            BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
397        }
398        EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
399    })
400}
401
402/// An entry for each hash, containing a weak reference to a BaoFileHandle
403/// wrapped in a tokio mutex so handle creation is sequential.
404#[derive(Debug, Clone, Default)]
405pub(crate) struct Slot(Arc<tokio::sync::Mutex<Option<BaoFileHandleWeak>>>);
406
407impl Slot {
408    pub async fn is_live(&self) -> bool {
409        let slot = self.0.lock().await;
410        slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false)
411    }
412
413    /// Get the handle if it exists and is still alive, otherwise load it from the database.
414    /// If there is nothing in the database, create a new in-memory handle.
415    ///
416    /// `make` will be called if the a live handle does not exist.
417    pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
418    where
419        F: FnOnce() -> Fut,
420        Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
421        T: Default,
422    {
423        let mut slot = self.0.lock().await;
424        if let Some(weak) = &*slot {
425            if let Some(handle) = weak.upgrade() {
426                return Ok((handle, Default::default()));
427            }
428        }
429        let handle = make().await;
430        if let Ok((handle, _)) = &handle {
431            *slot = Some(handle.downgrade());
432        }
433        handle
434    }
435}
436
437impl Actor {
438    fn db(&self) -> &meta::Db {
439        &self.context.db
440    }
441
442    fn context(&self) -> Arc<TaskContext> {
443        self.context.clone()
444    }
445
446    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
447        let span = tracing::Span::current();
448        let id = self.tasks.spawn(fut.instrument(span)).id();
449        self.running.insert(id);
450    }
451
452    fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
453        match res {
454            Ok((id, _)) => {
455                // println!("task {id} finished");
456                self.running.remove(&id);
457                // println!("{:?}", self.running);
458            }
459            Err(e) => {
460                error!("task failed: {e}");
461            }
462        }
463    }
464
465    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
466        let CreateTempTagMsg { tx, inner, .. } = cmd;
467        let mut tt = self.temp_tags.create(inner.scope, inner.value);
468        if tx.is_rpc() {
469            tt.leak();
470        }
471        tx.send(tt).await.ok();
472    }
473
474    async fn clear_dead_handles(&mut self) {
475        let mut to_remove = Vec::new();
476        for (hash, slot) in &self.handles {
477            if !slot.is_live().await {
478                to_remove.push(*hash);
479            }
480        }
481        for hash in to_remove {
482            if let Some(slot) = self.handles.remove(&hash) {
483                // do a quick check if the handle has become alive in the meantime, and reinsert it
484                let guard = slot.0.lock().await;
485                let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
486                if is_live {
487                    drop(guard);
488                    self.handles.insert(hash, slot);
489                }
490            }
491        }
492    }
493
494    async fn handle_command(&mut self, cmd: Command) {
495        let span = cmd.parent_span();
496        let _entered = span.enter();
497        match cmd {
498            Command::SyncDb(cmd) => {
499                trace!("{cmd:?}");
500                self.db().send(cmd.into()).await.ok();
501            }
502            Command::Shutdown(cmd) => {
503                trace!("{cmd:?}");
504                self.db().send(cmd.into()).await.ok();
505            }
506            Command::CreateTag(cmd) => {
507                trace!("{cmd:?}");
508                self.db().send(cmd.into()).await.ok();
509            }
510            Command::SetTag(cmd) => {
511                trace!("{cmd:?}");
512                self.db().send(cmd.into()).await.ok();
513            }
514            Command::ListTags(cmd) => {
515                trace!("{cmd:?}");
516                self.db().send(cmd.into()).await.ok();
517            }
518            Command::DeleteTags(cmd) => {
519                trace!("{cmd:?}");
520                self.db().send(cmd.into()).await.ok();
521            }
522            Command::RenameTag(cmd) => {
523                trace!("{cmd:?}");
524                self.db().send(cmd.into()).await.ok();
525            }
526            Command::ClearProtected(cmd) => {
527                trace!("{cmd:?}");
528                self.clear_dead_handles().await;
529                self.db().send(cmd.into()).await.ok();
530            }
531            Command::BlobStatus(cmd) => {
532                trace!("{cmd:?}");
533                self.db().send(cmd.into()).await.ok();
534            }
535            Command::ListBlobs(cmd) => {
536                trace!("{cmd:?}");
537                let (tx, rx) = tokio::sync::oneshot::channel();
538                self.db()
539                    .send(
540                        Snapshot {
541                            tx,
542                            span: cmd.span.clone(),
543                        }
544                        .into(),
545                    )
546                    .await
547                    .ok();
548                if let Ok(snapshot) = rx.await {
549                    self.spawn(list_blobs(snapshot, cmd));
550                }
551            }
552            Command::DeleteBlobs(cmd) => {
553                trace!("{cmd:?}");
554                self.db().send(cmd.into()).await.ok();
555            }
556            Command::Batch(cmd) => {
557                trace!("{cmd:?}");
558                let (id, scope) = self.temp_tags.create_scope();
559                self.spawn(handle_batch(cmd, id, scope, self.context()));
560            }
561            Command::CreateTempTag(cmd) => {
562                trace!("{cmd:?}");
563                self.create_temp_tag(cmd).await;
564            }
565            Command::ListTempTags(cmd) => {
566                trace!("{cmd:?}");
567                let tts = self.temp_tags.list();
568                cmd.tx.send(tts).await.ok();
569            }
570            Command::ImportBytes(cmd) => {
571                trace!("{cmd:?}");
572                self.spawn(import_bytes(cmd, self.context()));
573            }
574            Command::ImportByteStream(cmd) => {
575                trace!("{cmd:?}");
576                self.spawn(import_byte_stream(cmd, self.context()));
577            }
578            Command::ImportPath(cmd) => {
579                trace!("{cmd:?}");
580                self.spawn(import_path(cmd, self.context()));
581            }
582            Command::ExportPath(cmd) => {
583                trace!("{cmd:?}");
584                let ctx = self.hash_context(cmd.hash);
585                self.spawn(export_path(cmd, ctx));
586            }
587            Command::ExportBao(cmd) => {
588                trace!("{cmd:?}");
589                let ctx = self.hash_context(cmd.hash);
590                self.spawn(export_bao(cmd, ctx));
591            }
592            Command::ExportRanges(cmd) => {
593                trace!("{cmd:?}");
594                let ctx = self.hash_context(cmd.hash);
595                self.spawn(export_ranges(cmd, ctx));
596            }
597            Command::ImportBao(cmd) => {
598                trace!("{cmd:?}");
599                let ctx = self.hash_context(cmd.hash);
600                self.spawn(import_bao(cmd, ctx));
601            }
602            Command::Observe(cmd) => {
603                trace!("{cmd:?}");
604                let ctx = self.hash_context(cmd.hash);
605                self.spawn(observe(cmd, ctx));
606            }
607        }
608    }
609
610    /// Create a hash context for a given hash.
611    fn hash_context(&mut self, hash: Hash) -> HashContext {
612        HashContext {
613            slot: self.handles.entry(hash).or_default().clone(),
614            ctx: self.context.clone(),
615        }
616    }
617
618    async fn handle_fs_command(&mut self, cmd: InternalCommand) {
619        let span = cmd.parent_span();
620        let _entered = span.enter();
621        match cmd {
622            InternalCommand::Dump(cmd) => {
623                trace!("{cmd:?}");
624                self.db().send(cmd.into()).await.ok();
625            }
626            InternalCommand::ClearScope(cmd) => {
627                trace!("{cmd:?}");
628                self.temp_tags.end_scope(cmd.scope);
629            }
630            InternalCommand::FinishImport(cmd) => {
631                trace!("{cmd:?}");
632                if cmd.hash == Hash::EMPTY {
633                    cmd.tx
634                        .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
635                        .await
636                        .ok();
637                } else {
638                    let tt = self.temp_tags.create(
639                        cmd.scope,
640                        HashAndFormat {
641                            hash: cmd.hash,
642                            format: cmd.format,
643                        },
644                    );
645                    let ctx = self.hash_context(cmd.hash);
646                    self.spawn(finish_import(cmd, tt, ctx));
647                }
648            }
649        }
650    }
651
652    async fn run(mut self) {
653        loop {
654            tokio::select! {
655                cmd = self.cmd_rx.recv() => {
656                    let Some(cmd) = cmd else {
657                        break;
658                    };
659                    self.handle_command(cmd).await;
660                }
661                Some(cmd) = self.fs_cmd_rx.recv() => {
662                    self.handle_fs_command(cmd).await;
663                }
664                Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
665                    self.log_task_result(res);
666                }
667            }
668        }
669    }
670
671    async fn new(
672        db_path: PathBuf,
673        rt: RtWrapper,
674        cmd_rx: tokio::sync::mpsc::Receiver<Command>,
675        fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
676        fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
677        options: Arc<Options>,
678    ) -> anyhow::Result<Self> {
679        trace!(
680            "creating data directory: {}",
681            options.path.data_path.display()
682        );
683        fs::create_dir_all(&options.path.data_path)?;
684        trace!(
685            "creating temp directory: {}",
686            options.path.temp_path.display()
687        );
688        fs::create_dir_all(&options.path.temp_path)?;
689        trace!(
690            "creating parent directory for db file{}",
691            db_path.parent().unwrap().display()
692        );
693        fs::create_dir_all(db_path.parent().unwrap())?;
694        let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
695        let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
696        let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
697        let slot_context = Arc::new(TaskContext {
698            options: options.clone(),
699            db: meta::Db::new(db_send),
700            internal_cmd_tx: fs_commands_tx,
701            empty: BaoFileHandle::new_complete(
702                Hash::EMPTY,
703                MemOrFile::empty(),
704                MemOrFile::empty(),
705                options,
706            ),
707            protect,
708        });
709        rt.spawn(db_actor.run());
710        Ok(Self {
711            context: slot_context,
712            cmd_rx,
713            fs_cmd_rx: fs_commands_rx,
714            tasks: JoinSet::new(),
715            running: HashSet::new(),
716            handles: Default::default(),
717            temp_tags: Default::default(),
718            _rt: rt,
719        })
720    }
721}
722
723struct RtWrapper(Option<tokio::runtime::Runtime>);
724
725impl From<tokio::runtime::Runtime> for RtWrapper {
726    fn from(rt: tokio::runtime::Runtime) -> Self {
727        Self(Some(rt))
728    }
729}
730
731impl fmt::Debug for RtWrapper {
732    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733        ValueOrPoisioned(self.0.as_ref()).fmt(f)
734    }
735}
736
737impl Deref for RtWrapper {
738    type Target = tokio::runtime::Runtime;
739
740    fn deref(&self) -> &Self::Target {
741        self.0.as_ref().unwrap()
742    }
743}
744
745impl Drop for RtWrapper {
746    fn drop(&mut self) {
747        if let Some(rt) = self.0.take() {
748            trace!("dropping tokio runtime");
749            tokio::task::block_in_place(|| {
750                drop(rt);
751            });
752            trace!("dropped tokio runtime");
753        }
754    }
755}
756
757async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
758    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
759        error!("batch failed: {cause}");
760    }
761    ctx.clear_scope(id).await;
762}
763
764async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
765    let BatchMsg { tx, mut rx, .. } = cmd;
766    trace!("created scope {}", id);
767    tx.send(id).await.map_err(api::Error::other)?;
768    while let Some(msg) = rx.recv().await? {
769        match msg {
770            BatchResponse::Drop(msg) => scope.on_drop(&msg),
771            BatchResponse::Ping => {}
772        }
773    }
774    Ok(())
775}
776
777#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
778async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) {
779    let res = match finish_import_impl(cmd.inner, ctx).await {
780        Ok(()) => {
781            // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
782            // it will be cleaned up when either the process exits or scope ends
783            if cmd.tx.is_rpc() {
784                trace!("leaking temp tag {}", tt.hash_and_format());
785                tt.leak();
786            }
787            AddProgressItem::Done(tt)
788        }
789        Err(cause) => AddProgressItem::Error(cause),
790    };
791    cmd.tx.send(res).await.ok();
792}
793
794async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> {
795    let ImportEntry {
796        source,
797        hash,
798        outboard,
799        ..
800    } = import_data;
801    let options = ctx.options();
802    match &source {
803        ImportSource::Memory(data) => {
804            debug_assert!(options.is_inlined_data(data.len() as u64));
805        }
806        ImportSource::External(_, _, size) => {
807            debug_assert!(!options.is_inlined_data(*size));
808        }
809        ImportSource::TempFile(_, _, size) => {
810            debug_assert!(!options.is_inlined_data(*size));
811        }
812    }
813    let guard = ctx.lock().await;
814    let handle = guard.as_ref().and_then(|x| x.upgrade());
815    // if I do have an existing handle, I have to possibly deal with observers.
816    // if I don't have an existing handle, there are 2 cases:
817    //   the entry exists in the db, but we don't have a handle
818    //   the entry does not exist at all.
819    // convert the import source to a data location and drop the open files
820    ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]);
821    let data_location = match source {
822        ImportSource::Memory(data) => DataLocation::Inline(data),
823        ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
824        ImportSource::TempFile(path, _file, size) => {
825            // this will always work on any unix, but on windows there might be an issue if the target file is open!
826            // possibly open with FILE_SHARE_DELETE on windows?
827            let target = ctx.options().path.data_path(&hash);
828            trace!(
829                "moving temp file to owned data location: {} -> {}",
830                path.display(),
831                target.display()
832            );
833            if let Err(cause) = fs::rename(&path, &target) {
834                error!(
835                    "failed to move temp file {} to owned data location {}: {cause}",
836                    path.display(),
837                    target.display()
838                );
839            }
840            DataLocation::Owned(size)
841        }
842    };
843    let outboard_location = match outboard {
844        MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
845        MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
846        MemOrFile::File(path) => {
847            // the same caveat as above applies here
848            let target = ctx.options().path.outboard_path(&hash);
849            trace!(
850                "moving temp file to owned outboard location: {} -> {}",
851                path.display(),
852                target.display()
853            );
854            if let Err(cause) = fs::rename(&path, &target) {
855                error!(
856                    "failed to move temp file {} to owned outboard location {}: {cause}",
857                    path.display(),
858                    target.display()
859                );
860            }
861            OutboardLocation::Owned
862        }
863    };
864    if let Some(handle) = handle {
865        let data = match &data_location {
866            DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
867            DataLocation::Owned(size) => {
868                let path = ctx.options().path.data_path(&hash);
869                let file = fs::File::open(&path)?;
870                MemOrFile::File(FixedSize::new(file, *size))
871            }
872            DataLocation::External(paths, size) => {
873                let Some(path) = paths.iter().next() else {
874                    return Err(io::Error::other("no external data path"));
875                };
876                let file = fs::File::open(path)?;
877                MemOrFile::File(FixedSize::new(file, *size))
878            }
879        };
880        let outboard = match &outboard_location {
881            OutboardLocation::NotNeeded => MemOrFile::empty(),
882            OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
883            OutboardLocation::Owned => {
884                let path = ctx.options().path.outboard_path(&hash);
885                let file = fs::File::open(&path)?;
886                MemOrFile::File(file)
887            }
888        };
889        handle.complete(data, outboard);
890    }
891    let state = EntryState::Complete {
892        data_location,
893        outboard_location,
894    };
895    ctx.update(hash, state).await?;
896    Ok(())
897}
898
899#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
900async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) {
901    trace!("{cmd:?}");
902    let ImportBaoMsg {
903        inner: ImportBaoRequest { size, hash },
904        rx,
905        tx,
906        ..
907    } = cmd;
908    let res = match ctx.get_or_create(hash).await {
909        Ok(handle) => import_bao_impl(size, rx, handle, ctx).await,
910        Err(cause) => Err(cause),
911    };
912    trace!("{res:?}");
913    tx.send(res).await.ok();
914}
915
916fn chunk_range(leaf: &Leaf) -> ChunkRanges {
917    let start = ChunkNum::chunks(leaf.offset);
918    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
919    (start..end).into()
920}
921
922async fn import_bao_impl(
923    size: NonZeroU64,
924    mut rx: mpsc::Receiver<BaoContentItem>,
925    handle: BaoFileHandle,
926    ctx: HashContext,
927) -> api::Result<()> {
928    trace!(
929        "importing bao: {} {} bytes",
930        handle.hash().fmt_short(),
931        size
932    );
933    let mut batch = Vec::<BaoContentItem>::new();
934    let mut ranges = ChunkRanges::empty();
935    while let Some(item) = rx.recv().await? {
936        // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
937        if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
938            let bitfield = Bitfield::new_unchecked(ranges, size.into());
939            handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
940            batch.clear();
941            ranges = ChunkRanges::empty();
942        }
943        if let BaoContentItem::Leaf(leaf) = &item {
944            let leaf_range = chunk_range(leaf);
945            if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
946            {
947                return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
948            }
949            ranges |= leaf_range;
950        }
951        batch.push(item);
952    }
953    if !batch.is_empty() {
954        let bitfield = Bitfield::new_unchecked(ranges, size.into());
955        handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
956    }
957    Ok(())
958}
959
960#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
961async fn observe(cmd: ObserveMsg, ctx: HashContext) {
962    let Ok(handle) = ctx.get_or_create(cmd.hash).await else {
963        return;
964    };
965    handle.subscribe().forward(cmd.tx).await.ok();
966}
967
968#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
969async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
970    match ctx.get(cmd.hash).await {
971        Ok(handle) => {
972            if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
973                cmd.tx
974                    .send(ExportRangesItem::Error(cause.into()))
975                    .await
976                    .ok();
977            }
978        }
979        Err(cause) => {
980            cmd.tx.send(ExportRangesItem::Error(cause)).await.ok();
981        }
982    }
983}
984
985async fn export_ranges_impl(
986    cmd: ExportRangesRequest,
987    tx: &mut mpsc::Sender<ExportRangesItem>,
988    handle: BaoFileHandle,
989) -> io::Result<()> {
990    let ExportRangesRequest { ranges, hash } = cmd;
991    trace!(
992        "exporting ranges: {hash} {ranges:?} size={}",
993        handle.current_size()?
994    );
995    debug_assert!(handle.hash() == hash, "hash mismatch");
996    let bitfield = handle.bitfield()?;
997    let data = handle.data_reader();
998    let size = bitfield.size();
999    for range in ranges.iter() {
1000        let range = match range {
1001            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1002            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1003        };
1004        let requested = ChunkRanges::bytes(range.start..range.end);
1005        if !bitfield.ranges.is_superset(&requested) {
1006            return Err(io::Error::other(format!(
1007                "missing range: {requested:?}, present: {bitfield:?}",
1008            )));
1009        }
1010        let bs = 1024;
1011        let mut offset = range.start;
1012        loop {
1013            let end: u64 = (offset + bs).min(range.end);
1014            let size = (end - offset) as usize;
1015            tx.send(ExportRangesItem::Data(Leaf {
1016                offset,
1017                data: data.read_bytes_at(offset, size)?,
1018            }))
1019            .await?;
1020            offset = end;
1021            if offset >= range.end {
1022                break;
1023            }
1024        }
1025    }
1026    Ok(())
1027}
1028
1029#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1030async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1031    match ctx.get_maybe_create(cmd.hash, false).await {
1032        Ok(handle) => {
1033            if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
1034                cmd.tx
1035                    .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1036                    .await
1037                    .ok();
1038            }
1039        }
1040        Err(cause) => {
1041            let crate::api::Error::Io(cause) = cause;
1042            cmd.tx
1043                .send(bao_tree::io::EncodeError::Io(cause).into())
1044                .await
1045                .ok();
1046        }
1047    }
1048}
1049
1050async fn export_bao_impl(
1051    cmd: ExportBaoRequest,
1052    tx: &mut mpsc::Sender<EncodedItem>,
1053    handle: BaoFileHandle,
1054) -> anyhow::Result<()> {
1055    let ExportBaoRequest { ranges, hash, .. } = cmd;
1056    debug_assert!(handle.hash() == hash, "hash mismatch");
1057    let outboard = handle.outboard()?;
1058    let size = outboard.tree.size();
1059    if size == 0 && hash != Hash::EMPTY {
1060        // we have no data whatsoever, so we stop here
1061        return Ok(());
1062    }
1063    trace!("exporting bao: {hash} {ranges:?} size={size}",);
1064    let data = handle.data_reader();
1065    let tx = BaoTreeSender::new(tx);
1066    traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1067    Ok(())
1068}
1069
1070#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1071async fn export_path(cmd: ExportPathMsg, ctx: HashContext) {
1072    let ExportPathMsg { inner, mut tx, .. } = cmd;
1073    if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await {
1074        tx.send(cause.into()).await.ok();
1075    }
1076}
1077
1078async fn export_path_impl(
1079    cmd: ExportPathRequest,
1080    tx: &mut mpsc::Sender<ExportProgressItem>,
1081    ctx: HashContext,
1082) -> api::Result<()> {
1083    let ExportPathRequest { mode, target, .. } = cmd;
1084    if !target.is_absolute() {
1085        return Err(api::Error::io(
1086            io::ErrorKind::InvalidInput,
1087            "path is not absolute",
1088        ));
1089    }
1090    if let Some(parent) = target.parent() {
1091        fs::create_dir_all(parent)?;
1092    }
1093    let _guard = ctx.lock().await;
1094    let state = ctx.get_entry_state(cmd.hash).await?;
1095    let (data_location, outboard_location) = match state {
1096        Some(EntryState::Complete {
1097            data_location,
1098            outboard_location,
1099        }) => (data_location, outboard_location),
1100        Some(EntryState::Partial { .. }) => {
1101            return Err(api::Error::io(
1102                io::ErrorKind::InvalidInput,
1103                "cannot export partial entry",
1104            ));
1105        }
1106        None => {
1107            return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1108        }
1109    };
1110    trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1111    let data = match data_location {
1112        DataLocation::Inline(data) => MemOrFile::Mem(data),
1113        DataLocation::Owned(size) => {
1114            MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size))
1115        }
1116        DataLocation::External(paths, size) => MemOrFile::File((
1117            paths
1118                .into_iter()
1119                .next()
1120                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no external data path"))?,
1121            size,
1122        )),
1123    };
1124    let size = match &data {
1125        MemOrFile::Mem(data) => data.len() as u64,
1126        MemOrFile::File((_, size)) => *size,
1127    };
1128    tx.send(ExportProgressItem::Size(size))
1129        .await
1130        .map_err(api::Error::other)?;
1131    match data {
1132        MemOrFile::Mem(data) => {
1133            let mut target = fs::File::create(&target)?;
1134            target.write_all(&data)?;
1135        }
1136        MemOrFile::File((source_path, size)) => match mode {
1137            ExportMode::Copy => {
1138                let source = fs::File::open(&source_path)?;
1139                let mut target = fs::File::create(&target)?;
1140                copy_with_progress(&source, size, &mut target, tx).await?
1141            }
1142            ExportMode::TryReference => {
1143                match std::fs::rename(&source_path, &target) {
1144                    Ok(()) => {}
1145                    Err(cause) => {
1146                        const ERR_CROSS: i32 = 18;
1147                        if cause.raw_os_error() == Some(ERR_CROSS) {
1148                            let source = fs::File::open(&source_path)?;
1149                            let mut target = fs::File::create(&target)?;
1150                            copy_with_progress(&source, size, &mut target, tx).await?;
1151                        } else {
1152                            return Err(cause.into());
1153                        }
1154                    }
1155                }
1156                ctx.set(
1157                    cmd.hash,
1158                    EntryState::Complete {
1159                        data_location: DataLocation::External(vec![target], size),
1160                        outboard_location,
1161                    },
1162                )
1163                .await?;
1164            }
1165        },
1166    }
1167    tx.send(ExportProgressItem::Done)
1168        .await
1169        .map_err(api::Error::other)?;
1170    Ok(())
1171}
1172
1173async fn copy_with_progress(
1174    file: impl ReadAt,
1175    size: u64,
1176    target: &mut impl Write,
1177    tx: &mut mpsc::Sender<ExportProgressItem>,
1178) -> io::Result<()> {
1179    let mut offset = 0;
1180    let mut buf = vec![0u8; 1024 * 1024];
1181    while offset < size {
1182        let remaining = buf.len().min((size - offset) as usize);
1183        let buf: &mut [u8] = &mut buf[..remaining];
1184        file.read_exact_at(offset, buf)?;
1185        target.write_all(buf)?;
1186        tx.try_send(ExportProgressItem::CopyProgress(offset))
1187            .await
1188            .map_err(|_e| io::Error::other(""))?;
1189        yield_now().await;
1190        offset += buf.len() as u64;
1191    }
1192    Ok(())
1193}
1194
1195impl FsStore {
1196    /// Load or create a new store.
1197    pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1198        let path = root.as_ref();
1199        let db_path = path.join("blobs.db");
1200        let options = Options::new(path);
1201        Self::load_with_opts(db_path, options).await
1202    }
1203
1204    /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
1205    pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1206        let rt = tokio::runtime::Builder::new_multi_thread()
1207            .thread_name("iroh-blob-store")
1208            .enable_time()
1209            .build()?;
1210        let handle = rt.handle().clone();
1211        let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1212        let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1213        let gc_config = options.gc.clone();
1214        let actor = handle
1215            .spawn(Actor::new(
1216                db_path,
1217                rt.into(),
1218                commands_rx,
1219                fs_commands_rx,
1220                fs_commands_tx.clone(),
1221                Arc::new(options),
1222            ))
1223            .await??;
1224        handle.spawn(actor.run());
1225        let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1226        if let Some(config) = gc_config {
1227            handle.spawn(run_gc(store.deref().clone(), config));
1228        }
1229        Ok(store)
1230    }
1231}
1232
1233/// A file based store.
1234///
1235/// A store can be created using [`load`](FsStore::load) or [`load_with_opts`](FsStore::load_with_opts).
1236/// Load will use the default options and create the required directories, while load_with_opts allows
1237/// you to customize the options and the location of the database. Both variants will create the database
1238/// if it does not exist, and load an existing database if one is found at the configured location.
1239///
1240/// In addition to implementing the [`Store`](`crate::api::Store`) API via [`Deref`](`std::ops::Deref`),
1241/// there are a few additional methods that are specific to file based stores, such as [`dump`](FsStore::dump).
1242#[derive(Debug, Clone)]
1243pub struct FsStore {
1244    sender: ApiClient,
1245    db: tokio::sync::mpsc::Sender<InternalCommand>,
1246}
1247
1248impl Deref for FsStore {
1249    type Target = Store;
1250
1251    fn deref(&self) -> &Self::Target {
1252        Store::ref_from_sender(&self.sender)
1253    }
1254}
1255
1256impl AsRef<Store> for FsStore {
1257    fn as_ref(&self) -> &Store {
1258        self.deref()
1259    }
1260}
1261
1262impl FsStore {
1263    fn new(
1264        sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1265        db: tokio::sync::mpsc::Sender<InternalCommand>,
1266    ) -> Self {
1267        Self {
1268            sender: sender.into(),
1269            db,
1270        }
1271    }
1272
1273    pub async fn dump(&self) -> anyhow::Result<()> {
1274        let (tx, rx) = oneshot::channel();
1275        self.db
1276            .send(
1277                meta::Dump {
1278                    tx,
1279                    span: tracing::Span::current(),
1280                }
1281                .into(),
1282            )
1283            .await?;
1284        rx.await??;
1285        Ok(())
1286    }
1287}
1288
1289#[cfg(test)]
1290pub mod tests {
1291    use core::panic;
1292    use std::collections::{HashMap, HashSet};
1293
1294    use bao_tree::{
1295        io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1296        ChunkRanges,
1297    };
1298    use n0_future::{stream, Stream, StreamExt};
1299    use testresult::TestResult;
1300    use walkdir::WalkDir;
1301
1302    use super::*;
1303    use crate::{
1304        api::blobs::Bitfield,
1305        store::{
1306            util::{read_checksummed, SliceInfoExt, Tag},
1307            HashAndFormat, IROH_BLOCK_SIZE,
1308        },
1309    };
1310
1311    /// Interesting sizes for testing.
1312    pub const INTERESTING_SIZES: [usize; 8] = [
1313        0,               // annoying corner case - always present, handled by the api
1314        1,               // less than 1 chunk, data inline, outboard not needed
1315        1024,            // exactly 1 chunk, data inline, outboard not needed
1316        1024 * 16 - 1,   // less than 1 chunk group, data inline, outboard not needed
1317        1024 * 16,       // exactly 1 chunk group, data inline, outboard not needed
1318        1024 * 16 + 1,   // data file, outboard inline (just 1 hash pair)
1319        1024 * 1024,     // data file, outboard inline (many hash pairs)
1320        1024 * 1024 * 8, // data file, outboard file
1321    ];
1322
1323    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
1324    /// which can not be exported via bao because we don't store hashes below the chunk group level.
1325    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1326        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1327        let mut encoded = Vec::new();
1328        let size = data.len() as u64;
1329        encoded.extend_from_slice(&size.to_le_bytes());
1330        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1331        Ok((outboard.root.into(), encoded))
1332    }
1333
1334    pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1335        let last_chunk = ChunkNum::chunks(size);
1336        let data_range = ChunkRanges::from(..last_chunk);
1337        let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1338            if last_chunk == 0 {
1339                ChunkRanges::all()
1340            } else {
1341                ChunkRanges::from(last_chunk - 1..)
1342            }
1343        } else {
1344            ranges.clone()
1345        };
1346        round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1347    }
1348
1349    fn create_n0_bao_full(
1350        data: &[u8],
1351        ranges: &ChunkRanges,
1352    ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1353        let ranges = round_up_request(data.len() as u64, ranges);
1354        let (hash, encoded) = create_n0_bao(data, &ranges)?;
1355        Ok((hash, ranges, encoded))
1356    }
1357
1358    #[tokio::test]
1359    // #[traced_test]
1360    async fn test_observe() -> TestResult<()> {
1361        tracing_subscriber::fmt::try_init().ok();
1362        let testdir = tempfile::tempdir()?;
1363        let db_dir = testdir.path().join("db");
1364        let options = Options::new(&db_dir);
1365        let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1366        let sizes = INTERESTING_SIZES;
1367        for size in sizes {
1368            let data = test_data(size);
1369            let ranges = ChunkRanges::all();
1370            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1371            let obs = store.observe(hash);
1372            let task = tokio::spawn(async move {
1373                obs.await_completion().await?;
1374                api::Result::Ok(())
1375            });
1376            store.import_bao_bytes(hash, ranges, bao).await?;
1377            task.await??;
1378        }
1379        Ok(())
1380    }
1381
1382    /// Generate test data for size n.
1383    ///
1384    /// We don't really care about the content, since we assume blake3 works.
1385    /// The only thing it should not be is all zeros, since that is what you
1386    /// will get for a gap.
1387    pub fn test_data(n: usize) -> Bytes {
1388        let mut res = Vec::with_capacity(n);
1389        // Using uppercase A-Z (65-90), 26 possible characters
1390        for i in 0..n {
1391            // Change character every 1024 bytes
1392            let block_num = i / 1024;
1393            // Map to uppercase A-Z range (65-90)
1394            let ascii_val = 65 + (block_num % 26) as u8;
1395            res.push(ascii_val);
1396        }
1397        Bytes::from(res)
1398    }
1399
1400    // import data via import_bytes, check that we can observe it and that it is complete
1401    #[tokio::test]
1402    async fn test_import_byte_stream() -> TestResult<()> {
1403        tracing_subscriber::fmt::try_init().ok();
1404        let testdir = tempfile::tempdir()?;
1405        let db_dir = testdir.path().join("db");
1406        let store = FsStore::load(db_dir).await?;
1407        for size in INTERESTING_SIZES {
1408            let expected = test_data(size);
1409            let expected_hash = Hash::new(&expected);
1410            let stream = bytes_to_stream(expected.clone(), 1023);
1411            let obs = store.observe(expected_hash);
1412            let tt = store.add_stream(stream).await.temp_tag().await?;
1413            assert_eq!(expected_hash, *tt.hash());
1414            // we must at some point see completion, otherwise the test will hang
1415            obs.await_completion().await?;
1416            let actual = store.get_bytes(expected_hash).await?;
1417            // check that the data is there
1418            assert_eq!(&expected, &actual);
1419        }
1420        Ok(())
1421    }
1422
1423    // import data via import_bytes, check that we can observe it and that it is complete
1424    #[tokio::test]
1425    async fn test_import_bytes() -> TestResult<()> {
1426        tracing_subscriber::fmt::try_init().ok();
1427        let testdir = tempfile::tempdir()?;
1428        let db_dir = testdir.path().join("db");
1429        let store = FsStore::load(&db_dir).await?;
1430        let sizes = INTERESTING_SIZES;
1431        trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1432        for size in sizes {
1433            let expected = test_data(size);
1434            let expected_hash = Hash::new(&expected);
1435            let obs = store.observe(expected_hash);
1436            let tt = store.add_bytes(expected.clone()).await?;
1437            assert_eq!(expected_hash, tt.hash);
1438            // we must at some point see completion, otherwise the test will hang
1439            obs.await_completion().await?;
1440            let actual = store.get_bytes(expected_hash).await?;
1441            // check that the data is there
1442            assert_eq!(&expected, &actual);
1443        }
1444        store.shutdown().await?;
1445        dump_dir_full(db_dir)?;
1446        Ok(())
1447    }
1448
1449    // import data via import_bytes, check that we can observe it and that it is complete
1450    #[tokio::test]
1451    #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1452    async fn test_roundtrip_bytes_small() -> TestResult<()> {
1453        tracing_subscriber::fmt::try_init().ok();
1454        let testdir = tempfile::tempdir()?;
1455        let db_dir = testdir.path().join("db");
1456        let store = FsStore::load(db_dir).await?;
1457        for size in INTERESTING_SIZES
1458            .into_iter()
1459            .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1460        {
1461            let expected = test_data(size);
1462            let expected_hash = Hash::new(&expected);
1463            let obs = store.observe(expected_hash);
1464            let tt = store.add_bytes(expected.clone()).await?;
1465            assert_eq!(expected_hash, tt.hash);
1466            let actual = store.get_bytes(expected_hash).await?;
1467            // check that the data is there
1468            assert_eq!(&expected, &actual);
1469            assert_eq!(
1470                &expected.addr(),
1471                &actual.addr(),
1472                "address mismatch for size {size}"
1473            );
1474            // we must at some point see completion, otherwise the test will hang
1475            // keep the handle alive by observing until the end, otherwise the handle
1476            // will change and the bytes won't be the same instance anymore
1477            obs.await_completion().await?;
1478        }
1479        store.shutdown().await?;
1480        Ok(())
1481    }
1482
1483    // import data via import_bytes, check that we can observe it and that it is complete
1484    #[tokio::test]
1485    async fn test_import_path() -> TestResult<()> {
1486        tracing_subscriber::fmt::try_init().ok();
1487        let testdir = tempfile::tempdir()?;
1488        let db_dir = testdir.path().join("db");
1489        let store = FsStore::load(db_dir).await?;
1490        for size in INTERESTING_SIZES {
1491            let expected = test_data(size);
1492            let expected_hash = Hash::new(&expected);
1493            let path = testdir.path().join(format!("in-{size}"));
1494            fs::write(&path, &expected)?;
1495            let obs = store.observe(expected_hash);
1496            let tt = store.add_path(&path).await?;
1497            assert_eq!(expected_hash, tt.hash);
1498            // we must at some point see completion, otherwise the test will hang
1499            obs.await_completion().await?;
1500            let actual = store.get_bytes(expected_hash).await?;
1501            // check that the data is there
1502            assert_eq!(&expected, &actual, "size={size}");
1503        }
1504        dump_dir_full(testdir.path())?;
1505        Ok(())
1506    }
1507
1508    // import data via import_bytes, check that we can observe it and that it is complete
1509    #[tokio::test]
1510    async fn test_export_path() -> TestResult<()> {
1511        tracing_subscriber::fmt::try_init().ok();
1512        let testdir = tempfile::tempdir()?;
1513        let db_dir = testdir.path().join("db");
1514        let store = FsStore::load(db_dir).await?;
1515        for size in INTERESTING_SIZES {
1516            let expected = test_data(size);
1517            let expected_hash = Hash::new(&expected);
1518            let tt = store.add_bytes(expected.clone()).await?;
1519            assert_eq!(expected_hash, tt.hash);
1520            let out_path = testdir.path().join(format!("out-{size}"));
1521            store.export(expected_hash, &out_path).await?;
1522            let actual = fs::read(&out_path)?;
1523            assert_eq!(expected, actual);
1524        }
1525        Ok(())
1526    }
1527
1528    #[tokio::test]
1529    async fn test_import_bao_ranges() -> TestResult<()> {
1530        tracing_subscriber::fmt::try_init().ok();
1531        let testdir = tempfile::tempdir()?;
1532        let db_dir = testdir.path().join("db");
1533        {
1534            let store = FsStore::load(&db_dir).await?;
1535            let data = test_data(100000);
1536            let ranges = ChunkRanges::chunks(16..32);
1537            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1538            store
1539                .import_bao_bytes(hash, ranges.clone(), bao.clone())
1540                .await?;
1541            let bitfield = store.observe(hash).await?;
1542            assert_eq!(bitfield.ranges, ranges);
1543            assert_eq!(bitfield.size(), data.len() as u64);
1544            let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1545            assert_eq!(export, bao);
1546        }
1547        Ok(())
1548    }
1549
1550    #[tokio::test]
1551    async fn test_import_bao_minimal() -> TestResult<()> {
1552        tracing_subscriber::fmt::try_init().ok();
1553        let testdir = tempfile::tempdir()?;
1554        let sizes = [1];
1555        let db_dir = testdir.path().join("db");
1556        {
1557            let store = FsStore::load(&db_dir).await?;
1558            for size in sizes {
1559                let data = vec![0u8; size];
1560                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1561                let data = Bytes::from(encoded);
1562                store
1563                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1564                    .await?;
1565            }
1566            store.shutdown().await?;
1567        }
1568        Ok(())
1569    }
1570
1571    #[tokio::test]
1572    async fn test_import_bao_simple() -> TestResult<()> {
1573        tracing_subscriber::fmt::try_init().ok();
1574        let testdir = tempfile::tempdir()?;
1575        let sizes = [1048576];
1576        let db_dir = testdir.path().join("db");
1577        {
1578            let store = FsStore::load(&db_dir).await?;
1579            for size in sizes {
1580                let data = vec![0u8; size];
1581                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1582                let data = Bytes::from(encoded);
1583                trace!("importing size={}", size);
1584                store
1585                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1586                    .await?;
1587            }
1588            store.shutdown().await?;
1589        }
1590        Ok(())
1591    }
1592
1593    #[tokio::test]
1594    async fn test_import_bao_persistence_full() -> TestResult<()> {
1595        tracing_subscriber::fmt::try_init().ok();
1596        let testdir = tempfile::tempdir()?;
1597        let sizes = INTERESTING_SIZES;
1598        let db_dir = testdir.path().join("db");
1599        {
1600            let store = FsStore::load(&db_dir).await?;
1601            for size in sizes {
1602                let data = vec![0u8; size];
1603                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1604                let data = Bytes::from(encoded);
1605                store
1606                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1607                    .await?;
1608            }
1609            store.shutdown().await?;
1610        }
1611        {
1612            let store = FsStore::load(&db_dir).await?;
1613            for size in sizes {
1614                let expected = vec![0u8; size];
1615                let hash = Hash::new(&expected);
1616                let actual = store
1617                    .export_bao(hash, ChunkRanges::all())
1618                    .data_to_vec()
1619                    .await?;
1620                assert_eq!(&expected, &actual);
1621            }
1622            store.shutdown().await?;
1623        }
1624        Ok(())
1625    }
1626
1627    #[tokio::test]
1628    async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1629        tracing_subscriber::fmt::try_init().ok();
1630        let testdir = tempfile::tempdir()?;
1631        let sizes = INTERESTING_SIZES;
1632        let db_dir = testdir.path().join("db");
1633        let just_size = ChunkRanges::last_chunk();
1634        {
1635            let store = FsStore::load(&db_dir).await?;
1636            for size in sizes {
1637                let data = test_data(size);
1638                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1639                let data = Bytes::from(encoded);
1640                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1641                    panic!("failed to import size={size}: {cause}");
1642                }
1643            }
1644            store.dump().await?;
1645            store.shutdown().await?;
1646        }
1647        {
1648            let store = FsStore::load(&db_dir).await?;
1649            store.dump().await?;
1650            for size in sizes {
1651                let data = test_data(size);
1652                let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1653                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1654                    Ok(actual) => actual,
1655                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1656                };
1657                assert_eq!(&expected, &actual);
1658            }
1659            store.shutdown().await?;
1660        }
1661        dump_dir_full(testdir.path())?;
1662        Ok(())
1663    }
1664
1665    #[tokio::test]
1666    async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1667        tracing_subscriber::fmt::try_init().ok();
1668        let testdir = tempfile::tempdir()?;
1669        let sizes = INTERESTING_SIZES;
1670        let db_dir = testdir.path().join("db");
1671        let just_size = ChunkRanges::last_chunk();
1672        // stage 1, import just the last full chunk group to get a validated size
1673        {
1674            let store = FsStore::load(&db_dir).await?;
1675            for size in sizes {
1676                let data = test_data(size);
1677                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1678                let data = Bytes::from(encoded);
1679                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1680                    panic!("failed to import size={size}: {cause}");
1681                }
1682            }
1683            store.dump().await?;
1684            store.shutdown().await?;
1685        }
1686        dump_dir_full(testdir.path())?;
1687        // stage 2, import the rest
1688        {
1689            let store = FsStore::load(&db_dir).await?;
1690            for size in sizes {
1691                let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1692                if remaining.is_empty() {
1693                    continue;
1694                }
1695                let data = test_data(size);
1696                let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1697                let data = Bytes::from(encoded);
1698                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1699                    panic!("failed to import size={size}: {cause}");
1700                }
1701            }
1702            store.dump().await?;
1703            store.shutdown().await?;
1704        }
1705        // check if the data is complete
1706        {
1707            let store = FsStore::load(&db_dir).await?;
1708            store.dump().await?;
1709            for size in sizes {
1710                let data = test_data(size);
1711                let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1712                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1713                    Ok(actual) => actual,
1714                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1715                };
1716                assert_eq!(&expected, &actual);
1717            }
1718            store.dump().await?;
1719            store.shutdown().await?;
1720        }
1721        dump_dir_full(testdir.path())?;
1722        Ok(())
1723    }
1724
1725    fn just_size() -> ChunkRanges {
1726        ChunkRanges::last_chunk()
1727    }
1728
1729    #[tokio::test]
1730    async fn test_import_bao_persistence_observe() -> TestResult<()> {
1731        tracing_subscriber::fmt::try_init().ok();
1732        let testdir = tempfile::tempdir()?;
1733        let sizes = INTERESTING_SIZES;
1734        let db_dir = testdir.path().join("db");
1735        let just_size = just_size();
1736        // stage 1, import just the last full chunk group to get a validated size
1737        {
1738            let store = FsStore::load(&db_dir).await?;
1739            for size in sizes {
1740                let data = test_data(size);
1741                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1742                let data = Bytes::from(encoded);
1743                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1744                    panic!("failed to import size={size}: {cause}");
1745                }
1746            }
1747            store.dump().await?;
1748            store.shutdown().await?;
1749        }
1750        dump_dir_full(testdir.path())?;
1751        // stage 2, import the rest
1752        {
1753            let store = FsStore::load(&db_dir).await?;
1754            for size in sizes {
1755                let expected_ranges = round_up_request(size as u64, &just_size);
1756                let data = test_data(size);
1757                let hash = Hash::new(&data);
1758                let bitfield = store.observe(hash).await?;
1759                assert_eq!(bitfield.ranges, expected_ranges);
1760            }
1761            store.dump().await?;
1762            store.shutdown().await?;
1763        }
1764        Ok(())
1765    }
1766
1767    #[tokio::test]
1768    async fn test_import_bao_persistence_recover() -> TestResult<()> {
1769        tracing_subscriber::fmt::try_init().ok();
1770        let testdir = tempfile::tempdir()?;
1771        let sizes = INTERESTING_SIZES;
1772        let db_dir = testdir.path().join("db");
1773        let options = Options::new(&db_dir);
1774        let just_size = just_size();
1775        // stage 1, import just the last full chunk group to get a validated size
1776        {
1777            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1778            for size in sizes {
1779                let data = test_data(size);
1780                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1781                let data = Bytes::from(encoded);
1782                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1783                    panic!("failed to import size={size}: {cause}");
1784                }
1785            }
1786            store.dump().await?;
1787            store.shutdown().await?;
1788        }
1789        delete_rec(testdir.path(), "bitfield")?;
1790        dump_dir_full(testdir.path())?;
1791        // stage 2, import the rest
1792        {
1793            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1794            for size in sizes {
1795                let expected_ranges = round_up_request(size as u64, &just_size);
1796                let data = test_data(size);
1797                let hash = Hash::new(&data);
1798                let bitfield = store.observe(hash).await?;
1799                assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1800            }
1801            store.dump().await?;
1802            store.shutdown().await?;
1803        }
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    async fn test_import_bytes_persistence_full() -> TestResult<()> {
1809        tracing_subscriber::fmt::try_init().ok();
1810        let testdir = tempfile::tempdir()?;
1811        let sizes = INTERESTING_SIZES;
1812        let db_dir = testdir.path().join("db");
1813        {
1814            let store = FsStore::load(&db_dir).await?;
1815            let mut tts = Vec::new();
1816            for size in sizes {
1817                let data = test_data(size);
1818                let data = data;
1819                tts.push(store.add_bytes(data.clone()).await?);
1820            }
1821            store.dump().await?;
1822            store.shutdown().await?;
1823        }
1824        {
1825            let store = FsStore::load(&db_dir).await?;
1826            store.dump().await?;
1827            for size in sizes {
1828                let expected = test_data(size);
1829                let hash = Hash::new(&expected);
1830                let Ok(actual) = store
1831                    .export_bao(hash, ChunkRanges::all())
1832                    .data_to_vec()
1833                    .await
1834                else {
1835                    panic!("failed to export size={size}");
1836                };
1837                assert_eq!(&expected, &actual, "size={size}");
1838            }
1839            store.shutdown().await?;
1840        }
1841        Ok(())
1842    }
1843
1844    async fn test_batch(store: &Store) -> TestResult<()> {
1845        let batch = store.blobs().batch().await?;
1846        let tt1 = batch.temp_tag(Hash::new("foo")).await?;
1847        let tt2 = batch.add_slice("boo").await?;
1848        let tts = store
1849            .tags()
1850            .list_temp_tags()
1851            .await?
1852            .collect::<HashSet<_>>()
1853            .await;
1854        assert!(tts.contains(tt1.hash_and_format()));
1855        assert!(tts.contains(tt2.hash_and_format()));
1856        drop(batch);
1857        store.sync_db().await?;
1858        let tts = store
1859            .tags()
1860            .list_temp_tags()
1861            .await?
1862            .collect::<HashSet<_>>()
1863            .await;
1864        // temp tag went out of scope, so it does not work anymore
1865        assert!(!tts.contains(tt1.hash_and_format()));
1866        assert!(!tts.contains(tt2.hash_and_format()));
1867        drop(tt1);
1868        drop(tt2);
1869        Ok(())
1870    }
1871
1872    #[tokio::test]
1873    async fn test_batch_fs() -> TestResult<()> {
1874        tracing_subscriber::fmt::try_init().ok();
1875        let testdir = tempfile::tempdir()?;
1876        let db_dir = testdir.path().join("db");
1877        let store = FsStore::load(db_dir).await?;
1878        test_batch(&store).await
1879    }
1880
1881    #[tokio::test]
1882    async fn smoke() -> TestResult<()> {
1883        tracing_subscriber::fmt::try_init().ok();
1884        let testdir = tempfile::tempdir()?;
1885        let db_dir = testdir.path().join("db");
1886        let store = FsStore::load(db_dir).await?;
1887        let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
1888        store.tags().set(Tag::from("test"), haf).await?;
1889        store.tags().set(Tag::from("boo"), haf).await?;
1890        store.tags().set(Tag::from("bar"), haf).await?;
1891        let sizes = INTERESTING_SIZES;
1892        let mut hashes = Vec::new();
1893        let mut data_by_hash = HashMap::new();
1894        let mut bao_by_hash = HashMap::new();
1895        for size in sizes {
1896            let data = vec![0u8; size];
1897            let data = Bytes::from(data);
1898            let tt = store.add_bytes(data.clone()).temp_tag().await?;
1899            data_by_hash.insert(*tt.hash(), data);
1900            hashes.push(tt);
1901        }
1902        store.sync_db().await?;
1903        for tt in &hashes {
1904            let hash = *tt.hash();
1905            let path = testdir.path().join(format!("{hash}.txt"));
1906            store.export(hash, path).await?;
1907        }
1908        for tt in &hashes {
1909            let hash = tt.hash();
1910            let data = store
1911                .export_bao(*hash, ChunkRanges::all())
1912                .data_to_vec()
1913                .await
1914                .unwrap();
1915            assert_eq!(data, data_by_hash[hash].to_vec());
1916            let bao = store
1917                .export_bao(*hash, ChunkRanges::all())
1918                .bao_to_vec()
1919                .await
1920                .unwrap();
1921            bao_by_hash.insert(*hash, bao);
1922        }
1923        store.dump().await?;
1924
1925        for size in sizes {
1926            let data = test_data(size);
1927            let ranges = ChunkRanges::all();
1928            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1929            store.import_bao_bytes(hash, ranges, bao).await?;
1930        }
1931
1932        for (_hash, _bao_tree) in bao_by_hash {
1933            // let mut reader = Cursor::new(bao_tree);
1934            // let size = reader.read_u64_le().await?;
1935            // let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
1936            // let ranges = ChunkRanges::all();
1937            // let mut decoder = DecodeResponseIter::new(hash, tree, reader, &ranges);
1938            // while let Some(item) = decoder.next() {
1939            //     let item = item?;
1940            // }
1941            // store.import_bao_bytes(hash, ChunkRanges::all(), bao_tree.into()).await?;
1942        }
1943        Ok(())
1944    }
1945
1946    pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
1947        // Remove leading dot if present, so we have just the extension
1948        let ext = extension.trim_start_matches('.').to_lowercase();
1949
1950        for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
1951            let path = entry.path();
1952
1953            if path.is_file() {
1954                if let Some(file_ext) = path.extension() {
1955                    if file_ext.to_string_lossy().to_lowercase() == ext {
1956                        println!("Deleting: {}", path.display());
1957                        fs::remove_file(path)?;
1958                    }
1959                }
1960            }
1961        }
1962
1963        Ok(())
1964    }
1965
1966    pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
1967        let mut entries: Vec<_> = WalkDir::new(&path)
1968            .into_iter()
1969            .filter_map(Result::ok) // Skip errors
1970            .collect();
1971
1972        // Sort by path (name at each depth)
1973        entries.sort_by(|a, b| a.path().cmp(b.path()));
1974
1975        for entry in entries {
1976            let depth = entry.depth();
1977            let indent = "  ".repeat(depth); // Two spaces per level
1978            let name = entry.file_name().to_string_lossy();
1979            let size = entry.metadata()?.len(); // Size in bytes
1980
1981            if entry.file_type().is_file() {
1982                println!("{indent}{name} ({size} bytes)");
1983            } else if entry.file_type().is_dir() {
1984                println!("{indent}{name}/");
1985            }
1986        }
1987        Ok(())
1988    }
1989
1990    pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
1991        let mut entries: Vec<_> = WalkDir::new(&path)
1992            .into_iter()
1993            .filter_map(Result::ok) // Skip errors
1994            .collect();
1995
1996        // Sort by path (name at each depth)
1997        entries.sort_by(|a, b| a.path().cmp(b.path()));
1998
1999        for entry in entries {
2000            let depth = entry.depth();
2001            let indent = "  ".repeat(depth);
2002            let name = entry.file_name().to_string_lossy();
2003
2004            if entry.file_type().is_dir() {
2005                println!("{indent}{name}/");
2006            } else if entry.file_type().is_file() {
2007                let size = entry.metadata()?.len();
2008                println!("{indent}{name} ({size} bytes)");
2009
2010                // Dump depending on file type
2011                let path = entry.path();
2012                if name.ends_with(".data") {
2013                    print!("{indent}  ");
2014                    dump_file(path, 1024 * 16)?;
2015                } else if name.ends_with(".obao4") {
2016                    print!("{indent}  ");
2017                    dump_file(path, 64)?;
2018                } else if name.ends_with(".sizes4") {
2019                    print!("{indent}  ");
2020                    dump_file(path, 8)?;
2021                } else if name.ends_with(".bitfield") {
2022                    match read_checksummed::<Bitfield>(path) {
2023                        Ok(bitfield) => {
2024                            println!("{indent}  bitfield: {bitfield:?}");
2025                        }
2026                        Err(cause) => {
2027                            println!("{indent}  bitfield: error: {cause}");
2028                        }
2029                    }
2030                } else {
2031                    continue; // Skip content dump for other files
2032                };
2033            }
2034        }
2035        Ok(())
2036    }
2037
2038    pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2039        let bits = file_bits(path, chunk_size)?;
2040        println!("{}", print_bitfield_ansi(bits));
2041        Ok(())
2042    }
2043
2044    pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2045        let file = fs::File::open(&path)?;
2046        let file_size = file.metadata()?.len();
2047        let mut buffer = vec![0u8; chunk_size as usize];
2048        let mut bits = Vec::new();
2049
2050        let mut offset = 0u64;
2051        while offset < file_size {
2052            let remaining = file_size - offset;
2053            let current_chunk_size = chunk_size.min(remaining);
2054
2055            let chunk = &mut buffer[..current_chunk_size as usize];
2056            file.read_exact_at(offset, chunk)?;
2057
2058            let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2059            bits.push(has_non_zero);
2060
2061            offset += current_chunk_size;
2062        }
2063
2064        Ok(bits)
2065    }
2066
2067    #[allow(dead_code)]
2068    fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2069        bits.into_iter()
2070            .map(|bit| if bit { '#' } else { '_' })
2071            .collect()
2072    }
2073
2074    fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2075        let mut result = String::new();
2076        let mut iter = bits.into_iter();
2077
2078        while let Some(b1) = iter.next() {
2079            let b2 = iter.next();
2080
2081            // ANSI color codes
2082            let white_fg = "\x1b[97m"; // bright white foreground
2083            let reset = "\x1b[0m"; // reset all attributes
2084            let gray_bg = "\x1b[100m"; // bright black (gray) background
2085            let black_bg = "\x1b[40m"; // black background
2086
2087            let colored_char = match (b1, b2) {
2088                (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), // 11 - solid white on default background
2089                (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), // 10 - left half white on gray background
2090                (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), // 01 - right half white on gray background
2091                (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), // 00 - space with gray background
2092                (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), // 1 (pad 0) - left half white on black background
2093                (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), // 0 (pad 0) - space with black background
2094            };
2095
2096            result.push_str(&colored_char);
2097        }
2098
2099        // Ensure we end with a reset code to prevent color bleeding
2100        result.push_str("\x1b[0m");
2101        result
2102    }
2103
2104    fn bytes_to_stream(
2105        bytes: Bytes,
2106        chunk_size: usize,
2107    ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2108        assert!(chunk_size > 0, "Chunk size must be greater than 0");
2109        stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2110            if offset >= bytes.len() {
2111                None
2112            } else {
2113                let chunk_len = chunk_size.min(bytes.len() - offset);
2114                let chunk = bytes.slice(offset..offset + chunk_len);
2115                Some((Ok(chunk), (bytes, offset + chunk_len)))
2116            }
2117        })
2118    }
2119}