iroh_blobs/store/
fs.rs

1//! # File based blob store.
2//!
3//! A file based blob store needs a writeable directory to work with.
4//!
5//! General design:
6//!
7//! The file store consists of two actors.
8//!
9//! # The main actor
10//!
11//! The purpose of the main actor is to handle user commands and own a map of
12//! handles for hashes that are currently being worked on.
13//!
14//! It also owns tasks for ongoing import and export operations, as well as the
15//! database actor.
16//!
17//! Handling a command almost always involves either forwarding it to the
18//! database actor or creating a hash context and spawning a task.
19//!
20//! # The database actor
21//!
22//! The database actor is responsible for storing metadata about each hash,
23//! as well as inlined data and outboard data for small files.
24//!
25//! In addition to the metadata, the database actor also stores tags.
26//!
27//! # Tasks
28//!
29//! Tasks do not return a result. They are responsible for sending an error
30//! to the requester if possible. Otherwise, just dropping the sender will
31//! also fail the receiver, but without a descriptive error message.
32//!
33//! Tasks are usually implemented as an impl fn that does return a result,
34//! and a wrapper (named `..._task`) that just forwards the error, if any.
35//!
36//! That way you can use `?` syntax in the task implementation. The impl fns
37//! are also easier to test.
38//!
39//! # Context
40//!
41//! The main actor holds a TaskContext that is needed for almost all tasks,
42//! such as the config and a way to interact with the database.
43//!
44//! For tasks that are specific to a hash, a HashContext combines the task
45//! context with a slot from the table of the main actor that can be used
46//! to obtain an unique handle for the hash.
47//!
48//! # Runtime
49//!
50//! The fs store owns and manages its own tokio runtime. Dropping the store
51//! will clean up the database and shut down the runtime. However, some parts
52//! of the persistent state won't make it to disk, so operations involving large
53//! partial blobs will have a large initial delay on the next startup.
54//!
55//! It is also not guaranteed that all write operations will make it to disk.
56//! The on-disk store will be in a consistent state, but might miss some writes
57//! in the last seconds before shutdown.
58//!
59//! To avoid this, you can use the [`crate::api::Store::shutdown`] method to
60//! cleanly shut down the store and save ephemeral state to disk.
61//!
62//! Note that if you use the store inside a [`iroh::protocol::Router`] and shut
63//! down the router using [`iroh::protocol::Router::shutdown`], the store will be
64//! safely shut down as well. Any store refs you are holding will be inoperable
65//! after this.
66use std::{
67    collections::{HashMap, HashSet},
68    fmt, fs,
69    future::Future,
70    io::Write,
71    num::NonZeroU64,
72    ops::Deref,
73    path::{Path, PathBuf},
74    sync::Arc,
75};
76
77use bao_tree::{
78    io::{
79        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
80        sync::ReadAt,
81        BaoContentItem, Leaf,
82    },
83    ChunkNum, ChunkRanges,
84};
85use bytes::Bytes;
86use delete_set::{BaoFilePart, ProtectHandle};
87use entry_state::{DataLocation, OutboardLocation};
88use gc::run_gc;
89use import::{ImportEntry, ImportSource};
90use irpc::channel::mpsc;
91use meta::{list_blobs, Snapshot};
92use n0_future::{future::yield_now, io};
93use nested_enum_utils::enum_conversions;
94use range_collections::range_set::RangeSetRange;
95use tokio::task::{Id, JoinError, JoinSet};
96use tracing::{error, instrument, trace};
97
98use crate::{
99    api::{
100        proto::{
101            self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
102            CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
103            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
104            ImportBaoRequest, ObserveMsg, Scope,
105        },
106        ApiClient,
107    },
108    store::{
109        util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
110        Hash,
111    },
112    util::{
113        channel::oneshot,
114        temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
115        ChunkRangesExt,
116    },
117};
118mod bao_file;
119use bao_file::{BaoFileHandle, BaoFileHandleWeak};
120mod delete_set;
121mod entry_state;
122mod import;
123mod meta;
124pub mod options;
125pub(crate) mod util;
126use entry_state::EntryState;
127use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
128use options::Options;
129use tracing::Instrument;
130mod gc;
131
132use super::HashAndFormat;
133use crate::api::{
134    self,
135    blobs::{AddProgressItem, ExportMode, ExportProgressItem},
136    Store,
137};
138
139/// Create a 16 byte unique ID.
140fn new_uuid() -> [u8; 16] {
141    use rand::RngCore;
142    let mut rng = rand::thread_rng();
143    let mut bytes = [0u8; 16];
144    rng.fill_bytes(&mut bytes);
145    bytes
146}
147
148/// Create temp file name based on a 16 byte UUID.
149fn temp_name() -> String {
150    format!("{}.temp", hex::encode(new_uuid()))
151}
152
153#[derive(Debug)]
154#[enum_conversions()]
155pub(crate) enum InternalCommand {
156    Dump(meta::Dump),
157    FinishImport(ImportEntryMsg),
158    ClearScope(ClearScope),
159}
160
161#[derive(Debug)]
162pub(crate) struct ClearScope {
163    pub scope: Scope,
164}
165
166impl InternalCommand {
167    pub fn parent_span(&self) -> tracing::Span {
168        match self {
169            Self::Dump(_) => tracing::Span::current(),
170            Self::ClearScope(_) => tracing::Span::current(),
171            Self::FinishImport(cmd) => cmd
172                .parent_span_opt()
173                .cloned()
174                .unwrap_or_else(tracing::Span::current),
175        }
176    }
177}
178
179/// Context needed by most tasks
180#[derive(Debug)]
181struct TaskContext {
182    // Store options such as paths and inline thresholds, in an Arc to cheaply share with tasks.
183    pub options: Arc<Options>,
184    // Metadata database, basically a mpsc sender with some extra functionality.
185    pub db: meta::Db,
186    // Handle to send internal commands
187    pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
188    /// The file handle for the empty hash.
189    pub empty: BaoFileHandle,
190    /// Handle to protect files from deletion.
191    pub protect: ProtectHandle,
192}
193
194impl TaskContext {
195    pub async fn clear_scope(&self, scope: Scope) {
196        self.internal_cmd_tx
197            .send(ClearScope { scope }.into())
198            .await
199            .ok();
200    }
201}
202
203#[derive(Debug)]
204struct Actor {
205    // Context that can be cheaply shared with tasks.
206    context: Arc<TaskContext>,
207    // Receiver for incoming user commands.
208    cmd_rx: tokio::sync::mpsc::Receiver<Command>,
209    // Receiver for incoming file store specific commands.
210    fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
211    // Tasks for import and export operations.
212    tasks: JoinSet<()>,
213    // Running tasks
214    running: HashSet<Id>,
215    // handles
216    handles: HashMap<Hash, Slot>,
217    // temp tags
218    temp_tags: TempTags,
219    // our private tokio runtime. It has to live somewhere.
220    _rt: RtWrapper,
221}
222
223/// Wraps a slot and the task context.
224///
225/// This contains everything a hash-specific task should need.
226struct HashContext {
227    slot: Slot,
228    ctx: Arc<TaskContext>,
229}
230
231impl HashContext {
232    pub fn db(&self) -> &meta::Db {
233        &self.ctx.db
234    }
235
236    pub fn options(&self) -> &Arc<Options> {
237        &self.ctx.options
238    }
239
240    pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option<BaoFileHandleWeak>> {
241        self.slot.0.lock().await
242    }
243
244    pub fn protect(&self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
245        self.ctx.protect.protect(hash, parts);
246    }
247
248    /// Update the entry state in the database, and wait for completion.
249    pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250        let (tx, rx) = oneshot::channel();
251        self.db()
252            .send(
253                meta::Update {
254                    hash,
255                    state,
256                    tx: Some(tx),
257                    span: tracing::Span::current(),
258                }
259                .into(),
260            )
261            .await?;
262        rx.await.map_err(|_e| io::Error::other(""))??;
263        Ok(())
264    }
265
266    pub async fn get_entry_state(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
267        if hash == Hash::EMPTY {
268            return Ok(Some(EntryState::Complete {
269                data_location: DataLocation::Inline(Bytes::new()),
270                outboard_location: OutboardLocation::NotNeeded,
271            }));
272        }
273        let (tx, rx) = oneshot::channel();
274        self.db()
275            .send(
276                meta::Get {
277                    hash,
278                    tx,
279                    span: tracing::Span::current(),
280                }
281                .into(),
282            )
283            .await
284            .ok();
285        let res = rx.await.map_err(io::Error::other)?;
286        Ok(res.state?)
287    }
288
289    /// Update the entry state in the database, and wait for completion.
290    pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291        let (tx, rx) = oneshot::channel();
292        self.db()
293            .send(
294                meta::Set {
295                    hash,
296                    state,
297                    tx,
298                    span: tracing::Span::current(),
299                }
300                .into(),
301            )
302            .await
303            .map_err(io::Error::other)?;
304        rx.await.map_err(|_e| io::Error::other(""))??;
305        Ok(())
306    }
307
308    pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
309        if hash == Hash::EMPTY {
310            return Ok(self.ctx.empty.clone());
311        }
312        let res = self
313            .slot
314            .get_or_create(|| async {
315                let res = self.db().get(hash).await.map_err(io::Error::other)?;
316                let res = match res {
317                    Some(state) => open_bao_file(&hash, state, &self.ctx).await,
318                    None => Ok(BaoFileHandle::new_partial_mem(
319                        hash,
320                        self.ctx.options.clone(),
321                    )),
322                };
323                Ok((res?, ()))
324            })
325            .await
326            .map_err(api::Error::from);
327        trace!("{res:?}");
328        let (res, _) = res?;
329        Ok(res)
330    }
331}
332
333async fn open_bao_file(
334    hash: &Hash,
335    state: EntryState<Bytes>,
336    ctx: &TaskContext,
337) -> io::Result<BaoFileHandle> {
338    let options = &ctx.options;
339    Ok(match state {
340        EntryState::Complete {
341            data_location,
342            outboard_location,
343        } => {
344            let data = match data_location {
345                DataLocation::Inline(data) => MemOrFile::Mem(data),
346                DataLocation::Owned(size) => {
347                    let path = options.path.data_path(hash);
348                    let file = fs::File::open(&path)?;
349                    MemOrFile::File(FixedSize::new(file, size))
350                }
351                DataLocation::External(paths, size) => {
352                    let Some(path) = paths.into_iter().next() else {
353                        return Err(io::Error::other("no external data path"));
354                    };
355                    let file = fs::File::open(&path)?;
356                    MemOrFile::File(FixedSize::new(file, size))
357                }
358            };
359            let outboard = match outboard_location {
360                OutboardLocation::NotNeeded => MemOrFile::empty(),
361                OutboardLocation::Inline(data) => MemOrFile::Mem(data),
362                OutboardLocation::Owned => {
363                    let path = options.path.outboard_path(hash);
364                    let file = fs::File::open(&path)?;
365                    MemOrFile::File(file)
366                }
367            };
368            BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
369        }
370        EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
371    })
372}
373
374/// An entry for each hash, containing a weak reference to a BaoFileHandle
375/// wrapped in a tokio mutex so handle creation is sequential.
376#[derive(Debug, Clone, Default)]
377pub(crate) struct Slot(Arc<tokio::sync::Mutex<Option<BaoFileHandleWeak>>>);
378
379impl Slot {
380    pub async fn is_live(&self) -> bool {
381        let slot = self.0.lock().await;
382        slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false)
383    }
384
385    /// Get the handle if it exists and is still alive, otherwise load it from the database.
386    /// If there is nothing in the database, create a new in-memory handle.
387    ///
388    /// `make` will be called if the a live handle does not exist.
389    pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
390    where
391        F: FnOnce() -> Fut,
392        Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
393        T: Default,
394    {
395        let mut slot = self.0.lock().await;
396        if let Some(weak) = &*slot {
397            if let Some(handle) = weak.upgrade() {
398                return Ok((handle, Default::default()));
399            }
400        }
401        let handle = make().await;
402        if let Ok((handle, _)) = &handle {
403            *slot = Some(handle.downgrade());
404        }
405        handle
406    }
407}
408
409impl Actor {
410    fn db(&self) -> &meta::Db {
411        &self.context.db
412    }
413
414    fn context(&self) -> Arc<TaskContext> {
415        self.context.clone()
416    }
417
418    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
419        let span = tracing::Span::current();
420        let id = self.tasks.spawn(fut.instrument(span)).id();
421        self.running.insert(id);
422    }
423
424    fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
425        match res {
426            Ok((id, _)) => {
427                // println!("task {id} finished");
428                self.running.remove(&id);
429                // println!("{:?}", self.running);
430            }
431            Err(e) => {
432                error!("task failed: {e}");
433            }
434        }
435    }
436
437    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
438        let CreateTempTagMsg { tx, inner, .. } = cmd;
439        let mut tt = self.temp_tags.create(inner.scope, inner.value);
440        if tx.is_rpc() {
441            tt.leak();
442        }
443        tx.send(tt).await.ok();
444    }
445
446    async fn clear_dead_handles(&mut self) {
447        let mut to_remove = Vec::new();
448        for (hash, slot) in &self.handles {
449            if !slot.is_live().await {
450                to_remove.push(*hash);
451            }
452        }
453        for hash in to_remove {
454            if let Some(slot) = self.handles.remove(&hash) {
455                // do a quick check if the handle has become alive in the meantime, and reinsert it
456                let guard = slot.0.lock().await;
457                let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
458                if is_live {
459                    drop(guard);
460                    self.handles.insert(hash, slot);
461                }
462            }
463        }
464    }
465
466    async fn handle_command(&mut self, cmd: Command) {
467        let span = cmd.parent_span();
468        let _entered = span.enter();
469        match cmd {
470            Command::SyncDb(cmd) => {
471                trace!("{cmd:?}");
472                self.db().send(cmd.into()).await.ok();
473            }
474            Command::Shutdown(cmd) => {
475                trace!("{cmd:?}");
476                self.db().send(cmd.into()).await.ok();
477            }
478            Command::CreateTag(cmd) => {
479                trace!("{cmd:?}");
480                self.db().send(cmd.into()).await.ok();
481            }
482            Command::SetTag(cmd) => {
483                trace!("{cmd:?}");
484                self.db().send(cmd.into()).await.ok();
485            }
486            Command::ListTags(cmd) => {
487                trace!("{cmd:?}");
488                self.db().send(cmd.into()).await.ok();
489            }
490            Command::DeleteTags(cmd) => {
491                trace!("{cmd:?}");
492                self.db().send(cmd.into()).await.ok();
493            }
494            Command::RenameTag(cmd) => {
495                trace!("{cmd:?}");
496                self.db().send(cmd.into()).await.ok();
497            }
498            Command::ClearProtected(cmd) => {
499                trace!("{cmd:?}");
500                self.clear_dead_handles().await;
501                self.db().send(cmd.into()).await.ok();
502            }
503            Command::BlobStatus(cmd) => {
504                trace!("{cmd:?}");
505                self.db().send(cmd.into()).await.ok();
506            }
507            Command::ListBlobs(cmd) => {
508                trace!("{cmd:?}");
509                let (tx, rx) = tokio::sync::oneshot::channel();
510                self.db()
511                    .send(
512                        Snapshot {
513                            tx,
514                            span: cmd.span.clone(),
515                        }
516                        .into(),
517                    )
518                    .await
519                    .ok();
520                if let Ok(snapshot) = rx.await {
521                    self.spawn(list_blobs(snapshot, cmd));
522                }
523            }
524            Command::DeleteBlobs(cmd) => {
525                trace!("{cmd:?}");
526                self.db().send(cmd.into()).await.ok();
527            }
528            Command::Batch(cmd) => {
529                trace!("{cmd:?}");
530                let (id, scope) = self.temp_tags.create_scope();
531                self.spawn(handle_batch(cmd, id, scope, self.context()));
532            }
533            Command::CreateTempTag(cmd) => {
534                trace!("{cmd:?}");
535                self.create_temp_tag(cmd).await;
536            }
537            Command::ListTempTags(cmd) => {
538                trace!("{cmd:?}");
539                let tts = self.temp_tags.list();
540                cmd.tx.send(tts).await.ok();
541            }
542            Command::ImportBytes(cmd) => {
543                trace!("{cmd:?}");
544                self.spawn(import_bytes(cmd, self.context()));
545            }
546            Command::ImportByteStream(cmd) => {
547                trace!("{cmd:?}");
548                self.spawn(import_byte_stream(cmd, self.context()));
549            }
550            Command::ImportPath(cmd) => {
551                trace!("{cmd:?}");
552                self.spawn(import_path(cmd, self.context()));
553            }
554            Command::ExportPath(cmd) => {
555                trace!("{cmd:?}");
556                let ctx = self.hash_context(cmd.hash);
557                self.spawn(export_path(cmd, ctx));
558            }
559            Command::ExportBao(cmd) => {
560                trace!("{cmd:?}");
561                let ctx = self.hash_context(cmd.hash);
562                self.spawn(export_bao(cmd, ctx));
563            }
564            Command::ExportRanges(cmd) => {
565                trace!("{cmd:?}");
566                let ctx = self.hash_context(cmd.hash);
567                self.spawn(export_ranges(cmd, ctx));
568            }
569            Command::ImportBao(cmd) => {
570                trace!("{cmd:?}");
571                let ctx = self.hash_context(cmd.hash);
572                self.spawn(import_bao(cmd, ctx));
573            }
574            Command::Observe(cmd) => {
575                trace!("{cmd:?}");
576                let ctx = self.hash_context(cmd.hash);
577                self.spawn(observe(cmd, ctx));
578            }
579        }
580    }
581
582    /// Create a hash context for a given hash.
583    fn hash_context(&mut self, hash: Hash) -> HashContext {
584        HashContext {
585            slot: self.handles.entry(hash).or_default().clone(),
586            ctx: self.context.clone(),
587        }
588    }
589
590    async fn handle_fs_command(&mut self, cmd: InternalCommand) {
591        let span = cmd.parent_span();
592        let _entered = span.enter();
593        match cmd {
594            InternalCommand::Dump(cmd) => {
595                trace!("{cmd:?}");
596                self.db().send(cmd.into()).await.ok();
597            }
598            InternalCommand::ClearScope(cmd) => {
599                trace!("{cmd:?}");
600                self.temp_tags.end_scope(cmd.scope);
601            }
602            InternalCommand::FinishImport(cmd) => {
603                trace!("{cmd:?}");
604                if cmd.hash == Hash::EMPTY {
605                    cmd.tx
606                        .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
607                        .await
608                        .ok();
609                } else {
610                    let tt = self.temp_tags.create(
611                        cmd.scope,
612                        HashAndFormat {
613                            hash: cmd.hash,
614                            format: cmd.format,
615                        },
616                    );
617                    let ctx = self.hash_context(cmd.hash);
618                    self.spawn(finish_import(cmd, tt, ctx));
619                }
620            }
621        }
622    }
623
624    async fn run(mut self) {
625        loop {
626            tokio::select! {
627                cmd = self.cmd_rx.recv() => {
628                    let Some(cmd) = cmd else {
629                        break;
630                    };
631                    self.handle_command(cmd).await;
632                }
633                Some(cmd) = self.fs_cmd_rx.recv() => {
634                    self.handle_fs_command(cmd).await;
635                }
636                Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
637                    self.log_task_result(res);
638                }
639            }
640        }
641    }
642
643    async fn new(
644        db_path: PathBuf,
645        rt: RtWrapper,
646        cmd_rx: tokio::sync::mpsc::Receiver<Command>,
647        fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
648        fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
649        options: Arc<Options>,
650    ) -> anyhow::Result<Self> {
651        trace!(
652            "creating data directory: {}",
653            options.path.data_path.display()
654        );
655        fs::create_dir_all(&options.path.data_path)?;
656        trace!(
657            "creating temp directory: {}",
658            options.path.temp_path.display()
659        );
660        fs::create_dir_all(&options.path.temp_path)?;
661        trace!(
662            "creating parent directory for db file{}",
663            db_path.parent().unwrap().display()
664        );
665        fs::create_dir_all(db_path.parent().unwrap())?;
666        let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
667        let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
668        let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
669        let slot_context = Arc::new(TaskContext {
670            options: options.clone(),
671            db: meta::Db::new(db_send),
672            internal_cmd_tx: fs_commands_tx,
673            empty: BaoFileHandle::new_complete(
674                Hash::EMPTY,
675                MemOrFile::empty(),
676                MemOrFile::empty(),
677                options,
678            ),
679            protect,
680        });
681        rt.spawn(db_actor.run());
682        Ok(Self {
683            context: slot_context,
684            cmd_rx,
685            fs_cmd_rx: fs_commands_rx,
686            tasks: JoinSet::new(),
687            running: HashSet::new(),
688            handles: Default::default(),
689            temp_tags: Default::default(),
690            _rt: rt,
691        })
692    }
693}
694
695struct RtWrapper(Option<tokio::runtime::Runtime>);
696
697impl From<tokio::runtime::Runtime> for RtWrapper {
698    fn from(rt: tokio::runtime::Runtime) -> Self {
699        Self(Some(rt))
700    }
701}
702
703impl fmt::Debug for RtWrapper {
704    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705        ValueOrPoisioned(self.0.as_ref()).fmt(f)
706    }
707}
708
709impl Deref for RtWrapper {
710    type Target = tokio::runtime::Runtime;
711
712    fn deref(&self) -> &Self::Target {
713        self.0.as_ref().unwrap()
714    }
715}
716
717impl Drop for RtWrapper {
718    fn drop(&mut self) {
719        if let Some(rt) = self.0.take() {
720            trace!("dropping tokio runtime");
721            tokio::task::block_in_place(|| {
722                drop(rt);
723            });
724            trace!("dropped tokio runtime");
725        }
726    }
727}
728
729async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
730    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
731        error!("batch failed: {cause}");
732    }
733    ctx.clear_scope(id).await;
734}
735
736async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
737    let BatchMsg { tx, mut rx, .. } = cmd;
738    trace!("created scope {}", id);
739    tx.send(id).await.map_err(api::Error::other)?;
740    while let Some(msg) = rx.recv().await? {
741        match msg {
742            BatchResponse::Drop(msg) => scope.on_drop(&msg),
743            BatchResponse::Ping => {}
744        }
745    }
746    Ok(())
747}
748
749#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
750async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) {
751    let res = match finish_import_impl(cmd.inner, ctx).await {
752        Ok(()) => {
753            // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
754            // it will be cleaned up when either the process exits or scope ends
755            if cmd.tx.is_rpc() {
756                trace!("leaking temp tag {}", tt.hash_and_format());
757                tt.leak();
758            }
759            AddProgressItem::Done(tt)
760        }
761        Err(cause) => AddProgressItem::Error(cause),
762    };
763    cmd.tx.send(res).await.ok();
764}
765
766async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> {
767    let ImportEntry {
768        source,
769        hash,
770        outboard,
771        ..
772    } = import_data;
773    let options = ctx.options();
774    match &source {
775        ImportSource::Memory(data) => {
776            debug_assert!(options.is_inlined_data(data.len() as u64));
777        }
778        ImportSource::External(_, _, size) => {
779            debug_assert!(!options.is_inlined_data(*size));
780        }
781        ImportSource::TempFile(_, _, size) => {
782            debug_assert!(!options.is_inlined_data(*size));
783        }
784    }
785    let guard = ctx.lock().await;
786    let handle = guard.as_ref().and_then(|x| x.upgrade());
787    // if I do have an existing handle, I have to possibly deal with observers.
788    // if I don't have an existing handle, there are 2 cases:
789    //   the entry exists in the db, but we don't have a handle
790    //   the entry does not exist at all.
791    // convert the import source to a data location and drop the open files
792    ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]);
793    let data_location = match source {
794        ImportSource::Memory(data) => DataLocation::Inline(data),
795        ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
796        ImportSource::TempFile(path, _file, size) => {
797            // this will always work on any unix, but on windows there might be an issue if the target file is open!
798            // possibly open with FILE_SHARE_DELETE on windows?
799            let target = ctx.options().path.data_path(&hash);
800            trace!(
801                "moving temp file to owned data location: {} -> {}",
802                path.display(),
803                target.display()
804            );
805            if let Err(cause) = fs::rename(&path, &target) {
806                error!(
807                    "failed to move temp file {} to owned data location {}: {cause}",
808                    path.display(),
809                    target.display()
810                );
811            }
812            DataLocation::Owned(size)
813        }
814    };
815    let outboard_location = match outboard {
816        MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
817        MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
818        MemOrFile::File(path) => {
819            // the same caveat as above applies here
820            let target = ctx.options().path.outboard_path(&hash);
821            trace!(
822                "moving temp file to owned outboard location: {} -> {}",
823                path.display(),
824                target.display()
825            );
826            if let Err(cause) = fs::rename(&path, &target) {
827                error!(
828                    "failed to move temp file {} to owned outboard location {}: {cause}",
829                    path.display(),
830                    target.display()
831                );
832            }
833            OutboardLocation::Owned
834        }
835    };
836    if let Some(handle) = handle {
837        let data = match &data_location {
838            DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
839            DataLocation::Owned(size) => {
840                let path = ctx.options().path.data_path(&hash);
841                let file = fs::File::open(&path)?;
842                MemOrFile::File(FixedSize::new(file, *size))
843            }
844            DataLocation::External(paths, size) => {
845                let Some(path) = paths.iter().next() else {
846                    return Err(io::Error::other("no external data path"));
847                };
848                let file = fs::File::open(path)?;
849                MemOrFile::File(FixedSize::new(file, *size))
850            }
851        };
852        let outboard = match &outboard_location {
853            OutboardLocation::NotNeeded => MemOrFile::empty(),
854            OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
855            OutboardLocation::Owned => {
856                let path = ctx.options().path.outboard_path(&hash);
857                let file = fs::File::open(&path)?;
858                MemOrFile::File(file)
859            }
860        };
861        handle.complete(data, outboard);
862    }
863    let state = EntryState::Complete {
864        data_location,
865        outboard_location,
866    };
867    ctx.update(hash, state).await?;
868    Ok(())
869}
870
871#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
872async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) {
873    trace!("{cmd:?}");
874    let ImportBaoMsg {
875        inner: ImportBaoRequest { size, hash },
876        rx,
877        tx,
878        ..
879    } = cmd;
880    let res = match ctx.get_or_create(hash).await {
881        Ok(handle) => import_bao_impl(size, rx, handle, ctx).await,
882        Err(cause) => Err(cause),
883    };
884    trace!("{res:?}");
885    tx.send(res).await.ok();
886}
887
888fn chunk_range(leaf: &Leaf) -> ChunkRanges {
889    let start = ChunkNum::chunks(leaf.offset);
890    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
891    (start..end).into()
892}
893
894async fn import_bao_impl(
895    size: NonZeroU64,
896    mut rx: mpsc::Receiver<BaoContentItem>,
897    handle: BaoFileHandle,
898    ctx: HashContext,
899) -> api::Result<()> {
900    trace!(
901        "importing bao: {} {} bytes",
902        handle.hash().fmt_short(),
903        size
904    );
905    let mut batch = Vec::<BaoContentItem>::new();
906    let mut ranges = ChunkRanges::empty();
907    while let Some(item) = rx.recv().await? {
908        // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
909        if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
910            let bitfield = Bitfield::new_unchecked(ranges, size.into());
911            handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
912            batch.clear();
913            ranges = ChunkRanges::empty();
914        }
915        if let BaoContentItem::Leaf(leaf) = &item {
916            let leaf_range = chunk_range(leaf);
917            if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
918            {
919                return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
920            }
921            ranges |= leaf_range;
922        }
923        batch.push(item);
924    }
925    if !batch.is_empty() {
926        let bitfield = Bitfield::new_unchecked(ranges, size.into());
927        handle.write_batch(&batch, &bitfield, &ctx.ctx).await?;
928    }
929    Ok(())
930}
931
932#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
933async fn observe(cmd: ObserveMsg, ctx: HashContext) {
934    let Ok(handle) = ctx.get_or_create(cmd.hash).await else {
935        return;
936    };
937    handle.subscribe().forward(cmd.tx).await.ok();
938}
939
940#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
941async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
942    match ctx.get_or_create(cmd.hash).await {
943        Ok(handle) => {
944            if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
945                cmd.tx
946                    .send(ExportRangesItem::Error(cause.into()))
947                    .await
948                    .ok();
949            }
950        }
951        Err(cause) => {
952            cmd.tx.send(ExportRangesItem::Error(cause)).await.ok();
953        }
954    }
955}
956
957async fn export_ranges_impl(
958    cmd: ExportRangesRequest,
959    tx: &mut mpsc::Sender<ExportRangesItem>,
960    handle: BaoFileHandle,
961) -> io::Result<()> {
962    let ExportRangesRequest { ranges, hash } = cmd;
963    trace!(
964        "exporting ranges: {hash} {ranges:?} size={}",
965        handle.current_size()?
966    );
967    debug_assert!(handle.hash() == hash, "hash mismatch");
968    let bitfield = handle.bitfield()?;
969    let data = handle.data_reader();
970    let size = bitfield.size();
971    for range in ranges.iter() {
972        let range = match range {
973            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
974            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
975        };
976        let requested = ChunkRanges::bytes(range.start..range.end);
977        if !bitfield.ranges.is_superset(&requested) {
978            return Err(io::Error::other(format!(
979                "missing range: {requested:?}, present: {bitfield:?}",
980            )));
981        }
982        let bs = 1024;
983        let mut offset = range.start;
984        loop {
985            let end: u64 = (offset + bs).min(range.end);
986            let size = (end - offset) as usize;
987            tx.send(ExportRangesItem::Data(Leaf {
988                offset,
989                data: data.read_bytes_at(offset, size)?,
990            }))
991            .await?;
992            offset = end;
993            if offset >= range.end {
994                break;
995            }
996        }
997    }
998    Ok(())
999}
1000
1001#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1002async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1003    match ctx.get_or_create(cmd.hash).await {
1004        Ok(handle) => {
1005            if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
1006                cmd.tx
1007                    .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1008                    .await
1009                    .ok();
1010            }
1011        }
1012        Err(cause) => {
1013            let cause = anyhow::anyhow!("failed to open file: {cause}");
1014            cmd.tx
1015                .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1016                .await
1017                .ok();
1018        }
1019    }
1020}
1021
1022async fn export_bao_impl(
1023    cmd: ExportBaoRequest,
1024    tx: &mut mpsc::Sender<EncodedItem>,
1025    handle: BaoFileHandle,
1026) -> anyhow::Result<()> {
1027    let ExportBaoRequest { ranges, hash } = cmd;
1028    debug_assert!(handle.hash() == hash, "hash mismatch");
1029    let outboard = handle.outboard()?;
1030    let size = outboard.tree.size();
1031    if size == 0 && hash != Hash::EMPTY {
1032        // we have no data whatsoever, so we stop here
1033        return Ok(());
1034    }
1035    trace!("exporting bao: {hash} {ranges:?} size={size}",);
1036    let data = handle.data_reader();
1037    let tx = BaoTreeSender::new(tx);
1038    traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1039    Ok(())
1040}
1041
1042#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
1043async fn export_path(cmd: ExportPathMsg, ctx: HashContext) {
1044    let ExportPathMsg { inner, mut tx, .. } = cmd;
1045    if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await {
1046        tx.send(cause.into()).await.ok();
1047    }
1048}
1049
1050async fn export_path_impl(
1051    cmd: ExportPathRequest,
1052    tx: &mut mpsc::Sender<ExportProgressItem>,
1053    ctx: HashContext,
1054) -> api::Result<()> {
1055    let ExportPathRequest { mode, target, .. } = cmd;
1056    if !target.is_absolute() {
1057        return Err(api::Error::io(
1058            io::ErrorKind::InvalidInput,
1059            "path is not absolute",
1060        ));
1061    }
1062    if let Some(parent) = target.parent() {
1063        fs::create_dir_all(parent)?;
1064    }
1065    let _guard = ctx.lock().await;
1066    let state = ctx.get_entry_state(cmd.hash).await?;
1067    let (data_location, outboard_location) = match state {
1068        Some(EntryState::Complete {
1069            data_location,
1070            outboard_location,
1071        }) => (data_location, outboard_location),
1072        Some(EntryState::Partial { .. }) => {
1073            return Err(api::Error::io(
1074                io::ErrorKind::InvalidInput,
1075                "cannot export partial entry",
1076            ));
1077        }
1078        None => {
1079            return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1080        }
1081    };
1082    trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1083    let data = match data_location {
1084        DataLocation::Inline(data) => MemOrFile::Mem(data),
1085        DataLocation::Owned(size) => {
1086            MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size))
1087        }
1088        DataLocation::External(paths, size) => MemOrFile::File((
1089            paths
1090                .into_iter()
1091                .next()
1092                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no external data path"))?,
1093            size,
1094        )),
1095    };
1096    let size = match &data {
1097        MemOrFile::Mem(data) => data.len() as u64,
1098        MemOrFile::File((_, size)) => *size,
1099    };
1100    tx.send(ExportProgressItem::Size(size))
1101        .await
1102        .map_err(api::Error::other)?;
1103    match data {
1104        MemOrFile::Mem(data) => {
1105            let mut target = fs::File::create(&target)?;
1106            target.write_all(&data)?;
1107        }
1108        MemOrFile::File((source_path, size)) => match mode {
1109            ExportMode::Copy => {
1110                let source = fs::File::open(&source_path)?;
1111                let mut target = fs::File::create(&target)?;
1112                copy_with_progress(&source, size, &mut target, tx).await?
1113            }
1114            ExportMode::TryReference => {
1115                match std::fs::rename(&source_path, &target) {
1116                    Ok(()) => {}
1117                    Err(cause) => {
1118                        const ERR_CROSS: i32 = 18;
1119                        if cause.raw_os_error() == Some(ERR_CROSS) {
1120                            let source = fs::File::open(&source_path)?;
1121                            let mut target = fs::File::create(&target)?;
1122                            copy_with_progress(&source, size, &mut target, tx).await?;
1123                        } else {
1124                            return Err(cause.into());
1125                        }
1126                    }
1127                }
1128                ctx.set(
1129                    cmd.hash,
1130                    EntryState::Complete {
1131                        data_location: DataLocation::External(vec![target], size),
1132                        outboard_location,
1133                    },
1134                )
1135                .await?;
1136            }
1137        },
1138    }
1139    tx.send(ExportProgressItem::Done)
1140        .await
1141        .map_err(api::Error::other)?;
1142    Ok(())
1143}
1144
1145async fn copy_with_progress(
1146    file: impl ReadAt,
1147    size: u64,
1148    target: &mut impl Write,
1149    tx: &mut mpsc::Sender<ExportProgressItem>,
1150) -> io::Result<()> {
1151    let mut offset = 0;
1152    let mut buf = vec![0u8; 1024 * 1024];
1153    while offset < size {
1154        let remaining = buf.len().min((size - offset) as usize);
1155        let buf: &mut [u8] = &mut buf[..remaining];
1156        file.read_exact_at(offset, buf)?;
1157        target.write_all(buf)?;
1158        tx.try_send(ExportProgressItem::CopyProgress(offset))
1159            .await
1160            .map_err(|_e| io::Error::other(""))?;
1161        yield_now().await;
1162        offset += buf.len() as u64;
1163    }
1164    Ok(())
1165}
1166
1167impl FsStore {
1168    /// Load or create a new store.
1169    pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1170        let path = root.as_ref();
1171        let db_path = path.join("blobs.db");
1172        let options = Options::new(path);
1173        Self::load_with_opts(db_path, options).await
1174    }
1175
1176    /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
1177    pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1178        let rt = tokio::runtime::Builder::new_multi_thread()
1179            .thread_name("iroh-blob-store")
1180            .enable_time()
1181            .build()?;
1182        let handle = rt.handle().clone();
1183        let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1184        let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1185        let gc_config = options.gc.clone();
1186        let actor = handle
1187            .spawn(Actor::new(
1188                db_path,
1189                rt.into(),
1190                commands_rx,
1191                fs_commands_rx,
1192                fs_commands_tx.clone(),
1193                Arc::new(options),
1194            ))
1195            .await??;
1196        handle.spawn(actor.run());
1197        let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1198        if let Some(config) = gc_config {
1199            handle.spawn(run_gc(store.deref().clone(), config));
1200        }
1201        Ok(store)
1202    }
1203}
1204
1205/// A file based store.
1206///
1207/// A store can be created using [`load`](FsStore::load) or [`load_with_opts`](FsStore::load_with_opts).
1208/// Load will use the default options and create the required directories, while load_with_opts allows
1209/// you to customize the options and the location of the database. Both variants will create the database
1210/// if it does not exist, and load an existing database if one is found at the configured location.
1211///
1212/// In addition to implementing the [`Store`](`crate::api::Store`) API via [`Deref`](`std::ops::Deref`),
1213/// there are a few additional methods that are specific to file based stores, such as [`dump`](FsStore::dump).
1214#[derive(Debug, Clone)]
1215pub struct FsStore {
1216    sender: ApiClient,
1217    db: tokio::sync::mpsc::Sender<InternalCommand>,
1218}
1219
1220impl Deref for FsStore {
1221    type Target = Store;
1222
1223    fn deref(&self) -> &Self::Target {
1224        Store::ref_from_sender(&self.sender)
1225    }
1226}
1227
1228impl AsRef<Store> for FsStore {
1229    fn as_ref(&self) -> &Store {
1230        self.deref()
1231    }
1232}
1233
1234impl FsStore {
1235    fn new(
1236        sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1237        db: tokio::sync::mpsc::Sender<InternalCommand>,
1238    ) -> Self {
1239        Self {
1240            sender: sender.into(),
1241            db,
1242        }
1243    }
1244
1245    pub async fn dump(&self) -> anyhow::Result<()> {
1246        let (tx, rx) = oneshot::channel();
1247        self.db
1248            .send(
1249                meta::Dump {
1250                    tx,
1251                    span: tracing::Span::current(),
1252                }
1253                .into(),
1254            )
1255            .await?;
1256        rx.await??;
1257        Ok(())
1258    }
1259}
1260
1261#[cfg(test)]
1262pub mod tests {
1263    use core::panic;
1264    use std::collections::{HashMap, HashSet};
1265
1266    use bao_tree::{
1267        io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1268        ChunkRanges,
1269    };
1270    use n0_future::{stream, Stream, StreamExt};
1271    use testresult::TestResult;
1272    use walkdir::WalkDir;
1273
1274    use super::*;
1275    use crate::{
1276        api::blobs::Bitfield,
1277        store::{
1278            util::{read_checksummed, SliceInfoExt, Tag},
1279            HashAndFormat, IROH_BLOCK_SIZE,
1280        },
1281    };
1282
1283    /// Interesting sizes for testing.
1284    pub const INTERESTING_SIZES: [usize; 8] = [
1285        0,               // annoying corner case - always present, handled by the api
1286        1,               // less than 1 chunk, data inline, outboard not needed
1287        1024,            // exactly 1 chunk, data inline, outboard not needed
1288        1024 * 16 - 1,   // less than 1 chunk group, data inline, outboard not needed
1289        1024 * 16,       // exactly 1 chunk group, data inline, outboard not needed
1290        1024 * 16 + 1,   // data file, outboard inline (just 1 hash pair)
1291        1024 * 1024,     // data file, outboard inline (many hash pairs)
1292        1024 * 1024 * 8, // data file, outboard file
1293    ];
1294
1295    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
1296    /// which can not be exported via bao because we don't store hashes below the chunk group level.
1297    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1298        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1299        let mut encoded = Vec::new();
1300        let size = data.len() as u64;
1301        encoded.extend_from_slice(&size.to_le_bytes());
1302        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1303        Ok((outboard.root.into(), encoded))
1304    }
1305
1306    pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1307        let last_chunk = ChunkNum::chunks(size);
1308        let data_range = ChunkRanges::from(..last_chunk);
1309        let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1310            if last_chunk == 0 {
1311                ChunkRanges::all()
1312            } else {
1313                ChunkRanges::from(last_chunk - 1..)
1314            }
1315        } else {
1316            ranges.clone()
1317        };
1318        round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1319    }
1320
1321    fn create_n0_bao_full(
1322        data: &[u8],
1323        ranges: &ChunkRanges,
1324    ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1325        let ranges = round_up_request(data.len() as u64, ranges);
1326        let (hash, encoded) = create_n0_bao(data, &ranges)?;
1327        Ok((hash, ranges, encoded))
1328    }
1329
1330    #[tokio::test]
1331    // #[traced_test]
1332    async fn test_observe() -> TestResult<()> {
1333        tracing_subscriber::fmt::try_init().ok();
1334        let testdir = tempfile::tempdir()?;
1335        let db_dir = testdir.path().join("db");
1336        let options = Options::new(&db_dir);
1337        let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1338        let sizes = INTERESTING_SIZES;
1339        for size in sizes {
1340            let data = test_data(size);
1341            let ranges = ChunkRanges::all();
1342            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1343            let obs = store.observe(hash);
1344            let task = tokio::spawn(async move {
1345                obs.await_completion().await?;
1346                api::Result::Ok(())
1347            });
1348            store.import_bao_bytes(hash, ranges, bao).await?;
1349            task.await??;
1350        }
1351        Ok(())
1352    }
1353
1354    /// Generate test data for size n.
1355    ///
1356    /// We don't really care about the content, since we assume blake3 works.
1357    /// The only thing it should not be is all zeros, since that is what you
1358    /// will get for a gap.
1359    pub fn test_data(n: usize) -> Bytes {
1360        let mut res = Vec::with_capacity(n);
1361        // Using uppercase A-Z (65-90), 26 possible characters
1362        for i in 0..n {
1363            // Change character every 1024 bytes
1364            let block_num = i / 1024;
1365            // Map to uppercase A-Z range (65-90)
1366            let ascii_val = 65 + (block_num % 26) as u8;
1367            res.push(ascii_val);
1368        }
1369        Bytes::from(res)
1370    }
1371
1372    // import data via import_bytes, check that we can observe it and that it is complete
1373    #[tokio::test]
1374    async fn test_import_byte_stream() -> TestResult<()> {
1375        tracing_subscriber::fmt::try_init().ok();
1376        let testdir = tempfile::tempdir()?;
1377        let db_dir = testdir.path().join("db");
1378        let store = FsStore::load(db_dir).await?;
1379        for size in INTERESTING_SIZES {
1380            let expected = test_data(size);
1381            let expected_hash = Hash::new(&expected);
1382            let stream = bytes_to_stream(expected.clone(), 1023);
1383            let obs = store.observe(expected_hash);
1384            let tt = store.add_stream(stream).await.temp_tag().await?;
1385            assert_eq!(expected_hash, *tt.hash());
1386            // we must at some point see completion, otherwise the test will hang
1387            obs.await_completion().await?;
1388            let actual = store.get_bytes(expected_hash).await?;
1389            // check that the data is there
1390            assert_eq!(&expected, &actual);
1391        }
1392        Ok(())
1393    }
1394
1395    // import data via import_bytes, check that we can observe it and that it is complete
1396    #[tokio::test]
1397    async fn test_import_bytes() -> TestResult<()> {
1398        tracing_subscriber::fmt::try_init().ok();
1399        let testdir = tempfile::tempdir()?;
1400        let db_dir = testdir.path().join("db");
1401        let store = FsStore::load(&db_dir).await?;
1402        let sizes = INTERESTING_SIZES;
1403        trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1404        for size in sizes {
1405            let expected = test_data(size);
1406            let expected_hash = Hash::new(&expected);
1407            let obs = store.observe(expected_hash);
1408            let tt = store.add_bytes(expected.clone()).await?;
1409            assert_eq!(expected_hash, tt.hash);
1410            // we must at some point see completion, otherwise the test will hang
1411            obs.await_completion().await?;
1412            let actual = store.get_bytes(expected_hash).await?;
1413            // check that the data is there
1414            assert_eq!(&expected, &actual);
1415        }
1416        store.shutdown().await?;
1417        dump_dir_full(db_dir)?;
1418        Ok(())
1419    }
1420
1421    // import data via import_bytes, check that we can observe it and that it is complete
1422    #[tokio::test]
1423    #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1424    async fn test_roundtrip_bytes_small() -> TestResult<()> {
1425        tracing_subscriber::fmt::try_init().ok();
1426        let testdir = tempfile::tempdir()?;
1427        let db_dir = testdir.path().join("db");
1428        let store = FsStore::load(db_dir).await?;
1429        for size in INTERESTING_SIZES
1430            .into_iter()
1431            .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1432        {
1433            let expected = test_data(size);
1434            let expected_hash = Hash::new(&expected);
1435            let obs = store.observe(expected_hash);
1436            let tt = store.add_bytes(expected.clone()).await?;
1437            assert_eq!(expected_hash, tt.hash);
1438            let actual = store.get_bytes(expected_hash).await?;
1439            // check that the data is there
1440            assert_eq!(&expected, &actual);
1441            assert_eq!(
1442                &expected.addr(),
1443                &actual.addr(),
1444                "address mismatch for size {size}"
1445            );
1446            // we must at some point see completion, otherwise the test will hang
1447            // keep the handle alive by observing until the end, otherwise the handle
1448            // will change and the bytes won't be the same instance anymore
1449            obs.await_completion().await?;
1450        }
1451        store.shutdown().await?;
1452        Ok(())
1453    }
1454
1455    // import data via import_bytes, check that we can observe it and that it is complete
1456    #[tokio::test]
1457    async fn test_import_path() -> TestResult<()> {
1458        tracing_subscriber::fmt::try_init().ok();
1459        let testdir = tempfile::tempdir()?;
1460        let db_dir = testdir.path().join("db");
1461        let store = FsStore::load(db_dir).await?;
1462        for size in INTERESTING_SIZES {
1463            let expected = test_data(size);
1464            let expected_hash = Hash::new(&expected);
1465            let path = testdir.path().join(format!("in-{size}"));
1466            fs::write(&path, &expected)?;
1467            let obs = store.observe(expected_hash);
1468            let tt = store.add_path(&path).await?;
1469            assert_eq!(expected_hash, tt.hash);
1470            // we must at some point see completion, otherwise the test will hang
1471            obs.await_completion().await?;
1472            let actual = store.get_bytes(expected_hash).await?;
1473            // check that the data is there
1474            assert_eq!(&expected, &actual, "size={size}");
1475        }
1476        dump_dir_full(testdir.path())?;
1477        Ok(())
1478    }
1479
1480    // import data via import_bytes, check that we can observe it and that it is complete
1481    #[tokio::test]
1482    async fn test_export_path() -> TestResult<()> {
1483        tracing_subscriber::fmt::try_init().ok();
1484        let testdir = tempfile::tempdir()?;
1485        let db_dir = testdir.path().join("db");
1486        let store = FsStore::load(db_dir).await?;
1487        for size in INTERESTING_SIZES {
1488            let expected = test_data(size);
1489            let expected_hash = Hash::new(&expected);
1490            let tt = store.add_bytes(expected.clone()).await?;
1491            assert_eq!(expected_hash, tt.hash);
1492            let out_path = testdir.path().join(format!("out-{size}"));
1493            store.export(expected_hash, &out_path).await?;
1494            let actual = fs::read(&out_path)?;
1495            assert_eq!(expected, actual);
1496        }
1497        Ok(())
1498    }
1499
1500    #[tokio::test]
1501    async fn test_import_bao_ranges() -> TestResult<()> {
1502        tracing_subscriber::fmt::try_init().ok();
1503        let testdir = tempfile::tempdir()?;
1504        let db_dir = testdir.path().join("db");
1505        {
1506            let store = FsStore::load(&db_dir).await?;
1507            let data = test_data(100000);
1508            let ranges = ChunkRanges::chunks(16..32);
1509            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1510            store
1511                .import_bao_bytes(hash, ranges.clone(), bao.clone())
1512                .await?;
1513            let bitfield = store.observe(hash).await?;
1514            assert_eq!(bitfield.ranges, ranges);
1515            assert_eq!(bitfield.size(), data.len() as u64);
1516            let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1517            assert_eq!(export, bao);
1518        }
1519        Ok(())
1520    }
1521
1522    #[tokio::test]
1523    async fn test_import_bao_minimal() -> TestResult<()> {
1524        tracing_subscriber::fmt::try_init().ok();
1525        let testdir = tempfile::tempdir()?;
1526        let sizes = [1];
1527        let db_dir = testdir.path().join("db");
1528        {
1529            let store = FsStore::load(&db_dir).await?;
1530            for size in sizes {
1531                let data = vec![0u8; size];
1532                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1533                let data = Bytes::from(encoded);
1534                store
1535                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1536                    .await?;
1537            }
1538            store.shutdown().await?;
1539        }
1540        Ok(())
1541    }
1542
1543    #[tokio::test]
1544    async fn test_import_bao_simple() -> TestResult<()> {
1545        tracing_subscriber::fmt::try_init().ok();
1546        let testdir = tempfile::tempdir()?;
1547        let sizes = [1048576];
1548        let db_dir = testdir.path().join("db");
1549        {
1550            let store = FsStore::load(&db_dir).await?;
1551            for size in sizes {
1552                let data = vec![0u8; size];
1553                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1554                let data = Bytes::from(encoded);
1555                trace!("importing size={}", size);
1556                store
1557                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1558                    .await?;
1559            }
1560            store.shutdown().await?;
1561        }
1562        Ok(())
1563    }
1564
1565    #[tokio::test]
1566    async fn test_import_bao_persistence_full() -> TestResult<()> {
1567        tracing_subscriber::fmt::try_init().ok();
1568        let testdir = tempfile::tempdir()?;
1569        let sizes = INTERESTING_SIZES;
1570        let db_dir = testdir.path().join("db");
1571        {
1572            let store = FsStore::load(&db_dir).await?;
1573            for size in sizes {
1574                let data = vec![0u8; size];
1575                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1576                let data = Bytes::from(encoded);
1577                store
1578                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1579                    .await?;
1580            }
1581            store.shutdown().await?;
1582        }
1583        {
1584            let store = FsStore::load(&db_dir).await?;
1585            for size in sizes {
1586                let expected = vec![0u8; size];
1587                let hash = Hash::new(&expected);
1588                let actual = store
1589                    .export_bao(hash, ChunkRanges::all())
1590                    .data_to_vec()
1591                    .await?;
1592                assert_eq!(&expected, &actual);
1593            }
1594            store.shutdown().await?;
1595        }
1596        Ok(())
1597    }
1598
1599    #[tokio::test]
1600    async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1601        tracing_subscriber::fmt::try_init().ok();
1602        let testdir = tempfile::tempdir()?;
1603        let sizes = INTERESTING_SIZES;
1604        let db_dir = testdir.path().join("db");
1605        let just_size = ChunkRanges::last_chunk();
1606        {
1607            let store = FsStore::load(&db_dir).await?;
1608            for size in sizes {
1609                let data = test_data(size);
1610                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1611                let data = Bytes::from(encoded);
1612                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1613                    panic!("failed to import size={size}: {cause}");
1614                }
1615            }
1616            store.dump().await?;
1617            store.shutdown().await?;
1618        }
1619        {
1620            let store = FsStore::load(&db_dir).await?;
1621            store.dump().await?;
1622            for size in sizes {
1623                let data = test_data(size);
1624                let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1625                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1626                    Ok(actual) => actual,
1627                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1628                };
1629                assert_eq!(&expected, &actual);
1630            }
1631            store.shutdown().await?;
1632        }
1633        dump_dir_full(testdir.path())?;
1634        Ok(())
1635    }
1636
1637    #[tokio::test]
1638    async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1639        tracing_subscriber::fmt::try_init().ok();
1640        let testdir = tempfile::tempdir()?;
1641        let sizes = INTERESTING_SIZES;
1642        let db_dir = testdir.path().join("db");
1643        let just_size = ChunkRanges::last_chunk();
1644        // stage 1, import just the last full chunk group to get a validated size
1645        {
1646            let store = FsStore::load(&db_dir).await?;
1647            for size in sizes {
1648                let data = test_data(size);
1649                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1650                let data = Bytes::from(encoded);
1651                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1652                    panic!("failed to import size={size}: {cause}");
1653                }
1654            }
1655            store.dump().await?;
1656            store.shutdown().await?;
1657        }
1658        dump_dir_full(testdir.path())?;
1659        // stage 2, import the rest
1660        {
1661            let store = FsStore::load(&db_dir).await?;
1662            for size in sizes {
1663                let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1664                if remaining.is_empty() {
1665                    continue;
1666                }
1667                let data = test_data(size);
1668                let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1669                let data = Bytes::from(encoded);
1670                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1671                    panic!("failed to import size={size}: {cause}");
1672                }
1673            }
1674            store.dump().await?;
1675            store.shutdown().await?;
1676        }
1677        // check if the data is complete
1678        {
1679            let store = FsStore::load(&db_dir).await?;
1680            store.dump().await?;
1681            for size in sizes {
1682                let data = test_data(size);
1683                let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1684                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1685                    Ok(actual) => actual,
1686                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1687                };
1688                assert_eq!(&expected, &actual);
1689            }
1690            store.dump().await?;
1691            store.shutdown().await?;
1692        }
1693        dump_dir_full(testdir.path())?;
1694        Ok(())
1695    }
1696
1697    fn just_size() -> ChunkRanges {
1698        ChunkRanges::last_chunk()
1699    }
1700
1701    #[tokio::test]
1702    async fn test_import_bao_persistence_observe() -> TestResult<()> {
1703        tracing_subscriber::fmt::try_init().ok();
1704        let testdir = tempfile::tempdir()?;
1705        let sizes = INTERESTING_SIZES;
1706        let db_dir = testdir.path().join("db");
1707        let just_size = just_size();
1708        // stage 1, import just the last full chunk group to get a validated size
1709        {
1710            let store = FsStore::load(&db_dir).await?;
1711            for size in sizes {
1712                let data = test_data(size);
1713                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1714                let data = Bytes::from(encoded);
1715                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1716                    panic!("failed to import size={size}: {cause}");
1717                }
1718            }
1719            store.dump().await?;
1720            store.shutdown().await?;
1721        }
1722        dump_dir_full(testdir.path())?;
1723        // stage 2, import the rest
1724        {
1725            let store = FsStore::load(&db_dir).await?;
1726            for size in sizes {
1727                let expected_ranges = round_up_request(size as u64, &just_size);
1728                let data = test_data(size);
1729                let hash = Hash::new(&data);
1730                let bitfield = store.observe(hash).await?;
1731                assert_eq!(bitfield.ranges, expected_ranges);
1732            }
1733            store.dump().await?;
1734            store.shutdown().await?;
1735        }
1736        Ok(())
1737    }
1738
1739    #[tokio::test]
1740    async fn test_import_bao_persistence_recover() -> TestResult<()> {
1741        tracing_subscriber::fmt::try_init().ok();
1742        let testdir = tempfile::tempdir()?;
1743        let sizes = INTERESTING_SIZES;
1744        let db_dir = testdir.path().join("db");
1745        let options = Options::new(&db_dir);
1746        let just_size = just_size();
1747        // stage 1, import just the last full chunk group to get a validated size
1748        {
1749            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1750            for size in sizes {
1751                let data = test_data(size);
1752                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1753                let data = Bytes::from(encoded);
1754                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1755                    panic!("failed to import size={size}: {cause}");
1756                }
1757            }
1758            store.dump().await?;
1759            store.shutdown().await?;
1760        }
1761        delete_rec(testdir.path(), "bitfield")?;
1762        dump_dir_full(testdir.path())?;
1763        // stage 2, import the rest
1764        {
1765            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1766            for size in sizes {
1767                let expected_ranges = round_up_request(size as u64, &just_size);
1768                let data = test_data(size);
1769                let hash = Hash::new(&data);
1770                let bitfield = store.observe(hash).await?;
1771                assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1772            }
1773            store.dump().await?;
1774            store.shutdown().await?;
1775        }
1776        Ok(())
1777    }
1778
1779    #[tokio::test]
1780    async fn test_import_bytes_persistence_full() -> TestResult<()> {
1781        tracing_subscriber::fmt::try_init().ok();
1782        let testdir = tempfile::tempdir()?;
1783        let sizes = INTERESTING_SIZES;
1784        let db_dir = testdir.path().join("db");
1785        {
1786            let store = FsStore::load(&db_dir).await?;
1787            let mut tts = Vec::new();
1788            for size in sizes {
1789                let data = test_data(size);
1790                let data = data;
1791                tts.push(store.add_bytes(data.clone()).await?);
1792            }
1793            store.dump().await?;
1794            store.shutdown().await?;
1795        }
1796        {
1797            let store = FsStore::load(&db_dir).await?;
1798            store.dump().await?;
1799            for size in sizes {
1800                let expected = test_data(size);
1801                let hash = Hash::new(&expected);
1802                let Ok(actual) = store
1803                    .export_bao(hash, ChunkRanges::all())
1804                    .data_to_vec()
1805                    .await
1806                else {
1807                    panic!("failed to export size={size}");
1808                };
1809                assert_eq!(&expected, &actual, "size={size}");
1810            }
1811            store.shutdown().await?;
1812        }
1813        Ok(())
1814    }
1815
1816    async fn test_batch(store: &Store) -> TestResult<()> {
1817        let batch = store.blobs().batch().await?;
1818        let tt1 = batch.temp_tag(Hash::new("foo")).await?;
1819        let tt2 = batch.add_slice("boo").await?;
1820        let tts = store
1821            .tags()
1822            .list_temp_tags()
1823            .await?
1824            .collect::<HashSet<_>>()
1825            .await;
1826        assert!(tts.contains(tt1.hash_and_format()));
1827        assert!(tts.contains(tt2.hash_and_format()));
1828        drop(batch);
1829        store.sync_db().await?;
1830        let tts = store
1831            .tags()
1832            .list_temp_tags()
1833            .await?
1834            .collect::<HashSet<_>>()
1835            .await;
1836        // temp tag went out of scope, so it does not work anymore
1837        assert!(!tts.contains(tt1.hash_and_format()));
1838        assert!(!tts.contains(tt2.hash_and_format()));
1839        drop(tt1);
1840        drop(tt2);
1841        Ok(())
1842    }
1843
1844    #[tokio::test]
1845    async fn test_batch_fs() -> TestResult<()> {
1846        tracing_subscriber::fmt::try_init().ok();
1847        let testdir = tempfile::tempdir()?;
1848        let db_dir = testdir.path().join("db");
1849        let store = FsStore::load(db_dir).await?;
1850        test_batch(&store).await
1851    }
1852
1853    #[tokio::test]
1854    async fn smoke() -> TestResult<()> {
1855        tracing_subscriber::fmt::try_init().ok();
1856        let testdir = tempfile::tempdir()?;
1857        let db_dir = testdir.path().join("db");
1858        let store = FsStore::load(db_dir).await?;
1859        let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
1860        store.tags().set(Tag::from("test"), haf).await?;
1861        store.tags().set(Tag::from("boo"), haf).await?;
1862        store.tags().set(Tag::from("bar"), haf).await?;
1863        let sizes = INTERESTING_SIZES;
1864        let mut hashes = Vec::new();
1865        let mut data_by_hash = HashMap::new();
1866        let mut bao_by_hash = HashMap::new();
1867        for size in sizes {
1868            let data = vec![0u8; size];
1869            let data = Bytes::from(data);
1870            let tt = store.add_bytes(data.clone()).temp_tag().await?;
1871            data_by_hash.insert(*tt.hash(), data);
1872            hashes.push(tt);
1873        }
1874        store.sync_db().await?;
1875        for tt in &hashes {
1876            let hash = *tt.hash();
1877            let path = testdir.path().join(format!("{hash}.txt"));
1878            store.export(hash, path).await?;
1879        }
1880        for tt in &hashes {
1881            let hash = tt.hash();
1882            let data = store
1883                .export_bao(*hash, ChunkRanges::all())
1884                .data_to_vec()
1885                .await
1886                .unwrap();
1887            assert_eq!(data, data_by_hash[hash].to_vec());
1888            let bao = store
1889                .export_bao(*hash, ChunkRanges::all())
1890                .bao_to_vec()
1891                .await
1892                .unwrap();
1893            bao_by_hash.insert(*hash, bao);
1894        }
1895        store.dump().await?;
1896
1897        for size in sizes {
1898            let data = test_data(size);
1899            let ranges = ChunkRanges::all();
1900            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1901            store.import_bao_bytes(hash, ranges, bao).await?;
1902        }
1903
1904        for (_hash, _bao_tree) in bao_by_hash {
1905            // let mut reader = Cursor::new(bao_tree);
1906            // let size = reader.read_u64_le().await?;
1907            // let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
1908            // let ranges = ChunkRanges::all();
1909            // let mut decoder = DecodeResponseIter::new(hash, tree, reader, &ranges);
1910            // while let Some(item) = decoder.next() {
1911            //     let item = item?;
1912            // }
1913            // store.import_bao_bytes(hash, ChunkRanges::all(), bao_tree.into()).await?;
1914        }
1915        Ok(())
1916    }
1917
1918    pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
1919        // Remove leading dot if present, so we have just the extension
1920        let ext = extension.trim_start_matches('.').to_lowercase();
1921
1922        for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
1923            let path = entry.path();
1924
1925            if path.is_file() {
1926                if let Some(file_ext) = path.extension() {
1927                    if file_ext.to_string_lossy().to_lowercase() == ext {
1928                        println!("Deleting: {}", path.display());
1929                        fs::remove_file(path)?;
1930                    }
1931                }
1932            }
1933        }
1934
1935        Ok(())
1936    }
1937
1938    pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
1939        let mut entries: Vec<_> = WalkDir::new(&path)
1940            .into_iter()
1941            .filter_map(Result::ok) // Skip errors
1942            .collect();
1943
1944        // Sort by path (name at each depth)
1945        entries.sort_by(|a, b| a.path().cmp(b.path()));
1946
1947        for entry in entries {
1948            let depth = entry.depth();
1949            let indent = "  ".repeat(depth); // Two spaces per level
1950            let name = entry.file_name().to_string_lossy();
1951            let size = entry.metadata()?.len(); // Size in bytes
1952
1953            if entry.file_type().is_file() {
1954                println!("{indent}{name} ({size} bytes)");
1955            } else if entry.file_type().is_dir() {
1956                println!("{indent}{name}/");
1957            }
1958        }
1959        Ok(())
1960    }
1961
1962    pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
1963        let mut entries: Vec<_> = WalkDir::new(&path)
1964            .into_iter()
1965            .filter_map(Result::ok) // Skip errors
1966            .collect();
1967
1968        // Sort by path (name at each depth)
1969        entries.sort_by(|a, b| a.path().cmp(b.path()));
1970
1971        for entry in entries {
1972            let depth = entry.depth();
1973            let indent = "  ".repeat(depth);
1974            let name = entry.file_name().to_string_lossy();
1975
1976            if entry.file_type().is_dir() {
1977                println!("{indent}{name}/");
1978            } else if entry.file_type().is_file() {
1979                let size = entry.metadata()?.len();
1980                println!("{indent}{name} ({size} bytes)");
1981
1982                // Dump depending on file type
1983                let path = entry.path();
1984                if name.ends_with(".data") {
1985                    print!("{indent}  ");
1986                    dump_file(path, 1024 * 16)?;
1987                } else if name.ends_with(".obao4") {
1988                    print!("{indent}  ");
1989                    dump_file(path, 64)?;
1990                } else if name.ends_with(".sizes4") {
1991                    print!("{indent}  ");
1992                    dump_file(path, 8)?;
1993                } else if name.ends_with(".bitfield") {
1994                    match read_checksummed::<Bitfield>(path) {
1995                        Ok(bitfield) => {
1996                            println!("{indent}  bitfield: {bitfield:?}");
1997                        }
1998                        Err(cause) => {
1999                            println!("{indent}  bitfield: error: {cause}");
2000                        }
2001                    }
2002                } else {
2003                    continue; // Skip content dump for other files
2004                };
2005            }
2006        }
2007        Ok(())
2008    }
2009
2010    pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2011        let bits = file_bits(path, chunk_size)?;
2012        println!("{}", print_bitfield_ansi(bits));
2013        Ok(())
2014    }
2015
2016    pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2017        let file = fs::File::open(&path)?;
2018        let file_size = file.metadata()?.len();
2019        let mut buffer = vec![0u8; chunk_size as usize];
2020        let mut bits = Vec::new();
2021
2022        let mut offset = 0u64;
2023        while offset < file_size {
2024            let remaining = file_size - offset;
2025            let current_chunk_size = chunk_size.min(remaining);
2026
2027            let chunk = &mut buffer[..current_chunk_size as usize];
2028            file.read_exact_at(offset, chunk)?;
2029
2030            let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2031            bits.push(has_non_zero);
2032
2033            offset += current_chunk_size;
2034        }
2035
2036        Ok(bits)
2037    }
2038
2039    #[allow(dead_code)]
2040    fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2041        bits.into_iter()
2042            .map(|bit| if bit { '#' } else { '_' })
2043            .collect()
2044    }
2045
2046    fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2047        let mut result = String::new();
2048        let mut iter = bits.into_iter();
2049
2050        while let Some(b1) = iter.next() {
2051            let b2 = iter.next();
2052
2053            // ANSI color codes
2054            let white_fg = "\x1b[97m"; // bright white foreground
2055            let reset = "\x1b[0m"; // reset all attributes
2056            let gray_bg = "\x1b[100m"; // bright black (gray) background
2057            let black_bg = "\x1b[40m"; // black background
2058
2059            let colored_char = match (b1, b2) {
2060                (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), // 11 - solid white on default background
2061                (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), // 10 - left half white on gray background
2062                (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), // 01 - right half white on gray background
2063                (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), // 00 - space with gray background
2064                (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), // 1 (pad 0) - left half white on black background
2065                (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), // 0 (pad 0) - space with black background
2066            };
2067
2068            result.push_str(&colored_char);
2069        }
2070
2071        // Ensure we end with a reset code to prevent color bleeding
2072        result.push_str("\x1b[0m");
2073        result
2074    }
2075
2076    fn bytes_to_stream(
2077        bytes: Bytes,
2078        chunk_size: usize,
2079    ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2080        assert!(chunk_size > 0, "Chunk size must be greater than 0");
2081        stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2082            if offset >= bytes.len() {
2083                None
2084            } else {
2085                let chunk_len = chunk_size.min(bytes.len() - offset);
2086                let chunk = bytes.slice(offset..offset + chunk_len);
2087                Some((Ok(chunk), (bytes, offset + chunk_len)))
2088            }
2089        })
2090    }
2091}