iroh_blobs/store/
fs.rs

1//! # File based blob store.
2//!
3//! A file based blob store needs a writeable directory to work with.
4//!
5//! General design:
6//!
7//! The file store consists of two actors.
8//!
9//! # The main actor
10//!
11//! The purpose of the main actor is to handle user commands and own a map of
12//! handles for hashes that are currently being worked on.
13//!
14//! It also owns tasks for ongoing import and export operations, as well as the
15//! database actor.
16//!
17//! Handling a command almost always involves either forwarding it to the
18//! database actor or creating a hash context and spawning a task.
19//!
20//! # The database actor
21//!
22//! The database actor is responsible for storing metadata about each hash,
23//! as well as inlined data and outboard data for small files.
24//!
25//! In addition to the metadata, the database actor also stores tags.
26//!
27//! # Tasks
28//!
29//! Tasks do not return a result. They are responsible for sending an error
30//! to the requester if possible. Otherwise, just dropping the sender will
31//! also fail the receiver, but without a descriptive error message.
32//!
33//! Tasks are usually implemented as an impl fn that does return a result,
34//! and a wrapper (named `..._task`) that just forwards the error, if any.
35//!
36//! That way you can use `?` syntax in the task implementation. The impl fns
37//! are also easier to test.
38//!
39//! # Context
40//!
41//! The main actor holds a TaskContext that is needed for almost all tasks,
42//! such as the config and a way to interact with the database.
43//!
44//! For tasks that are specific to a hash, a HashContext combines the task
45//! context with a slot from the table of the main actor that can be used
46//! to obtain an unique handle for the hash.
47//!
48//! # Runtime
49//!
50//! The fs store owns and manages its own tokio runtime. Dropping the store
51//! will clean up the database and shut down the runtime. However, some parts
52//! of the persistent state won't make it to disk, so operations involving large
53//! partial blobs will have a large initial delay on the next startup.
54//!
55//! It is also not guaranteed that all write operations will make it to disk.
56//! The on-disk store will be in a consistent state, but might miss some writes
57//! in the last seconds before shutdown.
58//!
59//! To avoid this, you can use the [`crate::api::Store::shutdown`] method to
60//! cleanly shut down the store and save ephemeral state to disk.
61//!
62//! Note that if you use the store inside a [`iroh::protocol::Router`] and shut
63//! down the router using [`iroh::protocol::Router::shutdown`], the store will be
64//! safely shut down as well. Any store refs you are holding will be inoperable
65//! after this.
66use std::{
67    collections::{HashMap, HashSet},
68    fmt, fs,
69    future::Future,
70    io::Write,
71    num::NonZeroU64,
72    ops::Deref,
73    path::{Path, PathBuf},
74    sync::Arc,
75};
76
77use bao_tree::{
78    io::{
79        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
80        sync::ReadAt,
81        BaoContentItem, Leaf,
82    },
83    ChunkNum, ChunkRanges,
84};
85use bytes::Bytes;
86use delete_set::{BaoFilePart, ProtectHandle};
87use entry_state::{DataLocation, OutboardLocation};
88use gc::run_gc;
89use import::{ImportEntry, ImportSource};
90use irpc::channel::mpsc;
91use meta::{list_blobs, Snapshot};
92use n0_future::{future::yield_now, io};
93use nested_enum_utils::enum_conversions;
94use range_collections::range_set::RangeSetRange;
95use tokio::task::{Id, JoinError, JoinSet};
96use tracing::{error, instrument, trace};
97
98use crate::{
99    api::{
100        proto::{
101            self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
102            CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
103            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
104            ImportBaoRequest, ObserveMsg, Scope,
105        },
106        ApiClient,
107    },
108    store::{
109        util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
110        Hash,
111    },
112    util::{
113        channel::oneshot,
114        temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
115        ChunkRangesExt,
116    },
117};
118mod bao_file;
119use bao_file::{BaoFileHandle, BaoFileHandleWeak};
120mod delete_set;
121mod entry_state;
122mod import;
123mod meta;
124pub mod options;
125pub(crate) mod util;
126use entry_state::EntryState;
127use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
128use options::Options;
129use tracing::Instrument;
130mod gc;
131
132use super::HashAndFormat;
133use crate::api::{
134    self,
135    blobs::{AddProgressItem, ExportMode, ExportProgressItem},
136    Store,
137};
138
139/// Create a 16 byte unique ID.
140fn new_uuid() -> [u8; 16] {
141    use rand::RngCore;
142    let mut rng = rand::thread_rng();
143    let mut bytes = [0u8; 16];
144    rng.fill_bytes(&mut bytes);
145    bytes
146}
147
148/// Create temp file name based on a 16 byte UUID.
149fn temp_name() -> String {
150    format!("{}.temp", hex::encode(new_uuid()))
151}
152
153#[derive(Debug)]
154#[enum_conversions()]
155pub(crate) enum InternalCommand {
156    Dump(meta::Dump),
157    FinishImport(ImportEntryMsg),
158    ClearScope(ClearScope),
159}
160
161#[derive(Debug)]
162pub(crate) struct ClearScope {
163    pub scope: Scope,
164}
165
166impl InternalCommand {
167    pub fn parent_span(&self) -> tracing::Span {
168        match self {
169            Self::Dump(_) => tracing::Span::current(),
170            Self::ClearScope(_) => tracing::Span::current(),
171            Self::FinishImport(cmd) => cmd
172                .parent_span_opt()
173                .cloned()
174                .unwrap_or_else(tracing::Span::current),
175        }
176    }
177}
178
179/// Context needed by most tasks
180#[derive(Debug)]
181struct TaskContext {
182    // Store options such as paths and inline thresholds, in an Arc to cheaply share with tasks.
183    pub options: Arc<Options>,
184    // Metadata database, basically a mpsc sender with some extra functionality.
185    pub db: meta::Db,
186    // Handle to send internal commands
187    pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
188    /// The file handle for the empty hash.
189    pub empty: BaoFileHandle,
190    /// Handle to protect files from deletion.
191    pub protect: ProtectHandle,
192}
193
194impl TaskContext {
195    pub async fn clear_scope(&self, scope: Scope) {
196        self.internal_cmd_tx
197            .send(ClearScope { scope }.into())
198            .await
199            .ok();
200    }
201}
202
203#[derive(Debug)]
204struct Actor {
205    // Context that can be cheaply shared with tasks.
206    context: Arc<TaskContext>,
207    // Receiver for incoming user commands.
208    cmd_rx: tokio::sync::mpsc::Receiver<Command>,
209    // Receiver for incoming file store specific commands.
210    fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
211    // Tasks for import and export operations.
212    tasks: JoinSet<()>,
213    // Running tasks
214    running: HashSet<Id>,
215    // handles
216    handles: HashMap<Hash, Slot>,
217    // temp tags
218    temp_tags: TempTags,
219    // our private tokio runtime. It has to live somewhere.
220    _rt: RtWrapper,
221}
222
223/// Wraps a slot and the task context.
224///
225/// This contains everything a hash-specific task should need.
226struct HashContext {
227    slot: Slot,
228    ctx: Arc<TaskContext>,
229}
230
231impl HashContext {
232    pub fn db(&self) -> &meta::Db {
233        &self.ctx.db
234    }
235
236    pub fn options(&self) -> &Arc<Options> {
237        &self.ctx.options
238    }
239
240    pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option<BaoFileHandleWeak>> {
241        self.slot.0.lock().await
242    }
243
244    pub fn protect(&self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
245        self.ctx.protect.protect(hash, parts);
246    }
247
248    /// Update the entry state in the database, and wait for completion.
249    pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250        let (tx, rx) = oneshot::channel();
251        self.db()
252            .send(
253                meta::Update {
254                    hash,
255                    state,
256                    tx: Some(tx),
257                    span: tracing::Span::current(),
258                }
259                .into(),
260            )
261            .await?;
262        rx.await.map_err(|_e| io::Error::other(""))??;
263        Ok(())
264    }
265
266    pub async fn get_entry_state(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
267        if hash == Hash::EMPTY {
268            return Ok(Some(EntryState::Complete {
269                data_location: DataLocation::Inline(Bytes::new()),
270                outboard_location: OutboardLocation::NotNeeded,
271            }));
272        }
273        let (tx, rx) = oneshot::channel();
274        self.db()
275            .send(
276                meta::Get {
277                    hash,
278                    tx,
279                    span: tracing::Span::current(),
280                }
281                .into(),
282            )
283            .await
284            .ok();
285        let res = rx.await.map_err(io::Error::other)?;
286        Ok(res.state?)
287    }
288
289    /// Update the entry state in the database, and wait for completion.
290    pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291        let (tx, rx) = oneshot::channel();
292        self.db()
293            .send(
294                meta::Set {
295                    hash,
296                    state,
297                    tx,
298                    span: tracing::Span::current(),
299                }
300                .into(),
301            )
302            .await
303            .map_err(io::Error::other)?;
304        rx.await.map_err(|_e| io::Error::other(""))??;
305        Ok(())
306    }
307
308    pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
309        if create {
310            self.get_or_create(hash).await
311        } else {
312            self.get(hash).await
313        }
314    }
315
316    pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
317        if hash == Hash::EMPTY {
318            return Ok(self.ctx.empty.clone());
319        }
320        let res = self
321            .slot
322            .get_or_create(|| async {
323                let res = self.db().get(hash).await.map_err(io::Error::other)?;
324                let res = match res {
325                    Some(state) => open_bao_file(&hash, state, &self.ctx).await,
326                    None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
327                };
328                Ok((res?, ()))
329            })
330            .await
331            .map_err(api::Error::from);
332        let (res, _) = res?;
333        Ok(res)
334    }
335
336    pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
337        if hash == Hash::EMPTY {
338            return Ok(self.ctx.empty.clone());
339        }
340        let res = self
341            .slot
342            .get_or_create(|| async {
343                let res = self.db().get(hash).await.map_err(io::Error::other)?;
344                let res = match res {
345                    Some(state) => open_bao_file(&hash, state, &self.ctx).await,
346                    None => Ok(BaoFileHandle::new_partial_mem(
347                        hash,
348                        self.ctx.options.clone(),
349                    )),
350                };
351                Ok((res?, ()))
352            })
353            .await
354            .map_err(api::Error::from);
355        trace!("{res:?}");
356        let (res, _) = res?;
357        Ok(res)
358    }
359}
360
361async fn open_bao_file(
362    hash: &Hash,
363    state: EntryState<Bytes>,
364    ctx: &TaskContext,
365) -> io::Result<BaoFileHandle> {
366    let options = &ctx.options;
367    Ok(match state {
368        EntryState::Complete {
369            data_location,
370            outboard_location,
371        } => {
372            let data = match data_location {
373                DataLocation::Inline(data) => MemOrFile::Mem(data),
374                DataLocation::Owned(size) => {
375                    let path = options.path.data_path(hash);
376                    let file = fs::File::open(&path)?;
377                    MemOrFile::File(FixedSize::new(file, size))
378                }
379                DataLocation::External(paths, size) => {
380                    let Some(path) = paths.into_iter().next() else {
381                        return Err(io::Error::other("no external data path"));
382                    };
383                    let file = fs::File::open(&path)?;
384                    MemOrFile::File(FixedSize::new(file, size))
385                }
386            };
387            let outboard = match outboard_location {
388                OutboardLocation::NotNeeded => MemOrFile::empty(),
389                OutboardLocation::Inline(data) => MemOrFile::Mem(data),
390                OutboardLocation::Owned => {
391                    let path = options.path.outboard_path(hash);
392                    let file = fs::File::open(&path)?;
393                    MemOrFile::File(file)
394                }
395            };
396            BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
397        }
398        EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
399    })
400}
401
402/// An entry for each hash, containing a weak reference to a BaoFileHandle
403/// wrapped in a tokio mutex so handle creation is sequential.
404#[derive(Debug, Clone, Default)]
405pub(crate) struct Slot(Arc<tokio::sync::Mutex<Option<BaoFileHandleWeak>>>);
406
407impl Slot {
408    pub async fn is_live(&self) -> bool {
409        let slot = self.0.lock().await;
410        slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false)
411    }
412
413    /// Get the handle if it exists and is still alive, otherwise load it from the database.
414    /// If there is nothing in the database, create a new in-memory handle.
415    ///
416    /// `make` will be called if the a live handle does not exist.
417    pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
418    where
419        F: FnOnce() -> Fut,
420        Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
421        T: Default,
422    {
423        let mut slot = self.0.lock().await;
424        if let Some(weak) = &*slot {
425            if let Some(handle) = weak.upgrade() {
426                return Ok((handle, Default::default()));
427            }
428        }
429        let handle = make().await;
430        if let Ok((handle, _)) = &handle {
431            *slot = Some(handle.downgrade());
432        }
433        handle
434    }
435}
436
437impl Actor {
438    fn db(&self) -> &meta::Db {
439        &self.context.db
440    }
441
442    fn context(&self) -> Arc<TaskContext> {
443        self.context.clone()
444    }
445
446    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
447        let span = tracing::Span::current();
448        let id = self.tasks.spawn(fut.instrument(span)).id();
449        self.running.insert(id);
450    }
451
452    fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
453        match res {
454            Ok((id, _)) => {
455                // println!("task {id} finished");
456                self.running.remove(&id);
457                // println!("{:?}", self.running);
458            }
459            Err(e) => {
460                error!("task failed: {e}");
461            }
462        }
463    }
464
465    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
466        let CreateTempTagMsg { tx, inner, .. } = cmd;
467        let mut tt = self.temp_tags.create(inner.scope, inner.value);
468        if tx.is_rpc() {
469            tt.leak();
470        }
471        tx.send(tt).await.ok();
472    }
473
474    async fn clear_dead_handles(&mut self) {
475        let mut to_remove = Vec::new();
476        for (hash, slot) in &self.handles {
477            if !slot.is_live().await {
478                to_remove.push(*hash);
479            }
480        }
481        for hash in to_remove {
482            if let Some(slot) = self.handles.remove(&hash) {
483                // do a quick check if the handle has become alive in the meantime, and reinsert it
484                let guard = slot.0.lock().await;
485                let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
486                if is_live {
487                    drop(guard);
488                    self.handles.insert(hash, slot);
489                }
490            }
491        }
492    }
493
494    async fn handle_command(&mut self, cmd: Command) {
495        let span = cmd.parent_span();
496        let _entered = span.enter();
497        match cmd {
498            Command::SyncDb(cmd) => {
499                trace!("{cmd:?}");
500                self.db().send(cmd.into()).await.ok();
501            }
502            Command::Shutdown(cmd) => {
503                trace!("{cmd:?}");
504                self.db().send(cmd.into()).await.ok();
505            }
506            Command::CreateTag(cmd) => {
507                trace!("{cmd:?}");
508                self.db().send(cmd.into()).await.ok();
509            }
510            Command::SetTag(cmd) => {
511                trace!("{cmd:?}");
512                self.db().send(cmd.into()).await.ok();
513            }
514            Command::ListTags(cmd) => {
515                trace!("{cmd:?}");
516                self.db().send(cmd.into()).await.ok();
517            }
518            Command::DeleteTags(cmd) => {
519                trace!("{cmd:?}");
520                self.db().send(cmd.into()).await.ok();
521            }
522            Command::RenameTag(cmd) => {
523                trace!("{cmd:?}");
524                self.db().send(cmd.into()).await.ok();
525            }
526            Command::ClearProtected(cmd) => {
527                trace!("{cmd:?}");
528                self.clear_dead_handles().await;
529                self.db().send(cmd.into()).await.ok();
530            }
531            Command::BlobStatus(cmd) => {
532                trace!("{cmd:?}");
533                self.db().send(cmd.into()).await.ok();
534            }
535            Command::ListBlobs(cmd) => {
536                trace!("{cmd:?}");
537                let (tx, rx) = tokio::sync::oneshot::channel();
538                self.db()
539                    .send(
540                        Snapshot {
541                            tx,
542                            span: cmd.span.clone(),
543                        }
544                        .into(),
545                    )
546                    .await
547                    .ok();
548                if let Ok(snapshot) = rx.await {
549                    self.spawn(list_blobs(snapshot, cmd));
550                }
551            }
552            Command::DeleteBlobs(cmd) => {
553                trace!("{cmd:?}");
554                self.db().send(cmd.into()).await.ok();
555            }
556            Command::Batch(cmd) => {
557                trace!("{cmd:?}");
558                let (id, scope) = self.temp_tags.create_scope();
559                self.spawn(handle_batch(cmd, id, scope, self.context()));
560            }
561            Command::CreateTempTag(cmd) => {
562                trace!("{cmd:?}");
563                self.create_temp_tag(cmd).await;
564            }
565            Command::ListTempTags(cmd) => {
566                trace!("{cmd:?}");
567                let tts = self.temp_tags.list();
568                cmd.tx.send(tts).await.ok();
569            }
570            Command::ImportBytes(cmd) => {
571                trace!("{cmd:?}");
572                self.spawn(import_bytes(cmd, self.context()));
573            }
574            Command::ImportByteStream(cmd) => {
575                trace!("{cmd:?}");
576                self.spawn(import_byte_stream(cmd, self.context()));
577            }
578            Command::ImportPath(cmd) => {
579                trace!("{cmd:?}");
580                self.spawn(import_path(cmd, self.context()));
581            }
582            Command::ExportPath(cmd) => {
583                trace!("{cmd:?}");
584                let ctx = self.hash_context(cmd.hash);
585                self.spawn(export_path(cmd, ctx));
586            }
587            Command::ExportBao(cmd) => {
588                trace!("{cmd:?}");
589                let ctx = self.hash_context(cmd.hash);
590                self.spawn(export_bao(cmd, ctx));
591            }
592            Command::ExportRanges(cmd) => {
593                trace!("{cmd:?}");
594                let ctx = self.hash_context(cmd.hash);
595                self.spawn(export_ranges(cmd, ctx));
596            }
597            Command::ImportBao(cmd) => {
598                trace!("{cmd:?}");
599                let ctx = self.hash_context(cmd.hash);
600                self.spawn(import_bao(cmd, ctx));
601            }
602            Command::Observe(cmd) => {
603                trace!("{cmd:?}");
604                let ctx = self.hash_context(cmd.hash);
605                self.spawn(observe(cmd, ctx));
606            }
607        }
608    }
609
610    /// Create a hash context for a given hash.
611    fn hash_context(&mut self, hash: Hash) -> HashContext {
612        HashContext {
613            slot: self.handles.entry(hash).or_default().clone(),
614            ctx: self.context.clone(),
615        }
616    }
617
618    async fn handle_fs_command(&mut self, cmd: InternalCommand) {
619        let span = cmd.parent_span();
620        let _entered = span.enter();
621        match cmd {
622            InternalCommand::Dump(cmd) => {
623                trace!("{cmd:?}");
624                self.db().send(cmd.into()).await.ok();
625            }
626            InternalCommand::ClearScope(cmd) => {
627                trace!("{cmd:?}");
628                self.temp_tags.end_scope(cmd.scope);
629            }
630            InternalCommand::FinishImport(cmd) => {
631                trace!("{cmd:?}");
632                if cmd.hash == Hash::EMPTY {
633                    cmd.tx
634                        .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
635                        .await
636                        .ok();
637                } else {
638                    let tt = self.temp_tags.create(
639                        cmd.scope,
640                        HashAndFormat {
641                            hash: cmd.hash,
642                            format: cmd.format,
643                        },
644                    );
645                    let ctx = self.hash_context(cmd.hash);
646                    self.spawn(finish_import(cmd, tt, ctx));
647                }
648            }
649        }
650    }
651
652    async fn run(mut self) {
653        loop {
654            tokio::select! {
655                cmd = self.cmd_rx.recv() => {
656                    let Some(cmd) = cmd else {
657                        break;
658                    };
659                    self.handle_command(cmd).await;
660                }
661                Some(cmd) = self.fs_cmd_rx.recv() => {
662                    self.handle_fs_command(cmd).await;
663                }
664                Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
665                    self.log_task_result(res);
666                }
667            }
668        }
669    }
670
671    async fn new(
672        db_path: PathBuf,
673        rt: RtWrapper,
674        cmd_rx: tokio::sync::mpsc::Receiver<Command>,
675        fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
676        fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
677        options: Arc<Options>,
678    ) -> anyhow::Result<Self> {
679        trace!(
680            "creating data directory: {}",
681            options.path.data_path.display()
682        );
683        fs::create_dir_all(&options.path.data_path)?;
684        trace!(
685            "creating temp directory: {}",
686            options.path.temp_path.display()
687        );
688        fs::create_dir_all(&options.path.temp_path)?;
689        trace!(
690            "creating parent directory for db file{}",
691            db_path.parent().unwrap().display()
692        );
693        fs::create_dir_all(db_path.parent().unwrap())?;
694        let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
695        let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
696        let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
697        let slot_context = Arc::new(TaskContext {
698            options: options.clone(),
699            db: meta::Db::new(db_send),
700            internal_cmd_tx: fs_commands_tx,
701            empty: BaoFileHandle::new_complete(
702                Hash::EMPTY,
703                MemOrFile::empty(),
704                MemOrFile::empty(),
705                options,
706            ),
707            protect,
708        });
709        rt.spawn(db_actor.run());
710        Ok(Self {
711            context: slot_context,
712            cmd_rx,
713            fs_cmd_rx: fs_commands_rx,
714            tasks: JoinSet::new(),
715            running: HashSet::new(),
716            handles: Default::default(),
717            temp_tags: Default::default(),
718            _rt: rt,
719        })
720    }
721}
722
723struct RtWrapper(Option<tokio::runtime::Runtime>);
724
725impl From<tokio::runtime::Runtime> for RtWrapper {
726    fn from(rt: tokio::runtime::Runtime) -> Self {
727        Self(Some(rt))
728    }
729}
730
731impl fmt::Debug for RtWrapper {
732    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733        ValueOrPoisioned(self.0.as_ref()).fmt(f)
734    }
735}
736
737impl Deref for RtWrapper {
738    type Target = tokio::runtime::Runtime;
739
740    fn deref(&self) -> &Self::Target {
741        self.0.as_ref().unwrap()
742    }
743}
744
745impl Drop for RtWrapper {
746    fn drop(&mut self) {
747        if let Some(rt) = self.0.take() {
748            trace!("dropping tokio runtime");
749            tokio::task::block_in_place(|| {
750                drop(rt);
751            });
752            trace!("dropped tokio runtime");
753        }
754    }
755}
756
757async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
758    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
759        error!("batch failed: {cause}");
760    }
761    ctx.clear_scope(id).await;
762}
763
764async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
765    let BatchMsg { tx, mut rx, .. } = cmd;
766    trace!("created scope {}", id);
767    tx.send(id).await.map_err(api::Error::other)?;
768    while let Some(msg) = rx.recv().await? {
769        match msg {
770            BatchResponse::Drop(msg) => scope.on_drop(&msg),
771            BatchResponse::Ping => {}
772        }
773    }
774    Ok(())
775}
776
777#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
778async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) {
779    let res = match finish_import_impl(cmd.inner, ctx).await {
780        Ok(()) => {
781            // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
782            // it will be cleaned up when either the process exits or scope ends
783            if cmd.tx.is_rpc() {
784                trace!("leaking temp tag {}", tt.hash_and_format());
785                tt.leak();
786            }
787            AddProgressItem::Done(tt)
788        }
789        Err(cause) => AddProgressItem::Error(cause),
790    };
791    cmd.tx.send(res).await.ok();
792}
793
794async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> {
795    let ImportEntry {
796        source,
797        hash,
798        outboard,
799        ..
800    } = import_data;
801    let options = ctx.options();
802    match &source {
803        ImportSource::Memory(data) => {
804            debug_assert!(options.is_inlined_data(data.len() as u64));
805        }
806        ImportSource::External(_, _, size) => {
807            debug_assert!(!options.is_inlined_data(*size));
808        }
809        ImportSource::TempFile(_, _, size) => {
810            debug_assert!(!options.is_inlined_data(*size));
811        }
812    }
813    let guard = ctx.lock().await;
814    let handle = guard.as_ref().and_then(|x| x.upgrade());
815    // if I do have an existing handle, I have to possibly deal with observers.
816    // if I don't have an existing handle, there are 2 cases:
817    //   the entry exists in the db, but we don't have a handle
818    //   the entry does not exist at all.
819    // convert the import source to a data location and drop the open files
820    ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]);
821    let data_location = match source {
822        ImportSource::Memory(data) => DataLocation::Inline(data),
823        ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
824        ImportSource::TempFile(path, _file, size) => {
825            // this will always work on any unix, but on windows there might be an issue if the target file is open!
826            // possibly open with FILE_SHARE_DELETE on windows?
827            let target = ctx.options().path.data_path(&hash);
828            trace!(
829                "moving temp file to owned data location: {} -> {}",
830                path.display(),
831                target.display()
832            );
833            if let Err(cause) = fs::rename(&path, &target) {
834                error!(
835                    "failed to move temp file {} to owned data location {}: {cause}",
836                    path.display(),
837                    target.display()
838                );
839            }
840            DataLocation::Owned(size)
841        }
842    };
843    let outboard_location = match outboard {
844        MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
845        MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
846        MemOrFile::File(path) => {
847            // the same caveat as above applies here
848            let target = ctx.options().path.outboard_path(&hash);
849            trace!(
850                "moving temp file to owned outboard location: {} -> {}",
851                path.display(),
852                target.display()
853            );
854            if let Err(cause) = fs::rename(&path, &target) {
855                error!(
856                    "failed to move temp file {} to owned outboard location {}: {cause}",
857                    path.display(),
858                    target.display()
859                );
860            }
861            OutboardLocation::Owned
862        }
863    };
864    if let Some(handle) = handle {
865        let data = match &data_location {
866            DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
867            DataLocation::Owned(size) => {
868                let path = ctx.options().path.data_path(&hash);
869                let file = fs::File::open(&path)?;
870                MemOrFile::File(FixedSize::new(file, *size))
871            }
872            DataLocation::External(paths, size) => {
873                let Some(path) = paths.iter().next() else {
874                    return Err(io::Error::other("no external data path"));
875                };
876                let file = fs::File::open(path)?;
877                MemOrFile::File(FixedSize::new(file, *size))
878            }
879        };
880        let outboard = match &outboard_location {
881            OutboardLocation::NotNeeded => MemOrFile::empty(),
882            OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
883            OutboardLocation::Owned => {
884                let path = ctx.options().path.outboard_path(&hash);
885                let file = fs::File::open(&path)?;
886                MemOrFile::File(file)
887            }
888        };
889        handle.complete(data, outboard);
890    }
891    let state = EntryState::Complete {
892        data_location,
893        outboard_location,
894    };
895    ctx.update(hash, state).await?;
896    Ok(())
897}
898
899#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
900async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) {
901    trace!("{cmd:?}");
902    let ImportBaoMsg {
903        inner: ImportBaoRequest { size, hash },
904        rx,
905        tx,
906        ..
907    } = cmd;
908    let res = match ctx.get_or_create(hash).await {
909        Ok(handle) => import_bao_impl(size, rx, handle, ctx).await,
910        Err(cause) => Err(cause),
911    };
912    trace!("{res:?}");
913    tx.send(res).await.ok();
914}
915
916fn chunk_range(leaf: &Leaf) -> ChunkRanges {
917    let start = ChunkNum::chunks(leaf.offset);
918    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
919    (start..end).into()
920}
921
922async fn import_bao_impl(
923    size: NonZeroU64,
924    mut rx: mpsc::Receiver<BaoContentItem>,
925    handle: BaoFileHandle,
926    ctx: HashContext,
927) -> api::Result<()> {
928    trace!(
929        "importing bao: {} {} bytes",
930        handle.hash().fmt_short(),
931        size
932    );
933    let mut batch = Vec::<BaoContentItem>::new();
934    let mut ranges = ChunkRanges::empty();
935    while let Some(item) = rx.recv().await? {
936        // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
937        if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
938            let bitfield = Bitfield::new_unchecked(ranges, size.into());
939            handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
940            batch.clear();
941            ranges = ChunkRanges::empty();
942        }
943        if let BaoContentItem::Leaf(leaf) = &item {
944            let leaf_range = chunk_range(leaf);
945            if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
946            {
947                return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
948            }
949            ranges |= leaf_range;
950        }
951        batch.push(item);
952    }
953    if !batch.is_empty() {
954        let bitfield = Bitfield::new_unchecked(ranges, size.into());
955        handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
956    }
957    Ok(())
958}
959
960#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
961async fn observe(cmd: ObserveMsg, ctx: HashContext) {
962    let Ok(handle) = ctx.get_or_create(cmd.hash).await else {
963        return;
964    };
965    handle.subscribe().forward(cmd.tx).await.ok();
966}
967
968#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
969async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
970    match ctx.get(cmd.hash).await {
971        Ok(handle) => {
972            if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
973                cmd.tx
974                    .send(ExportRangesItem::Error(cause.into()))
975                    .await
976                    .ok();
977            }
978        }
979        Err(cause) => {
980            cmd.tx.send(ExportRangesItem::Error(cause)).await.ok();
981        }
982    }
983}
984
985async fn export_ranges_impl(
986    cmd: ExportRangesRequest,
987    tx: &mut mpsc::Sender<ExportRangesItem>,
988    handle: BaoFileHandle,
989) -> io::Result<()> {
990    let ExportRangesRequest { ranges, hash } = cmd;
991    trace!(
992        "export_ranges: exporting ranges: {hash} {ranges:?} size={}",
993        handle.current_size()?
994    );
995    debug_assert!(handle.hash() == hash, "hash mismatch");
996    let bitfield = handle.bitfield()?;
997    let data = handle.data_reader();
998    let size = bitfield.size();
999    for range in ranges.iter() {
1000        let range = match range {
1001            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1002            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1003        };
1004        let requested = ChunkRanges::bytes(range.start..range.end);
1005        if !bitfield.ranges.is_superset(&requested) {
1006            return Err(io::Error::other(format!(
1007                "missing range: {requested:?}, present: {bitfield:?}",
1008            )));
1009        }
1010        let bs = 1024;
1011        let mut offset = range.start;
1012        loop {
1013            let end: u64 = (offset + bs).min(range.end);
1014            let size = (end - offset) as usize;
1015            let res = data.read_bytes_at(offset, size);
1016            tx.send(ExportRangesItem::Data(Leaf { offset, data: res? }))
1017                .await?;
1018            offset = end;
1019            if offset >= range.end {
1020                break;
1021            }
1022        }
1023    }
1024    Ok(())
1025}
1026
1027#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1028async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1029    match ctx.get_maybe_create(cmd.hash, false).await {
1030        Ok(handle) => {
1031            if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
1032                cmd.tx
1033                    .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1034                    .await
1035                    .ok();
1036            }
1037        }
1038        Err(cause) => {
1039            let crate::api::Error::Io(cause) = cause;
1040            cmd.tx
1041                .send(bao_tree::io::EncodeError::Io(cause).into())
1042                .await
1043                .ok();
1044        }
1045    }
1046}
1047
1048async fn export_bao_impl(
1049    cmd: ExportBaoRequest,
1050    tx: &mut mpsc::Sender<EncodedItem>,
1051    handle: BaoFileHandle,
1052) -> anyhow::Result<()> {
1053    let ExportBaoRequest { ranges, hash, .. } = cmd;
1054    debug_assert!(handle.hash() == hash, "hash mismatch");
1055    let outboard = handle.outboard()?;
1056    let size = outboard.tree.size();
1057    if size == 0 && hash != Hash::EMPTY {
1058        // we have no data whatsoever, so we stop here
1059        return Ok(());
1060    }
1061    trace!("exporting bao: {hash} {ranges:?} size={size}",);
1062    let data = handle.data_reader();
1063    let tx = BaoTreeSender::new(tx);
1064    traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1065    Ok(())
1066}
1067
1068#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1069async fn export_path(cmd: ExportPathMsg, ctx: HashContext) {
1070    let ExportPathMsg { inner, mut tx, .. } = cmd;
1071    if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await {
1072        tx.send(cause.into()).await.ok();
1073    }
1074}
1075
1076async fn export_path_impl(
1077    cmd: ExportPathRequest,
1078    tx: &mut mpsc::Sender<ExportProgressItem>,
1079    ctx: HashContext,
1080) -> api::Result<()> {
1081    let ExportPathRequest { mode, target, .. } = cmd;
1082    if !target.is_absolute() {
1083        return Err(api::Error::io(
1084            io::ErrorKind::InvalidInput,
1085            "path is not absolute",
1086        ));
1087    }
1088    if let Some(parent) = target.parent() {
1089        fs::create_dir_all(parent)?;
1090    }
1091    let _guard = ctx.lock().await;
1092    let state = ctx.get_entry_state(cmd.hash).await?;
1093    let (data_location, outboard_location) = match state {
1094        Some(EntryState::Complete {
1095            data_location,
1096            outboard_location,
1097        }) => (data_location, outboard_location),
1098        Some(EntryState::Partial { .. }) => {
1099            return Err(api::Error::io(
1100                io::ErrorKind::InvalidInput,
1101                "cannot export partial entry",
1102            ));
1103        }
1104        None => {
1105            return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1106        }
1107    };
1108    trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1109    let data = match data_location {
1110        DataLocation::Inline(data) => MemOrFile::Mem(data),
1111        DataLocation::Owned(size) => {
1112            MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size))
1113        }
1114        DataLocation::External(paths, size) => MemOrFile::File((
1115            paths
1116                .into_iter()
1117                .next()
1118                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no external data path"))?,
1119            size,
1120        )),
1121    };
1122    let size = match &data {
1123        MemOrFile::Mem(data) => data.len() as u64,
1124        MemOrFile::File((_, size)) => *size,
1125    };
1126    tx.send(ExportProgressItem::Size(size))
1127        .await
1128        .map_err(api::Error::other)?;
1129    match data {
1130        MemOrFile::Mem(data) => {
1131            let mut target = fs::File::create(&target)?;
1132            target.write_all(&data)?;
1133        }
1134        MemOrFile::File((source_path, size)) => match mode {
1135            ExportMode::Copy => {
1136                let source = fs::File::open(&source_path)?;
1137                let mut target = fs::File::create(&target)?;
1138                copy_with_progress(&source, size, &mut target, tx).await?
1139            }
1140            ExportMode::TryReference => {
1141                match std::fs::rename(&source_path, &target) {
1142                    Ok(()) => {}
1143                    Err(cause) => {
1144                        const ERR_CROSS: i32 = 18;
1145                        if cause.raw_os_error() == Some(ERR_CROSS) {
1146                            let source = fs::File::open(&source_path)?;
1147                            let mut target = fs::File::create(&target)?;
1148                            copy_with_progress(&source, size, &mut target, tx).await?;
1149                        } else {
1150                            return Err(cause.into());
1151                        }
1152                    }
1153                }
1154                ctx.set(
1155                    cmd.hash,
1156                    EntryState::Complete {
1157                        data_location: DataLocation::External(vec![target], size),
1158                        outboard_location,
1159                    },
1160                )
1161                .await?;
1162            }
1163        },
1164    }
1165    tx.send(ExportProgressItem::Done)
1166        .await
1167        .map_err(api::Error::other)?;
1168    Ok(())
1169}
1170
1171async fn copy_with_progress(
1172    file: impl ReadAt,
1173    size: u64,
1174    target: &mut impl Write,
1175    tx: &mut mpsc::Sender<ExportProgressItem>,
1176) -> io::Result<()> {
1177    let mut offset = 0;
1178    let mut buf = vec![0u8; 1024 * 1024];
1179    while offset < size {
1180        let remaining = buf.len().min((size - offset) as usize);
1181        let buf: &mut [u8] = &mut buf[..remaining];
1182        file.read_exact_at(offset, buf)?;
1183        target.write_all(buf)?;
1184        tx.try_send(ExportProgressItem::CopyProgress(offset))
1185            .await
1186            .map_err(|_e| io::Error::other(""))?;
1187        yield_now().await;
1188        offset += buf.len() as u64;
1189    }
1190    Ok(())
1191}
1192
1193impl FsStore {
1194    /// Load or create a new store.
1195    pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1196        let path = root.as_ref();
1197        let db_path = path.join("blobs.db");
1198        let options = Options::new(path);
1199        Self::load_with_opts(db_path, options).await
1200    }
1201
1202    /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
1203    pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1204        let rt = tokio::runtime::Builder::new_multi_thread()
1205            .thread_name("iroh-blob-store")
1206            .enable_time()
1207            .build()?;
1208        let handle = rt.handle().clone();
1209        let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1210        let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1211        let gc_config = options.gc.clone();
1212        let actor = handle
1213            .spawn(Actor::new(
1214                db_path,
1215                rt.into(),
1216                commands_rx,
1217                fs_commands_rx,
1218                fs_commands_tx.clone(),
1219                Arc::new(options),
1220            ))
1221            .await??;
1222        handle.spawn(actor.run());
1223        let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1224        if let Some(config) = gc_config {
1225            handle.spawn(run_gc(store.deref().clone(), config));
1226        }
1227        Ok(store)
1228    }
1229}
1230
1231/// A file based store.
1232///
1233/// A store can be created using [`load`](FsStore::load) or [`load_with_opts`](FsStore::load_with_opts).
1234/// Load will use the default options and create the required directories, while load_with_opts allows
1235/// you to customize the options and the location of the database. Both variants will create the database
1236/// if it does not exist, and load an existing database if one is found at the configured location.
1237///
1238/// In addition to implementing the [`Store`](`crate::api::Store`) API via [`Deref`](`std::ops::Deref`),
1239/// there are a few additional methods that are specific to file based stores, such as [`dump`](FsStore::dump).
1240#[derive(Debug, Clone)]
1241pub struct FsStore {
1242    sender: ApiClient,
1243    db: tokio::sync::mpsc::Sender<InternalCommand>,
1244}
1245
1246impl Deref for FsStore {
1247    type Target = Store;
1248
1249    fn deref(&self) -> &Self::Target {
1250        Store::ref_from_sender(&self.sender)
1251    }
1252}
1253
1254impl AsRef<Store> for FsStore {
1255    fn as_ref(&self) -> &Store {
1256        self.deref()
1257    }
1258}
1259
1260impl FsStore {
1261    fn new(
1262        sender: irpc::LocalSender<proto::Request>,
1263        db: tokio::sync::mpsc::Sender<InternalCommand>,
1264    ) -> Self {
1265        Self {
1266            sender: sender.into(),
1267            db,
1268        }
1269    }
1270
1271    pub async fn dump(&self) -> anyhow::Result<()> {
1272        let (tx, rx) = oneshot::channel();
1273        self.db
1274            .send(
1275                meta::Dump {
1276                    tx,
1277                    span: tracing::Span::current(),
1278                }
1279                .into(),
1280            )
1281            .await?;
1282        rx.await??;
1283        Ok(())
1284    }
1285}
1286
1287#[cfg(test)]
1288pub mod tests {
1289    use core::panic;
1290    use std::collections::{HashMap, HashSet};
1291
1292    use bao_tree::{
1293        io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1294        ChunkRanges,
1295    };
1296    use n0_future::{stream, Stream, StreamExt};
1297    use testresult::TestResult;
1298    use walkdir::WalkDir;
1299
1300    use super::*;
1301    use crate::{
1302        api::blobs::Bitfield,
1303        store::{
1304            util::{read_checksummed, SliceInfoExt, Tag},
1305            HashAndFormat, IROH_BLOCK_SIZE,
1306        },
1307    };
1308
1309    /// Interesting sizes for testing.
1310    pub const INTERESTING_SIZES: [usize; 8] = [
1311        0,               // annoying corner case - always present, handled by the api
1312        1,               // less than 1 chunk, data inline, outboard not needed
1313        1024,            // exactly 1 chunk, data inline, outboard not needed
1314        1024 * 16 - 1,   // less than 1 chunk group, data inline, outboard not needed
1315        1024 * 16,       // exactly 1 chunk group, data inline, outboard not needed
1316        1024 * 16 + 1,   // data file, outboard inline (just 1 hash pair)
1317        1024 * 1024,     // data file, outboard inline (many hash pairs)
1318        1024 * 1024 * 8, // data file, outboard file
1319    ];
1320
1321    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
1322    /// which can not be exported via bao because we don't store hashes below the chunk group level.
1323    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1324        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1325        let mut encoded = Vec::new();
1326        let size = data.len() as u64;
1327        encoded.extend_from_slice(&size.to_le_bytes());
1328        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1329        Ok((outboard.root.into(), encoded))
1330    }
1331
1332    pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1333        let last_chunk = ChunkNum::chunks(size);
1334        let data_range = ChunkRanges::from(..last_chunk);
1335        let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1336            if last_chunk == 0 {
1337                ChunkRanges::all()
1338            } else {
1339                ChunkRanges::from(last_chunk - 1..)
1340            }
1341        } else {
1342            ranges.clone()
1343        };
1344        round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1345    }
1346
1347    fn create_n0_bao_full(
1348        data: &[u8],
1349        ranges: &ChunkRanges,
1350    ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1351        let ranges = round_up_request(data.len() as u64, ranges);
1352        let (hash, encoded) = create_n0_bao(data, &ranges)?;
1353        Ok((hash, ranges, encoded))
1354    }
1355
1356    #[tokio::test]
1357    // #[traced_test]
1358    async fn test_observe() -> TestResult<()> {
1359        tracing_subscriber::fmt::try_init().ok();
1360        let testdir = tempfile::tempdir()?;
1361        let db_dir = testdir.path().join("db");
1362        let options = Options::new(&db_dir);
1363        let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1364        let sizes = INTERESTING_SIZES;
1365        for size in sizes {
1366            let data = test_data(size);
1367            let ranges = ChunkRanges::all();
1368            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1369            let obs = store.observe(hash);
1370            let task = tokio::spawn(async move {
1371                obs.await_completion().await?;
1372                api::Result::Ok(())
1373            });
1374            store.import_bao_bytes(hash, ranges, bao).await?;
1375            task.await??;
1376        }
1377        Ok(())
1378    }
1379
1380    /// Generate test data for size n.
1381    ///
1382    /// We don't really care about the content, since we assume blake3 works.
1383    /// The only thing it should not be is all zeros, since that is what you
1384    /// will get for a gap.
1385    pub fn test_data(n: usize) -> Bytes {
1386        let mut res = Vec::with_capacity(n);
1387        // Using uppercase A-Z (65-90), 26 possible characters
1388        for i in 0..n {
1389            // Change character every 1024 bytes
1390            let block_num = i / 1024;
1391            // Map to uppercase A-Z range (65-90)
1392            let ascii_val = 65 + (block_num % 26) as u8;
1393            res.push(ascii_val);
1394        }
1395        Bytes::from(res)
1396    }
1397
1398    // import data via import_bytes, check that we can observe it and that it is complete
1399    #[tokio::test]
1400    async fn test_import_byte_stream() -> TestResult<()> {
1401        tracing_subscriber::fmt::try_init().ok();
1402        let testdir = tempfile::tempdir()?;
1403        let db_dir = testdir.path().join("db");
1404        let store = FsStore::load(db_dir).await?;
1405        for size in INTERESTING_SIZES {
1406            let expected = test_data(size);
1407            let expected_hash = Hash::new(&expected);
1408            let stream = bytes_to_stream(expected.clone(), 1023);
1409            let obs = store.observe(expected_hash);
1410            let tt = store.add_stream(stream).await.temp_tag().await?;
1411            assert_eq!(expected_hash, *tt.hash());
1412            // we must at some point see completion, otherwise the test will hang
1413            obs.await_completion().await?;
1414            let actual = store.get_bytes(expected_hash).await?;
1415            // check that the data is there
1416            assert_eq!(&expected, &actual);
1417        }
1418        Ok(())
1419    }
1420
1421    // import data via import_bytes, check that we can observe it and that it is complete
1422    #[tokio::test]
1423    async fn test_import_bytes() -> TestResult<()> {
1424        tracing_subscriber::fmt::try_init().ok();
1425        let testdir = tempfile::tempdir()?;
1426        let db_dir = testdir.path().join("db");
1427        let store = FsStore::load(&db_dir).await?;
1428        let sizes = INTERESTING_SIZES;
1429        trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1430        for size in sizes {
1431            let expected = test_data(size);
1432            let expected_hash = Hash::new(&expected);
1433            let obs = store.observe(expected_hash);
1434            let tt = store.add_bytes(expected.clone()).await?;
1435            assert_eq!(expected_hash, tt.hash);
1436            // we must at some point see completion, otherwise the test will hang
1437            obs.await_completion().await?;
1438            let actual = store.get_bytes(expected_hash).await?;
1439            // check that the data is there
1440            assert_eq!(&expected, &actual);
1441        }
1442        store.shutdown().await?;
1443        dump_dir_full(db_dir)?;
1444        Ok(())
1445    }
1446
1447    // import data via import_bytes, check that we can observe it and that it is complete
1448    #[tokio::test]
1449    #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1450    async fn test_roundtrip_bytes_small() -> TestResult<()> {
1451        tracing_subscriber::fmt::try_init().ok();
1452        let testdir = tempfile::tempdir()?;
1453        let db_dir = testdir.path().join("db");
1454        let store = FsStore::load(db_dir).await?;
1455        for size in INTERESTING_SIZES
1456            .into_iter()
1457            .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1458        {
1459            let expected = test_data(size);
1460            let expected_hash = Hash::new(&expected);
1461            let obs = store.observe(expected_hash);
1462            let tt = store.add_bytes(expected.clone()).await?;
1463            assert_eq!(expected_hash, tt.hash);
1464            let actual = store.get_bytes(expected_hash).await?;
1465            // check that the data is there
1466            assert_eq!(&expected, &actual);
1467            assert_eq!(
1468                &expected.addr(),
1469                &actual.addr(),
1470                "address mismatch for size {size}"
1471            );
1472            // we must at some point see completion, otherwise the test will hang
1473            // keep the handle alive by observing until the end, otherwise the handle
1474            // will change and the bytes won't be the same instance anymore
1475            obs.await_completion().await?;
1476        }
1477        store.shutdown().await?;
1478        Ok(())
1479    }
1480
1481    // import data via import_bytes, check that we can observe it and that it is complete
1482    #[tokio::test]
1483    async fn test_import_path() -> TestResult<()> {
1484        tracing_subscriber::fmt::try_init().ok();
1485        let testdir = tempfile::tempdir()?;
1486        let db_dir = testdir.path().join("db");
1487        let store = FsStore::load(db_dir).await?;
1488        for size in INTERESTING_SIZES {
1489            let expected = test_data(size);
1490            let expected_hash = Hash::new(&expected);
1491            let path = testdir.path().join(format!("in-{size}"));
1492            fs::write(&path, &expected)?;
1493            let obs = store.observe(expected_hash);
1494            let tt = store.add_path(&path).await?;
1495            assert_eq!(expected_hash, tt.hash);
1496            // we must at some point see completion, otherwise the test will hang
1497            obs.await_completion().await?;
1498            let actual = store.get_bytes(expected_hash).await?;
1499            // check that the data is there
1500            assert_eq!(&expected, &actual, "size={size}");
1501        }
1502        dump_dir_full(testdir.path())?;
1503        Ok(())
1504    }
1505
1506    // import data via import_bytes, check that we can observe it and that it is complete
1507    #[tokio::test]
1508    async fn test_export_path() -> TestResult<()> {
1509        tracing_subscriber::fmt::try_init().ok();
1510        let testdir = tempfile::tempdir()?;
1511        let db_dir = testdir.path().join("db");
1512        let store = FsStore::load(db_dir).await?;
1513        for size in INTERESTING_SIZES {
1514            let expected = test_data(size);
1515            let expected_hash = Hash::new(&expected);
1516            let tt = store.add_bytes(expected.clone()).await?;
1517            assert_eq!(expected_hash, tt.hash);
1518            let out_path = testdir.path().join(format!("out-{size}"));
1519            store.export(expected_hash, &out_path).await?;
1520            let actual = fs::read(&out_path)?;
1521            assert_eq!(expected, actual);
1522        }
1523        Ok(())
1524    }
1525
1526    #[tokio::test]
1527    async fn test_import_bao_ranges() -> TestResult<()> {
1528        tracing_subscriber::fmt::try_init().ok();
1529        let testdir = tempfile::tempdir()?;
1530        let db_dir = testdir.path().join("db");
1531        {
1532            let store = FsStore::load(&db_dir).await?;
1533            let data = test_data(100000);
1534            let ranges = ChunkRanges::chunks(16..32);
1535            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1536            store
1537                .import_bao_bytes(hash, ranges.clone(), bao.clone())
1538                .await?;
1539            let bitfield = store.observe(hash).await?;
1540            assert_eq!(bitfield.ranges, ranges);
1541            assert_eq!(bitfield.size(), data.len() as u64);
1542            let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1543            assert_eq!(export, bao);
1544        }
1545        Ok(())
1546    }
1547
1548    #[tokio::test]
1549    async fn test_import_bao_minimal() -> TestResult<()> {
1550        tracing_subscriber::fmt::try_init().ok();
1551        let testdir = tempfile::tempdir()?;
1552        let sizes = [1];
1553        let db_dir = testdir.path().join("db");
1554        {
1555            let store = FsStore::load(&db_dir).await?;
1556            for size in sizes {
1557                let data = vec![0u8; size];
1558                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1559                let data = Bytes::from(encoded);
1560                store
1561                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1562                    .await?;
1563            }
1564            store.shutdown().await?;
1565        }
1566        Ok(())
1567    }
1568
1569    #[tokio::test]
1570    async fn test_import_bao_simple() -> TestResult<()> {
1571        tracing_subscriber::fmt::try_init().ok();
1572        let testdir = tempfile::tempdir()?;
1573        let sizes = [1048576];
1574        let db_dir = testdir.path().join("db");
1575        {
1576            let store = FsStore::load(&db_dir).await?;
1577            for size in sizes {
1578                let data = vec![0u8; size];
1579                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1580                let data = Bytes::from(encoded);
1581                trace!("importing size={}", size);
1582                store
1583                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1584                    .await?;
1585            }
1586            store.shutdown().await?;
1587        }
1588        Ok(())
1589    }
1590
1591    #[tokio::test]
1592    async fn test_import_bao_persistence_full() -> TestResult<()> {
1593        tracing_subscriber::fmt::try_init().ok();
1594        let testdir = tempfile::tempdir()?;
1595        let sizes = INTERESTING_SIZES;
1596        let db_dir = testdir.path().join("db");
1597        {
1598            let store = FsStore::load(&db_dir).await?;
1599            for size in sizes {
1600                let data = vec![0u8; size];
1601                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1602                let data = Bytes::from(encoded);
1603                store
1604                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1605                    .await?;
1606            }
1607            store.shutdown().await?;
1608        }
1609        {
1610            let store = FsStore::load(&db_dir).await?;
1611            for size in sizes {
1612                let expected = vec![0u8; size];
1613                let hash = Hash::new(&expected);
1614                let actual = store
1615                    .export_bao(hash, ChunkRanges::all())
1616                    .data_to_vec()
1617                    .await?;
1618                assert_eq!(&expected, &actual);
1619            }
1620            store.shutdown().await?;
1621        }
1622        Ok(())
1623    }
1624
1625    #[tokio::test]
1626    async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1627        tracing_subscriber::fmt::try_init().ok();
1628        let testdir = tempfile::tempdir()?;
1629        let sizes = INTERESTING_SIZES;
1630        let db_dir = testdir.path().join("db");
1631        let just_size = ChunkRanges::last_chunk();
1632        {
1633            let store = FsStore::load(&db_dir).await?;
1634            for size in sizes {
1635                let data = test_data(size);
1636                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1637                let data = Bytes::from(encoded);
1638                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1639                    panic!("failed to import size={size}: {cause}");
1640                }
1641            }
1642            store.dump().await?;
1643            store.shutdown().await?;
1644        }
1645        {
1646            let store = FsStore::load(&db_dir).await?;
1647            store.dump().await?;
1648            for size in sizes {
1649                let data = test_data(size);
1650                let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1651                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1652                    Ok(actual) => actual,
1653                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1654                };
1655                assert_eq!(&expected, &actual);
1656            }
1657            store.shutdown().await?;
1658        }
1659        dump_dir_full(testdir.path())?;
1660        Ok(())
1661    }
1662
1663    #[tokio::test]
1664    async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1665        tracing_subscriber::fmt::try_init().ok();
1666        let testdir = tempfile::tempdir()?;
1667        let sizes = INTERESTING_SIZES;
1668        let db_dir = testdir.path().join("db");
1669        let just_size = ChunkRanges::last_chunk();
1670        // stage 1, import just the last full chunk group to get a validated size
1671        {
1672            let store = FsStore::load(&db_dir).await?;
1673            for size in sizes {
1674                let data = test_data(size);
1675                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1676                let data = Bytes::from(encoded);
1677                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1678                    panic!("failed to import size={size}: {cause}");
1679                }
1680            }
1681            store.dump().await?;
1682            store.shutdown().await?;
1683        }
1684        dump_dir_full(testdir.path())?;
1685        // stage 2, import the rest
1686        {
1687            let store = FsStore::load(&db_dir).await?;
1688            for size in sizes {
1689                let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1690                if remaining.is_empty() {
1691                    continue;
1692                }
1693                let data = test_data(size);
1694                let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1695                let data = Bytes::from(encoded);
1696                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1697                    panic!("failed to import size={size}: {cause}");
1698                }
1699            }
1700            store.dump().await?;
1701            store.shutdown().await?;
1702        }
1703        // check if the data is complete
1704        {
1705            let store = FsStore::load(&db_dir).await?;
1706            store.dump().await?;
1707            for size in sizes {
1708                let data = test_data(size);
1709                let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1710                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1711                    Ok(actual) => actual,
1712                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1713                };
1714                assert_eq!(&expected, &actual);
1715            }
1716            store.dump().await?;
1717            store.shutdown().await?;
1718        }
1719        dump_dir_full(testdir.path())?;
1720        Ok(())
1721    }
1722
1723    fn just_size() -> ChunkRanges {
1724        ChunkRanges::last_chunk()
1725    }
1726
1727    #[tokio::test]
1728    async fn test_import_bao_persistence_observe() -> TestResult<()> {
1729        tracing_subscriber::fmt::try_init().ok();
1730        let testdir = tempfile::tempdir()?;
1731        let sizes = INTERESTING_SIZES;
1732        let db_dir = testdir.path().join("db");
1733        let just_size = just_size();
1734        // stage 1, import just the last full chunk group to get a validated size
1735        {
1736            let store = FsStore::load(&db_dir).await?;
1737            for size in sizes {
1738                let data = test_data(size);
1739                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1740                let data = Bytes::from(encoded);
1741                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1742                    panic!("failed to import size={size}: {cause}");
1743                }
1744            }
1745            store.dump().await?;
1746            store.shutdown().await?;
1747        }
1748        dump_dir_full(testdir.path())?;
1749        // stage 2, import the rest
1750        {
1751            let store = FsStore::load(&db_dir).await?;
1752            for size in sizes {
1753                let expected_ranges = round_up_request(size as u64, &just_size);
1754                let data = test_data(size);
1755                let hash = Hash::new(&data);
1756                let bitfield = store.observe(hash).await?;
1757                assert_eq!(bitfield.ranges, expected_ranges);
1758            }
1759            store.dump().await?;
1760            store.shutdown().await?;
1761        }
1762        Ok(())
1763    }
1764
1765    #[tokio::test]
1766    async fn test_import_bao_persistence_recover() -> TestResult<()> {
1767        tracing_subscriber::fmt::try_init().ok();
1768        let testdir = tempfile::tempdir()?;
1769        let sizes = INTERESTING_SIZES;
1770        let db_dir = testdir.path().join("db");
1771        let options = Options::new(&db_dir);
1772        let just_size = just_size();
1773        // stage 1, import just the last full chunk group to get a validated size
1774        {
1775            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1776            for size in sizes {
1777                let data = test_data(size);
1778                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1779                let data = Bytes::from(encoded);
1780                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1781                    panic!("failed to import size={size}: {cause}");
1782                }
1783            }
1784            store.dump().await?;
1785            store.shutdown().await?;
1786        }
1787        delete_rec(testdir.path(), "bitfield")?;
1788        dump_dir_full(testdir.path())?;
1789        // stage 2, import the rest
1790        {
1791            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1792            for size in sizes {
1793                let expected_ranges = round_up_request(size as u64, &just_size);
1794                let data = test_data(size);
1795                let hash = Hash::new(&data);
1796                let bitfield = store.observe(hash).await?;
1797                assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1798            }
1799            store.dump().await?;
1800            store.shutdown().await?;
1801        }
1802        Ok(())
1803    }
1804
1805    #[tokio::test]
1806    async fn test_import_bytes_persistence_full() -> TestResult<()> {
1807        tracing_subscriber::fmt::try_init().ok();
1808        let testdir = tempfile::tempdir()?;
1809        let sizes = INTERESTING_SIZES;
1810        let db_dir = testdir.path().join("db");
1811        {
1812            let store = FsStore::load(&db_dir).await?;
1813            let mut tts = Vec::new();
1814            for size in sizes {
1815                let data = test_data(size);
1816                let data = data;
1817                tts.push(store.add_bytes(data.clone()).await?);
1818            }
1819            store.dump().await?;
1820            store.shutdown().await?;
1821        }
1822        {
1823            let store = FsStore::load(&db_dir).await?;
1824            store.dump().await?;
1825            for size in sizes {
1826                let expected = test_data(size);
1827                let hash = Hash::new(&expected);
1828                let Ok(actual) = store
1829                    .export_bao(hash, ChunkRanges::all())
1830                    .data_to_vec()
1831                    .await
1832                else {
1833                    panic!("failed to export size={size}");
1834                };
1835                assert_eq!(&expected, &actual, "size={size}");
1836            }
1837            store.shutdown().await?;
1838        }
1839        Ok(())
1840    }
1841
1842    async fn test_batch(store: &Store) -> TestResult<()> {
1843        let batch = store.blobs().batch().await?;
1844        let tt1 = batch.temp_tag(Hash::new("foo")).await?;
1845        let tt2 = batch.add_slice("boo").await?;
1846        let tts = store
1847            .tags()
1848            .list_temp_tags()
1849            .await?
1850            .collect::<HashSet<_>>()
1851            .await;
1852        assert!(tts.contains(tt1.hash_and_format()));
1853        assert!(tts.contains(tt2.hash_and_format()));
1854        drop(batch);
1855        store.sync_db().await?;
1856        let tts = store
1857            .tags()
1858            .list_temp_tags()
1859            .await?
1860            .collect::<HashSet<_>>()
1861            .await;
1862        // temp tag went out of scope, so it does not work anymore
1863        assert!(!tts.contains(tt1.hash_and_format()));
1864        assert!(!tts.contains(tt2.hash_and_format()));
1865        drop(tt1);
1866        drop(tt2);
1867        Ok(())
1868    }
1869
1870    #[tokio::test]
1871    async fn test_batch_fs() -> TestResult<()> {
1872        tracing_subscriber::fmt::try_init().ok();
1873        let testdir = tempfile::tempdir()?;
1874        let db_dir = testdir.path().join("db");
1875        let store = FsStore::load(db_dir).await?;
1876        test_batch(&store).await
1877    }
1878
1879    #[tokio::test]
1880    async fn smoke() -> TestResult<()> {
1881        tracing_subscriber::fmt::try_init().ok();
1882        let testdir = tempfile::tempdir()?;
1883        let db_dir = testdir.path().join("db");
1884        let store = FsStore::load(db_dir).await?;
1885        let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
1886        store.tags().set(Tag::from("test"), haf).await?;
1887        store.tags().set(Tag::from("boo"), haf).await?;
1888        store.tags().set(Tag::from("bar"), haf).await?;
1889        let sizes = INTERESTING_SIZES;
1890        let mut hashes = Vec::new();
1891        let mut data_by_hash = HashMap::new();
1892        let mut bao_by_hash = HashMap::new();
1893        for size in sizes {
1894            let data = vec![0u8; size];
1895            let data = Bytes::from(data);
1896            let tt = store.add_bytes(data.clone()).temp_tag().await?;
1897            data_by_hash.insert(*tt.hash(), data);
1898            hashes.push(tt);
1899        }
1900        store.sync_db().await?;
1901        for tt in &hashes {
1902            let hash = *tt.hash();
1903            let path = testdir.path().join(format!("{hash}.txt"));
1904            store.export(hash, path).await?;
1905        }
1906        for tt in &hashes {
1907            let hash = tt.hash();
1908            let data = store
1909                .export_bao(*hash, ChunkRanges::all())
1910                .data_to_vec()
1911                .await
1912                .unwrap();
1913            assert_eq!(data, data_by_hash[hash].to_vec());
1914            let bao = store
1915                .export_bao(*hash, ChunkRanges::all())
1916                .bao_to_vec()
1917                .await
1918                .unwrap();
1919            bao_by_hash.insert(*hash, bao);
1920        }
1921        store.dump().await?;
1922
1923        for size in sizes {
1924            let data = test_data(size);
1925            let ranges = ChunkRanges::all();
1926            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1927            store.import_bao_bytes(hash, ranges, bao).await?;
1928        }
1929
1930        for (_hash, _bao_tree) in bao_by_hash {
1931            // let mut reader = Cursor::new(bao_tree);
1932            // let size = reader.read_u64_le().await?;
1933            // let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
1934            // let ranges = ChunkRanges::all();
1935            // let mut decoder = DecodeResponseIter::new(hash, tree, reader, &ranges);
1936            // while let Some(item) = decoder.next() {
1937            //     let item = item?;
1938            // }
1939            // store.import_bao_bytes(hash, ChunkRanges::all(), bao_tree.into()).await?;
1940        }
1941        Ok(())
1942    }
1943
1944    pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
1945        // Remove leading dot if present, so we have just the extension
1946        let ext = extension.trim_start_matches('.').to_lowercase();
1947
1948        for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
1949            let path = entry.path();
1950
1951            if path.is_file() {
1952                if let Some(file_ext) = path.extension() {
1953                    if file_ext.to_string_lossy().to_lowercase() == ext {
1954                        println!("Deleting: {}", path.display());
1955                        fs::remove_file(path)?;
1956                    }
1957                }
1958            }
1959        }
1960
1961        Ok(())
1962    }
1963
1964    pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
1965        let mut entries: Vec<_> = WalkDir::new(&path)
1966            .into_iter()
1967            .filter_map(Result::ok) // Skip errors
1968            .collect();
1969
1970        // Sort by path (name at each depth)
1971        entries.sort_by(|a, b| a.path().cmp(b.path()));
1972
1973        for entry in entries {
1974            let depth = entry.depth();
1975            let indent = "  ".repeat(depth); // Two spaces per level
1976            let name = entry.file_name().to_string_lossy();
1977            let size = entry.metadata()?.len(); // Size in bytes
1978
1979            if entry.file_type().is_file() {
1980                println!("{indent}{name} ({size} bytes)");
1981            } else if entry.file_type().is_dir() {
1982                println!("{indent}{name}/");
1983            }
1984        }
1985        Ok(())
1986    }
1987
1988    pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
1989        let mut entries: Vec<_> = WalkDir::new(&path)
1990            .into_iter()
1991            .filter_map(Result::ok) // Skip errors
1992            .collect();
1993
1994        // Sort by path (name at each depth)
1995        entries.sort_by(|a, b| a.path().cmp(b.path()));
1996
1997        for entry in entries {
1998            let depth = entry.depth();
1999            let indent = "  ".repeat(depth);
2000            let name = entry.file_name().to_string_lossy();
2001
2002            if entry.file_type().is_dir() {
2003                println!("{indent}{name}/");
2004            } else if entry.file_type().is_file() {
2005                let size = entry.metadata()?.len();
2006                println!("{indent}{name} ({size} bytes)");
2007
2008                // Dump depending on file type
2009                let path = entry.path();
2010                if name.ends_with(".data") {
2011                    print!("{indent}  ");
2012                    dump_file(path, 1024 * 16)?;
2013                } else if name.ends_with(".obao4") {
2014                    print!("{indent}  ");
2015                    dump_file(path, 64)?;
2016                } else if name.ends_with(".sizes4") {
2017                    print!("{indent}  ");
2018                    dump_file(path, 8)?;
2019                } else if name.ends_with(".bitfield") {
2020                    match read_checksummed::<Bitfield>(path) {
2021                        Ok(bitfield) => {
2022                            println!("{indent}  bitfield: {bitfield:?}");
2023                        }
2024                        Err(cause) => {
2025                            println!("{indent}  bitfield: error: {cause}");
2026                        }
2027                    }
2028                } else {
2029                    continue; // Skip content dump for other files
2030                };
2031            }
2032        }
2033        Ok(())
2034    }
2035
2036    pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2037        let bits = file_bits(path, chunk_size)?;
2038        println!("{}", print_bitfield_ansi(bits));
2039        Ok(())
2040    }
2041
2042    pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2043        let file = fs::File::open(&path)?;
2044        let file_size = file.metadata()?.len();
2045        let mut buffer = vec![0u8; chunk_size as usize];
2046        let mut bits = Vec::new();
2047
2048        let mut offset = 0u64;
2049        while offset < file_size {
2050            let remaining = file_size - offset;
2051            let current_chunk_size = chunk_size.min(remaining);
2052
2053            let chunk = &mut buffer[..current_chunk_size as usize];
2054            file.read_exact_at(offset, chunk)?;
2055
2056            let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2057            bits.push(has_non_zero);
2058
2059            offset += current_chunk_size;
2060        }
2061
2062        Ok(bits)
2063    }
2064
2065    #[allow(dead_code)]
2066    fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2067        bits.into_iter()
2068            .map(|bit| if bit { '#' } else { '_' })
2069            .collect()
2070    }
2071
2072    fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2073        let mut result = String::new();
2074        let mut iter = bits.into_iter();
2075
2076        while let Some(b1) = iter.next() {
2077            let b2 = iter.next();
2078
2079            // ANSI color codes
2080            let white_fg = "\x1b[97m"; // bright white foreground
2081            let reset = "\x1b[0m"; // reset all attributes
2082            let gray_bg = "\x1b[100m"; // bright black (gray) background
2083            let black_bg = "\x1b[40m"; // black background
2084
2085            let colored_char = match (b1, b2) {
2086                (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), // 11 - solid white on default background
2087                (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), // 10 - left half white on gray background
2088                (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), // 01 - right half white on gray background
2089                (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), // 00 - space with gray background
2090                (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), // 1 (pad 0) - left half white on black background
2091                (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), // 0 (pad 0) - space with black background
2092            };
2093
2094            result.push_str(&colored_char);
2095        }
2096
2097        // Ensure we end with a reset code to prevent color bleeding
2098        result.push_str("\x1b[0m");
2099        result
2100    }
2101
2102    fn bytes_to_stream(
2103        bytes: Bytes,
2104        chunk_size: usize,
2105    ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2106        assert!(chunk_size > 0, "Chunk size must be greater than 0");
2107        stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2108            if offset >= bytes.len() {
2109                None
2110            } else {
2111                let chunk_len = chunk_size.min(bytes.len() - offset);
2112                let chunk = bytes.slice(offset..offset + chunk_len);
2113                Some((Ok(chunk), (bytes, offset + chunk_len)))
2114            }
2115        })
2116    }
2117}