iroh_bytes/store/
fs.rs

1//! redb backed storage
2//!
3//! Data can get into the store in two ways:
4//!
5//! 1. import from local data
6//! 2. sync from a remote
7//!
8//! These two cases are very different. In the first case, we have the data
9//! completely and don't know the hash yet. We compute the outboard and hash,
10//! and only then move/reference the data into the store.
11//!
12//! The entry for the hash comes into existence already complete.
13//!
14//! In the second case, we know the hash, but don't have the data yet. We create
15//! a partial entry, and then request the data from the remote. This is the more
16//! complex case.
17//!
18//! Partial entries always start as pure in memory entries without a database
19//! entry. Only once we receive enough data, we convert them into a persistent
20//! partial entry. This is necessary because we can't trust the size given
21//! by the remote side before receiving data. It is also an optimization,
22//! because for small blobs it is not worth it to create a partial entry.
23//!
24//! A persistent partial entry is always stored as three files in the file
25//! system: The data file, the outboard file, and a sizes file that contains
26//! the most up to date information about the size of the data.
27//!
28//! The redb database entry for a persistent partial entry does not contain
29//! any information about the size of the data until the size is exactly known.
30//!
31//! Updating this information on each write would be too costly.
32//!
33//! Marking a partial entry as complete is done from the outside. At this point
34//! the size is taken as validated. Depending on the size we decide whether to
35//! store data and outboard inline or to keep storing it in external files.
36//!
37//! Data can get out of the store in two ways:
38//!
39//! 1. the data and outboard of both partial and complete entries can be read
40//! at any time and shared over the network. Only data that is complete will
41//! be shared, everything else will lead to validation errors.
42//!
43//! 2. entries can be exported to the file system. This currently only works
44//! for complete entries.
45//!
46//! Tables:
47//!
48//! The blobs table contains a mapping from hash to rough entry state.
49//! The inline_data table contains the actual data for complete entries.
50//! The inline_outboard table contains the actual outboard for complete entries.
51//! The tags table contains a mapping from tag to hash.
52//!
53//! Design:
54//!
55//! The redb store is accessed in a single threaded way by an actor that runs
56//! on its own std thread. Communication with this actor is via a flume channel,
57//! with oneshot channels for the return values if needed.
58//!
59//! Errors:
60//!
61//! ActorError is an enum containing errors that can happen inside message
62//! handlers of the actor. This includes various redb related errors and io
63//! errors when reading or writing non-inlined data or outboard files.
64//!
65//! OuterError is an enum containing all the actor errors and in addition
66//! errors when communicating with the actor.
67use std::{
68    collections::{BTreeMap, BTreeSet},
69    io::{self, BufReader, Read},
70    path::{Path, PathBuf},
71    sync::{Arc, RwLock},
72    time::{Duration, SystemTime},
73};
74
75use bao_tree::io::{
76    fsm::Outboard,
77    outboard::PreOrderOutboard,
78    sync::{ReadAt, Size},
79};
80use bytes::Bytes;
81use futures_lite::{Stream, StreamExt};
82
83use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
84use iroh_io::AsyncSliceReader;
85use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
86use serde::{Deserialize, Serialize};
87use smallvec::SmallVec;
88use tokio::{io::AsyncWriteExt, sync::oneshot};
89use tracing::trace_span;
90
91mod import_flat_store;
92mod migrate_redb_v1_v2;
93mod tables;
94#[doc(hidden)]
95pub mod test_support;
96#[cfg(test)]
97mod tests;
98mod util;
99mod validate;
100
101use crate::{
102    store::{
103        bao_file::{BaoFileStorage, CompleteStorage},
104        fs::{
105            tables::BaoFilePart,
106            util::{overwrite_and_sync, read_and_remove, ProgressReader},
107        },
108    },
109    util::{
110        progress::{
111            BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
112            ProgressSender,
113        },
114        raw_outboard_size, LivenessTracker, MemOrFile,
115    },
116    Tag, TempTag, IROH_BLOCK_SIZE,
117};
118use tables::{ReadOnlyTables, ReadableTables, Tables};
119
120use self::{tables::DeleteSet, util::PeekableFlumeReceiver};
121
122use self::test_support::EntryData;
123
124use super::{
125    bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
126    temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
127    ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
128};
129
130/// Location of the data.
131///
132/// Data can be inlined in the database, a file conceptually owned by the store,
133/// or a number of external files conceptually owned by the user.
134///
135/// Only complete data can be inlined.
136#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub(crate) enum DataLocation<I = (), E = ()> {
138    /// Data is in the inline_data table.
139    Inline(I),
140    /// Data is in the canonical location in the data directory.
141    Owned(E),
142    /// Data is in several external locations. This should be a non-empty list.
143    External(Vec<PathBuf>, E),
144}
145
146impl<X> DataLocation<X, u64> {
147    fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
148        Ok(match (self, that) {
149            (
150                DataLocation::External(mut paths, a_size),
151                DataLocation::External(b_paths, b_size),
152            ) => {
153                if a_size != b_size {
154                    return Err(ActorError::Inconsistent(format!(
155                        "complete size mismatch {} {}",
156                        a_size, b_size
157                    )));
158                }
159                paths.extend(b_paths);
160                paths.sort();
161                paths.dedup();
162                DataLocation::External(paths, a_size)
163            }
164            (_, b @ DataLocation::Owned(_)) => {
165                // owned needs to win, since it has an associated file. Choosing
166                // external would orphan the file.
167                b
168            }
169            (a @ DataLocation::Owned(_), _) => {
170                // owned needs to win, since it has an associated file. Choosing
171                // external would orphan the file.
172                a
173            }
174            (_, b @ DataLocation::Inline(_)) => {
175                // inline needs to win, since it has associated data. Choosing
176                // external would orphan the file.
177                b
178            }
179            (a @ DataLocation::Inline(_), _) => {
180                // inline needs to win, since it has associated data. Choosing
181                // external would orphan the file.
182                a
183            }
184        })
185    }
186}
187
188impl<I, E> DataLocation<I, E> {
189    fn discard_inline_data(self) -> DataLocation<(), E> {
190        match self {
191            DataLocation::Inline(_) => DataLocation::Inline(()),
192            DataLocation::Owned(x) => DataLocation::Owned(x),
193            DataLocation::External(paths, x) => DataLocation::External(paths, x),
194        }
195    }
196}
197
198/// Location of the outboard.
199///
200/// Outboard can be inlined in the database or a file conceptually owned by the store.
201/// Outboards are implementation specific to the store and as such are always owned.
202///
203/// Only complete outboards can be inlined.
204#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
205pub(crate) enum OutboardLocation<I = ()> {
206    /// Outboard is in the inline_outboard table.
207    Inline(I),
208    /// Outboard is in the canonical location in the data directory.
209    Owned,
210    /// Outboard is not needed
211    NotNeeded,
212}
213
214impl<I> OutboardLocation<I> {
215    fn discard_extra_data(self) -> OutboardLocation<()> {
216        match self {
217            Self::Inline(_) => OutboardLocation::Inline(()),
218            Self::Owned => OutboardLocation::Owned,
219            Self::NotNeeded => OutboardLocation::NotNeeded,
220        }
221    }
222}
223
224/// The information about an entry that we keep in the entry table for quick access.
225///
226/// The exact info to store here is TBD, so usually you should use the accessor methods.
227#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
228pub(crate) enum EntryState<I = ()> {
229    /// For a complete entry we always know the size. It does not make much sense
230    /// to write to a complete entry, so they are much easier to share.
231    Complete {
232        /// Location of the data.
233        data_location: DataLocation<I, u64>,
234        /// Location of the outboard.
235        outboard_location: OutboardLocation<I>,
236    },
237    /// Partial entries are entries for which we know the hash, but don't have
238    /// all the data. They are created when syncing from somewhere else by hash.
239    ///
240    /// As such they are always owned. There is also no inline storage for them.
241    /// Non short lived partial entries always live in the file system, and for
242    /// short lived ones we never create a database entry in the first place.
243    Partial {
244        /// Once we get the last chunk of a partial entry, we have validated
245        /// the size of the entry despite it still being incomplete.
246        ///
247        /// E.g. a giant file where we just requested the last chunk.
248        size: Option<u64>,
249    },
250}
251
252impl Default for EntryState {
253    fn default() -> Self {
254        Self::Partial { size: None }
255    }
256}
257
258impl EntryState {
259    fn union(self, that: Self) -> ActorResult<Self> {
260        match (self, that) {
261            (
262                Self::Complete {
263                    data_location,
264                    outboard_location,
265                },
266                Self::Complete {
267                    data_location: b_data_location,
268                    ..
269                },
270            ) => Ok(Self::Complete {
271                // combine external paths if needed
272                data_location: data_location.union(b_data_location)?,
273                outboard_location,
274            }),
275            (a @ Self::Complete { .. }, Self::Partial { .. }) =>
276            // complete wins over partial
277            {
278                Ok(a)
279            }
280            (Self::Partial { .. }, b @ Self::Complete { .. }) =>
281            // complete wins over partial
282            {
283                Ok(b)
284            }
285            (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
286            // keep known size from either entry
287            {
288                let size = match (a_size, b_size) {
289                    (Some(a_size), Some(b_size)) => {
290                        // validated sizes are different. this means that at
291                        // least one validation was wrong, which would be a bug
292                        // in bao-tree.
293                        if a_size != b_size {
294                            return Err(ActorError::Inconsistent(format!(
295                                "validated size mismatch {} {}",
296                                a_size, b_size
297                            )));
298                        }
299                        Some(a_size)
300                    }
301                    (Some(a_size), None) => Some(a_size),
302                    (None, Some(b_size)) => Some(b_size),
303                    (None, None) => None,
304                };
305                Ok(Self::Partial { size })
306            }
307        }
308    }
309}
310
311impl redb::Value for EntryState {
312    type SelfType<'a> = EntryState;
313
314    type AsBytes<'a> = SmallVec<[u8; 128]>;
315
316    fn fixed_width() -> Option<usize> {
317        None
318    }
319
320    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
321    where
322        Self: 'a,
323    {
324        postcard::from_bytes(data).unwrap()
325    }
326
327    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
328    where
329        Self: 'a,
330        Self: 'b,
331    {
332        postcard::to_extend(value, SmallVec::new()).unwrap()
333    }
334
335    fn type_name() -> redb::TypeName {
336        redb::TypeName::new("EntryState")
337    }
338}
339
340/// Options for inlining small complete data or outboards.
341#[derive(Debug, Clone)]
342pub struct InlineOptions {
343    /// Maximum data size to inline.
344    pub max_data_inlined: u64,
345    /// Maximum outboard size to inline.
346    pub max_outboard_inlined: u64,
347}
348
349impl InlineOptions {
350    /// Do not inline anything, ever.
351    pub const NO_INLINE: Self = Self {
352        max_data_inlined: 0,
353        max_outboard_inlined: 0,
354    };
355    /// Always inline everything
356    pub const ALWAYS_INLINE: Self = Self {
357        max_data_inlined: u64::MAX,
358        max_outboard_inlined: u64::MAX,
359    };
360}
361
362impl Default for InlineOptions {
363    fn default() -> Self {
364        Self {
365            max_data_inlined: 1024 * 16,
366            max_outboard_inlined: 1024 * 16,
367        }
368    }
369}
370
371/// Options for directories used by the file store.
372#[derive(Debug, Clone)]
373pub struct PathOptions {
374    /// Path to the directory where data and outboard files are stored.
375    pub data_path: PathBuf,
376    /// Path to the directory where temp files are stored.
377    /// This *must* be on the same device as `data_path`, since we need to
378    /// atomically move temp files into place.
379    pub temp_path: PathBuf,
380}
381
382impl PathOptions {
383    fn new(root: &Path) -> Self {
384        Self {
385            data_path: root.join("data"),
386            temp_path: root.join("temp"),
387        }
388    }
389
390    fn owned_data_path(&self, hash: &Hash) -> PathBuf {
391        self.data_path.join(format!("{}.data", hash.to_hex()))
392    }
393
394    fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
395        self.data_path.join(format!("{}.obao4", hash.to_hex()))
396    }
397
398    fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
399        self.data_path.join(format!("{}.sizes4", hash.to_hex()))
400    }
401
402    fn temp_file_name(&self) -> PathBuf {
403        self.temp_path.join(temp_name())
404    }
405}
406
407/// Options for transaction batching.
408#[derive(Debug, Clone)]
409pub struct BatchOptions {
410    /// Maximum number of actor messages to batch before creating a new read transaction.
411    pub max_read_batch: usize,
412    /// Maximum duration to wait before committing a read transaction.
413    pub max_read_duration: Duration,
414    /// Maximum number of actor messages to batch before committing write transaction.
415    pub max_write_batch: usize,
416    /// Maximum duration to wait before committing a write transaction.
417    pub max_write_duration: Duration,
418}
419
420impl Default for BatchOptions {
421    fn default() -> Self {
422        Self {
423            max_read_batch: 10000,
424            max_read_duration: Duration::from_secs(1),
425            max_write_batch: 1000,
426            max_write_duration: Duration::from_millis(500),
427        }
428    }
429}
430
431/// Options for the file store.
432#[derive(Debug, Clone)]
433pub struct Options {
434    /// Path options.
435    pub path: PathOptions,
436    /// Inline storage options.
437    pub inline: InlineOptions,
438    /// Transaction batching options.
439    pub batch: BatchOptions,
440}
441
442#[derive(derive_more::Debug)]
443pub(crate) enum ImportSource {
444    TempFile(PathBuf),
445    External(PathBuf),
446    Memory(#[debug(skip)] Bytes),
447}
448
449impl ImportSource {
450    fn content(&self) -> MemOrFile<&[u8], &Path> {
451        match self {
452            Self::TempFile(path) => MemOrFile::File(path.as_path()),
453            Self::External(path) => MemOrFile::File(path.as_path()),
454            Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
455        }
456    }
457
458    fn len(&self) -> io::Result<u64> {
459        match self {
460            Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
461            Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
462            Self::Memory(data) => Ok(data.len() as u64),
463        }
464    }
465}
466
467/// Use BaoFileHandle as the entry type for the map.
468pub type Entry = BaoFileHandle;
469
470impl super::MapEntry for Entry {
471    fn hash(&self) -> Hash {
472        self.hash()
473    }
474
475    fn size(&self) -> BaoBlobSize {
476        let size = self.current_size().unwrap();
477        tracing::trace!("redb::Entry::size() = {}", size);
478        BaoBlobSize::new(size, self.is_complete())
479    }
480
481    fn is_complete(&self) -> bool {
482        self.is_complete()
483    }
484
485    async fn outboard(&self) -> io::Result<impl Outboard> {
486        self.outboard()
487    }
488
489    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
490        Ok(self.data_reader())
491    }
492}
493
494impl super::MapEntryMut for Entry {
495    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
496        Ok(self.writer())
497    }
498}
499
500#[derive(derive_more::Debug)]
501pub(crate) struct Import {
502    /// The hash and format of the data to import
503    content_id: HashAndFormat,
504    /// The source of the data to import, can be a temp file, external file, or memory
505    source: ImportSource,
506    /// Data size
507    data_size: u64,
508    /// Outboard without length prefix
509    #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
510    outboard: Option<Vec<u8>>,
511}
512
513#[derive(derive_more::Debug)]
514pub(crate) struct Export {
515    /// A temp tag to keep the entry alive while exporting. This also
516    /// contains the hash to be exported.
517    temp_tag: TempTag,
518    /// The target path for the export.
519    target: PathBuf,
520    /// The export mode to use.
521    mode: ExportMode,
522    /// The progress callback to use.
523    #[debug(skip)]
524    progress: ExportProgressCb,
525}
526
527#[derive(derive_more::Debug)]
528pub(crate) enum ActorMessage {
529    // Query method: get a file handle for a hash, if it exists.
530    // This will produce a file handle even for entries that are not yet in redb at all.
531    Get {
532        hash: Hash,
533        tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
534    },
535    /// Query method: get the rough entry status for a hash. Just complete, partial or not found.
536    EntryStatus {
537        hash: Hash,
538        tx: flume::Sender<ActorResult<EntryStatus>>,
539    },
540    #[cfg(test)]
541    /// Query method: get the full entry state for a hash, both in memory and in redb.
542    /// This is everything we got about the entry, including the actual inline outboard and data.
543    EntryState {
544        hash: Hash,
545        tx: flume::Sender<ActorResult<test_support::EntryStateResponse>>,
546    },
547    /// Query method: get the full entry state for a hash.
548    GetFullEntryState {
549        hash: Hash,
550        tx: flume::Sender<ActorResult<Option<EntryData>>>,
551    },
552    /// Modification method: set the full entry state for a hash.
553    SetFullEntryState {
554        hash: Hash,
555        entry: Option<EntryData>,
556        tx: flume::Sender<ActorResult<()>>,
557    },
558    /// Modification method: get or create a file handle for a hash.
559    ///
560    /// If the entry exists in redb, either partial or complete, the corresponding
561    /// data will be returned. If it does not yet exist, a new partial file handle
562    /// will be created, but not yet written to redb.
563    GetOrCreate {
564        hash: Hash,
565        tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
566    },
567    /// Modification method: inline size was exceeded for a partial entry.
568    /// If the entry is complete, this is a no-op. If the entry is partial and in
569    /// memory, it will be written to a file and created in redb.
570    OnMemSizeExceeded { hash: Hash },
571    /// Modification method: marks a partial entry as complete.
572    /// Calling this on a complete entry is a no-op.
573    OnComplete { handle: BaoFileHandle },
574    /// Modification method: import data into a redb store
575    ///
576    /// At this point the size, hash and outboard must already be known.
577    Import {
578        cmd: Import,
579        tx: flume::Sender<ActorResult<(TempTag, u64)>>,
580    },
581    /// Modification method: export data from a redb store
582    ///
583    /// In most cases this will not modify the store. Only when using
584    /// [`ExportMode::TryReference`] and the entry is large enough to not be
585    /// inlined.
586    Export {
587        cmd: Export,
588        tx: oneshot::Sender<ActorResult<()>>,
589    },
590    /// Modification method: import an entire flat store into the redb store.
591    ImportFlatStore {
592        paths: FlatStorePaths,
593        tx: oneshot::Sender<bool>,
594    },
595    /// Update inline options
596    UpdateInlineOptions {
597        /// The new inline options
598        inline_options: InlineOptions,
599        /// Whether to reapply the new options to existing entries
600        reapply: bool,
601        tx: oneshot::Sender<()>,
602    },
603    /// Bulk query method: get entries from the blobs table
604    Blobs {
605        #[debug(skip)]
606        filter: FilterPredicate<Hash, EntryState>,
607        #[allow(clippy::type_complexity)]
608        tx: oneshot::Sender<
609            ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
610        >,
611    },
612    /// Bulk query method: get the entire tags table
613    Tags {
614        #[debug(skip)]
615        filter: FilterPredicate<Tag, HashAndFormat>,
616        #[allow(clippy::type_complexity)]
617        tx: oneshot::Sender<
618            ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
619        >,
620    },
621    /// Modification method: set a tag to a value, or remove it.
622    SetTag {
623        tag: Tag,
624        value: Option<HashAndFormat>,
625        tx: oneshot::Sender<ActorResult<()>>,
626    },
627    /// Modification method: create a new unique tag and set it to a value.
628    CreateTag {
629        hash: HashAndFormat,
630        tx: oneshot::Sender<ActorResult<Tag>>,
631    },
632    /// Modification method: unconditional delete the data for a number of hashes
633    Delete {
634        hashes: Vec<Hash>,
635        tx: oneshot::Sender<ActorResult<()>>,
636    },
637    /// Sync the entire database to disk.
638    ///
639    /// This just makes sure that there is no write transaction open.
640    Sync { tx: oneshot::Sender<()> },
641    /// Internal method: dump the entire database to stdout.
642    Dump,
643    /// Internal method: validate the entire database.
644    ///
645    /// Note that this will block the actor until it is done, so don't use it
646    /// on a node under load.
647    Fsck {
648        repair: bool,
649        progress: BoxedProgressSender<ConsistencyCheckProgress>,
650        tx: oneshot::Sender<ActorResult<()>>,
651    },
652    /// Internal method: notify the actor that a new gc epoch has started.
653    ///
654    /// This will be called periodically and can be used to do misc cleanups.
655    GcStart { tx: oneshot::Sender<()> },
656    /// Internal method: shutdown the actor.
657    ///
658    /// Can have an optional oneshot sender to signal when the actor has shut down.
659    Shutdown { tx: Option<oneshot::Sender<()>> },
660}
661
662impl ActorMessage {
663    fn category(&self) -> MessageCategory {
664        match self {
665            Self::Get { .. }
666            | Self::GetOrCreate { .. }
667            | Self::EntryStatus { .. }
668            | Self::Blobs { .. }
669            | Self::Tags { .. }
670            | Self::GcStart { .. }
671            | Self::GetFullEntryState { .. }
672            | Self::Dump => MessageCategory::ReadOnly,
673            Self::Import { .. }
674            | Self::Export { .. }
675            | Self::OnMemSizeExceeded { .. }
676            | Self::OnComplete { .. }
677            | Self::SetTag { .. }
678            | Self::CreateTag { .. }
679            | Self::SetFullEntryState { .. }
680            | Self::Delete { .. } => MessageCategory::ReadWrite,
681            Self::UpdateInlineOptions { .. }
682            | Self::Sync { .. }
683            | Self::Shutdown { .. }
684            | Self::Fsck { .. }
685            | Self::ImportFlatStore { .. } => MessageCategory::TopLevel,
686            #[cfg(test)]
687            Self::EntryState { .. } => MessageCategory::ReadOnly,
688        }
689    }
690}
691
692enum MessageCategory {
693    ReadOnly,
694    ReadWrite,
695    TopLevel,
696}
697
698/// Predicate for filtering entries in a redb table.
699pub(crate) type FilterPredicate<K, V> =
700    Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
701
702/// Parameters for importing from a flat store
703#[derive(Debug)]
704pub struct FlatStorePaths {
705    /// Complete data files
706    pub complete: PathBuf,
707    /// Partial data files
708    pub partial: PathBuf,
709    /// Metadata files such as the tags table
710    pub meta: PathBuf,
711}
712
713/// Storage that is using a redb database for small files and files for
714/// large files.
715#[derive(Debug, Clone)]
716pub struct Store(Arc<StoreInner>);
717
718impl Store {
719    /// Load or create a new store.
720    pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
721        let path = root.as_ref();
722        let db_path = path.join("blobs.db");
723        let options = Options {
724            path: PathOptions::new(path),
725            inline: Default::default(),
726            batch: Default::default(),
727        };
728        Self::new(db_path, options).await
729    }
730
731    /// Create a new store with custom options.
732    pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
733        // spawn_blocking because StoreInner::new creates directories
734        let rt = tokio::runtime::Handle::try_current()
735            .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
736        let inner =
737            tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
738        Ok(Self(Arc::new(inner)))
739    }
740
741    /// Update the inline options.
742    ///
743    /// When reapply is true, the new options will be applied to all existing
744    /// entries.
745    pub async fn update_inline_options(
746        &self,
747        inline_options: InlineOptions,
748        reapply: bool,
749    ) -> io::Result<()> {
750        Ok(self
751            .0
752            .update_inline_options(inline_options, reapply)
753            .await?)
754    }
755
756    /// Dump the entire content of the database to stdout.
757    pub async fn dump(&self) -> io::Result<()> {
758        Ok(self.0.dump().await?)
759    }
760
761    /// Ensure that all operations before the sync are processed and persisted.
762    ///
763    /// This is done by closing any open write transaction.
764    pub async fn sync(&self) -> io::Result<()> {
765        Ok(self.0.sync().await?)
766    }
767
768    /// Import from a v0 or v1 flat store, for backwards compatibility.
769    pub async fn import_flat_store(&self, paths: FlatStorePaths) -> io::Result<bool> {
770        Ok(self.0.import_flat_store(paths).await?)
771    }
772}
773
774#[derive(Debug)]
775struct StoreInner {
776    tx: flume::Sender<ActorMessage>,
777    temp: Arc<RwLock<TempCounterMap>>,
778    handle: Option<std::thread::JoinHandle<()>>,
779    path_options: Arc<PathOptions>,
780}
781
782impl LivenessTracker for RwLock<TempCounterMap> {
783    fn on_clone(&self, content: &HashAndFormat) {
784        self.write().unwrap().inc(content);
785    }
786
787    fn on_drop(&self, content: &HashAndFormat) {
788        self.write().unwrap().dec(content);
789    }
790}
791
792impl StoreInner {
793    fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
794        tracing::trace!(
795            "creating data directory: {}",
796            options.path.data_path.display()
797        );
798        std::fs::create_dir_all(&options.path.data_path)?;
799        tracing::trace!(
800            "creating temp directory: {}",
801            options.path.temp_path.display()
802        );
803        std::fs::create_dir_all(&options.path.temp_path)?;
804        tracing::trace!(
805            "creating parent directory for db file{}",
806            path.parent().unwrap().display()
807        );
808        std::fs::create_dir_all(path.parent().unwrap())?;
809        let temp: Arc<RwLock<TempCounterMap>> = Default::default();
810        let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt)?;
811        let handle = std::thread::Builder::new()
812            .name("redb-actor".to_string())
813            .spawn(move || {
814                if let Err(cause) = actor.run_batched() {
815                    tracing::error!("redb actor failed: {}", cause);
816                }
817            })
818            .expect("failed to spawn thread");
819        Ok(Self {
820            tx,
821            temp,
822            handle: Some(handle),
823            path_options: Arc::new(options.path),
824        })
825    }
826
827    pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
828        let (tx, rx) = oneshot::channel();
829        self.tx.send_async(ActorMessage::Get { hash, tx }).await?;
830        Ok(rx.await??)
831    }
832
833    async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
834        let (tx, rx) = oneshot::channel();
835        self.tx
836            .send_async(ActorMessage::GetOrCreate { hash, tx })
837            .await?;
838        Ok(rx.await??)
839    }
840
841    async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
842        let (tx, rx) = oneshot::channel();
843        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
844            let v = v.value();
845            if let EntryState::Complete { .. } = &v {
846                Some((k.value(), v))
847            } else {
848                None
849            }
850        });
851        self.tx
852            .send_async(ActorMessage::Blobs { filter, tx })
853            .await?;
854        let blobs = rx.await?;
855        let res = blobs?
856            .into_iter()
857            .map(|r| {
858                r.map(|(hash, _)| hash)
859                    .map_err(|e| ActorError::from(e).into())
860            })
861            .collect::<Vec<_>>();
862        Ok(res)
863    }
864
865    async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
866        let (tx, rx) = oneshot::channel();
867        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
868            let v = v.value();
869            if let EntryState::Partial { .. } = &v {
870                Some((k.value(), v))
871            } else {
872                None
873            }
874        });
875        self.tx
876            .send_async(ActorMessage::Blobs { filter, tx })
877            .await?;
878        let blobs = rx.await?;
879        let res = blobs?
880            .into_iter()
881            .map(|r| {
882                r.map(|(hash, _)| hash)
883                    .map_err(|e| ActorError::from(e).into())
884            })
885            .collect::<Vec<_>>();
886        Ok(res)
887    }
888
889    async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
890        let (tx, rx) = oneshot::channel();
891        let filter: FilterPredicate<Tag, HashAndFormat> =
892            Box::new(|_i, k, v| Some((k.value(), v.value())));
893        self.tx
894            .send_async(ActorMessage::Tags { filter, tx })
895            .await?;
896        let tags = rx.await?;
897        // transform the internal error type into io::Error
898        let tags = tags?
899            .into_iter()
900            .map(|r| r.map_err(|e| ActorError::from(e).into()))
901            .collect();
902        Ok(tags)
903    }
904
905    async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
906        let (tx, rx) = oneshot::channel();
907        self.tx
908            .send_async(ActorMessage::SetTag { tag, value, tx })
909            .await?;
910        Ok(rx.await??)
911    }
912
913    async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
914        let (tx, rx) = oneshot::channel();
915        self.tx
916            .send_async(ActorMessage::CreateTag { hash, tx })
917            .await?;
918        Ok(rx.await??)
919    }
920
921    async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
922        let (tx, rx) = oneshot::channel();
923        self.tx
924            .send_async(ActorMessage::Delete { hashes, tx })
925            .await?;
926        Ok(rx.await??)
927    }
928
929    async fn gc_start(&self) -> OuterResult<()> {
930        let (tx, rx) = oneshot::channel();
931        self.tx.send_async(ActorMessage::GcStart { tx }).await?;
932        Ok(rx.await?)
933    }
934
935    async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
936        let (tx, rx) = flume::bounded(1);
937        self.tx
938            .send_async(ActorMessage::EntryStatus { hash: *hash, tx })
939            .await?;
940        Ok(rx.into_recv_async().await??)
941    }
942
943    fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
944        let (tx, rx) = flume::bounded(1);
945        self.tx
946            .send(ActorMessage::EntryStatus { hash: *hash, tx })?;
947        Ok(rx.recv()??)
948    }
949
950    async fn complete(&self, entry: Entry) -> OuterResult<()> {
951        self.tx
952            .send_async(ActorMessage::OnComplete { handle: entry })
953            .await?;
954        Ok(())
955    }
956
957    async fn export(
958        &self,
959        hash: Hash,
960        target: PathBuf,
961        mode: ExportMode,
962        progress: ExportProgressCb,
963    ) -> OuterResult<()> {
964        tracing::debug!(
965            "exporting {} to {} using mode {:?}",
966            hash.to_hex(),
967            target.display(),
968            mode
969        );
970        if !target.is_absolute() {
971            return Err(io::Error::new(
972                io::ErrorKind::InvalidInput,
973                "target path must be absolute",
974            )
975            .into());
976        }
977        let parent = target.parent().ok_or_else(|| {
978            OuterError::from(io::Error::new(
979                io::ErrorKind::InvalidInput,
980                "target path has no parent directory",
981            ))
982        })?;
983        std::fs::create_dir_all(parent)?;
984        let temp_tag = self.temp_tag(HashAndFormat::raw(hash));
985        let (tx, rx) = oneshot::channel();
986        self.tx
987            .send_async(ActorMessage::Export {
988                cmd: Export {
989                    temp_tag,
990                    target,
991                    mode,
992                    progress,
993                },
994                tx,
995            })
996            .await?;
997        Ok(rx.await??)
998    }
999
1000    async fn consistency_check(
1001        &self,
1002        repair: bool,
1003        progress: BoxedProgressSender<ConsistencyCheckProgress>,
1004    ) -> OuterResult<()> {
1005        let (tx, rx) = oneshot::channel();
1006        self.tx
1007            .send_async(ActorMessage::Fsck {
1008                repair,
1009                progress,
1010                tx,
1011            })
1012            .await?;
1013        Ok(rx.await??)
1014    }
1015
1016    async fn import_flat_store(&self, paths: FlatStorePaths) -> OuterResult<bool> {
1017        let (tx, rx) = oneshot::channel();
1018        self.tx
1019            .send_async(ActorMessage::ImportFlatStore { paths, tx })
1020            .await?;
1021        Ok(rx.await?)
1022    }
1023
1024    async fn update_inline_options(
1025        &self,
1026        inline_options: InlineOptions,
1027        reapply: bool,
1028    ) -> OuterResult<()> {
1029        let (tx, rx) = oneshot::channel();
1030        self.tx
1031            .send_async(ActorMessage::UpdateInlineOptions {
1032                inline_options,
1033                reapply,
1034                tx,
1035            })
1036            .await?;
1037        Ok(rx.await?)
1038    }
1039
1040    async fn dump(&self) -> OuterResult<()> {
1041        self.tx.send_async(ActorMessage::Dump).await?;
1042        Ok(())
1043    }
1044
1045    async fn sync(&self) -> OuterResult<()> {
1046        let (tx, rx) = oneshot::channel();
1047        self.tx.send_async(ActorMessage::Sync { tx }).await?;
1048        Ok(rx.await?)
1049    }
1050
1051    fn temp_tag(&self, content: HashAndFormat) -> TempTag {
1052        TempTag::new(content, Some(self.temp.clone()))
1053    }
1054
1055    fn import_file_sync(
1056        &self,
1057        path: PathBuf,
1058        mode: ImportMode,
1059        format: BlobFormat,
1060        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1061    ) -> OuterResult<(TempTag, u64)> {
1062        if !path.is_absolute() {
1063            return Err(
1064                io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1065            );
1066        }
1067        if !path.is_file() && !path.is_symlink() {
1068            return Err(io::Error::new(
1069                io::ErrorKind::InvalidInput,
1070                "path is not a file or symlink",
1071            )
1072            .into());
1073        }
1074        let id = progress.new_id();
1075        progress.blocking_send(ImportProgress::Found {
1076            id,
1077            name: path.to_string_lossy().to_string(),
1078        })?;
1079        let file = match mode {
1080            ImportMode::TryReference => ImportSource::External(path),
1081            ImportMode::Copy => {
1082                if std::fs::metadata(&path)?.len() < 16 * 1024 {
1083                    // we don't know if the data will be inlined since we don't
1084                    // have the inline options here. But still for such a small file
1085                    // it does not seem worth it do to the temp file ceremony.
1086                    let data = std::fs::read(&path)?;
1087                    ImportSource::Memory(data.into())
1088                } else {
1089                    let temp_path = self.temp_file_name();
1090                    // copy the data, since it is not stable
1091                    progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1092                    if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1093                        tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1094                    } else {
1095                        tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1096                    }
1097                    // copy progress for size will be called in finalize_import_sync
1098                    ImportSource::TempFile(temp_path)
1099                }
1100            }
1101        };
1102        let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1103        Ok((tag, size))
1104    }
1105
1106    fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1107        let id = 0;
1108        let file = ImportSource::Memory(data);
1109        let progress = IgnoreProgressSender::default();
1110        let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1111        Ok(tag)
1112    }
1113
1114    fn finalize_import_sync(
1115        &self,
1116        file: ImportSource,
1117        format: BlobFormat,
1118        id: u64,
1119        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1120    ) -> OuterResult<(TempTag, u64)> {
1121        let data_size = file.len()?;
1122        tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1123        progress.blocking_send(ImportProgress::Size {
1124            id,
1125            size: data_size,
1126        })?;
1127        let progress2 = progress.clone();
1128        let (hash, outboard) = match file.content() {
1129            MemOrFile::File(path) => {
1130                let span = trace_span!("outboard.compute", path = %path.display());
1131                let _guard = span.enter();
1132                let file = std::fs::File::open(path)?;
1133                compute_outboard(file, data_size, move |offset| {
1134                    Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1135                })?
1136            }
1137            MemOrFile::Mem(bytes) => {
1138                // todo: progress? usually this is will be small enough that progress might not be needed.
1139                compute_outboard(bytes, data_size, |_| Ok(()))?
1140            }
1141        };
1142        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1143        // from here on, everything related to the hash is protected by the temp tag
1144        let tag = self.temp_tag(HashAndFormat { hash, format });
1145        let hash = *tag.hash();
1146        // blocking send for the import
1147        let (tx, rx) = flume::bounded(1);
1148        self.tx.send(ActorMessage::Import {
1149            cmd: Import {
1150                content_id: HashAndFormat { hash, format },
1151                source: file,
1152                outboard,
1153                data_size,
1154            },
1155            tx,
1156        })?;
1157        Ok(rx.recv()??)
1158    }
1159
1160    fn temp_file_name(&self) -> PathBuf {
1161        self.path_options.temp_file_name()
1162    }
1163
1164    async fn shutdown(&self) {
1165        let (tx, rx) = oneshot::channel();
1166        self.tx
1167            .send_async(ActorMessage::Shutdown { tx: Some(tx) })
1168            .await
1169            .ok();
1170        rx.await.ok();
1171    }
1172}
1173
1174impl Drop for StoreInner {
1175    fn drop(&mut self) {
1176        if let Some(handle) = self.handle.take() {
1177            self.tx.send(ActorMessage::Shutdown { tx: None }).ok();
1178            handle.join().ok();
1179        }
1180    }
1181}
1182
1183struct ActorState {
1184    handles: BTreeMap<Hash, BaoFileHandleWeak>,
1185    protected: BTreeSet<Hash>,
1186    temp: Arc<RwLock<TempCounterMap>>,
1187    msgs: flume::Receiver<ActorMessage>,
1188    create_options: Arc<BaoFileConfig>,
1189    options: Options,
1190    rt: tokio::runtime::Handle,
1191}
1192
1193/// The actor for the redb store.
1194///
1195/// It is split into the database and the rest of the state to allow for split
1196/// borrows in the message handlers.
1197struct Actor {
1198    db: redb::Database,
1199    state: ActorState,
1200}
1201
1202/// Error type for message handler functions of the redb actor.
1203///
1204/// What can go wrong are various things with redb, as well as io errors related
1205/// to files other than redb.
1206#[derive(Debug, thiserror::Error)]
1207pub(crate) enum ActorError {
1208    #[error("table error: {0}")]
1209    Table(#[from] redb::TableError),
1210    #[error("database error: {0}")]
1211    Database(#[from] redb::DatabaseError),
1212    #[error("transaction error: {0}")]
1213    Transaction(#[from] redb::TransactionError),
1214    #[error("commit error: {0}")]
1215    Commit(#[from] redb::CommitError),
1216    #[error("storage error: {0}")]
1217    Storage(#[from] redb::StorageError),
1218    #[error("io error: {0}")]
1219    Io(#[from] io::Error),
1220    #[error("inconsistent database state: {0}")]
1221    Inconsistent(String),
1222    #[error("error during database migration: {0}")]
1223    Migration(#[source] anyhow::Error),
1224}
1225
1226impl From<ActorError> for io::Error {
1227    fn from(e: ActorError) -> Self {
1228        match e {
1229            ActorError::Io(e) => e,
1230            e => io::Error::new(io::ErrorKind::Other, e),
1231        }
1232    }
1233}
1234
1235/// Result type for handler functions of the redb actor.
1236///
1237/// See [`ActorError`] for what can go wrong.
1238pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1239
1240/// Error type for calling the redb actor from the store.
1241///
1242/// What can go wrong is all the things in [`ActorError`] and in addition
1243/// sending and receiving messages.
1244#[derive(Debug, thiserror::Error)]
1245pub(crate) enum OuterError {
1246    #[error("inner error: {0}")]
1247    Inner(#[from] ActorError),
1248    #[error("send error: {0}")]
1249    Send(#[from] flume::SendError<ActorMessage>),
1250    #[error("progress send error: {0}")]
1251    ProgressSend(#[from] ProgressSendError),
1252    #[error("recv error: {0}")]
1253    Recv(#[from] oneshot::error::RecvError),
1254    #[error("recv error: {0}")]
1255    FlumeRecv(#[from] flume::RecvError),
1256    #[error("join error: {0}")]
1257    JoinTask(#[from] tokio::task::JoinError),
1258}
1259
1260/// Result type for calling the redb actor from the store.
1261///
1262/// See [`OuterError`] for what can go wrong.
1263pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1264
1265impl From<io::Error> for OuterError {
1266    fn from(e: io::Error) -> Self {
1267        OuterError::Inner(ActorError::Io(e))
1268    }
1269}
1270
1271impl From<OuterError> for io::Error {
1272    fn from(e: OuterError) -> Self {
1273        match e {
1274            OuterError::Inner(ActorError::Io(e)) => e,
1275            e => io::Error::new(io::ErrorKind::Other, e),
1276        }
1277    }
1278}
1279
1280impl super::Map for Store {
1281    type Entry = Entry;
1282
1283    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1284        Ok(self.0.get(*hash).await?.map(From::from))
1285    }
1286}
1287
1288impl super::MapMut for Store {
1289    type EntryMut = Entry;
1290
1291    async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1292        Ok(self.0.get_or_create(hash).await?)
1293    }
1294
1295    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1296        Ok(self.0.entry_status(hash).await?)
1297    }
1298
1299    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1300        self.get(hash).await
1301    }
1302
1303    async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1304        Ok(self.0.complete(entry).await?)
1305    }
1306
1307    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1308        Ok(self.0.entry_status_sync(hash)?)
1309    }
1310}
1311
1312impl super::ReadableStore for Store {
1313    async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1314        Ok(Box::new(self.0.blobs().await?.into_iter()))
1315    }
1316
1317    async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1318        Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1319    }
1320
1321    async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1322        Ok(Box::new(self.0.tags().await?.into_iter()))
1323    }
1324
1325    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1326        Box::new(self.0.temp.read().unwrap().keys())
1327    }
1328
1329    async fn consistency_check(
1330        &self,
1331        repair: bool,
1332        tx: BoxedProgressSender<ConsistencyCheckProgress>,
1333    ) -> io::Result<()> {
1334        self.0.consistency_check(repair, tx.clone()).await?;
1335        Ok(())
1336    }
1337
1338    async fn export(
1339        &self,
1340        hash: Hash,
1341        target: PathBuf,
1342        mode: ExportMode,
1343        progress: ExportProgressCb,
1344    ) -> io::Result<()> {
1345        Ok(self.0.export(hash, target, mode, progress).await?)
1346    }
1347}
1348
1349impl super::Store for Store {
1350    async fn import_file(
1351        &self,
1352        path: PathBuf,
1353        mode: ImportMode,
1354        format: BlobFormat,
1355        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1356    ) -> io::Result<(crate::TempTag, u64)> {
1357        let this = self.0.clone();
1358        Ok(
1359            tokio::task::spawn_blocking(move || {
1360                this.import_file_sync(path, mode, format, progress)
1361            })
1362            .await??,
1363        )
1364    }
1365
1366    async fn import_bytes(
1367        &self,
1368        data: bytes::Bytes,
1369        format: iroh_base::hash::BlobFormat,
1370    ) -> io::Result<crate::TempTag> {
1371        let this = self.0.clone();
1372        Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1373    }
1374
1375    async fn import_stream(
1376        &self,
1377        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1378        format: BlobFormat,
1379        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1380    ) -> io::Result<(TempTag, u64)> {
1381        let this = self.clone();
1382        let id = progress.new_id();
1383        // write to a temp file
1384        let temp_data_path = this.0.temp_file_name();
1385        let name = temp_data_path
1386            .file_name()
1387            .expect("just created")
1388            .to_string_lossy()
1389            .to_string();
1390        progress.send(ImportProgress::Found { id, name }).await?;
1391        let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1392        let mut offset = 0;
1393        while let Some(chunk) = data.next().await {
1394            let chunk = chunk?;
1395            writer.write_all(&chunk).await?;
1396            offset += chunk.len() as u64;
1397            progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1398        }
1399        writer.flush().await?;
1400        drop(writer);
1401        let file = ImportSource::TempFile(temp_data_path);
1402        Ok(tokio::task::spawn_blocking(move || {
1403            this.0.finalize_import_sync(file, format, id, progress)
1404        })
1405        .await??)
1406    }
1407
1408    async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
1409        Ok(self.0.set_tag(name, hash).await?)
1410    }
1411
1412    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1413        Ok(self.0.create_tag(hash).await?)
1414    }
1415
1416    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1417        Ok(self.0.delete(hashes).await?)
1418    }
1419
1420    async fn gc_start(&self) -> io::Result<()> {
1421        self.0.gc_start().await?;
1422        Ok(())
1423    }
1424
1425    fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1426        self.0.temp_tag(value)
1427    }
1428
1429    async fn shutdown(&self) {
1430        self.0.shutdown().await;
1431    }
1432}
1433
1434impl Actor {
1435    fn new(
1436        path: &Path,
1437        options: Options,
1438        temp: Arc<RwLock<TempCounterMap>>,
1439        rt: tokio::runtime::Handle,
1440    ) -> ActorResult<(Self, flume::Sender<ActorMessage>)> {
1441        let db = match redb::Database::create(path) {
1442            Ok(db) => db,
1443            Err(DatabaseError::UpgradeRequired(1)) => {
1444                migrate_redb_v1_v2::run(path).map_err(ActorError::Migration)?
1445            }
1446            Err(err) => return Err(err.into()),
1447        };
1448
1449        let txn = db.begin_write()?;
1450        // create tables and drop them just to create them.
1451        let mut t = Default::default();
1452        let tables = Tables::new(&txn, &mut t)?;
1453        drop(tables);
1454        txn.commit()?;
1455        // make the channel relatively large. there are some messages that don't
1456        // require a response, it's fine if they pile up a bit.
1457        let (tx, rx) = flume::bounded(1024);
1458        let tx2 = tx.clone();
1459        let on_file_create: CreateCb = Arc::new(move |hash| {
1460            // todo: make the callback allow async
1461            tx2.send(ActorMessage::OnMemSizeExceeded { hash: *hash })
1462                .ok();
1463            Ok(())
1464        });
1465        let create_options = BaoFileConfig::new(
1466            Arc::new(options.path.data_path.clone()),
1467            16 * 1024,
1468            Some(on_file_create),
1469        );
1470        Ok((
1471            Self {
1472                db,
1473                state: ActorState {
1474                    temp,
1475                    handles: BTreeMap::new(),
1476                    protected: BTreeSet::new(),
1477                    msgs: rx,
1478                    options,
1479                    create_options: Arc::new(create_options),
1480                    rt,
1481                },
1482            },
1483            tx,
1484        ))
1485    }
1486
1487    fn run_batched(mut self) -> ActorResult<()> {
1488        let mut msgs = PeekableFlumeReceiver::new(self.state.msgs.clone());
1489        while let Some(msg) = msgs.recv() {
1490            if let ActorMessage::Shutdown { tx } = msg {
1491                if let Some(tx) = tx {
1492                    tx.send(()).ok();
1493                }
1494                break;
1495            }
1496            match msg.category() {
1497                MessageCategory::TopLevel => {
1498                    self.state.handle_toplevel(&self.db, msg)?;
1499                }
1500                MessageCategory::ReadOnly => {
1501                    msgs.push_back(msg).expect("just recv'd");
1502                    tracing::debug!("starting read transaction");
1503                    let txn = self.db.begin_read()?;
1504                    let tables = ReadOnlyTables::new(&txn)?;
1505                    let count = self.state.options.batch.max_read_batch;
1506                    let timeout = self.state.options.batch.max_read_duration;
1507                    for msg in msgs.batch_iter(count, timeout) {
1508                        if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1509                            msgs.push_back(msg).expect("just recv'd");
1510                            break;
1511                        }
1512                    }
1513                    tracing::debug!("done with read transaction");
1514                }
1515                MessageCategory::ReadWrite => {
1516                    msgs.push_back(msg).expect("just recv'd");
1517                    tracing::debug!("starting write transaction");
1518                    let txn = self.db.begin_write()?;
1519                    let mut delete_after_commit = Default::default();
1520                    let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1521                    let count = self.state.options.batch.max_write_batch;
1522                    let timeout = self.state.options.batch.max_write_duration;
1523                    for msg in msgs.batch_iter(count, timeout) {
1524                        if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1525                            msgs.push_back(msg).expect("just recv'd");
1526                            break;
1527                        }
1528                    }
1529                    drop(tables);
1530                    txn.commit()?;
1531                    delete_after_commit.apply_and_clear(&self.state.options.path);
1532                    tracing::debug!("write transaction committed");
1533                }
1534            }
1535        }
1536        tracing::debug!("redb actor done");
1537        Ok(())
1538    }
1539}
1540
1541impl ActorState {
1542    fn entry_status(
1543        &mut self,
1544        tables: &impl ReadableTables,
1545        hash: Hash,
1546    ) -> ActorResult<EntryStatus> {
1547        let status = match tables.blobs().get(hash)? {
1548            Some(guard) => match guard.value() {
1549                EntryState::Complete { .. } => EntryStatus::Complete,
1550                EntryState::Partial { .. } => EntryStatus::Partial,
1551            },
1552            None => EntryStatus::NotFound,
1553        };
1554        Ok(status)
1555    }
1556
1557    fn get(
1558        &mut self,
1559        tables: &impl ReadableTables,
1560        hash: Hash,
1561    ) -> ActorResult<Option<BaoFileHandle>> {
1562        if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1563            return Ok(Some(handle));
1564        }
1565        let Some(entry) = tables.blobs().get(hash)? else {
1566            return Ok(None);
1567        };
1568        // todo: if complete, load inline data and/or outboard into memory if needed,
1569        // and return a complete entry.
1570        let entry = entry.value();
1571        let config = self.create_options.clone();
1572        let handle = match entry {
1573            EntryState::Complete {
1574                data_location,
1575                outboard_location,
1576            } => {
1577                let data = load_data(tables, &self.options.path, data_location, &hash)?;
1578                let outboard = load_outboard(
1579                    tables,
1580                    &self.options.path,
1581                    outboard_location,
1582                    data.size(),
1583                    &hash,
1584                )?;
1585                BaoFileHandle::new_complete(config, hash, data, outboard)
1586            }
1587            EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1588        };
1589        self.handles.insert(hash, handle.downgrade());
1590        Ok(Some(handle))
1591    }
1592
1593    fn export(
1594        &mut self,
1595        tables: &mut Tables,
1596        cmd: Export,
1597        tx: oneshot::Sender<ActorResult<()>>,
1598    ) -> ActorResult<()> {
1599        let Export {
1600            temp_tag,
1601            target,
1602            mode,
1603            progress,
1604        } = cmd;
1605        let guard = tables
1606            .blobs
1607            .get(temp_tag.hash())?
1608            .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1609        let entry = guard.value();
1610        match entry {
1611            EntryState::Complete {
1612                data_location,
1613                outboard_location,
1614            } => match data_location {
1615                DataLocation::Inline(()) => {
1616                    // ignore export mode, just copy. For inline data we can not reference anyway.
1617                    let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1618                        ActorError::Inconsistent("inline data not found".to_owned())
1619                    })?;
1620                    tracing::trace!("exporting inline data to {}", target.display());
1621                    tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1622                        .ok();
1623                }
1624                DataLocation::Owned(size) => {
1625                    let path = self.options.path.owned_data_path(temp_tag.hash());
1626                    match mode {
1627                        ExportMode::Copy => {
1628                            // copy in an external thread
1629                            self.rt.spawn_blocking(move || {
1630                                tx.send(export_file_copy(temp_tag, path, size, target, progress))
1631                                    .ok();
1632                            });
1633                        }
1634                        ExportMode::TryReference => match std::fs::rename(&path, &target) {
1635                            Ok(()) => {
1636                                let entry = EntryState::Complete {
1637                                    data_location: DataLocation::External(vec![target], size),
1638                                    outboard_location,
1639                                };
1640                                drop(guard);
1641                                tables.blobs.insert(temp_tag.hash(), entry)?;
1642                                drop(temp_tag);
1643                                tx.send(Ok(())).ok();
1644                            }
1645                            Err(e) => {
1646                                const ERR_CROSS: i32 = 18;
1647                                if e.raw_os_error() == Some(ERR_CROSS) {
1648                                    // Cross device renaming failed, copy instead
1649                                    match std::fs::copy(&path, &target) {
1650                                        Ok(_) => {
1651                                            let entry = EntryState::Complete {
1652                                                data_location: DataLocation::External(
1653                                                    vec![target],
1654                                                    size,
1655                                                ),
1656                                                outboard_location,
1657                                            };
1658
1659                                            drop(guard);
1660                                            tables.blobs.insert(temp_tag.hash(), entry)?;
1661                                            tables
1662                                                .delete_after_commit
1663                                                .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1664                                            drop(temp_tag);
1665
1666                                            tx.send(Ok(())).ok();
1667                                        }
1668                                        Err(e) => {
1669                                            drop(temp_tag);
1670                                            tx.send(Err(e.into())).ok();
1671                                        }
1672                                    }
1673                                } else {
1674                                    drop(temp_tag);
1675                                    tx.send(Err(e.into())).ok();
1676                                }
1677                            }
1678                        },
1679                    }
1680                }
1681                DataLocation::External(paths, size) => {
1682                    let path = paths
1683                        .first()
1684                        .ok_or_else(|| {
1685                            ActorError::Inconsistent("external path missing".to_owned())
1686                        })?
1687                        .to_owned();
1688                    // we can not reference external files, so we just copy them. But this does not have to happen in the actor.
1689                    if path == target {
1690                        // export to the same path, nothing to do
1691                        tx.send(Ok(())).ok();
1692                    } else {
1693                        // copy in an external thread
1694                        self.rt.spawn_blocking(move || {
1695                            tx.send(export_file_copy(temp_tag, path, size, target, progress))
1696                                .ok();
1697                        });
1698                    }
1699                }
1700            },
1701            EntryState::Partial { .. } => {
1702                return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1703            }
1704        }
1705        Ok(())
1706    }
1707
1708    fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1709        let Import {
1710            content_id,
1711            source: file,
1712            outboard,
1713            data_size,
1714        } = cmd;
1715        let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1716        let inline_data = data_size <= self.options.inline.max_data_inlined;
1717        let inline_outboard =
1718            outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1719        // from here on, everything related to the hash is protected by the temp tag
1720        let tag = TempTag::new(content_id, Some(self.temp.clone()));
1721        let hash = *tag.hash();
1722        self.protected.insert(hash);
1723        // move the data file into place, or create a reference to it
1724        let data_location = match file {
1725            ImportSource::External(external_path) => {
1726                tracing::debug!("stored external reference {}", external_path.display());
1727                if inline_data {
1728                    tracing::debug!(
1729                        "reading external data to inline it: {}",
1730                        external_path.display()
1731                    );
1732                    let data = Bytes::from(std::fs::read(&external_path)?);
1733                    DataLocation::Inline(data)
1734                } else {
1735                    DataLocation::External(vec![external_path], data_size)
1736                }
1737            }
1738            ImportSource::TempFile(temp_data_path) => {
1739                if inline_data {
1740                    tracing::debug!(
1741                        "reading and deleting temp file to inline it: {}",
1742                        temp_data_path.display()
1743                    );
1744                    let data = Bytes::from(read_and_remove(&temp_data_path)?);
1745                    DataLocation::Inline(data)
1746                } else {
1747                    let data_path = self.options.path.owned_data_path(&hash);
1748                    std::fs::rename(&temp_data_path, &data_path)?;
1749                    tracing::debug!("created file {}", data_path.display());
1750                    DataLocation::Owned(data_size)
1751                }
1752            }
1753            ImportSource::Memory(data) => {
1754                if inline_data {
1755                    DataLocation::Inline(data)
1756                } else {
1757                    let data_path = self.options.path.owned_data_path(&hash);
1758                    overwrite_and_sync(&data_path, &data)?;
1759                    tracing::debug!("created file {}", data_path.display());
1760                    DataLocation::Owned(data_size)
1761                }
1762            }
1763        };
1764        let outboard_location = if let Some(outboard) = outboard {
1765            if inline_outboard {
1766                OutboardLocation::Inline(Bytes::from(outboard))
1767            } else {
1768                let outboard_path = self.options.path.owned_outboard_path(&hash);
1769                // todo: this blocks the actor when writing a large outboard
1770                overwrite_and_sync(&outboard_path, &outboard)?;
1771                OutboardLocation::Owned
1772            }
1773        } else {
1774            OutboardLocation::NotNeeded
1775        };
1776        if let DataLocation::Inline(data) = &data_location {
1777            tables.inline_data.insert(hash, data.as_ref())?;
1778        }
1779        if let OutboardLocation::Inline(outboard) = &outboard_location {
1780            tables.inline_outboard.insert(hash, outboard.as_ref())?;
1781        }
1782        if let DataLocation::Owned(_) = &data_location {
1783            tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1784        }
1785        if let OutboardLocation::Owned = &outboard_location {
1786            tables
1787                .delete_after_commit
1788                .remove(hash, [BaoFilePart::Outboard]);
1789        }
1790        let entry = tables.blobs.get(hash)?;
1791        let entry = entry.map(|x| x.value()).unwrap_or_default();
1792        let data_location = data_location.discard_inline_data();
1793        let outboard_location = outboard_location.discard_extra_data();
1794        let entry = entry.union(EntryState::Complete {
1795            data_location,
1796            outboard_location,
1797        })?;
1798        tables.blobs.insert(hash, entry)?;
1799        Ok((tag, data_size))
1800    }
1801
1802    fn get_or_create(
1803        &mut self,
1804        tables: &impl ReadableTables,
1805        hash: Hash,
1806    ) -> ActorResult<BaoFileHandle> {
1807        self.protected.insert(hash);
1808        if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1809            return Ok(handle);
1810        }
1811        let entry = tables.blobs().get(hash)?;
1812        let handle = if let Some(entry) = entry {
1813            let entry = entry.value();
1814            match entry {
1815                EntryState::Complete {
1816                    data_location,
1817                    outboard_location,
1818                    ..
1819                } => {
1820                    let data = load_data(tables, &self.options.path, data_location, &hash)?;
1821                    let outboard = load_outboard(
1822                        tables,
1823                        &self.options.path,
1824                        outboard_location,
1825                        data.size(),
1826                        &hash,
1827                    )?;
1828                    println!("creating complete entry for {}", hash.to_hex());
1829                    BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1830                }
1831                EntryState::Partial { .. } => {
1832                    println!("creating partial entry for {}", hash.to_hex());
1833                    BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1834                }
1835            }
1836        } else {
1837            BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1838        };
1839        self.handles.insert(hash, handle.downgrade());
1840        Ok(handle)
1841    }
1842
1843    /// Read the entire blobs table. Callers can then sift through the results to find what they need
1844    fn blobs(
1845        &mut self,
1846        tables: &impl ReadableTables,
1847        filter: FilterPredicate<Hash, EntryState>,
1848    ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1849        let mut res = Vec::new();
1850        let mut index = 0u64;
1851        #[allow(clippy::explicit_counter_loop)]
1852        for item in tables.blobs().iter()? {
1853            match item {
1854                Ok((k, v)) => {
1855                    if let Some(item) = filter(index, k, v) {
1856                        res.push(Ok(item));
1857                    }
1858                }
1859                Err(e) => {
1860                    res.push(Err(e));
1861                }
1862            }
1863            index += 1;
1864        }
1865        Ok(res)
1866    }
1867
1868    /// Read the entire tags table. Callers can then sift through the results to find what they need
1869    fn tags(
1870        &mut self,
1871        tables: &impl ReadableTables,
1872        filter: FilterPredicate<Tag, HashAndFormat>,
1873    ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
1874        let mut res = Vec::new();
1875        let mut index = 0u64;
1876        #[allow(clippy::explicit_counter_loop)]
1877        for item in tables.tags().iter()? {
1878            match item {
1879                Ok((k, v)) => {
1880                    if let Some(item) = filter(index, k, v) {
1881                        res.push(Ok(item));
1882                    }
1883                }
1884                Err(e) => {
1885                    res.push(Err(e));
1886                }
1887            }
1888            index += 1;
1889        }
1890        Ok(res)
1891    }
1892
1893    fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
1894        let tag = {
1895            let tag = Tag::auto(SystemTime::now(), |x| {
1896                matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
1897            });
1898            tables.tags.insert(tag.clone(), content)?;
1899            tag
1900        };
1901        Ok(tag)
1902    }
1903
1904    fn set_tag(
1905        &self,
1906        tables: &mut Tables,
1907        tag: Tag,
1908        value: Option<HashAndFormat>,
1909    ) -> ActorResult<()> {
1910        match value {
1911            Some(value) => {
1912                tables.tags.insert(tag, value)?;
1913            }
1914            None => {
1915                tables.tags.remove(tag)?;
1916            }
1917        }
1918        Ok(())
1919    }
1920
1921    fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
1922        let entry = tables
1923            .blobs
1924            .get(hash)?
1925            .map(|x| x.value())
1926            .unwrap_or_default();
1927        let entry = entry.union(EntryState::Partial { size: None })?;
1928        tables.blobs.insert(hash, entry)?;
1929        // protect all three parts of the entry
1930        tables.delete_after_commit.remove(
1931            hash,
1932            [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
1933        );
1934        Ok(())
1935    }
1936
1937    fn update_inline_options(
1938        &mut self,
1939        db: &redb::Database,
1940        options: InlineOptions,
1941        reapply: bool,
1942    ) -> ActorResult<()> {
1943        self.options.inline = options;
1944        if reapply {
1945            let mut delete_after_commit = Default::default();
1946            let tx = db.begin_write()?;
1947            {
1948                let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
1949                let hashes = tables
1950                    .blobs
1951                    .iter()?
1952                    .map(|x| x.map(|(k, _)| k.value()))
1953                    .collect::<Result<Vec<_>, _>>()?;
1954                for hash in hashes {
1955                    let guard = tables
1956                        .blobs
1957                        .get(hash)?
1958                        .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
1959                    let entry = guard.value();
1960                    if let EntryState::Complete {
1961                        data_location,
1962                        outboard_location,
1963                    } = entry
1964                    {
1965                        let (data_location, data_size, data_location_changed) = match data_location
1966                        {
1967                            DataLocation::Owned(size) => {
1968                                // inline
1969                                if size <= self.options.inline.max_data_inlined {
1970                                    let path = self.options.path.owned_data_path(&hash);
1971                                    let data = std::fs::read(&path)?;
1972                                    tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
1973                                    tables.inline_data.insert(hash, data.as_slice())?;
1974                                    (DataLocation::Inline(()), size, true)
1975                                } else {
1976                                    (DataLocation::Owned(size), size, false)
1977                                }
1978                            }
1979                            DataLocation::Inline(()) => {
1980                                let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
1981                                    ActorError::Inconsistent("inline data missing".to_owned())
1982                                })?;
1983                                let data = guard.value();
1984                                let size = data.len() as u64;
1985                                if size > self.options.inline.max_data_inlined {
1986                                    let path = self.options.path.owned_data_path(&hash);
1987                                    std::fs::write(&path, data)?;
1988                                    drop(guard);
1989                                    tables.inline_data.remove(hash)?;
1990                                    (DataLocation::Owned(size), size, true)
1991                                } else {
1992                                    (DataLocation::Inline(()), size, false)
1993                                }
1994                            }
1995                            DataLocation::External(paths, size) => {
1996                                (DataLocation::External(paths, size), size, false)
1997                            }
1998                        };
1999                        let outboard_size = raw_outboard_size(data_size);
2000                        let (outboard_location, outboard_location_changed) = match outboard_location
2001                        {
2002                            OutboardLocation::Owned
2003                                if outboard_size <= self.options.inline.max_outboard_inlined =>
2004                            {
2005                                let path = self.options.path.owned_outboard_path(&hash);
2006                                let outboard = std::fs::read(&path)?;
2007                                tables
2008                                    .delete_after_commit
2009                                    .insert(hash, [BaoFilePart::Outboard]);
2010                                tables.inline_outboard.insert(hash, outboard.as_slice())?;
2011                                (OutboardLocation::Inline(()), true)
2012                            }
2013                            OutboardLocation::Inline(())
2014                                if outboard_size > self.options.inline.max_outboard_inlined =>
2015                            {
2016                                let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2017                                    ActorError::Inconsistent("inline outboard missing".to_owned())
2018                                })?;
2019                                let outboard = guard.value();
2020                                let path = self.options.path.owned_outboard_path(&hash);
2021                                std::fs::write(&path, outboard)?;
2022                                drop(guard);
2023                                tables.inline_outboard.remove(hash)?;
2024                                (OutboardLocation::Owned, true)
2025                            }
2026                            x => (x, false),
2027                        };
2028                        drop(guard);
2029                        if data_location_changed || outboard_location_changed {
2030                            tables.blobs.insert(
2031                                hash,
2032                                EntryState::Complete {
2033                                    data_location,
2034                                    outboard_location,
2035                                },
2036                            )?;
2037                        }
2038                    }
2039                }
2040            }
2041            tx.commit()?;
2042            delete_after_commit.apply_and_clear(&self.options.path);
2043        }
2044        Ok(())
2045    }
2046
2047    fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>) -> ActorResult<()> {
2048        for hash in hashes {
2049            if self.temp.as_ref().read().unwrap().contains(&hash) {
2050                continue;
2051            }
2052            if self.protected.contains(&hash) {
2053                tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2054                continue;
2055            }
2056            tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2057            self.handles.remove(&hash);
2058            if let Some(entry) = tables.blobs.remove(hash)? {
2059                match entry.value() {
2060                    EntryState::Complete {
2061                        data_location,
2062                        outboard_location,
2063                    } => {
2064                        match data_location {
2065                            DataLocation::Inline(_) => {
2066                                tables.inline_data.remove(hash)?;
2067                            }
2068                            DataLocation::Owned(_) => {
2069                                // mark the data for deletion
2070                                tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2071                            }
2072                            DataLocation::External(_, _) => {}
2073                        }
2074                        match outboard_location {
2075                            OutboardLocation::Inline(_) => {
2076                                tables.inline_outboard.remove(hash)?;
2077                            }
2078                            OutboardLocation::Owned => {
2079                                // mark the outboard for deletion
2080                                tables
2081                                    .delete_after_commit
2082                                    .insert(hash, [BaoFilePart::Outboard]);
2083                            }
2084                            OutboardLocation::NotNeeded => {}
2085                        }
2086                    }
2087                    EntryState::Partial { .. } => {
2088                        // mark all parts for deletion
2089                        tables.delete_after_commit.insert(
2090                            hash,
2091                            [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2092                        );
2093                    }
2094                }
2095            }
2096        }
2097        Ok(())
2098    }
2099
2100    fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2101        let hash = entry.hash();
2102        let mut info = None;
2103        tracing::trace!("on_complete({})", hash.to_hex());
2104        entry.transform(|state| {
2105            tracing::trace!("on_complete transform {:?}", state);
2106            let entry = match complete_storage(
2107                state,
2108                &hash,
2109                &self.options.path,
2110                &self.options.inline,
2111                tables.delete_after_commit,
2112            )? {
2113                Ok(entry) => {
2114                    // store the info so we can insert it into the db later
2115                    info = Some((
2116                        entry.data_size(),
2117                        entry.data.mem().cloned(),
2118                        entry.outboard_size(),
2119                        entry.outboard.mem().cloned(),
2120                    ));
2121                    entry
2122                }
2123                Err(entry) => {
2124                    // the entry was already complete, nothing to do
2125                    entry
2126                }
2127            };
2128            Ok(BaoFileStorage::Complete(entry))
2129        })?;
2130        if let Some((data_size, data, outboard_size, outboard)) = info {
2131            let data_location = if data.is_some() {
2132                DataLocation::Inline(())
2133            } else {
2134                DataLocation::Owned(data_size)
2135            };
2136            let outboard_location = if outboard_size == 0 {
2137                OutboardLocation::NotNeeded
2138            } else if outboard.is_some() {
2139                OutboardLocation::Inline(())
2140            } else {
2141                OutboardLocation::Owned
2142            };
2143            {
2144                tracing::debug!(
2145                    "inserting complete entry for {}, {} bytes",
2146                    hash.to_hex(),
2147                    data_size,
2148                );
2149                let entry = tables
2150                    .blobs()
2151                    .get(hash)?
2152                    .map(|x| x.value())
2153                    .unwrap_or_default();
2154                let entry = entry.union(EntryState::Complete {
2155                    data_location,
2156                    outboard_location,
2157                })?;
2158                tables.blobs.insert(hash, entry)?;
2159                if let Some(data) = data {
2160                    tables.inline_data.insert(hash, data.as_ref())?;
2161                }
2162                if let Some(outboard) = outboard {
2163                    tables.inline_outboard.insert(hash, outboard.as_ref())?;
2164                }
2165            }
2166        }
2167        Ok(())
2168    }
2169
2170    fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2171        match msg {
2172            ActorMessage::ImportFlatStore { paths, tx } => {
2173                let res = self.import_flat_store(db, paths);
2174                tx.send(res?).ok();
2175            }
2176            ActorMessage::UpdateInlineOptions {
2177                inline_options,
2178                reapply,
2179                tx,
2180            } => {
2181                let res = self.update_inline_options(db, inline_options, reapply);
2182                tx.send(res?).ok();
2183            }
2184            ActorMessage::Fsck {
2185                repair,
2186                progress,
2187                tx,
2188            } => {
2189                let res = self.consistency_check(db, repair, progress);
2190                tx.send(res).ok();
2191            }
2192            ActorMessage::Sync { tx } => {
2193                tx.send(()).ok();
2194            }
2195            x => {
2196                return Err(ActorError::Inconsistent(format!(
2197                    "unexpected message for handle_toplevel: {:?}",
2198                    x
2199                )))
2200            }
2201        }
2202        Ok(())
2203    }
2204
2205    fn handle_readonly(
2206        &mut self,
2207        tables: &impl ReadableTables,
2208        msg: ActorMessage,
2209    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2210        match msg {
2211            ActorMessage::Get { hash, tx } => {
2212                let res = self.get(tables, hash);
2213                tx.send(res).ok();
2214            }
2215            ActorMessage::GetOrCreate { hash, tx } => {
2216                let res = self.get_or_create(tables, hash);
2217                tx.send(res).ok();
2218            }
2219            ActorMessage::EntryStatus { hash, tx } => {
2220                let res = self.entry_status(tables, hash);
2221                tx.send(res).ok();
2222            }
2223            ActorMessage::Blobs { filter, tx } => {
2224                let res = self.blobs(tables, filter);
2225                tx.send(res).ok();
2226            }
2227            ActorMessage::Tags { filter, tx } => {
2228                let res = self.tags(tables, filter);
2229                tx.send(res).ok();
2230            }
2231            ActorMessage::GcStart { tx } => {
2232                self.protected.clear();
2233                self.handles.retain(|_, weak| weak.is_live());
2234                tx.send(()).ok();
2235            }
2236            ActorMessage::Dump => {
2237                dump(tables).ok();
2238            }
2239            #[cfg(test)]
2240            ActorMessage::EntryState { hash, tx } => {
2241                tx.send(self.entry_state(tables, hash)).ok();
2242            }
2243            ActorMessage::GetFullEntryState { hash, tx } => {
2244                let res = self.get_full_entry_state(tables, hash);
2245                tx.send(res).ok();
2246            }
2247            x => return Ok(Err(x)),
2248        }
2249        Ok(Ok(()))
2250    }
2251
2252    fn handle_readwrite(
2253        &mut self,
2254        tables: &mut Tables,
2255        msg: ActorMessage,
2256    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2257        match msg {
2258            ActorMessage::Import { cmd, tx } => {
2259                let res = self.import(tables, cmd);
2260                tx.send(res).ok();
2261            }
2262            ActorMessage::SetTag { tag, value, tx } => {
2263                let res = self.set_tag(tables, tag, value);
2264                tx.send(res).ok();
2265            }
2266            ActorMessage::CreateTag { hash, tx } => {
2267                let res = self.create_tag(tables, hash);
2268                tx.send(res).ok();
2269            }
2270            ActorMessage::Delete { hashes, tx } => {
2271                let res = self.delete(tables, hashes);
2272                tx.send(res).ok();
2273            }
2274            ActorMessage::OnComplete { handle } => {
2275                let res = self.on_complete(tables, handle);
2276                res.ok();
2277            }
2278            ActorMessage::Export { cmd, tx } => {
2279                self.export(tables, cmd, tx)?;
2280            }
2281            ActorMessage::OnMemSizeExceeded { hash } => {
2282                let res = self.on_mem_size_exceeded(tables, hash);
2283                res.ok();
2284            }
2285            ActorMessage::Dump => {
2286                let res = dump(tables);
2287                res.ok();
2288            }
2289            ActorMessage::SetFullEntryState { hash, entry, tx } => {
2290                let res = self.set_full_entry_state(tables, hash, entry);
2291                tx.send(res).ok();
2292            }
2293            msg => {
2294                // try to handle it as readonly
2295                if let Err(msg) = self.handle_readonly(tables, msg)? {
2296                    return Ok(Err(msg));
2297                }
2298            }
2299        }
2300        Ok(Ok(()))
2301    }
2302}
2303
2304/// Export a file by copying out its content to a new location
2305fn export_file_copy(
2306    temp_tag: TempTag,
2307    path: PathBuf,
2308    size: u64,
2309    target: PathBuf,
2310    progress: ExportProgressCb,
2311) -> ActorResult<()> {
2312    progress(0)?;
2313    // todo: fine grained copy progress
2314    reflink_copy::reflink_or_copy(path, target)?;
2315    progress(size)?;
2316    drop(temp_tag);
2317    Ok(())
2318}
2319
2320/// Synchronously compute the outboard of a file, and return hash and outboard.
2321///
2322/// It is assumed that the file is not modified while this is running.
2323///
2324/// If it is modified while or after this is running, the outboard will be
2325/// invalid, so any attempt to compute a slice from it will fail.
2326///
2327/// If the size of the file is changed while this is running, an error will be
2328/// returned.
2329///
2330/// The computed outboard is without length prefix.
2331fn compute_outboard(
2332    read: impl Read,
2333    size: u64,
2334    progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
2335) -> io::Result<(Hash, Option<Vec<u8>>)> {
2336    use bao_tree::io::sync::CreateOutboard;
2337
2338    // wrap the reader in a progress reader, so we can report progress.
2339    let reader = ProgressReader::new(read, progress);
2340    // wrap the reader in a buffered reader, so we read in large chunks
2341    // this reduces the number of io ops and also the number of progress reports
2342    let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
2343    let reader = BufReader::with_capacity(buf_size, reader);
2344
2345    let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
2346    let root = ob.root.into();
2347    let data = ob.data;
2348    tracing::trace!(%root, "done");
2349    let data = if !data.is_empty() { Some(data) } else { None };
2350    Ok((root, data))
2351}
2352
2353fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2354    for e in tables.blobs().iter()? {
2355        let (k, v) = e?;
2356        let k = k.value();
2357        let v = v.value();
2358        println!("blobs: {} -> {:?}", k.to_hex(), v);
2359    }
2360    for e in tables.tags().iter()? {
2361        let (k, v) = e?;
2362        let k = k.value();
2363        let v = v.value();
2364        println!("tags: {} -> {:?}", k, v);
2365    }
2366    for e in tables.inline_data().iter()? {
2367        let (k, v) = e?;
2368        let k = k.value();
2369        let v = v.value();
2370        println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2371    }
2372    for e in tables.inline_outboard().iter()? {
2373        let (k, v) = e?;
2374        let k = k.value();
2375        let v = v.value();
2376        println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2377    }
2378    Ok(())
2379}
2380
2381fn load_data(
2382    tables: &impl ReadableTables,
2383    options: &PathOptions,
2384    location: DataLocation<(), u64>,
2385    hash: &Hash,
2386) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2387    Ok(match location {
2388        DataLocation::Inline(()) => {
2389            let Some(data) = tables.inline_data().get(hash)? else {
2390                return Err(ActorError::Inconsistent(format!(
2391                    "inconsistent database state: {} should have inline data but does not",
2392                    hash.to_hex()
2393                )));
2394            };
2395            MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2396        }
2397        DataLocation::Owned(data_size) => {
2398            let path = options.owned_data_path(hash);
2399            let Ok(file) = std::fs::File::open(&path) else {
2400                return Err(io::Error::new(
2401                    io::ErrorKind::NotFound,
2402                    format!("file not found: {}", path.display()),
2403                )
2404                .into());
2405            };
2406            MemOrFile::File((file, data_size))
2407        }
2408        DataLocation::External(paths, data_size) => {
2409            if paths.is_empty() {
2410                return Err(ActorError::Inconsistent(
2411                    "external data location must not be empty".into(),
2412                ));
2413            }
2414            let path = &paths[0];
2415            let Ok(file) = std::fs::File::open(path) else {
2416                return Err(io::Error::new(
2417                    io::ErrorKind::NotFound,
2418                    format!("external file not found: {}", path.display()),
2419                )
2420                .into());
2421            };
2422            MemOrFile::File((file, data_size))
2423        }
2424    })
2425}
2426
2427fn load_outboard(
2428    tables: &impl ReadableTables,
2429    options: &PathOptions,
2430    location: OutboardLocation,
2431    size: u64,
2432    hash: &Hash,
2433) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2434    Ok(match location {
2435        OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2436        OutboardLocation::Inline(_) => {
2437            let Some(outboard) = tables.inline_outboard().get(hash)? else {
2438                return Err(ActorError::Inconsistent(format!(
2439                    "inconsistent database state: {} should have inline outboard but does not",
2440                    hash.to_hex()
2441                )));
2442            };
2443            MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2444        }
2445        OutboardLocation::Owned => {
2446            let outboard_size = raw_outboard_size(size);
2447            let path = options.owned_outboard_path(hash);
2448            let Ok(file) = std::fs::File::open(&path) else {
2449                return Err(io::Error::new(
2450                    io::ErrorKind::NotFound,
2451                    format!("file not found: {} size={}", path.display(), outboard_size),
2452                )
2453                .into());
2454            };
2455            MemOrFile::File((file, outboard_size))
2456        }
2457    })
2458}
2459
2460/// Take a possibly incomplete storage and turn it into complete
2461fn complete_storage(
2462    storage: BaoFileStorage,
2463    hash: &Hash,
2464    path_options: &PathOptions,
2465    inline_options: &InlineOptions,
2466    delete_after_commit: &mut DeleteSet,
2467) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2468    let (data, outboard, _sizes) = match storage {
2469        BaoFileStorage::Complete(c) => return Ok(Err(c)),
2470        BaoFileStorage::IncompleteMem(storage) => {
2471            let (data, outboard, sizes) = storage.into_parts();
2472            (
2473                MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2474                MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2475                MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2476            )
2477        }
2478        BaoFileStorage::IncompleteFile(storage) => {
2479            let (data, outboard, sizes) = storage.into_parts();
2480            (
2481                MemOrFile::File(data),
2482                MemOrFile::File(outboard),
2483                MemOrFile::File(sizes),
2484            )
2485        }
2486    };
2487    let data_size = data.size()?.unwrap();
2488    let outboard_size = outboard.size()?.unwrap();
2489    // todo: perform more sanity checks if in debug mode
2490    debug_assert!(raw_outboard_size(data_size) == outboard_size);
2491    // inline data if needed, or write to file if needed
2492    let data = if data_size <= inline_options.max_data_inlined {
2493        match data {
2494            MemOrFile::File(data) => {
2495                let mut buf = vec![0; data_size as usize];
2496                data.read_at(0, &mut buf)?;
2497                // mark data for deletion after commit
2498                delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2499                MemOrFile::Mem(Bytes::from(buf))
2500            }
2501            MemOrFile::Mem(data) => MemOrFile::Mem(data),
2502        }
2503    } else {
2504        // protect the data from previous deletions
2505        delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2506        match data {
2507            MemOrFile::Mem(data) => {
2508                let path = path_options.owned_data_path(hash);
2509                let file = overwrite_and_sync(&path, &data)?;
2510                MemOrFile::File((file, data_size))
2511            }
2512            MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2513        }
2514    };
2515    // inline outboard if needed, or write to file if needed
2516    let outboard = if outboard_size == 0 {
2517        Default::default()
2518    } else if outboard_size <= inline_options.max_outboard_inlined {
2519        match outboard {
2520            MemOrFile::File(outboard) => {
2521                let mut buf = vec![0; outboard_size as usize];
2522                outboard.read_at(0, &mut buf)?;
2523                drop(outboard);
2524                // mark outboard for deletion after commit
2525                delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2526                MemOrFile::Mem(Bytes::from(buf))
2527            }
2528            MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2529        }
2530    } else {
2531        // protect the outboard from previous deletions
2532        delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2533        match outboard {
2534            MemOrFile::Mem(outboard) => {
2535                let path = path_options.owned_outboard_path(hash);
2536                let file = overwrite_and_sync(&path, &outboard)?;
2537                MemOrFile::File((file, outboard_size))
2538            }
2539            MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2540        }
2541    };
2542    // mark sizes for deletion after commit in any case - a complete entry
2543    // does not need sizes.
2544    delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2545    Ok(Ok(CompleteStorage { data, outboard }))
2546}