iroh_blobs/store/
fs.rs

1//! # File based blob store.
2//!
3//! A file based blob store needs a writeable directory to work with.
4//!
5//! General design:
6//!
7//! The file store consists of two actors.
8//!
9//! # The main actor
10//!
11//! The purpose of the main actor is to handle user commands and own a map of
12//! handles for hashes that are currently being worked on.
13//!
14//! It also owns tasks for ongoing import and export operations, as well as the
15//! database actor.
16//!
17//! Handling a command almost always involves either forwarding it to the
18//! database actor or creating a hash context and spawning a task.
19//!
20//! # The database actor
21//!
22//! The database actor is responsible for storing metadata about each hash,
23//! as well as inlined data and outboard data for small files.
24//!
25//! In addition to the metadata, the database actor also stores tags.
26//!
27//! # Tasks
28//!
29//! Tasks do not return a result. They are responsible for sending an error
30//! to the requester if possible. Otherwise, just dropping the sender will
31//! also fail the receiver, but without a descriptive error message.
32//!
33//! Tasks are usually implemented as an impl fn that does return a result,
34//! and a wrapper (named `..._task`) that just forwards the error, if any.
35//!
36//! That way you can use `?` syntax in the task implementation. The impl fns
37//! are also easier to test.
38//!
39//! # Context
40//!
41//! The main actor holds a TaskContext that is needed for almost all tasks,
42//! such as the config and a way to interact with the database.
43//!
44//! For tasks that are specific to a hash, a HashContext combines the task
45//! context with a slot from the table of the main actor that can be used
46//! to obtain an unique handle for the hash.
47//!
48//! # Runtime
49//!
50//! The fs store owns and manages its own tokio runtime. Dropping the store
51//! will clean up the database and shut down the runtime. However, some parts
52//! of the persistent state won't make it to disk, so operations involving large
53//! partial blobs will have a large initial delay on the next startup.
54//!
55//! It is also not guaranteed that all write operations will make it to disk.
56//! The on-disk store will be in a consistent state, but might miss some writes
57//! in the last seconds before shutdown.
58//!
59//! To avoid this, you can use the [`crate::api::Store::shutdown`] method to
60//! cleanly shut down the store and save ephemeral state to disk.
61//!
62//! Note that if you use the store inside a [`iroh::protocol::Router`] and shut
63//! down the router using [`iroh::protocol::Router::shutdown`], the store will be
64//! safely shut down as well. Any store refs you are holding will be inoperable
65//! after this.
66use std::{
67    fmt::{self, Debug},
68    fs,
69    future::Future,
70    io::Write,
71    num::NonZeroU64,
72    ops::Deref,
73    path::{Path, PathBuf},
74    sync::{
75        atomic::{AtomicU64, Ordering},
76        Arc,
77    },
78};
79
80use bao_tree::{
81    blake3,
82    io::{
83        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
84        outboard::PreOrderOutboard,
85        sync::ReadAt,
86        BaoContentItem, Leaf,
87    },
88    BaoTree, ChunkNum, ChunkRanges,
89};
90use bytes::Bytes;
91use delete_set::{BaoFilePart, ProtectHandle};
92use entity_manager::{EntityManagerState, SpawnArg};
93use entry_state::{DataLocation, OutboardLocation};
94use import::{ImportEntry, ImportSource};
95use irpc::{channel::mpsc, RpcMessage};
96use meta::list_blobs;
97use n0_future::{future::yield_now, io};
98use nested_enum_utils::enum_conversions;
99use range_collections::range_set::RangeSetRange;
100use tokio::task::{JoinError, JoinSet};
101use tracing::{error, instrument, trace};
102
103use crate::{
104    api::{
105        proto::{
106            self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
107            CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
108            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
109            ImportBaoRequest, ObserveMsg, Scope,
110        },
111        ApiClient,
112    },
113    protocol::ChunkRangesExt,
114    store::{
115        fs::{
116            bao_file::{
117                BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
118                OutboardReader,
119            },
120            util::entity_manager::{self, ActiveEntityState},
121        },
122        gc::run_gc,
123        util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
124        IROH_BLOCK_SIZE,
125    },
126    util::{
127        channel::oneshot,
128        temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
129    },
130    Hash,
131};
132mod bao_file;
133use bao_file::BaoFileHandle;
134mod delete_set;
135mod entry_state;
136mod import;
137mod meta;
138pub mod options;
139pub(crate) mod util;
140use entry_state::EntryState;
141use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
142use options::Options;
143use tracing::Instrument;
144
145use crate::{
146    api::{
147        self,
148        blobs::{AddProgressItem, ExportMode, ExportProgressItem},
149        Store,
150    },
151    HashAndFormat,
152};
153
154/// Maximum number of external paths we track per blob.
155const MAX_EXTERNAL_PATHS: usize = 8;
156
157/// Create a 16 byte unique ID.
158fn new_uuid() -> [u8; 16] {
159    use rand::RngCore;
160    let mut rng = rand::rng();
161    let mut bytes = [0u8; 16];
162    rng.fill_bytes(&mut bytes);
163    bytes
164}
165
166/// Create temp file name based on a 16 byte UUID.
167fn temp_name() -> String {
168    format!("{}.temp", hex::encode(new_uuid()))
169}
170
171#[derive(Debug)]
172#[enum_conversions()]
173pub(crate) enum InternalCommand {
174    Dump(meta::Dump),
175    FinishImport(ImportEntryMsg),
176    ClearScope(ClearScope),
177}
178
179#[derive(Debug)]
180pub(crate) struct ClearScope {
181    pub scope: Scope,
182}
183
184impl InternalCommand {
185    pub fn parent_span(&self) -> tracing::Span {
186        match self {
187            Self::Dump(_) => tracing::Span::current(),
188            Self::ClearScope(_) => tracing::Span::current(),
189            Self::FinishImport(cmd) => cmd
190                .parent_span_opt()
191                .cloned()
192                .unwrap_or_else(tracing::Span::current),
193        }
194    }
195}
196
197/// Context needed by most tasks
198#[derive(Debug)]
199struct TaskContext {
200    // Store options such as paths and inline thresholds, in an Arc to cheaply share with tasks.
201    pub options: Arc<Options>,
202    // Metadata database, basically a mpsc sender with some extra functionality.
203    pub db: meta::Db,
204    // Handle to send internal commands
205    pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
206    /// Handle to protect files from deletion.
207    pub protect: ProtectHandle,
208}
209
210impl TaskContext {
211    pub async fn clear_scope(&self, scope: Scope) {
212        self.internal_cmd_tx
213            .send(ClearScope { scope }.into())
214            .await
215            .ok();
216    }
217}
218
219#[derive(Debug)]
220struct EmParams;
221
222impl entity_manager::Params for EmParams {
223    type EntityId = Hash;
224
225    type GlobalState = Arc<TaskContext>;
226
227    type EntityState = BaoFileHandle;
228
229    async fn on_shutdown(
230        state: entity_manager::ActiveEntityState<Self>,
231        cause: entity_manager::ShutdownCause,
232    ) {
233        trace!("persist {:?} due to {cause:?}", state.id);
234        state.persist().await;
235    }
236}
237
238#[derive(Debug)]
239struct Actor {
240    // Context that can be cheaply shared with tasks.
241    context: Arc<TaskContext>,
242    // Receiver for incoming user commands.
243    cmd_rx: tokio::sync::mpsc::Receiver<Command>,
244    // Receiver for incoming file store specific commands.
245    fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
246    // Tasks for import and export operations.
247    tasks: JoinSet<()>,
248    // Entity manager that handles concurrency for entities.
249    handles: EntityManagerState<EmParams>,
250    // temp tags
251    temp_tags: TempTags,
252    // waiters for idle state.
253    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
254    // our private tokio runtime. It has to live somewhere.
255    _rt: RtWrapper,
256}
257
258type HashContext = ActiveEntityState<EmParams>;
259
260impl SyncEntityApi for HashContext {
261    /// Load the state from the database.
262    ///
263    /// If the state is Initial, this will start the load.
264    /// If it is Loading, it will wait until loading is done.
265    /// If it is any other state, it will be a noop.
266    async fn load(&self) {
267        enum Action {
268            Load,
269            Wait,
270            None,
271        }
272        let mut action = Action::None;
273        self.state.send_if_modified(|guard| match guard.deref() {
274            BaoFileStorage::Initial => {
275                *guard = BaoFileStorage::Loading;
276                action = Action::Load;
277                true
278            }
279            BaoFileStorage::Loading => {
280                action = Action::Wait;
281                false
282            }
283            _ => false,
284        });
285        match action {
286            Action::Load => {
287                let state = if self.id == Hash::EMPTY {
288                    BaoFileStorage::Complete(CompleteStorage {
289                        data: MemOrFile::Mem(Bytes::new()),
290                        outboard: MemOrFile::empty(),
291                    })
292                } else {
293                    // we must assign a new state even in the error case, otherwise
294                    // tasks waiting for loading would stall!
295                    match self.global.db.get(self.id).await {
296                        Ok(state) => match BaoFileStorage::open(state, self).await {
297                            Ok(handle) => handle,
298                            Err(_) => BaoFileStorage::Poisoned,
299                        },
300                        Err(_) => BaoFileStorage::Poisoned,
301                    }
302                };
303                self.state.send_replace(state);
304            }
305            Action::Wait => {
306                // we are in state loading already, so we just need to wait for the
307                // other task to complete loading.
308                while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) {
309                    self.state.0.subscribe().changed().await.ok();
310                }
311            }
312            Action::None => {}
313        }
314    }
315
316    /// Write a batch and notify the db
317    async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> {
318        trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len());
319        let mut res = Ok(None);
320        self.state.send_if_modified(|state| {
321            let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else {
322                res = Err(io::Error::other("write batch failed"));
323                return false;
324            };
325            res = Ok(update);
326            *state = state1;
327            true
328        });
329        if let Some(update) = res? {
330            self.global.db.update(self.id, update).await?;
331        }
332        Ok(())
333    }
334
335    /// An AsyncSliceReader for the data file.
336    ///
337    /// Caution: this is a reader for the unvalidated data file. Reading this
338    /// can produce data that does not match the hash.
339    #[allow(refining_impl_trait_internal)]
340    fn data_reader(&self) -> DataReader {
341        DataReader(self.state.clone())
342    }
343
344    /// An AsyncSliceReader for the outboard file.
345    ///
346    /// The outboard file is used to validate the data file. It is not guaranteed
347    /// to be complete.
348    #[allow(refining_impl_trait_internal)]
349    fn outboard_reader(&self) -> OutboardReader {
350        OutboardReader(self.state.clone())
351    }
352
353    /// The most precise known total size of the data file.
354    fn current_size(&self) -> io::Result<u64> {
355        match self.state.borrow().deref() {
356            BaoFileStorage::Complete(mem) => Ok(mem.size()),
357            BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
358            BaoFileStorage::Partial(file) => file.current_size(),
359            BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
360            BaoFileStorage::Initial => Err(io::Error::other("initial")),
361            BaoFileStorage::Loading => Err(io::Error::other("loading")),
362            BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
363        }
364    }
365
366    /// The most precise known total size of the data file.
367    fn bitfield(&self) -> io::Result<Bitfield> {
368        match self.state.borrow().deref() {
369            BaoFileStorage::Complete(mem) => Ok(mem.bitfield()),
370            BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
371            BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
372            BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
373            BaoFileStorage::Initial => Err(io::Error::other("initial")),
374            BaoFileStorage::Loading => Err(io::Error::other("loading")),
375            BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
376        }
377    }
378}
379
380impl HashContext {
381    /// The outboard for the file.
382    pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
383        let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
384        let outboard = self.outboard_reader();
385        Ok(PreOrderOutboard {
386            root: blake3::Hash::from(self.id),
387            tree,
388            data: outboard,
389        })
390    }
391
392    fn db(&self) -> &meta::Db {
393        &self.global.db
394    }
395
396    pub fn options(&self) -> &Arc<Options> {
397        &self.global.options
398    }
399
400    pub fn protect(&self, parts: impl IntoIterator<Item = BaoFilePart>) {
401        self.global.protect.protect(self.id, parts);
402    }
403
404    /// Update the entry state in the database, and wait for completion.
405    pub async fn update_await(&self, state: EntryState<Bytes>) -> io::Result<()> {
406        self.db().update_await(self.id, state).await?;
407        Ok(())
408    }
409
410    pub async fn get_entry_state(&self) -> io::Result<Option<EntryState<Bytes>>> {
411        let hash = self.id;
412        if hash == Hash::EMPTY {
413            return Ok(Some(EntryState::Complete {
414                data_location: DataLocation::Inline(Bytes::new()),
415                outboard_location: OutboardLocation::NotNeeded,
416            }));
417        };
418        self.db().get(hash).await
419    }
420
421    /// Update the entry state in the database, and wait for completion.
422    pub async fn set(&self, state: EntryState<Bytes>) -> io::Result<()> {
423        self.db().set(self.id, state).await
424    }
425}
426
427impl Actor {
428    fn db(&self) -> &meta::Db {
429        &self.context.db
430    }
431
432    fn context(&self) -> Arc<TaskContext> {
433        self.context.clone()
434    }
435
436    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
437        let span = tracing::Span::current();
438        self.tasks.spawn(fut.instrument(span));
439    }
440
441    fn log_task_result(res: Result<(), JoinError>) {
442        match res {
443            Ok(_) => {}
444            Err(e) => {
445                error!("task failed: {e}");
446            }
447        }
448    }
449
450    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
451        let CreateTempTagMsg { tx, inner, .. } = cmd;
452        let mut tt = self.temp_tags.create(inner.scope, inner.value);
453        if tx.is_rpc() {
454            tt.leak();
455        }
456        tx.send(tt).await.ok();
457    }
458
459    async fn handle_command(&mut self, cmd: Command) {
460        let span = cmd.parent_span();
461        let _entered = span.enter();
462        match cmd {
463            Command::SyncDb(cmd) => {
464                trace!("{cmd:?}");
465                self.db().send(cmd.into()).await.ok();
466            }
467            Command::WaitIdle(cmd) => {
468                trace!("{cmd:?}");
469                if self.tasks.is_empty() {
470                    // we are currently idle
471                    cmd.tx.send(()).await.ok();
472                } else {
473                    // wait for idle state
474                    self.idle_waiters.push(cmd.tx);
475                }
476            }
477            Command::Shutdown(cmd) => {
478                trace!("{cmd:?}");
479                self.db().send(cmd.into()).await.ok();
480            }
481            Command::CreateTag(cmd) => {
482                trace!("{cmd:?}");
483                self.db().send(cmd.into()).await.ok();
484            }
485            Command::SetTag(cmd) => {
486                trace!("{cmd:?}");
487                self.db().send(cmd.into()).await.ok();
488            }
489            Command::ListTags(cmd) => {
490                trace!("{cmd:?}");
491                self.db().send(cmd.into()).await.ok();
492            }
493            Command::DeleteTags(cmd) => {
494                trace!("{cmd:?}");
495                self.db().send(cmd.into()).await.ok();
496            }
497            Command::RenameTag(cmd) => {
498                trace!("{cmd:?}");
499                self.db().send(cmd.into()).await.ok();
500            }
501            Command::ClearProtected(cmd) => {
502                trace!("{cmd:?}");
503                self.db().send(cmd.into()).await.ok();
504            }
505            Command::BlobStatus(cmd) => {
506                trace!("{cmd:?}");
507                self.db().send(cmd.into()).await.ok();
508            }
509            Command::DeleteBlobs(cmd) => {
510                trace!("{cmd:?}");
511                self.db().send(cmd.into()).await.ok();
512            }
513            Command::ListBlobs(cmd) => {
514                trace!("{cmd:?}");
515                if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
516                    self.spawn(list_blobs(snapshot, cmd));
517                }
518            }
519            Command::Batch(cmd) => {
520                trace!("{cmd:?}");
521                let (id, scope) = self.temp_tags.create_scope();
522                self.spawn(handle_batch(cmd, id, scope, self.context()));
523            }
524            Command::CreateTempTag(cmd) => {
525                trace!("{cmd:?}");
526                self.create_temp_tag(cmd).await;
527            }
528            Command::ListTempTags(cmd) => {
529                trace!("{cmd:?}");
530                let tts = self.temp_tags.list();
531                cmd.tx.send(tts).await.ok();
532            }
533            Command::ImportBytes(cmd) => {
534                trace!("{cmd:?}");
535                self.spawn(import_bytes(cmd, self.context()));
536            }
537            Command::ImportByteStream(cmd) => {
538                trace!("{cmd:?}");
539                self.spawn(import_byte_stream(cmd, self.context()));
540            }
541            Command::ImportPath(cmd) => {
542                trace!("{cmd:?}");
543                self.spawn(import_path(cmd, self.context()));
544            }
545            Command::ExportPath(cmd) => {
546                trace!("{cmd:?}");
547                cmd.spawn(&mut self.handles, &mut self.tasks).await;
548            }
549            Command::ExportBao(cmd) => {
550                trace!("{cmd:?}");
551                cmd.spawn(&mut self.handles, &mut self.tasks).await;
552            }
553            Command::ExportRanges(cmd) => {
554                trace!("{cmd:?}");
555                cmd.spawn(&mut self.handles, &mut self.tasks).await;
556            }
557            Command::ImportBao(cmd) => {
558                trace!("{cmd:?}");
559                cmd.spawn(&mut self.handles, &mut self.tasks).await;
560            }
561            Command::Observe(cmd) => {
562                trace!("{cmd:?}");
563                cmd.spawn(&mut self.handles, &mut self.tasks).await;
564            }
565        }
566    }
567
568    async fn handle_fs_command(&mut self, cmd: InternalCommand) {
569        let span = cmd.parent_span();
570        let _entered = span.enter();
571        match cmd {
572            InternalCommand::Dump(cmd) => {
573                trace!("{cmd:?}");
574                self.db().send(cmd.into()).await.ok();
575            }
576            InternalCommand::ClearScope(cmd) => {
577                trace!("{cmd:?}");
578                self.temp_tags.end_scope(cmd.scope);
579            }
580            InternalCommand::FinishImport(cmd) => {
581                trace!("{cmd:?}");
582                if cmd.hash == Hash::EMPTY {
583                    cmd.tx
584                        .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
585                        .await
586                        .ok();
587                } else {
588                    let tt = self.temp_tags.create(
589                        cmd.scope,
590                        HashAndFormat {
591                            hash: cmd.hash,
592                            format: cmd.format,
593                        },
594                    );
595                    (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await;
596                }
597            }
598        }
599    }
600
601    async fn run(mut self) {
602        loop {
603            tokio::select! {
604                task = self.handles.tick() => {
605                    if let Some(task) = task {
606                        self.spawn(task);
607                    }
608                }
609                cmd = self.cmd_rx.recv() => {
610                    let Some(cmd) = cmd else {
611                        break;
612                    };
613                    self.handle_command(cmd).await;
614                }
615                Some(cmd) = self.fs_cmd_rx.recv() => {
616                    self.handle_fs_command(cmd).await;
617                }
618                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
619                    Self::log_task_result(res);
620                    if self.tasks.is_empty() {
621                        for tx in self.idle_waiters.drain(..) {
622                            tx.send(()).await.ok();
623                        }
624                    }
625                }
626            }
627        }
628        self.handles.shutdown().await;
629        while let Some(res) = self.tasks.join_next().await {
630            Self::log_task_result(res);
631        }
632    }
633
634    async fn new(
635        db_path: PathBuf,
636        rt: RtWrapper,
637        cmd_rx: tokio::sync::mpsc::Receiver<Command>,
638        fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
639        fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
640        options: Arc<Options>,
641    ) -> anyhow::Result<Self> {
642        trace!(
643            "creating data directory: {}",
644            options.path.data_path.display()
645        );
646        fs::create_dir_all(&options.path.data_path)?;
647        trace!(
648            "creating temp directory: {}",
649            options.path.temp_path.display()
650        );
651        fs::create_dir_all(&options.path.temp_path)?;
652        trace!(
653            "creating parent directory for db file{}",
654            db_path.parent().unwrap().display()
655        );
656        fs::create_dir_all(db_path.parent().unwrap())?;
657        let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
658        let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
659        let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
660        let slot_context = Arc::new(TaskContext {
661            options: options.clone(),
662            db: meta::Db::new(db_send),
663            internal_cmd_tx: fs_commands_tx,
664            protect,
665        });
666        rt.spawn(db_actor.run());
667        Ok(Self {
668            context: slot_context.clone(),
669            cmd_rx,
670            fs_cmd_rx: fs_commands_rx,
671            tasks: JoinSet::new(),
672            handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
673            temp_tags: Default::default(),
674            idle_waiters: Vec::new(),
675            _rt: rt,
676        })
677    }
678}
679
680trait HashSpecificCommand: HashSpecific + Send + 'static {
681    /// Handle the command on success by spawning a task into the per-hash context.
682    fn handle(self, ctx: HashContext) -> impl Future<Output = ()> + Send + 'static;
683
684    /// Opportunity to send an error if spawning fails due to the task being busy (inbox full)
685    /// or dead (e.g. panic in one of the running tasks).
686    fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
687
688    async fn spawn(
689        self,
690        manager: &mut entity_manager::EntityManagerState<EmParams>,
691        tasks: &mut JoinSet<()>,
692    ) where
693        Self: Sized,
694    {
695        let span = tracing::Span::current();
696        let task = manager
697            .spawn(self.hash(), |arg| {
698                async move {
699                    match arg {
700                        SpawnArg::Active(state) => {
701                            self.handle(state).await;
702                        }
703                        SpawnArg::Busy => {
704                            self.on_error(arg).await;
705                        }
706                        SpawnArg::Dead => {
707                            self.on_error(arg).await;
708                        }
709                    }
710                }
711                .instrument(span)
712            })
713            .await;
714        if let Some(task) = task {
715            tasks.spawn(task);
716        }
717    }
718}
719
720impl HashSpecificCommand for ObserveMsg {
721    async fn handle(self, ctx: HashContext) {
722        ctx.observe(self).await
723    }
724    async fn on_error(self, _arg: SpawnArg<EmParams>) {}
725}
726impl HashSpecificCommand for ExportPathMsg {
727    async fn handle(self, ctx: HashContext) {
728        ctx.export_path(self).await
729    }
730    async fn on_error(self, arg: SpawnArg<EmParams>) {
731        let err = match arg {
732            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
733            SpawnArg::Dead => io::Error::other("entity is dead"),
734            _ => unreachable!(),
735        };
736        self.tx
737            .send(ExportProgressItem::Error(api::Error::Io(err)))
738            .await
739            .ok();
740    }
741}
742impl HashSpecificCommand for ExportBaoMsg {
743    async fn handle(self, ctx: HashContext) {
744        ctx.export_bao(self).await
745    }
746    async fn on_error(self, arg: SpawnArg<EmParams>) {
747        let err = match arg {
748            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
749            SpawnArg::Dead => io::Error::other("entity is dead"),
750            _ => unreachable!(),
751        };
752        self.tx
753            .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err)))
754            .await
755            .ok();
756    }
757}
758impl HashSpecificCommand for ExportRangesMsg {
759    async fn handle(self, ctx: HashContext) {
760        ctx.export_ranges(self).await
761    }
762    async fn on_error(self, arg: SpawnArg<EmParams>) {
763        let err = match arg {
764            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
765            SpawnArg::Dead => io::Error::other("entity is dead"),
766            _ => unreachable!(),
767        };
768        self.tx
769            .send(ExportRangesItem::Error(api::Error::Io(err)))
770            .await
771            .ok();
772    }
773}
774impl HashSpecificCommand for ImportBaoMsg {
775    async fn handle(self, ctx: HashContext) {
776        ctx.import_bao(self).await
777    }
778    async fn on_error(self, arg: SpawnArg<EmParams>) {
779        let err = match arg {
780            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
781            SpawnArg::Dead => io::Error::other("entity is dead"),
782            _ => unreachable!(),
783        };
784        self.tx.send(Err(api::Error::Io(err))).await.ok();
785    }
786}
787impl HashSpecific for (TempTag, ImportEntryMsg) {
788    fn hash(&self) -> Hash {
789        self.1.hash()
790    }
791}
792impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
793    async fn handle(self, ctx: HashContext) {
794        let (tt, cmd) = self;
795        ctx.finish_import(cmd, tt).await
796    }
797    async fn on_error(self, arg: SpawnArg<EmParams>) {
798        let err = match arg {
799            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
800            SpawnArg::Dead => io::Error::other("entity is dead"),
801            _ => unreachable!(),
802        };
803        self.1.tx.send(AddProgressItem::Error(err)).await.ok();
804    }
805}
806
807struct RtWrapper(Option<tokio::runtime::Runtime>);
808
809impl From<tokio::runtime::Runtime> for RtWrapper {
810    fn from(rt: tokio::runtime::Runtime) -> Self {
811        Self(Some(rt))
812    }
813}
814
815impl fmt::Debug for RtWrapper {
816    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
817        ValueOrPoisioned(self.0.as_ref()).fmt(f)
818    }
819}
820
821impl Deref for RtWrapper {
822    type Target = tokio::runtime::Runtime;
823
824    fn deref(&self) -> &Self::Target {
825        self.0.as_ref().unwrap()
826    }
827}
828
829impl Drop for RtWrapper {
830    fn drop(&mut self) {
831        if let Some(rt) = self.0.take() {
832            trace!("dropping tokio runtime");
833            tokio::task::block_in_place(|| {
834                drop(rt);
835            });
836            trace!("dropped tokio runtime");
837        }
838    }
839}
840
841async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
842    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
843        error!("batch failed: {cause}");
844    }
845    ctx.clear_scope(id).await;
846}
847
848async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
849    let BatchMsg { tx, mut rx, .. } = cmd;
850    trace!("created scope {}", id);
851    tx.send(id).await.map_err(api::Error::other)?;
852    while let Some(msg) = rx.recv().await? {
853        match msg {
854            BatchResponse::Drop(msg) => scope.on_drop(&msg),
855            BatchResponse::Ping => {}
856        }
857    }
858    Ok(())
859}
860
861/// The minimal API you need to implement for an entity for a store to work.
862trait EntityApi {
863    /// Import from a stream of n0 bao encoded data.
864    async fn import_bao(&self, cmd: ImportBaoMsg);
865    /// Finish an import from a local file or memory.
866    async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag);
867    /// Observe the bitfield of the entry.
868    async fn observe(&self, cmd: ObserveMsg);
869    /// Export byte ranges of the entry as data
870    async fn export_ranges(&self, cmd: ExportRangesMsg);
871    /// Export chunk ranges of the entry as a n0 bao encoded stream.
872    async fn export_bao(&self, cmd: ExportBaoMsg);
873    /// Export the entry to a local file.
874    async fn export_path(&self, cmd: ExportPathMsg);
875    /// Persist the entry at the end of its lifecycle.
876    async fn persist(&self);
877}
878
879/// A more opinionated API that can be used as a helper to save implementation
880/// effort when implementing the EntityApi trait.
881trait SyncEntityApi: EntityApi {
882    /// Load the entry state from the database. This must make sure that it is
883    /// not run concurrently, so if load is called multiple times, all but one
884    /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this.
885    async fn load(&self);
886
887    /// Get a synchronous reader for the data file.
888    fn data_reader(&self) -> impl ReadBytesAt;
889
890    /// Get a synchronous reader for the outboard file.
891    fn outboard_reader(&self) -> impl ReadAt;
892
893    /// Get the best known size of the data file.
894    fn current_size(&self) -> io::Result<u64>;
895
896    /// Get the bitfield of the entry.
897    fn bitfield(&self) -> io::Result<Bitfield>;
898
899    /// Write a batch of content items to the entry.
900    async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>;
901}
902
903/// The high level entry point per entry.
904impl EntityApi for HashContext {
905    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
906    async fn import_bao(&self, cmd: ImportBaoMsg) {
907        trace!("{cmd:?}");
908        self.load().await;
909        let ImportBaoMsg {
910            inner: ImportBaoRequest { size, .. },
911            rx,
912            tx,
913            ..
914        } = cmd;
915        let res = import_bao_impl(self, size, rx).await;
916        trace!("{res:?}");
917        tx.send(res).await.ok();
918    }
919
920    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
921    async fn observe(&self, cmd: ObserveMsg) {
922        trace!("{cmd:?}");
923        self.load().await;
924        BaoFileStorageSubscriber::new(self.state.subscribe())
925            .forward(cmd.tx)
926            .await
927            .ok();
928    }
929
930    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
931    async fn export_ranges(&self, mut cmd: ExportRangesMsg) {
932        trace!("{cmd:?}");
933        self.load().await;
934        if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await {
935            cmd.tx
936                .send(ExportRangesItem::Error(cause.into()))
937                .await
938                .ok();
939        }
940    }
941
942    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
943    async fn export_bao(&self, mut cmd: ExportBaoMsg) {
944        trace!("{cmd:?}");
945        self.load().await;
946        if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await {
947            // if the entry is in state NonExisting, this will be an io error with
948            // kind NotFound. So we must not wrap this somehow but pass it on directly.
949            cmd.tx
950                .send(bao_tree::io::EncodeError::Io(cause).into())
951                .await
952                .ok();
953        }
954    }
955
956    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
957    async fn export_path(&self, cmd: ExportPathMsg) {
958        trace!("{cmd:?}");
959        self.load().await;
960        let ExportPathMsg { inner, mut tx, .. } = cmd;
961        if let Err(cause) = export_path_impl(self, inner, &mut tx).await {
962            tx.send(cause.into()).await.ok();
963        }
964    }
965
966    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
967    async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) {
968        trace!("{cmd:?}");
969        self.load().await;
970        let res = match finish_import_impl(self, cmd.inner).await {
971            Ok(()) => {
972                // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
973                // it will be cleaned up when either the process exits or scope ends
974                if cmd.tx.is_rpc() {
975                    trace!("leaking temp tag {}", tt.hash_and_format());
976                    tt.leak();
977                }
978                AddProgressItem::Done(tt)
979            }
980            Err(cause) => AddProgressItem::Error(cause),
981        };
982        cmd.tx.send(res).await.ok();
983    }
984
985    #[instrument(skip_all, fields(hash = %self.id.fmt_short()))]
986    async fn persist(&self) {
987        self.state.send_if_modified(|guard| {
988            let hash = &self.id;
989            let BaoFileStorage::Partial(fs) = guard.take() else {
990                return false;
991            };
992            let path = self.global.options.path.bitfield_path(hash);
993            trace!("writing bitfield for hash {} to {}", hash, path.display());
994            if let Err(cause) = fs.sync_all(&path) {
995                error!(
996                    "failed to write bitfield for {} at {}: {:?}",
997                    hash,
998                    path.display(),
999                    cause
1000                );
1001            }
1002            false
1003        });
1004    }
1005}
1006
1007async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {
1008    if ctx.id == Hash::EMPTY {
1009        return Ok(()); // nothing to do for the empty hash
1010    }
1011    let ImportEntry {
1012        source,
1013        hash,
1014        outboard,
1015        ..
1016    } = import_data;
1017    let options = ctx.options();
1018    match &source {
1019        ImportSource::Memory(data) => {
1020            debug_assert!(options.is_inlined_data(data.len() as u64));
1021        }
1022        ImportSource::External(_, _, size) => {
1023            debug_assert!(!options.is_inlined_data(*size));
1024        }
1025        ImportSource::TempFile(_, _, size) => {
1026            debug_assert!(!options.is_inlined_data(*size));
1027        }
1028    }
1029    ctx.load().await;
1030    let handle = &ctx.state;
1031    // if I do have an existing handle, I have to possibly deal with observers.
1032    // if I don't have an existing handle, there are 2 cases:
1033    //   the entry exists in the db, but we don't have a handle
1034    //   the entry does not exist at all.
1035    // convert the import source to a data location and drop the open files
1036    ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]);
1037    let data_location = match source {
1038        ImportSource::Memory(data) => DataLocation::Inline(data),
1039        ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
1040        ImportSource::TempFile(path, _file, size) => {
1041            // this will always work on any unix, but on windows there might be an issue if the target file is open!
1042            // possibly open with FILE_SHARE_DELETE on windows?
1043            let target = ctx.options().path.data_path(&hash);
1044            trace!(
1045                "moving temp file to owned data location: {} -> {}",
1046                path.display(),
1047                target.display()
1048            );
1049            if let Err(cause) = fs::rename(&path, &target) {
1050                error!(
1051                    "failed to move temp file {} to owned data location {}: {cause}",
1052                    path.display(),
1053                    target.display()
1054                );
1055            }
1056            DataLocation::Owned(size)
1057        }
1058    };
1059    let outboard_location = match outboard {
1060        MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
1061        MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
1062        MemOrFile::File(path) => {
1063            // the same caveat as above applies here
1064            let target = ctx.options().path.outboard_path(&hash);
1065            trace!(
1066                "moving temp file to owned outboard location: {} -> {}",
1067                path.display(),
1068                target.display()
1069            );
1070            if let Err(cause) = fs::rename(&path, &target) {
1071                error!(
1072                    "failed to move temp file {} to owned outboard location {}: {cause}",
1073                    path.display(),
1074                    target.display()
1075                );
1076            }
1077            OutboardLocation::Owned
1078        }
1079    };
1080    let data = match &data_location {
1081        DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1082        DataLocation::Owned(size) => {
1083            let path = ctx.options().path.data_path(&hash);
1084            let file = fs::File::open(&path)?;
1085            MemOrFile::File(FixedSize::new(file, *size))
1086        }
1087        DataLocation::External(paths, size) => {
1088            let Some(path) = paths.iter().next() else {
1089                return Err(io::Error::other("no external data path"));
1090            };
1091            let file = fs::File::open(path)?;
1092            MemOrFile::File(FixedSize::new(file, *size))
1093        }
1094    };
1095    let outboard = match &outboard_location {
1096        OutboardLocation::NotNeeded => MemOrFile::empty(),
1097        OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1098        OutboardLocation::Owned => {
1099            let path = ctx.options().path.outboard_path(&hash);
1100            let file = fs::File::open(&path)?;
1101            MemOrFile::File(file)
1102        }
1103    };
1104    handle.complete(data, outboard);
1105    let state = EntryState::Complete {
1106        data_location,
1107        outboard_location,
1108    };
1109    ctx.update_await(state).await?;
1110    Ok(())
1111}
1112
1113fn chunk_range(leaf: &Leaf) -> ChunkRanges {
1114    let start = ChunkNum::chunks(leaf.offset);
1115    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
1116    (start..end).into()
1117}
1118
1119async fn import_bao_impl(
1120    ctx: &HashContext,
1121    size: NonZeroU64,
1122    mut rx: mpsc::Receiver<BaoContentItem>,
1123) -> api::Result<()> {
1124    trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
1125    let mut batch = Vec::<BaoContentItem>::new();
1126    let mut ranges = ChunkRanges::empty();
1127    while let Some(item) = rx.recv().await? {
1128        // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
1129        if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
1130            let bitfield = Bitfield::new_unchecked(ranges, size.into());
1131            ctx.write_batch(&batch, &bitfield).await?;
1132            batch.clear();
1133            ranges = ChunkRanges::empty();
1134        }
1135        if let BaoContentItem::Leaf(leaf) = &item {
1136            let leaf_range = chunk_range(leaf);
1137            if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
1138            {
1139                return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
1140            }
1141            ranges |= leaf_range;
1142        }
1143        batch.push(item);
1144    }
1145    if !batch.is_empty() {
1146        let bitfield = Bitfield::new_unchecked(ranges, size.into());
1147        ctx.write_batch(&batch, &bitfield).await?;
1148    }
1149    Ok(())
1150}
1151
1152async fn export_ranges_impl(
1153    ctx: &HashContext,
1154    cmd: ExportRangesRequest,
1155    tx: &mut mpsc::Sender<ExportRangesItem>,
1156) -> io::Result<()> {
1157    let ExportRangesRequest { ranges, hash } = cmd;
1158    trace!(
1159        "exporting ranges: {hash} {ranges:?} size={}",
1160        ctx.current_size()?
1161    );
1162    let bitfield = ctx.bitfield()?;
1163    let data = ctx.data_reader();
1164    let size = bitfield.size();
1165    for range in ranges.iter() {
1166        let range = match range {
1167            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1168            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1169        };
1170        let requested = ChunkRanges::bytes(range.start..range.end);
1171        if !bitfield.ranges.is_superset(&requested) {
1172            return Err(io::Error::other(format!(
1173                "missing range: {requested:?}, present: {bitfield:?}",
1174            )));
1175        }
1176        let bs = 1024;
1177        let mut offset = range.start;
1178        loop {
1179            let end: u64 = (offset + bs).min(range.end);
1180            let size = (end - offset) as usize;
1181            let res = data.read_bytes_at(offset, size);
1182            tx.send(ExportRangesItem::Data(Leaf { offset, data: res? }))
1183                .await?;
1184            offset = end;
1185            if offset >= range.end {
1186                break;
1187            }
1188        }
1189    }
1190    Ok(())
1191}
1192
1193async fn export_bao_impl(
1194    ctx: &HashContext,
1195    cmd: ExportBaoRequest,
1196    tx: &mut mpsc::Sender<EncodedItem>,
1197) -> io::Result<()> {
1198    let ExportBaoRequest { ranges, hash, .. } = cmd;
1199    let outboard = ctx.outboard()?;
1200    let size = outboard.tree.size();
1201    if size == 0 && cmd.hash != Hash::EMPTY {
1202        // we have no data whatsoever, so we stop here
1203        return Ok(());
1204    }
1205    trace!("exporting bao: {hash} {ranges:?} size={size}",);
1206    let data = ctx.data_reader();
1207    let tx = BaoTreeSender::new(tx);
1208    traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1209    Ok(())
1210}
1211
1212async fn export_path_impl(
1213    ctx: &HashContext,
1214    cmd: ExportPathRequest,
1215    tx: &mut mpsc::Sender<ExportProgressItem>,
1216) -> api::Result<()> {
1217    let ExportPathRequest { mode, target, .. } = cmd;
1218    if !target.is_absolute() {
1219        return Err(api::Error::io(
1220            io::ErrorKind::InvalidInput,
1221            "path is not absolute",
1222        ));
1223    }
1224    if let Some(parent) = target.parent() {
1225        fs::create_dir_all(parent)?;
1226    }
1227    let state = ctx.get_entry_state().await?;
1228    let (data_location, outboard_location) = match state {
1229        Some(EntryState::Complete {
1230            data_location,
1231            outboard_location,
1232        }) => (data_location, outboard_location),
1233        Some(EntryState::Partial { .. }) => {
1234            return Err(api::Error::io(
1235                io::ErrorKind::InvalidInput,
1236                "cannot export partial entry",
1237            ));
1238        }
1239        None => {
1240            return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1241        }
1242    };
1243    trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1244    let (data, mut external) = match data_location {
1245        DataLocation::Inline(data) => (MemOrFile::Mem(data), vec![]),
1246        DataLocation::Owned(size) => (
1247            MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size)),
1248            vec![],
1249        ),
1250        DataLocation::External(paths, size) => (
1251            MemOrFile::File((
1252                paths.first().cloned().ok_or_else(|| {
1253                    io::Error::new(io::ErrorKind::NotFound, "no external data path")
1254                })?,
1255                size,
1256            )),
1257            paths,
1258        ),
1259    };
1260    let size = match &data {
1261        MemOrFile::Mem(data) => data.len() as u64,
1262        MemOrFile::File((_, size)) => *size,
1263    };
1264    tx.send(ExportProgressItem::Size(size))
1265        .await
1266        .map_err(api::Error::other)?;
1267    match data {
1268        MemOrFile::Mem(data) => {
1269            let mut target = fs::File::create(&target)?;
1270            target.write_all(&data)?;
1271        }
1272        MemOrFile::File((source_path, size)) => match mode {
1273            ExportMode::Copy => {
1274                let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1275                trace!(
1276                    "exported {} to {}, {res:?}",
1277                    source_path.display(),
1278                    target.display()
1279                );
1280            }
1281            ExportMode::TryReference => {
1282                if !external.is_empty() {
1283                    // the file already exists externally, so we need to copy it.
1284                    // if the OS supports reflink, we might as well use that.
1285                    let res =
1286                        reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1287                    trace!(
1288                        "exported {} also to {}, {res:?}",
1289                        source_path.display(),
1290                        target.display()
1291                    );
1292                    external.push(target);
1293                    external.sort();
1294                    external.dedup();
1295                    external.truncate(MAX_EXTERNAL_PATHS);
1296                } else {
1297                    // the file was previously owned, so we can just move it.
1298                    // if that fails with ERR_CROSS, we fall back to copy.
1299                    match std::fs::rename(&source_path, &target) {
1300                        Ok(()) => {}
1301                        Err(cause) => {
1302                            const ERR_CROSS: i32 = 18;
1303                            if cause.raw_os_error() == Some(ERR_CROSS) {
1304                                reflink_or_copy_with_progress(&source_path, &target, size, tx)
1305                                    .await?;
1306                            } else {
1307                                return Err(cause.into());
1308                            }
1309                        }
1310                    }
1311                    external.push(target);
1312                };
1313                // setting the new entry state will also take care of deleting the owned data file!
1314                ctx.set(EntryState::Complete {
1315                    data_location: DataLocation::External(external, size),
1316                    outboard_location,
1317                })
1318                .await?;
1319            }
1320        },
1321    }
1322    tx.send(ExportProgressItem::Done)
1323        .await
1324        .map_err(api::Error::other)?;
1325    Ok(())
1326}
1327
1328trait CopyProgress: RpcMessage {
1329    fn from_offset(offset: u64) -> Self;
1330}
1331
1332impl CopyProgress for ExportProgressItem {
1333    fn from_offset(offset: u64) -> Self {
1334        ExportProgressItem::CopyProgress(offset)
1335    }
1336}
1337
1338impl CopyProgress for AddProgressItem {
1339    fn from_offset(offset: u64) -> Self {
1340        AddProgressItem::CopyProgress(offset)
1341    }
1342}
1343
1344#[derive(Debug)]
1345enum CopyResult {
1346    Reflinked,
1347    Copied,
1348}
1349
1350async fn reflink_or_copy_with_progress(
1351    from: impl AsRef<Path>,
1352    to: impl AsRef<Path>,
1353    size: u64,
1354    tx: &mut mpsc::Sender<impl CopyProgress>,
1355) -> io::Result<CopyResult> {
1356    let from = from.as_ref();
1357    let to = to.as_ref();
1358    if reflink_copy::reflink(from, to).is_ok() {
1359        return Ok(CopyResult::Reflinked);
1360    }
1361    let source = fs::File::open(from)?;
1362    let mut target = fs::File::create(to)?;
1363    copy_with_progress(source, size, &mut target, tx).await?;
1364    Ok(CopyResult::Copied)
1365}
1366
1367async fn copy_with_progress<T: CopyProgress>(
1368    file: impl ReadAt,
1369    size: u64,
1370    target: &mut impl Write,
1371    tx: &mut mpsc::Sender<T>,
1372) -> io::Result<()> {
1373    let mut offset = 0;
1374    let mut buf = vec![0u8; 1024 * 1024];
1375    while offset < size {
1376        let remaining = buf.len().min((size - offset) as usize);
1377        let buf: &mut [u8] = &mut buf[..remaining];
1378        file.read_exact_at(offset, buf)?;
1379        target.write_all(buf)?;
1380        tx.try_send(T::from_offset(offset))
1381            .await
1382            .map_err(|_e| io::Error::other(""))?;
1383        yield_now().await;
1384        offset += buf.len() as u64;
1385    }
1386    Ok(())
1387}
1388
1389impl FsStore {
1390    /// Load or create a new store.
1391    pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1392        let path = root.as_ref();
1393        let db_path = path.join("blobs.db");
1394        let options = Options::new(path);
1395        Self::load_with_opts(db_path, options).await
1396    }
1397
1398    /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
1399    pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1400        static THREAD_NR: AtomicU64 = AtomicU64::new(0);
1401        let rt = tokio::runtime::Builder::new_multi_thread()
1402            .thread_name_fn(|| {
1403                format!(
1404                    "iroh-blob-store-{}",
1405                    THREAD_NR.fetch_add(1, Ordering::Relaxed)
1406                )
1407            })
1408            .enable_time()
1409            .build()?;
1410        let handle = rt.handle().clone();
1411        let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1412        let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1413        let gc_config = options.gc.clone();
1414        let actor = handle
1415            .spawn(Actor::new(
1416                db_path,
1417                rt.into(),
1418                commands_rx,
1419                fs_commands_rx,
1420                fs_commands_tx.clone(),
1421                Arc::new(options),
1422            ))
1423            .await??;
1424        handle.spawn(actor.run());
1425        let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1426        if let Some(config) = gc_config {
1427            handle.spawn(run_gc(store.deref().clone(), config));
1428        }
1429        Ok(store)
1430    }
1431}
1432
1433/// A file based store.
1434///
1435/// A store can be created using [`load`](FsStore::load) or [`load_with_opts`](FsStore::load_with_opts).
1436/// Load will use the default options and create the required directories, while load_with_opts allows
1437/// you to customize the options and the location of the database. Both variants will create the database
1438/// if it does not exist, and load an existing database if one is found at the configured location.
1439///
1440/// In addition to implementing the [`Store`](`crate::api::Store`) API via [`Deref`](`std::ops::Deref`),
1441/// there are a few additional methods that are specific to file based stores, such as [`dump`](FsStore::dump).
1442#[derive(Debug, Clone)]
1443pub struct FsStore {
1444    sender: ApiClient,
1445    db: tokio::sync::mpsc::Sender<InternalCommand>,
1446}
1447
1448impl From<FsStore> for Store {
1449    fn from(value: FsStore) -> Self {
1450        Store::from_sender(value.sender)
1451    }
1452}
1453
1454impl Deref for FsStore {
1455    type Target = Store;
1456
1457    fn deref(&self) -> &Self::Target {
1458        Store::ref_from_sender(&self.sender)
1459    }
1460}
1461
1462impl AsRef<Store> for FsStore {
1463    fn as_ref(&self) -> &Store {
1464        self.deref()
1465    }
1466}
1467
1468impl FsStore {
1469    fn new(
1470        sender: irpc::LocalSender<proto::Request>,
1471        db: tokio::sync::mpsc::Sender<InternalCommand>,
1472    ) -> Self {
1473        Self {
1474            sender: sender.into(),
1475            db,
1476        }
1477    }
1478
1479    pub async fn dump(&self) -> anyhow::Result<()> {
1480        let (tx, rx) = oneshot::channel();
1481        self.db
1482            .send(
1483                meta::Dump {
1484                    tx,
1485                    span: tracing::Span::current(),
1486                }
1487                .into(),
1488            )
1489            .await?;
1490        rx.await??;
1491        Ok(())
1492    }
1493}
1494
1495#[cfg(test)]
1496pub mod tests {
1497    use core::panic;
1498    use std::collections::{HashMap, HashSet};
1499
1500    use bao_tree::{io::round_up_to_chunks_groups, ChunkRanges};
1501    use n0_future::{stream, Stream, StreamExt};
1502    use testresult::TestResult;
1503    use walkdir::WalkDir;
1504
1505    use super::*;
1506    use crate::{
1507        api::blobs::Bitfield,
1508        store::{
1509            util::{read_checksummed, tests::create_n0_bao, SliceInfoExt, Tag},
1510            IROH_BLOCK_SIZE,
1511        },
1512    };
1513
1514    /// Interesting sizes for testing.
1515    pub const INTERESTING_SIZES: [usize; 8] = [
1516        0,               // annoying corner case - always present, handled by the api
1517        1,               // less than 1 chunk, data inline, outboard not needed
1518        1024,            // exactly 1 chunk, data inline, outboard not needed
1519        1024 * 16 - 1,   // less than 1 chunk group, data inline, outboard not needed
1520        1024 * 16,       // exactly 1 chunk group, data inline, outboard not needed
1521        1024 * 16 + 1,   // data file, outboard inline (just 1 hash pair)
1522        1024 * 1024,     // data file, outboard inline (many hash pairs)
1523        1024 * 1024 * 8, // data file, outboard file
1524    ];
1525
1526    pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1527        let last_chunk = ChunkNum::chunks(size);
1528        let data_range = ChunkRanges::from(..last_chunk);
1529        let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1530            if last_chunk == 0 {
1531                ChunkRanges::all()
1532            } else {
1533                ChunkRanges::from(last_chunk - 1..)
1534            }
1535        } else {
1536            ranges.clone()
1537        };
1538        round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1539    }
1540
1541    fn create_n0_bao_full(
1542        data: &[u8],
1543        ranges: &ChunkRanges,
1544    ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1545        let ranges = round_up_request(data.len() as u64, ranges);
1546        let (hash, encoded) = create_n0_bao(data, &ranges)?;
1547        Ok((hash, ranges, encoded))
1548    }
1549
1550    #[tokio::test]
1551    // #[traced_test]
1552    async fn test_observe() -> TestResult<()> {
1553        tracing_subscriber::fmt::try_init().ok();
1554        let testdir = tempfile::tempdir()?;
1555        let db_dir = testdir.path().join("db");
1556        let options = Options::new(&db_dir);
1557        let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1558        let sizes = INTERESTING_SIZES;
1559        for size in sizes {
1560            let data = test_data(size);
1561            let ranges = ChunkRanges::all();
1562            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1563            let obs = store.observe(hash);
1564            let task = tokio::spawn(async move {
1565                obs.await_completion().await?;
1566                api::Result::Ok(())
1567            });
1568            store.import_bao_bytes(hash, ranges, bao).await?;
1569            task.await??;
1570        }
1571        Ok(())
1572    }
1573
1574    /// Generate test data for size n.
1575    ///
1576    /// We don't really care about the content, since we assume blake3 works.
1577    /// The only thing it should not be is all zeros, since that is what you
1578    /// will get for a gap.
1579    pub fn test_data(n: usize) -> Bytes {
1580        let mut res = Vec::with_capacity(n);
1581        // Using uppercase A-Z (65-90), 26 possible characters
1582        for i in 0..n {
1583            // Change character every 1024 bytes
1584            let block_num = i / 1024;
1585            // Map to uppercase A-Z range (65-90)
1586            let ascii_val = 65 + (block_num % 26) as u8;
1587            res.push(ascii_val);
1588        }
1589        Bytes::from(res)
1590    }
1591
1592    // import data via import_bytes, check that we can observe it and that it is complete
1593    #[tokio::test]
1594    async fn test_import_byte_stream() -> TestResult<()> {
1595        tracing_subscriber::fmt::try_init().ok();
1596        let testdir = tempfile::tempdir()?;
1597        let db_dir = testdir.path().join("db");
1598        let store = FsStore::load(db_dir).await?;
1599        for size in INTERESTING_SIZES {
1600            let expected = test_data(size);
1601            let expected_hash = Hash::new(&expected);
1602            let stream = bytes_to_stream(expected.clone(), 1023);
1603            let obs = store.observe(expected_hash);
1604            let tt = store.add_stream(stream).await.temp_tag().await?;
1605            assert_eq!(expected_hash, tt.hash());
1606            // we must at some point see completion, otherwise the test will hang
1607            obs.await_completion().await?;
1608            let actual = store.get_bytes(expected_hash).await?;
1609            // check that the data is there
1610            assert_eq!(&expected, &actual);
1611        }
1612        Ok(())
1613    }
1614
1615    // import data via import_bytes, check that we can observe it and that it is complete
1616    #[tokio::test]
1617    async fn test_import_bytes_simple() -> TestResult<()> {
1618        tracing_subscriber::fmt::try_init().ok();
1619        let testdir = tempfile::tempdir()?;
1620        let db_dir = testdir.path().join("db");
1621        let store = FsStore::load(&db_dir).await?;
1622        let sizes = INTERESTING_SIZES;
1623        trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1624        for size in sizes {
1625            let expected = test_data(size);
1626            let expected_hash = Hash::new(&expected);
1627            let obs = store.observe(expected_hash);
1628            let tt = store.add_bytes(expected.clone()).await?;
1629            assert_eq!(expected_hash, tt.hash);
1630            // we must at some point see completion, otherwise the test will hang
1631            obs.await_completion().await?;
1632            let actual = store.get_bytes(expected_hash).await?;
1633            // check that the data is there
1634            assert_eq!(&expected, &actual);
1635        }
1636        store.shutdown().await?;
1637        dump_dir_full(db_dir)?;
1638        Ok(())
1639    }
1640
1641    // import data via import_bytes, check that we can observe it and that it is complete
1642    #[tokio::test]
1643    #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1644    async fn test_roundtrip_bytes_small() -> TestResult<()> {
1645        tracing_subscriber::fmt::try_init().ok();
1646        let testdir = tempfile::tempdir()?;
1647        let db_dir = testdir.path().join("db");
1648        let store = FsStore::load(db_dir).await?;
1649        for size in INTERESTING_SIZES
1650            .into_iter()
1651            .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1652        {
1653            let expected = test_data(size);
1654            let expected_hash = Hash::new(&expected);
1655            let obs = store.observe(expected_hash);
1656            let tt = store.add_bytes(expected.clone()).await?;
1657            assert_eq!(expected_hash, tt.hash);
1658            let actual = store.get_bytes(expected_hash).await?;
1659            // check that the data is there
1660            assert_eq!(&expected, &actual);
1661            assert_eq!(
1662                &expected.addr(),
1663                &actual.addr(),
1664                "address mismatch for size {size}"
1665            );
1666            // we must at some point see completion, otherwise the test will hang
1667            // keep the handle alive by observing until the end, otherwise the handle
1668            // will change and the bytes won't be the same instance anymore
1669            obs.await_completion().await?;
1670        }
1671        store.shutdown().await?;
1672        Ok(())
1673    }
1674
1675    // import data via import_bytes, check that we can observe it and that it is complete
1676    #[tokio::test]
1677    async fn test_import_path() -> TestResult<()> {
1678        tracing_subscriber::fmt::try_init().ok();
1679        let testdir = tempfile::tempdir()?;
1680        let db_dir = testdir.path().join("db");
1681        let store = FsStore::load(db_dir).await?;
1682        for size in INTERESTING_SIZES {
1683            let expected = test_data(size);
1684            let expected_hash = Hash::new(&expected);
1685            let path = testdir.path().join(format!("in-{size}"));
1686            fs::write(&path, &expected)?;
1687            let obs = store.observe(expected_hash);
1688            let tt = store.add_path(&path).await?;
1689            assert_eq!(expected_hash, tt.hash);
1690            // we must at some point see completion, otherwise the test will hang
1691            obs.await_completion().await?;
1692            let actual = store.get_bytes(expected_hash).await?;
1693            // check that the data is there
1694            assert_eq!(&expected, &actual, "size={size}");
1695        }
1696        dump_dir_full(testdir.path())?;
1697        Ok(())
1698    }
1699
1700    // import data via import_bytes, check that we can observe it and that it is complete
1701    #[tokio::test]
1702    async fn test_export_path() -> TestResult<()> {
1703        tracing_subscriber::fmt::try_init().ok();
1704        let testdir = tempfile::tempdir()?;
1705        let db_dir = testdir.path().join("db");
1706        let store = FsStore::load(db_dir).await?;
1707        for size in INTERESTING_SIZES {
1708            let expected = test_data(size);
1709            let expected_hash = Hash::new(&expected);
1710            let tt = store.add_bytes(expected.clone()).await?;
1711            assert_eq!(expected_hash, tt.hash);
1712            let out_path = testdir.path().join(format!("out-{size}"));
1713            store.export(expected_hash, &out_path).await?;
1714            let actual = fs::read(&out_path)?;
1715            assert_eq!(expected, actual);
1716        }
1717        Ok(())
1718    }
1719
1720    #[tokio::test]
1721    async fn test_import_bao_ranges() -> TestResult<()> {
1722        tracing_subscriber::fmt::try_init().ok();
1723        let testdir = tempfile::tempdir()?;
1724        let db_dir = testdir.path().join("db");
1725        {
1726            let store = FsStore::load(&db_dir).await?;
1727            let data = test_data(100000);
1728            let ranges = ChunkRanges::chunks(16..32);
1729            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1730            store
1731                .import_bao_bytes(hash, ranges.clone(), bao.clone())
1732                .await?;
1733            let bitfield = store.observe(hash).await?;
1734            assert_eq!(bitfield.ranges, ranges);
1735            assert_eq!(bitfield.size(), data.len() as u64);
1736            let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1737            assert_eq!(export, bao);
1738        }
1739        Ok(())
1740    }
1741
1742    #[tokio::test]
1743    async fn test_import_bao_minimal() -> TestResult<()> {
1744        tracing_subscriber::fmt::try_init().ok();
1745        let testdir = tempfile::tempdir()?;
1746        let sizes = [1];
1747        let db_dir = testdir.path().join("db");
1748        {
1749            let store = FsStore::load(&db_dir).await?;
1750            for size in sizes {
1751                let data = vec![0u8; size];
1752                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1753                let data = Bytes::from(encoded);
1754                store
1755                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1756                    .await?;
1757            }
1758            store.shutdown().await?;
1759        }
1760        Ok(())
1761    }
1762
1763    #[tokio::test]
1764    async fn test_import_bao_simple() -> TestResult<()> {
1765        tracing_subscriber::fmt::try_init().ok();
1766        let testdir = tempfile::tempdir()?;
1767        let sizes = [1048576];
1768        let db_dir = testdir.path().join("db");
1769        {
1770            let store = FsStore::load(&db_dir).await?;
1771            for size in sizes {
1772                let data = vec![0u8; size];
1773                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1774                let data = Bytes::from(encoded);
1775                trace!("importing size={}", size);
1776                store
1777                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1778                    .await?;
1779            }
1780            store.shutdown().await?;
1781        }
1782        Ok(())
1783    }
1784
1785    #[tokio::test]
1786    async fn test_import_bao_persistence_full() -> TestResult<()> {
1787        tracing_subscriber::fmt::try_init().ok();
1788        let testdir = tempfile::tempdir()?;
1789        let sizes = INTERESTING_SIZES;
1790        let db_dir = testdir.path().join("db");
1791        {
1792            let store = FsStore::load(&db_dir).await?;
1793            for size in sizes {
1794                let data = vec![0u8; size];
1795                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1796                let data = Bytes::from(encoded);
1797                store
1798                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1799                    .await?;
1800            }
1801            store.shutdown().await?;
1802        }
1803        {
1804            let store = FsStore::load(&db_dir).await?;
1805            for size in sizes {
1806                let expected = vec![0u8; size];
1807                let hash = Hash::new(&expected);
1808                let actual = store
1809                    .export_bao(hash, ChunkRanges::all())
1810                    .data_to_vec()
1811                    .await?;
1812                assert_eq!(&expected, &actual);
1813            }
1814            store.shutdown().await?;
1815        }
1816        Ok(())
1817    }
1818
1819    #[tokio::test]
1820    async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1821        tracing_subscriber::fmt::try_init().ok();
1822        let testdir = tempfile::tempdir()?;
1823        let sizes = INTERESTING_SIZES;
1824        let db_dir = testdir.path().join("db");
1825        let just_size = ChunkRanges::last_chunk();
1826        {
1827            let store = FsStore::load(&db_dir).await?;
1828            for size in sizes {
1829                let data = test_data(size);
1830                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1831                let data = Bytes::from(encoded);
1832                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1833                    panic!("failed to import size={size}: {cause}");
1834                }
1835            }
1836            store.dump().await?;
1837            store.shutdown().await?;
1838        }
1839        {
1840            let store = FsStore::load(&db_dir).await?;
1841            store.dump().await?;
1842            for size in sizes {
1843                let data = test_data(size);
1844                let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1845                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1846                    Ok(actual) => actual,
1847                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1848                };
1849                assert_eq!(&expected, &actual);
1850            }
1851            store.shutdown().await?;
1852        }
1853        dump_dir_full(testdir.path())?;
1854        Ok(())
1855    }
1856
1857    #[tokio::test]
1858    async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1859        tracing_subscriber::fmt::try_init().ok();
1860        let testdir = tempfile::tempdir()?;
1861        let sizes = INTERESTING_SIZES;
1862        let db_dir = testdir.path().join("db");
1863        let just_size = ChunkRanges::last_chunk();
1864        // stage 1, import just the last full chunk group to get a validated size
1865        {
1866            let store = FsStore::load(&db_dir).await?;
1867            for size in sizes {
1868                let data = test_data(size);
1869                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1870                let data = Bytes::from(encoded);
1871                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1872                    panic!("failed to import size={size}: {cause}");
1873                }
1874            }
1875            store.dump().await?;
1876            store.shutdown().await?;
1877        }
1878        dump_dir_full(testdir.path())?;
1879        // stage 2, import the rest
1880        {
1881            let store = FsStore::load(&db_dir).await?;
1882            for size in sizes {
1883                let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1884                if remaining.is_empty() {
1885                    continue;
1886                }
1887                let data = test_data(size);
1888                let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1889                let data = Bytes::from(encoded);
1890                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1891                    panic!("failed to import size={size}: {cause}");
1892                }
1893            }
1894            store.dump().await?;
1895            store.shutdown().await?;
1896        }
1897        // check if the data is complete
1898        {
1899            let store = FsStore::load(&db_dir).await?;
1900            store.dump().await?;
1901            for size in sizes {
1902                let data = test_data(size);
1903                let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1904                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1905                    Ok(actual) => actual,
1906                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1907                };
1908                assert_eq!(&expected, &actual);
1909            }
1910            store.dump().await?;
1911            store.shutdown().await?;
1912        }
1913        dump_dir_full(testdir.path())?;
1914        Ok(())
1915    }
1916
1917    fn just_size() -> ChunkRanges {
1918        ChunkRanges::last_chunk()
1919    }
1920
1921    #[tokio::test]
1922    async fn test_import_bao_persistence_observe() -> TestResult<()> {
1923        tracing_subscriber::fmt::try_init().ok();
1924        let testdir = tempfile::tempdir()?;
1925        let sizes = INTERESTING_SIZES;
1926        let db_dir = testdir.path().join("db");
1927        let just_size = just_size();
1928        // stage 1, import just the last full chunk group to get a validated size
1929        {
1930            let store = FsStore::load(&db_dir).await?;
1931            for size in sizes {
1932                let data = test_data(size);
1933                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1934                let data = Bytes::from(encoded);
1935                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1936                    panic!("failed to import size={size}: {cause}");
1937                }
1938            }
1939            store.dump().await?;
1940            store.shutdown().await?;
1941        }
1942        dump_dir_full(testdir.path())?;
1943        // stage 2, import the rest
1944        {
1945            let store = FsStore::load(&db_dir).await?;
1946            for size in sizes {
1947                let expected_ranges = round_up_request(size as u64, &just_size);
1948                let data = test_data(size);
1949                let hash = Hash::new(&data);
1950                let bitfield = store.observe(hash).await?;
1951                assert_eq!(bitfield.ranges, expected_ranges);
1952            }
1953            store.dump().await?;
1954            store.shutdown().await?;
1955        }
1956        Ok(())
1957    }
1958
1959    #[tokio::test]
1960    async fn test_import_bao_persistence_recover() -> TestResult<()> {
1961        tracing_subscriber::fmt::try_init().ok();
1962        let testdir = tempfile::tempdir()?;
1963        let sizes = INTERESTING_SIZES;
1964        let db_dir = testdir.path().join("db");
1965        let options = Options::new(&db_dir);
1966        let just_size = just_size();
1967        // stage 1, import just the last full chunk group to get a validated size
1968        {
1969            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1970            for size in sizes {
1971                let data = test_data(size);
1972                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1973                let data = Bytes::from(encoded);
1974                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1975                    panic!("failed to import size={size}: {cause}");
1976                }
1977            }
1978            store.dump().await?;
1979            store.shutdown().await?;
1980        }
1981        delete_rec(testdir.path(), "bitfield")?;
1982        dump_dir_full(testdir.path())?;
1983        // stage 2, import the rest
1984        {
1985            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1986            for size in sizes {
1987                let expected_ranges = round_up_request(size as u64, &just_size);
1988                let data = test_data(size);
1989                let hash = Hash::new(&data);
1990                let bitfield = store.observe(hash).await?;
1991                assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1992            }
1993            store.dump().await?;
1994            store.shutdown().await?;
1995        }
1996        Ok(())
1997    }
1998
1999    #[tokio::test]
2000    async fn test_import_bytes_persistence_full() -> TestResult<()> {
2001        tracing_subscriber::fmt::try_init().ok();
2002        let testdir = tempfile::tempdir()?;
2003        let sizes = INTERESTING_SIZES;
2004        let db_dir = testdir.path().join("db");
2005        {
2006            let store = FsStore::load(&db_dir).await?;
2007            let mut tts = Vec::new();
2008            for size in sizes {
2009                let data = test_data(size);
2010                let data = data;
2011                tts.push(store.add_bytes(data.clone()).await?);
2012            }
2013            store.dump().await?;
2014            store.shutdown().await?;
2015        }
2016        {
2017            let store = FsStore::load(&db_dir).await?;
2018            store.dump().await?;
2019            for size in sizes {
2020                let expected = test_data(size);
2021                let hash = Hash::new(&expected);
2022                let Ok(actual) = store
2023                    .export_bao(hash, ChunkRanges::all())
2024                    .data_to_vec()
2025                    .await
2026                else {
2027                    panic!("failed to export size={size}");
2028                };
2029                assert_eq!(&expected, &actual, "size={size}");
2030            }
2031            store.shutdown().await?;
2032        }
2033        Ok(())
2034    }
2035
2036    async fn test_batch(store: &Store) -> TestResult<()> {
2037        let batch = store.blobs().batch().await?;
2038        let tt1 = batch.temp_tag(Hash::new("foo")).await?;
2039        let tt2 = batch.add_slice("boo").await?;
2040        let tts = store
2041            .tags()
2042            .list_temp_tags()
2043            .await?
2044            .collect::<HashSet<_>>()
2045            .await;
2046        assert!(tts.contains(&tt1.hash_and_format()));
2047        assert!(tts.contains(&tt2.hash_and_format()));
2048        drop(batch);
2049        store.sync_db().await?;
2050        store.wait_idle().await?;
2051        let tts = store
2052            .tags()
2053            .list_temp_tags()
2054            .await?
2055            .collect::<HashSet<_>>()
2056            .await;
2057        // temp tag went out of scope, so it does not work anymore
2058        assert!(!tts.contains(&tt1.hash_and_format()));
2059        assert!(!tts.contains(&tt2.hash_and_format()));
2060        drop(tt1);
2061        drop(tt2);
2062        Ok(())
2063    }
2064
2065    #[tokio::test]
2066    async fn test_batch_fs() -> TestResult<()> {
2067        tracing_subscriber::fmt::try_init().ok();
2068        let testdir = tempfile::tempdir()?;
2069        let db_dir = testdir.path().join("db");
2070        let store = FsStore::load(db_dir).await?;
2071        test_batch(&store).await
2072    }
2073
2074    #[tokio::test]
2075    async fn smoke() -> TestResult<()> {
2076        tracing_subscriber::fmt::try_init().ok();
2077        let testdir = tempfile::tempdir()?;
2078        let db_dir = testdir.path().join("db");
2079        let store = FsStore::load(db_dir).await?;
2080        let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
2081        store.tags().set(Tag::from("test"), haf).await?;
2082        store.tags().set(Tag::from("boo"), haf).await?;
2083        store.tags().set(Tag::from("bar"), haf).await?;
2084        let sizes = INTERESTING_SIZES;
2085        let mut hashes = Vec::new();
2086        let mut data_by_hash = HashMap::new();
2087        let mut bao_by_hash = HashMap::new();
2088        for size in sizes {
2089            let data = vec![0u8; size];
2090            let data = Bytes::from(data);
2091            let tt = store.add_bytes(data.clone()).temp_tag().await?;
2092            data_by_hash.insert(tt.hash(), data);
2093            hashes.push(tt);
2094        }
2095        store.sync_db().await?;
2096        for tt in &hashes {
2097            let hash = tt.hash();
2098            let path = testdir.path().join(format!("{hash}.txt"));
2099            store.export(hash, path).await?;
2100        }
2101        for tt in &hashes {
2102            let hash = tt.hash();
2103            let data = store
2104                .export_bao(hash, ChunkRanges::all())
2105                .data_to_vec()
2106                .await
2107                .unwrap();
2108            assert_eq!(data, data_by_hash[&hash].to_vec());
2109            let bao = store
2110                .export_bao(hash, ChunkRanges::all())
2111                .bao_to_vec()
2112                .await
2113                .unwrap();
2114            bao_by_hash.insert(hash, bao);
2115        }
2116        store.dump().await?;
2117
2118        for size in sizes {
2119            let data = test_data(size);
2120            let ranges = ChunkRanges::all();
2121            let (hash, bao) = create_n0_bao(&data, &ranges)?;
2122            store.import_bao_bytes(hash, ranges, bao).await?;
2123        }
2124
2125        for (_hash, _bao_tree) in bao_by_hash {
2126            // let mut reader = Cursor::new(bao_tree);
2127            // let size = reader.read_u64_le().await?;
2128            // let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
2129            // let ranges = ChunkRanges::all();
2130            // let mut decoder = DecodeResponseIter::new(hash, tree, reader, &ranges);
2131            // while let Some(item) = decoder.next() {
2132            //     let item = item?;
2133            // }
2134            // store.import_bao_bytes(hash, ChunkRanges::all(), bao_tree.into()).await?;
2135        }
2136        Ok(())
2137    }
2138
2139    pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
2140        // Remove leading dot if present, so we have just the extension
2141        let ext = extension.trim_start_matches('.').to_lowercase();
2142
2143        for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
2144            let path = entry.path();
2145
2146            if path.is_file() {
2147                if let Some(file_ext) = path.extension() {
2148                    if file_ext.to_string_lossy().to_lowercase() == ext {
2149                        fs::remove_file(path)?;
2150                    }
2151                }
2152            }
2153        }
2154
2155        Ok(())
2156    }
2157
2158    pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
2159        let mut entries: Vec<_> = WalkDir::new(&path)
2160            .into_iter()
2161            .filter_map(Result::ok) // Skip errors
2162            .collect();
2163
2164        // Sort by path (name at each depth)
2165        entries.sort_by(|a, b| a.path().cmp(b.path()));
2166
2167        for entry in entries {
2168            let depth = entry.depth();
2169            let indent = "  ".repeat(depth); // Two spaces per level
2170            let name = entry.file_name().to_string_lossy();
2171            let size = entry.metadata()?.len(); // Size in bytes
2172
2173            if entry.file_type().is_file() {
2174                println!("{indent}{name} ({size} bytes)");
2175            } else if entry.file_type().is_dir() {
2176                println!("{indent}{name}/");
2177            }
2178        }
2179        Ok(())
2180    }
2181
2182    pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
2183        let mut entries: Vec<_> = WalkDir::new(&path)
2184            .into_iter()
2185            .filter_map(Result::ok) // Skip errors
2186            .collect();
2187
2188        // Sort by path (name at each depth)
2189        entries.sort_by(|a, b| a.path().cmp(b.path()));
2190
2191        for entry in entries {
2192            let depth = entry.depth();
2193            let indent = "  ".repeat(depth);
2194            let name = entry.file_name().to_string_lossy();
2195
2196            if entry.file_type().is_dir() {
2197                println!("{indent}{name}/");
2198            } else if entry.file_type().is_file() {
2199                let size = entry.metadata()?.len();
2200                println!("{indent}{name} ({size} bytes)");
2201
2202                // Dump depending on file type
2203                let path = entry.path();
2204                if name.ends_with(".data") {
2205                    print!("{indent}  ");
2206                    dump_file(path, 1024 * 16)?;
2207                } else if name.ends_with(".obao4") {
2208                    print!("{indent}  ");
2209                    dump_file(path, 64)?;
2210                } else if name.ends_with(".sizes4") {
2211                    print!("{indent}  ");
2212                    dump_file(path, 8)?;
2213                } else if name.ends_with(".bitfield") {
2214                    match read_checksummed::<Bitfield>(path) {
2215                        Ok(bitfield) => {
2216                            println!("{indent}  bitfield: {bitfield:?}");
2217                        }
2218                        Err(cause) => {
2219                            println!("{indent}  bitfield: error: {cause}");
2220                        }
2221                    }
2222                } else {
2223                    continue; // Skip content dump for other files
2224                };
2225            }
2226        }
2227        Ok(())
2228    }
2229
2230    pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2231        let bits = file_bits(path, chunk_size)?;
2232        println!("{}", print_bitfield_ansi(bits));
2233        Ok(())
2234    }
2235
2236    pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2237        let file = fs::File::open(&path)?;
2238        let file_size = file.metadata()?.len();
2239        let mut buffer = vec![0u8; chunk_size as usize];
2240        let mut bits = Vec::new();
2241
2242        let mut offset = 0u64;
2243        while offset < file_size {
2244            let remaining = file_size - offset;
2245            let current_chunk_size = chunk_size.min(remaining);
2246
2247            let chunk = &mut buffer[..current_chunk_size as usize];
2248            file.read_exact_at(offset, chunk)?;
2249
2250            let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2251            bits.push(has_non_zero);
2252
2253            offset += current_chunk_size;
2254        }
2255
2256        Ok(bits)
2257    }
2258
2259    #[allow(dead_code)]
2260    fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2261        bits.into_iter()
2262            .map(|bit| if bit { '#' } else { '_' })
2263            .collect()
2264    }
2265
2266    fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2267        let mut result = String::new();
2268        let mut iter = bits.into_iter();
2269
2270        while let Some(b1) = iter.next() {
2271            let b2 = iter.next();
2272
2273            // ANSI color codes
2274            let white_fg = "\x1b[97m"; // bright white foreground
2275            let reset = "\x1b[0m"; // reset all attributes
2276            let gray_bg = "\x1b[100m"; // bright black (gray) background
2277            let black_bg = "\x1b[40m"; // black background
2278
2279            let colored_char = match (b1, b2) {
2280                (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), // 11 - solid white on default background
2281                (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), // 10 - left half white on gray background
2282                (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), // 01 - right half white on gray background
2283                (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), // 00 - space with gray background
2284                (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), // 1 (pad 0) - left half white on black background
2285                (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), // 0 (pad 0) - space with black background
2286            };
2287
2288            result.push_str(&colored_char);
2289        }
2290
2291        // Ensure we end with a reset code to prevent color bleeding
2292        result.push_str("\x1b[0m");
2293        result
2294    }
2295
2296    fn bytes_to_stream(
2297        bytes: Bytes,
2298        chunk_size: usize,
2299    ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2300        assert!(chunk_size > 0, "Chunk size must be greater than 0");
2301        stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2302            if offset >= bytes.len() {
2303                None
2304            } else {
2305                let chunk_len = chunk_size.min(bytes.len() - offset);
2306                let chunk = bytes.slice(offset..offset + chunk_len);
2307                Some((Ok(chunk), (bytes, offset + chunk_len)))
2308            }
2309        })
2310    }
2311}