iroh_blobs/store/
fs.rs

1//! redb backed storage
2//!
3//! Data can get into the store in two ways:
4//!
5//! 1. import from local data
6//! 2. sync from a remote
7//!
8//! These two cases are very different. In the first case, we have the data
9//! completely and don't know the hash yet. We compute the outboard and hash,
10//! and only then move/reference the data into the store.
11//!
12//! The entry for the hash comes into existence already complete.
13//!
14//! In the second case, we know the hash, but don't have the data yet. We create
15//! a partial entry, and then request the data from the remote. This is the more
16//! complex case.
17//!
18//! Partial entries always start as pure in memory entries without a database
19//! entry. Only once we receive enough data, we convert them into a persistent
20//! partial entry. This is necessary because we can't trust the size given
21//! by the remote side before receiving data. It is also an optimization,
22//! because for small blobs it is not worth it to create a partial entry.
23//!
24//! A persistent partial entry is always stored as three files in the file
25//! system: The data file, the outboard file, and a sizes file that contains
26//! the most up to date information about the size of the data.
27//!
28//! The redb database entry for a persistent partial entry does not contain
29//! any information about the size of the data until the size is exactly known.
30//!
31//! Updating this information on each write would be too costly.
32//!
33//! Marking a partial entry as complete is done from the outside. At this point
34//! the size is taken as validated. Depending on the size we decide whether to
35//! store data and outboard inline or to keep storing it in external files.
36//!
37//! Data can get out of the store in two ways:
38//!
39//! 1. the data and outboard of both partial and complete entries can be read at any time and
40//!    shared over the network. Only data that is complete will be shared, everything else will
41//!    lead to validation errors.
42//!
43//! 2. entries can be exported to the file system. This currently only works for complete entries.
44//!
45//! Tables:
46//!
47//! The blobs table contains a mapping from hash to rough entry state.
48//! The inline_data table contains the actual data for complete entries.
49//! The inline_outboard table contains the actual outboard for complete entries.
50//! The tags table contains a mapping from tag to hash.
51//!
52//! Design:
53//!
54//! The redb store is accessed in a single threaded way by an actor that runs
55//! on its own std thread. Communication with this actor is via a flume channel,
56//! with oneshot channels for the return values if needed.
57//!
58//! Errors:
59//!
60//! ActorError is an enum containing errors that can happen inside message
61//! handlers of the actor. This includes various redb related errors and io
62//! errors when reading or writing non-inlined data or outboard files.
63//!
64//! OuterError is an enum containing all the actor errors and in addition
65//! errors when communicating with the actor.
66use std::{
67    collections::{BTreeMap, BTreeSet},
68    future::Future,
69    io,
70    ops::Bound,
71    path::{Path, PathBuf},
72    sync::{Arc, RwLock},
73    time::{Duration, SystemTime},
74};
75
76use bao_tree::io::{
77    fsm::Outboard,
78    sync::{ReadAt, Size},
79};
80use bytes::Bytes;
81use futures_lite::{Stream, StreamExt};
82use genawaiter::rc::{Co, Gen};
83use iroh_io::AsyncSliceReader;
84use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
85use serde::{Deserialize, Serialize};
86use smallvec::SmallVec;
87use tokio::io::AsyncWriteExt;
88use tracing::trace_span;
89mod tables;
90#[doc(hidden)]
91pub mod test_support;
92#[cfg(test)]
93mod tests;
94mod util;
95mod validate;
96
97use tables::{ReadOnlyTables, ReadableTables, Tables};
98
99use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
100use super::{
101    bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102    temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
103    ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
104};
105use crate::{
106    store::{
107        bao_file::{BaoFileStorage, CompleteStorage},
108        fs::{
109            tables::BaoFilePart,
110            util::{overwrite_and_sync, read_and_remove},
111        },
112        GcMarkEvent, GcSweepEvent,
113    },
114    util::{
115        compute_outboard,
116        progress::{
117            BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
118            ProgressSender,
119        },
120        raw_outboard_size, MemOrFile, TagCounter, TagDrop,
121    },
122    BlobFormat, Hash, HashAndFormat, Tag, TempTag,
123};
124
125/// Location of the data.
126///
127/// Data can be inlined in the database, a file conceptually owned by the store,
128/// or a number of external files conceptually owned by the user.
129///
130/// Only complete data can be inlined.
131#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum DataLocation<I = (), E = ()> {
133    /// Data is in the inline_data table.
134    Inline(I),
135    /// Data is in the canonical location in the data directory.
136    Owned(E),
137    /// Data is in several external locations. This should be a non-empty list.
138    External(Vec<PathBuf>, E),
139}
140
141impl<X> DataLocation<X, u64> {
142    fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
143        Ok(match (self, that) {
144            (
145                DataLocation::External(mut paths, a_size),
146                DataLocation::External(b_paths, b_size),
147            ) => {
148                if a_size != b_size {
149                    return Err(ActorError::Inconsistent(format!(
150                        "complete size mismatch {} {}",
151                        a_size, b_size
152                    )));
153                }
154                paths.extend(b_paths);
155                paths.sort();
156                paths.dedup();
157                DataLocation::External(paths, a_size)
158            }
159            (_, b @ DataLocation::Owned(_)) => {
160                // owned needs to win, since it has an associated file. Choosing
161                // external would orphan the file.
162                b
163            }
164            (a @ DataLocation::Owned(_), _) => {
165                // owned needs to win, since it has an associated file. Choosing
166                // external would orphan the file.
167                a
168            }
169            (_, b @ DataLocation::Inline(_)) => {
170                // inline needs to win, since it has associated data. Choosing
171                // external would orphan the file.
172                b
173            }
174            (a @ DataLocation::Inline(_), _) => {
175                // inline needs to win, since it has associated data. Choosing
176                // external would orphan the file.
177                a
178            }
179        })
180    }
181}
182
183impl<I, E> DataLocation<I, E> {
184    fn discard_inline_data(self) -> DataLocation<(), E> {
185        match self {
186            DataLocation::Inline(_) => DataLocation::Inline(()),
187            DataLocation::Owned(x) => DataLocation::Owned(x),
188            DataLocation::External(paths, x) => DataLocation::External(paths, x),
189        }
190    }
191}
192
193/// Location of the outboard.
194///
195/// Outboard can be inlined in the database or a file conceptually owned by the store.
196/// Outboards are implementation specific to the store and as such are always owned.
197///
198/// Only complete outboards can be inlined.
199#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
200pub(crate) enum OutboardLocation<I = ()> {
201    /// Outboard is in the inline_outboard table.
202    Inline(I),
203    /// Outboard is in the canonical location in the data directory.
204    Owned,
205    /// Outboard is not needed
206    NotNeeded,
207}
208
209impl<I> OutboardLocation<I> {
210    fn discard_extra_data(self) -> OutboardLocation<()> {
211        match self {
212            Self::Inline(_) => OutboardLocation::Inline(()),
213            Self::Owned => OutboardLocation::Owned,
214            Self::NotNeeded => OutboardLocation::NotNeeded,
215        }
216    }
217}
218
219/// The information about an entry that we keep in the entry table for quick access.
220///
221/// The exact info to store here is TBD, so usually you should use the accessor methods.
222#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub(crate) enum EntryState<I = ()> {
224    /// For a complete entry we always know the size. It does not make much sense
225    /// to write to a complete entry, so they are much easier to share.
226    Complete {
227        /// Location of the data.
228        data_location: DataLocation<I, u64>,
229        /// Location of the outboard.
230        outboard_location: OutboardLocation<I>,
231    },
232    /// Partial entries are entries for which we know the hash, but don't have
233    /// all the data. They are created when syncing from somewhere else by hash.
234    ///
235    /// As such they are always owned. There is also no inline storage for them.
236    /// Non short lived partial entries always live in the file system, and for
237    /// short lived ones we never create a database entry in the first place.
238    Partial {
239        /// Once we get the last chunk of a partial entry, we have validated
240        /// the size of the entry despite it still being incomplete.
241        ///
242        /// E.g. a giant file where we just requested the last chunk.
243        size: Option<u64>,
244    },
245}
246
247impl Default for EntryState {
248    fn default() -> Self {
249        Self::Partial { size: None }
250    }
251}
252
253impl EntryState {
254    fn union(self, that: Self) -> ActorResult<Self> {
255        match (self, that) {
256            (
257                Self::Complete {
258                    data_location,
259                    outboard_location,
260                },
261                Self::Complete {
262                    data_location: b_data_location,
263                    ..
264                },
265            ) => Ok(Self::Complete {
266                // combine external paths if needed
267                data_location: data_location.union(b_data_location)?,
268                outboard_location,
269            }),
270            (a @ Self::Complete { .. }, Self::Partial { .. }) =>
271            // complete wins over partial
272            {
273                Ok(a)
274            }
275            (Self::Partial { .. }, b @ Self::Complete { .. }) =>
276            // complete wins over partial
277            {
278                Ok(b)
279            }
280            (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
281            // keep known size from either entry
282            {
283                let size = match (a_size, b_size) {
284                    (Some(a_size), Some(b_size)) => {
285                        // validated sizes are different. this means that at
286                        // least one validation was wrong, which would be a bug
287                        // in bao-tree.
288                        if a_size != b_size {
289                            return Err(ActorError::Inconsistent(format!(
290                                "validated size mismatch {} {}",
291                                a_size, b_size
292                            )));
293                        }
294                        Some(a_size)
295                    }
296                    (Some(a_size), None) => Some(a_size),
297                    (None, Some(b_size)) => Some(b_size),
298                    (None, None) => None,
299                };
300                Ok(Self::Partial { size })
301            }
302        }
303    }
304}
305
306impl redb::Value for EntryState {
307    type SelfType<'a> = EntryState;
308
309    type AsBytes<'a> = SmallVec<[u8; 128]>;
310
311    fn fixed_width() -> Option<usize> {
312        None
313    }
314
315    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
316    where
317        Self: 'a,
318    {
319        postcard::from_bytes(data).unwrap()
320    }
321
322    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
323    where
324        Self: 'a,
325        Self: 'b,
326    {
327        postcard::to_extend(value, SmallVec::new()).unwrap()
328    }
329
330    fn type_name() -> redb::TypeName {
331        redb::TypeName::new("EntryState")
332    }
333}
334
335/// Options for inlining small complete data or outboards.
336#[derive(Debug, Clone)]
337pub struct InlineOptions {
338    /// Maximum data size to inline.
339    pub max_data_inlined: u64,
340    /// Maximum outboard size to inline.
341    pub max_outboard_inlined: u64,
342}
343
344impl InlineOptions {
345    /// Do not inline anything, ever.
346    pub const NO_INLINE: Self = Self {
347        max_data_inlined: 0,
348        max_outboard_inlined: 0,
349    };
350    /// Always inline everything
351    pub const ALWAYS_INLINE: Self = Self {
352        max_data_inlined: u64::MAX,
353        max_outboard_inlined: u64::MAX,
354    };
355}
356
357impl Default for InlineOptions {
358    fn default() -> Self {
359        Self {
360            max_data_inlined: 1024 * 16,
361            max_outboard_inlined: 1024 * 16,
362        }
363    }
364}
365
366/// Options for directories used by the file store.
367#[derive(Debug, Clone)]
368pub struct PathOptions {
369    /// Path to the directory where data and outboard files are stored.
370    pub data_path: PathBuf,
371    /// Path to the directory where temp files are stored.
372    /// This *must* be on the same device as `data_path`, since we need to
373    /// atomically move temp files into place.
374    pub temp_path: PathBuf,
375}
376
377impl PathOptions {
378    fn new(root: &Path) -> Self {
379        Self {
380            data_path: root.join("data"),
381            temp_path: root.join("temp"),
382        }
383    }
384
385    fn owned_data_path(&self, hash: &Hash) -> PathBuf {
386        self.data_path.join(format!("{}.data", hash.to_hex()))
387    }
388
389    fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
390        self.data_path.join(format!("{}.obao4", hash.to_hex()))
391    }
392
393    fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
394        self.data_path.join(format!("{}.sizes4", hash.to_hex()))
395    }
396
397    fn temp_file_name(&self) -> PathBuf {
398        self.temp_path.join(temp_name())
399    }
400}
401
402/// Options for transaction batching.
403#[derive(Debug, Clone)]
404pub struct BatchOptions {
405    /// Maximum number of actor messages to batch before creating a new read transaction.
406    pub max_read_batch: usize,
407    /// Maximum duration to wait before committing a read transaction.
408    pub max_read_duration: Duration,
409    /// Maximum number of actor messages to batch before committing write transaction.
410    pub max_write_batch: usize,
411    /// Maximum duration to wait before committing a write transaction.
412    pub max_write_duration: Duration,
413}
414
415impl Default for BatchOptions {
416    fn default() -> Self {
417        Self {
418            max_read_batch: 10000,
419            max_read_duration: Duration::from_secs(1),
420            max_write_batch: 1000,
421            max_write_duration: Duration::from_millis(500),
422        }
423    }
424}
425
426/// Options for the file store.
427#[derive(Debug, Clone)]
428pub struct Options {
429    /// Path options.
430    pub path: PathOptions,
431    /// Inline storage options.
432    pub inline: InlineOptions,
433    /// Transaction batching options.
434    pub batch: BatchOptions,
435}
436
437#[derive(derive_more::Debug)]
438pub(crate) enum ImportSource {
439    TempFile(PathBuf),
440    External(PathBuf),
441    Memory(#[debug(skip)] Bytes),
442}
443
444impl ImportSource {
445    fn content(&self) -> MemOrFile<&[u8], &Path> {
446        match self {
447            Self::TempFile(path) => MemOrFile::File(path.as_path()),
448            Self::External(path) => MemOrFile::File(path.as_path()),
449            Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
450        }
451    }
452
453    fn len(&self) -> io::Result<u64> {
454        match self {
455            Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
456            Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
457            Self::Memory(data) => Ok(data.len() as u64),
458        }
459    }
460}
461
462/// Use BaoFileHandle as the entry type for the map.
463pub type Entry = BaoFileHandle;
464
465impl super::MapEntry for Entry {
466    fn hash(&self) -> Hash {
467        self.hash()
468    }
469
470    fn size(&self) -> BaoBlobSize {
471        let size = self.current_size().unwrap();
472        tracing::trace!("redb::Entry::size() = {}", size);
473        BaoBlobSize::new(size, self.is_complete())
474    }
475
476    fn is_complete(&self) -> bool {
477        self.is_complete()
478    }
479
480    async fn outboard(&self) -> io::Result<impl Outboard> {
481        self.outboard()
482    }
483
484    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
485        Ok(self.data_reader())
486    }
487}
488
489impl super::MapEntryMut for Entry {
490    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
491        Ok(self.writer())
492    }
493}
494
495#[derive(derive_more::Debug)]
496pub(crate) struct Import {
497    /// The hash and format of the data to import
498    content_id: HashAndFormat,
499    /// The source of the data to import, can be a temp file, external file, or memory
500    source: ImportSource,
501    /// Data size
502    data_size: u64,
503    /// Outboard without length prefix
504    #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
505    outboard: Option<Vec<u8>>,
506}
507
508#[derive(derive_more::Debug)]
509pub(crate) struct Export {
510    /// A temp tag to keep the entry alive while exporting. This also
511    /// contains the hash to be exported.
512    temp_tag: TempTag,
513    /// The target path for the export.
514    target: PathBuf,
515    /// The export mode to use.
516    mode: ExportMode,
517    /// The progress callback to use.
518    #[debug(skip)]
519    progress: ExportProgressCb,
520}
521
522#[derive(derive_more::Debug)]
523pub(crate) enum ActorMessage {
524    // Query method: get a file handle for a hash, if it exists.
525    // This will produce a file handle even for entries that are not yet in redb at all.
526    Get {
527        hash: Hash,
528        tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
529    },
530    /// Query method: get the rough entry status for a hash. Just complete, partial or not found.
531    EntryStatus {
532        hash: Hash,
533        tx: oneshot::Sender<ActorResult<EntryStatus>>,
534    },
535    #[cfg(test)]
536    /// Query method: get the full entry state for a hash, both in memory and in redb.
537    /// This is everything we got about the entry, including the actual inline outboard and data.
538    EntryState {
539        hash: Hash,
540        tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
541    },
542    /// Query method: get the full entry state for a hash.
543    GetFullEntryState {
544        hash: Hash,
545        tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
546    },
547    /// Modification method: set the full entry state for a hash.
548    SetFullEntryState {
549        hash: Hash,
550        entry: Option<EntryData>,
551        tx: oneshot::Sender<ActorResult<()>>,
552    },
553    /// Modification method: get or create a file handle for a hash.
554    ///
555    /// If the entry exists in redb, either partial or complete, the corresponding
556    /// data will be returned. If it does not yet exist, a new partial file handle
557    /// will be created, but not yet written to redb.
558    GetOrCreate {
559        hash: Hash,
560        tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
561    },
562    /// Modification method: inline size was exceeded for a partial entry.
563    /// If the entry is complete, this is a no-op. If the entry is partial and in
564    /// memory, it will be written to a file and created in redb.
565    OnMemSizeExceeded { hash: Hash },
566    /// Modification method: marks a partial entry as complete.
567    /// Calling this on a complete entry is a no-op.
568    OnComplete { handle: BaoFileHandle },
569    /// Modification method: import data into a redb store
570    ///
571    /// At this point the size, hash and outboard must already be known.
572    Import {
573        cmd: Import,
574        tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
575    },
576    /// Modification method: export data from a redb store
577    ///
578    /// In most cases this will not modify the store. Only when using
579    /// [`ExportMode::TryReference`] and the entry is large enough to not be
580    /// inlined.
581    Export {
582        cmd: Export,
583        tx: oneshot::Sender<ActorResult<()>>,
584    },
585    /// Update inline options
586    UpdateInlineOptions {
587        /// The new inline options
588        inline_options: InlineOptions,
589        /// Whether to reapply the new options to existing entries
590        reapply: bool,
591        tx: oneshot::Sender<()>,
592    },
593    /// Bulk query method: get entries from the blobs table
594    Blobs {
595        #[debug(skip)]
596        filter: FilterPredicate<Hash, EntryState>,
597        #[allow(clippy::type_complexity)]
598        tx: oneshot::Sender<
599            ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
600        >,
601    },
602    /// Bulk query method: get the entire tags table
603    Tags {
604        from: Option<Tag>,
605        to: Option<Tag>,
606        #[allow(clippy::type_complexity)]
607        tx: oneshot::Sender<
608            ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
609        >,
610    },
611    /// Modification method: set a tag to a value.
612    SetTag {
613        tag: Tag,
614        value: HashAndFormat,
615        tx: oneshot::Sender<ActorResult<()>>,
616    },
617    /// Modification method: set a tag to a value.
618    DeleteTags {
619        from: Option<Tag>,
620        to: Option<Tag>,
621        tx: oneshot::Sender<ActorResult<()>>,
622    },
623    /// Modification method: create a new unique tag and set it to a value.
624    CreateTag {
625        hash: HashAndFormat,
626        tx: oneshot::Sender<ActorResult<Tag>>,
627    },
628    /// Modification method: rename a tag atomically.
629    RenameTag {
630        from: Tag,
631        to: Tag,
632        tx: oneshot::Sender<ActorResult<()>>,
633    },
634    /// Modification method: unconditional delete the data for a number of hashes
635    Delete {
636        hashes: Vec<Hash>,
637        tx: oneshot::Sender<ActorResult<()>>,
638    },
639    /// Modification method: delete the data for a number of hashes, only if not protected
640    GcDelete {
641        hashes: Vec<Hash>,
642        tx: oneshot::Sender<ActorResult<()>>,
643    },
644    /// Sync the entire database to disk.
645    ///
646    /// This just makes sure that there is no write transaction open.
647    Sync { tx: oneshot::Sender<()> },
648    /// Internal method: dump the entire database to stdout.
649    Dump,
650    /// Internal method: validate the entire database.
651    ///
652    /// Note that this will block the actor until it is done, so don't use it
653    /// on a node under load.
654    Fsck {
655        repair: bool,
656        progress: BoxedProgressSender<ConsistencyCheckProgress>,
657        tx: oneshot::Sender<ActorResult<()>>,
658    },
659    /// Internal method: notify the actor that a new gc epoch has started.
660    ///
661    /// This will be called periodically and can be used to do misc cleanups.
662    GcStart { tx: oneshot::Sender<()> },
663    /// Internal method: shutdown the actor.
664    ///
665    /// Can have an optional oneshot sender to signal when the actor has shut down.
666    Shutdown { tx: Option<oneshot::Sender<()>> },
667}
668
669impl ActorMessage {
670    fn category(&self) -> MessageCategory {
671        match self {
672            Self::Get { .. }
673            | Self::GetOrCreate { .. }
674            | Self::EntryStatus { .. }
675            | Self::Blobs { .. }
676            | Self::Tags { .. }
677            | Self::GcStart { .. }
678            | Self::GetFullEntryState { .. }
679            | Self::Dump => MessageCategory::ReadOnly,
680            Self::Import { .. }
681            | Self::Export { .. }
682            | Self::OnMemSizeExceeded { .. }
683            | Self::OnComplete { .. }
684            | Self::SetTag { .. }
685            | Self::CreateTag { .. }
686            | Self::SetFullEntryState { .. }
687            | Self::Delete { .. }
688            | Self::DeleteTags { .. }
689            | Self::RenameTag { .. }
690            | Self::GcDelete { .. } => MessageCategory::ReadWrite,
691            Self::UpdateInlineOptions { .. }
692            | Self::Sync { .. }
693            | Self::Shutdown { .. }
694            | Self::Fsck { .. } => MessageCategory::TopLevel,
695            #[cfg(test)]
696            Self::EntryState { .. } => MessageCategory::ReadOnly,
697        }
698    }
699}
700
701enum MessageCategory {
702    ReadOnly,
703    ReadWrite,
704    TopLevel,
705}
706
707/// Predicate for filtering entries in a redb table.
708pub(crate) type FilterPredicate<K, V> =
709    Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
710
711/// Storage that is using a redb database for small files and files for
712/// large files.
713#[derive(Debug, Clone)]
714pub struct Store(Arc<StoreInner>);
715
716impl Store {
717    /// Load or create a new store.
718    pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
719        let path = root.as_ref();
720        let db_path = path.join("blobs.db");
721        let options = Options {
722            path: PathOptions::new(path),
723            inline: Default::default(),
724            batch: Default::default(),
725        };
726        Self::new(db_path, options).await
727    }
728
729    /// Create a new store with custom options.
730    pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
731        // spawn_blocking because StoreInner::new creates directories
732        let rt = tokio::runtime::Handle::try_current()
733            .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
734        let inner =
735            tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
736        Ok(Self(Arc::new(inner)))
737    }
738
739    /// Update the inline options.
740    ///
741    /// When reapply is true, the new options will be applied to all existing
742    /// entries.
743    pub async fn update_inline_options(
744        &self,
745        inline_options: InlineOptions,
746        reapply: bool,
747    ) -> io::Result<()> {
748        Ok(self
749            .0
750            .update_inline_options(inline_options, reapply)
751            .await?)
752    }
753
754    /// Dump the entire content of the database to stdout.
755    pub async fn dump(&self) -> io::Result<()> {
756        Ok(self.0.dump().await?)
757    }
758}
759
760#[derive(Debug)]
761struct StoreInner {
762    tx: async_channel::Sender<ActorMessage>,
763    temp: Arc<RwLock<TempCounterMap>>,
764    handle: Option<std::thread::JoinHandle<()>>,
765    path_options: Arc<PathOptions>,
766}
767
768impl TagDrop for RwLock<TempCounterMap> {
769    fn on_drop(&self, content: &HashAndFormat) {
770        self.write().unwrap().dec(content);
771    }
772}
773
774impl TagCounter for RwLock<TempCounterMap> {
775    fn on_create(&self, content: &HashAndFormat) {
776        self.write().unwrap().inc(content);
777    }
778}
779
780impl StoreInner {
781    fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
782        tracing::trace!(
783            "creating data directory: {}",
784            options.path.data_path.display()
785        );
786        std::fs::create_dir_all(&options.path.data_path)?;
787        tracing::trace!(
788            "creating temp directory: {}",
789            options.path.temp_path.display()
790        );
791        std::fs::create_dir_all(&options.path.temp_path)?;
792        tracing::trace!(
793            "creating parent directory for db file{}",
794            path.parent().unwrap().display()
795        );
796        std::fs::create_dir_all(path.parent().unwrap())?;
797        let temp: Arc<RwLock<TempCounterMap>> = Default::default();
798        let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
799        let handle = std::thread::Builder::new()
800            .name("redb-actor".to_string())
801            .spawn(move || {
802                rt.block_on(async move {
803                    if let Err(cause) = actor.run_batched().await {
804                        tracing::error!("redb actor failed: {}", cause);
805                    }
806                });
807            })
808            .expect("failed to spawn thread");
809        Ok(Self {
810            tx,
811            temp,
812            handle: Some(handle),
813            path_options: Arc::new(options.path),
814        })
815    }
816
817    pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
818        let (tx, rx) = oneshot::channel();
819        self.tx.send(ActorMessage::Get { hash, tx }).await?;
820        Ok(rx.await??)
821    }
822
823    async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
824        let (tx, rx) = oneshot::channel();
825        self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
826        Ok(rx.await??)
827    }
828
829    async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
830        let (tx, rx) = oneshot::channel();
831        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
832            let v = v.value();
833            if let EntryState::Complete { .. } = &v {
834                Some((k.value(), v))
835            } else {
836                None
837            }
838        });
839        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
840        let blobs = rx.await?;
841        let res = blobs?
842            .into_iter()
843            .map(|r| {
844                r.map(|(hash, _)| hash)
845                    .map_err(|e| ActorError::from(e).into())
846            })
847            .collect::<Vec<_>>();
848        Ok(res)
849    }
850
851    async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
852        let (tx, rx) = oneshot::channel();
853        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
854            let v = v.value();
855            if let EntryState::Partial { .. } = &v {
856                Some((k.value(), v))
857            } else {
858                None
859            }
860        });
861        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
862        let blobs = rx.await?;
863        let res = blobs?
864            .into_iter()
865            .map(|r| {
866                r.map(|(hash, _)| hash)
867                    .map_err(|e| ActorError::from(e).into())
868            })
869            .collect::<Vec<_>>();
870        Ok(res)
871    }
872
873    async fn tags(
874        &self,
875        from: Option<Tag>,
876        to: Option<Tag>,
877    ) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
878        let (tx, rx) = oneshot::channel();
879        self.tx.send(ActorMessage::Tags { from, to, tx }).await?;
880        let tags = rx.await?;
881        // transform the internal error type into io::Error
882        let tags = tags?
883            .into_iter()
884            .map(|r| r.map_err(|e| ActorError::from(e).into()))
885            .collect();
886        Ok(tags)
887    }
888
889    async fn set_tag(&self, tag: Tag, value: HashAndFormat) -> OuterResult<()> {
890        let (tx, rx) = oneshot::channel();
891        self.tx
892            .send(ActorMessage::SetTag { tag, value, tx })
893            .await?;
894        Ok(rx.await??)
895    }
896
897    async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> OuterResult<()> {
898        let (tx, rx) = oneshot::channel();
899        self.tx
900            .send(ActorMessage::DeleteTags { from, to, tx })
901            .await?;
902        Ok(rx.await??)
903    }
904
905    async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
906        let (tx, rx) = oneshot::channel();
907        self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
908        Ok(rx.await??)
909    }
910
911    async fn rename_tag(&self, from: Tag, to: Tag) -> OuterResult<()> {
912        let (tx, rx) = oneshot::channel();
913        self.tx
914            .send(ActorMessage::RenameTag { from, to, tx })
915            .await?;
916        Ok(rx.await??)
917    }
918
919    async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
920        let (tx, rx) = oneshot::channel();
921        self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
922        Ok(rx.await??)
923    }
924
925    async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
926        let (tx, rx) = oneshot::channel();
927        self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
928        Ok(rx.await??)
929    }
930
931    async fn gc_start(&self) -> OuterResult<()> {
932        let (tx, rx) = oneshot::channel();
933        self.tx.send(ActorMessage::GcStart { tx }).await?;
934        Ok(rx.await?)
935    }
936
937    async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
938        let (tx, rx) = oneshot::channel();
939        self.tx
940            .send(ActorMessage::EntryStatus { hash: *hash, tx })
941            .await?;
942        Ok(rx.await??)
943    }
944
945    fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
946        let (tx, rx) = oneshot::channel();
947        self.tx
948            .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
949        Ok(rx.recv()??)
950    }
951
952    async fn complete(&self, entry: Entry) -> OuterResult<()> {
953        self.tx
954            .send(ActorMessage::OnComplete { handle: entry })
955            .await?;
956        Ok(())
957    }
958
959    async fn export(
960        &self,
961        hash: Hash,
962        target: PathBuf,
963        mode: ExportMode,
964        progress: ExportProgressCb,
965    ) -> OuterResult<()> {
966        tracing::debug!(
967            "exporting {} to {} using mode {:?}",
968            hash.to_hex(),
969            target.display(),
970            mode
971        );
972        if !target.is_absolute() {
973            return Err(io::Error::new(
974                io::ErrorKind::InvalidInput,
975                "target path must be absolute",
976            )
977            .into());
978        }
979        let parent = target.parent().ok_or_else(|| {
980            OuterError::from(io::Error::new(
981                io::ErrorKind::InvalidInput,
982                "target path has no parent directory",
983            ))
984        })?;
985        std::fs::create_dir_all(parent)?;
986        let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
987        let (tx, rx) = oneshot::channel();
988        self.tx
989            .send(ActorMessage::Export {
990                cmd: Export {
991                    temp_tag,
992                    target,
993                    mode,
994                    progress,
995                },
996                tx,
997            })
998            .await?;
999        Ok(rx.await??)
1000    }
1001
1002    async fn consistency_check(
1003        &self,
1004        repair: bool,
1005        progress: BoxedProgressSender<ConsistencyCheckProgress>,
1006    ) -> OuterResult<()> {
1007        let (tx, rx) = oneshot::channel();
1008        self.tx
1009            .send(ActorMessage::Fsck {
1010                repair,
1011                progress,
1012                tx,
1013            })
1014            .await?;
1015        Ok(rx.await??)
1016    }
1017
1018    async fn update_inline_options(
1019        &self,
1020        inline_options: InlineOptions,
1021        reapply: bool,
1022    ) -> OuterResult<()> {
1023        let (tx, rx) = oneshot::channel();
1024        self.tx
1025            .send(ActorMessage::UpdateInlineOptions {
1026                inline_options,
1027                reapply,
1028                tx,
1029            })
1030            .await?;
1031        Ok(rx.await?)
1032    }
1033
1034    async fn dump(&self) -> OuterResult<()> {
1035        self.tx.send(ActorMessage::Dump).await?;
1036        Ok(())
1037    }
1038
1039    async fn sync(&self) -> OuterResult<()> {
1040        let (tx, rx) = oneshot::channel();
1041        self.tx.send(ActorMessage::Sync { tx }).await?;
1042        Ok(rx.await?)
1043    }
1044
1045    fn import_file_sync(
1046        &self,
1047        path: PathBuf,
1048        mode: ImportMode,
1049        format: BlobFormat,
1050        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1051    ) -> OuterResult<(TempTag, u64)> {
1052        if !path.is_absolute() {
1053            return Err(
1054                io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1055            );
1056        }
1057        if !path.is_file() && !path.is_symlink() {
1058            return Err(io::Error::new(
1059                io::ErrorKind::InvalidInput,
1060                "path is not a file or symlink",
1061            )
1062            .into());
1063        }
1064        let id = progress.new_id();
1065        progress.blocking_send(ImportProgress::Found {
1066            id,
1067            name: path.to_string_lossy().to_string(),
1068        })?;
1069        let file = match mode {
1070            ImportMode::TryReference => ImportSource::External(path),
1071            ImportMode::Copy => {
1072                if std::fs::metadata(&path)?.len() < 16 * 1024 {
1073                    // we don't know if the data will be inlined since we don't
1074                    // have the inline options here. But still for such a small file
1075                    // it does not seem worth it do to the temp file ceremony.
1076                    let data = std::fs::read(&path)?;
1077                    ImportSource::Memory(data.into())
1078                } else {
1079                    let temp_path = self.temp_file_name();
1080                    // copy the data, since it is not stable
1081                    progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1082                    if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1083                        tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1084                    } else {
1085                        tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1086                    }
1087                    // copy progress for size will be called in finalize_import_sync
1088                    ImportSource::TempFile(temp_path)
1089                }
1090            }
1091        };
1092        let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1093        Ok((tag, size))
1094    }
1095
1096    fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1097        let id = 0;
1098        let file = ImportSource::Memory(data);
1099        let progress = IgnoreProgressSender::default();
1100        let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1101        Ok(tag)
1102    }
1103
1104    fn finalize_import_sync(
1105        &self,
1106        file: ImportSource,
1107        format: BlobFormat,
1108        id: u64,
1109        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1110    ) -> OuterResult<(TempTag, u64)> {
1111        let data_size = file.len()?;
1112        tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1113        progress.blocking_send(ImportProgress::Size {
1114            id,
1115            size: data_size,
1116        })?;
1117        let progress2 = progress.clone();
1118        let (hash, outboard) = match file.content() {
1119            MemOrFile::File(path) => {
1120                let span = trace_span!("outboard.compute", path = %path.display());
1121                let _guard = span.enter();
1122                let file = std::fs::File::open(path)?;
1123                compute_outboard(file, data_size, move |offset| {
1124                    Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1125                })?
1126            }
1127            MemOrFile::Mem(bytes) => {
1128                // todo: progress? usually this is will be small enough that progress might not be needed.
1129                compute_outboard(bytes, data_size, |_| Ok(()))?
1130            }
1131        };
1132        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1133        // from here on, everything related to the hash is protected by the temp tag
1134        let tag = self.temp.temp_tag(HashAndFormat { hash, format });
1135        let hash = *tag.hash();
1136        // blocking send for the import
1137        let (tx, rx) = oneshot::channel();
1138        self.tx.send_blocking(ActorMessage::Import {
1139            cmd: Import {
1140                content_id: HashAndFormat { hash, format },
1141                source: file,
1142                outboard,
1143                data_size,
1144            },
1145            tx,
1146        })?;
1147        Ok(rx.recv()??)
1148    }
1149
1150    fn temp_file_name(&self) -> PathBuf {
1151        self.path_options.temp_file_name()
1152    }
1153
1154    async fn shutdown(&self) {
1155        let (tx, rx) = oneshot::channel();
1156        self.tx
1157            .send(ActorMessage::Shutdown { tx: Some(tx) })
1158            .await
1159            .ok();
1160        rx.await.ok();
1161    }
1162}
1163
1164impl Drop for StoreInner {
1165    fn drop(&mut self) {
1166        if let Some(handle) = self.handle.take() {
1167            self.tx
1168                .send_blocking(ActorMessage::Shutdown { tx: None })
1169                .ok();
1170            handle.join().ok();
1171        }
1172    }
1173}
1174
1175struct ActorState {
1176    handles: BTreeMap<Hash, BaoFileHandleWeak>,
1177    protected: BTreeSet<Hash>,
1178    temp: Arc<RwLock<TempCounterMap>>,
1179    msgs_rx: async_channel::Receiver<ActorMessage>,
1180    create_options: Arc<BaoFileConfig>,
1181    options: Options,
1182    rt: tokio::runtime::Handle,
1183}
1184
1185/// The actor for the redb store.
1186///
1187/// It is split into the database and the rest of the state to allow for split
1188/// borrows in the message handlers.
1189struct Actor {
1190    db: redb::Database,
1191    state: ActorState,
1192}
1193
1194/// Error type for message handler functions of the redb actor.
1195///
1196/// What can go wrong are various things with redb, as well as io errors related
1197/// to files other than redb.
1198#[derive(Debug, thiserror::Error)]
1199pub(crate) enum ActorError {
1200    #[error("table error: {0}")]
1201    Table(#[from] redb::TableError),
1202    #[error("database error: {0}")]
1203    Database(#[from] redb::DatabaseError),
1204    #[error("transaction error: {0}")]
1205    Transaction(#[from] redb::TransactionError),
1206    #[error("commit error: {0}")]
1207    Commit(#[from] redb::CommitError),
1208    #[error("storage error: {0}")]
1209    Storage(#[from] redb::StorageError),
1210    #[error("io error: {0}")]
1211    Io(#[from] io::Error),
1212    #[error("inconsistent database state: {0}")]
1213    Inconsistent(String),
1214    #[error("error during database migration: {0}")]
1215    Migration(#[source] anyhow::Error),
1216}
1217
1218impl From<ActorError> for io::Error {
1219    fn from(e: ActorError) -> Self {
1220        match e {
1221            ActorError::Io(e) => e,
1222            e => io::Error::new(io::ErrorKind::Other, e),
1223        }
1224    }
1225}
1226
1227/// Result type for handler functions of the redb actor.
1228///
1229/// See [`ActorError`] for what can go wrong.
1230pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1231
1232/// Error type for calling the redb actor from the store.
1233///
1234/// What can go wrong is all the things in [`ActorError`] and in addition
1235/// sending and receiving messages.
1236#[derive(Debug, thiserror::Error)]
1237pub(crate) enum OuterError {
1238    #[error("inner error: {0}")]
1239    Inner(#[from] ActorError),
1240    #[error("send error")]
1241    Send,
1242    #[error("progress send error: {0}")]
1243    ProgressSend(#[from] ProgressSendError),
1244    #[error("recv error: {0}")]
1245    Recv(#[from] oneshot::RecvError),
1246    #[error("recv error: {0}")]
1247    AsyncChannelRecv(#[from] async_channel::RecvError),
1248    #[error("join error: {0}")]
1249    JoinTask(#[from] tokio::task::JoinError),
1250}
1251
1252impl From<async_channel::SendError<ActorMessage>> for OuterError {
1253    fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
1254        OuterError::Send
1255    }
1256}
1257
1258/// Result type for calling the redb actor from the store.
1259///
1260/// See [`OuterError`] for what can go wrong.
1261pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1262
1263impl From<io::Error> for OuterError {
1264    fn from(e: io::Error) -> Self {
1265        OuterError::Inner(ActorError::Io(e))
1266    }
1267}
1268
1269impl From<OuterError> for io::Error {
1270    fn from(e: OuterError) -> Self {
1271        match e {
1272            OuterError::Inner(ActorError::Io(e)) => e,
1273            e => io::Error::new(io::ErrorKind::Other, e),
1274        }
1275    }
1276}
1277
1278impl super::Map for Store {
1279    type Entry = Entry;
1280
1281    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1282        Ok(self.0.get(*hash).await?)
1283    }
1284}
1285
1286impl super::MapMut for Store {
1287    type EntryMut = Entry;
1288
1289    async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1290        Ok(self.0.get_or_create(hash).await?)
1291    }
1292
1293    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1294        Ok(self.0.entry_status(hash).await?)
1295    }
1296
1297    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1298        self.get(hash).await
1299    }
1300
1301    async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1302        Ok(self.0.complete(entry).await?)
1303    }
1304
1305    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1306        Ok(self.0.entry_status_sync(hash)?)
1307    }
1308}
1309
1310impl super::ReadableStore for Store {
1311    async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1312        Ok(Box::new(self.0.blobs().await?.into_iter()))
1313    }
1314
1315    async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1316        Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1317    }
1318
1319    async fn tags(
1320        &self,
1321        from: Option<Tag>,
1322        to: Option<Tag>,
1323    ) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1324        Ok(Box::new(self.0.tags(from, to).await?.into_iter()))
1325    }
1326
1327    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1328        Box::new(self.0.temp.read().unwrap().keys())
1329    }
1330
1331    async fn consistency_check(
1332        &self,
1333        repair: bool,
1334        tx: BoxedProgressSender<ConsistencyCheckProgress>,
1335    ) -> io::Result<()> {
1336        self.0.consistency_check(repair, tx.clone()).await?;
1337        Ok(())
1338    }
1339
1340    async fn export(
1341        &self,
1342        hash: Hash,
1343        target: PathBuf,
1344        mode: ExportMode,
1345        progress: ExportProgressCb,
1346    ) -> io::Result<()> {
1347        Ok(self.0.export(hash, target, mode, progress).await?)
1348    }
1349}
1350
1351impl super::Store for Store {
1352    async fn import_file(
1353        &self,
1354        path: PathBuf,
1355        mode: ImportMode,
1356        format: BlobFormat,
1357        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1358    ) -> io::Result<(crate::TempTag, u64)> {
1359        let this = self.0.clone();
1360        Ok(
1361            tokio::task::spawn_blocking(move || {
1362                this.import_file_sync(path, mode, format, progress)
1363            })
1364            .await??,
1365        )
1366    }
1367
1368    async fn import_bytes(
1369        &self,
1370        data: bytes::Bytes,
1371        format: crate::BlobFormat,
1372    ) -> io::Result<crate::TempTag> {
1373        let this = self.0.clone();
1374        Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1375    }
1376
1377    async fn import_stream(
1378        &self,
1379        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1380        format: BlobFormat,
1381        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1382    ) -> io::Result<(TempTag, u64)> {
1383        let this = self.clone();
1384        let id = progress.new_id();
1385        // write to a temp file
1386        let temp_data_path = this.0.temp_file_name();
1387        let name = temp_data_path
1388            .file_name()
1389            .expect("just created")
1390            .to_string_lossy()
1391            .to_string();
1392        progress.send(ImportProgress::Found { id, name }).await?;
1393        let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1394        let mut offset = 0;
1395        while let Some(chunk) = data.next().await {
1396            let chunk = chunk?;
1397            writer.write_all(&chunk).await?;
1398            offset += chunk.len() as u64;
1399            progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1400        }
1401        writer.flush().await?;
1402        drop(writer);
1403        let file = ImportSource::TempFile(temp_data_path);
1404        Ok(tokio::task::spawn_blocking(move || {
1405            this.0.finalize_import_sync(file, format, id, progress)
1406        })
1407        .await??)
1408    }
1409
1410    async fn set_tag(&self, name: Tag, hash: HashAndFormat) -> io::Result<()> {
1411        Ok(self.0.set_tag(name, hash).await?)
1412    }
1413
1414    async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
1415        Ok(self.0.delete_tags(from, to).await?)
1416    }
1417
1418    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1419        Ok(self.0.create_tag(hash).await?)
1420    }
1421
1422    async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
1423        Ok(self.0.rename_tag(from, to).await?)
1424    }
1425
1426    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1427        Ok(self.0.delete(hashes).await?)
1428    }
1429
1430    async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
1431    where
1432        G: Fn() -> Gut,
1433        Gut: Future<Output = BTreeSet<Hash>> + Send,
1434    {
1435        tracing::info!("Starting GC task with interval {:?}", config.period);
1436        let mut live = BTreeSet::new();
1437        'outer: loop {
1438            if let Err(cause) = self.0.gc_start().await {
1439                tracing::debug!(
1440                    "unable to notify the db of GC start: {cause}. Shutting down GC loop."
1441                );
1442                break;
1443            }
1444            // do delay before the two phases of GC
1445            tokio::time::sleep(config.period).await;
1446            tracing::debug!("Starting GC");
1447            live.clear();
1448
1449            let p = protected_cb().await;
1450            live.extend(p);
1451
1452            tracing::debug!("Starting GC mark phase");
1453            let live_ref = &mut live;
1454            let mut stream = Gen::new(|co| async move {
1455                if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
1456                    co.yield_(GcMarkEvent::Error(e)).await;
1457                }
1458            });
1459            while let Some(item) = stream.next().await {
1460                match item {
1461                    GcMarkEvent::CustomDebug(text) => {
1462                        tracing::debug!("{}", text);
1463                    }
1464                    GcMarkEvent::CustomWarning(text, _) => {
1465                        tracing::warn!("{}", text);
1466                    }
1467                    GcMarkEvent::Error(err) => {
1468                        tracing::error!("Fatal error during GC mark {}", err);
1469                        continue 'outer;
1470                    }
1471                }
1472            }
1473            drop(stream);
1474
1475            tracing::debug!("Starting GC sweep phase");
1476            let live_ref = &live;
1477            let mut stream = Gen::new(|co| async move {
1478                if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
1479                    co.yield_(GcSweepEvent::Error(e)).await;
1480                }
1481            });
1482            while let Some(item) = stream.next().await {
1483                match item {
1484                    GcSweepEvent::CustomDebug(text) => {
1485                        tracing::debug!("{}", text);
1486                    }
1487                    GcSweepEvent::CustomWarning(text, _) => {
1488                        tracing::warn!("{}", text);
1489                    }
1490                    GcSweepEvent::Error(err) => {
1491                        tracing::error!("Fatal error during GC mark {}", err);
1492                        continue 'outer;
1493                    }
1494                }
1495            }
1496            if let Some(ref cb) = config.done_callback {
1497                cb();
1498            }
1499        }
1500    }
1501
1502    fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1503        self.0.temp.temp_tag(value)
1504    }
1505
1506    async fn sync(&self) -> io::Result<()> {
1507        Ok(self.0.sync().await?)
1508    }
1509
1510    async fn shutdown(&self) {
1511        self.0.shutdown().await;
1512    }
1513}
1514
1515pub(super) async fn gc_sweep_task(
1516    store: &Store,
1517    live: &BTreeSet<Hash>,
1518    co: &Co<GcSweepEvent>,
1519) -> anyhow::Result<()> {
1520    let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
1521    let mut count = 0;
1522    let mut batch = Vec::new();
1523    for hash in blobs {
1524        let hash = hash?;
1525        if !live.contains(&hash) {
1526            batch.push(hash);
1527            count += 1;
1528        }
1529        if batch.len() >= 100 {
1530            store.0.gc_delete(batch.clone()).await?;
1531            batch.clear();
1532        }
1533    }
1534    if !batch.is_empty() {
1535        store.0.gc_delete(batch).await?;
1536    }
1537    co.yield_(GcSweepEvent::CustomDebug(format!(
1538        "deleted {} blobs",
1539        count
1540    )))
1541    .await;
1542    Ok(())
1543}
1544
1545impl Actor {
1546    fn new(
1547        path: &Path,
1548        options: Options,
1549        temp: Arc<RwLock<TempCounterMap>>,
1550        rt: tokio::runtime::Handle,
1551    ) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
1552        let db = match redb::Database::create(path) {
1553            Ok(db) => db,
1554            Err(DatabaseError::UpgradeRequired(1)) => {
1555                return Err(ActorError::Migration(anyhow::anyhow!(
1556                    "migration from v1 no longer supported"
1557                )))
1558            }
1559            Err(err) => return Err(err.into()),
1560        };
1561
1562        let txn = db.begin_write()?;
1563        // create tables and drop them just to create them.
1564        let mut t = Default::default();
1565        let tables = Tables::new(&txn, &mut t)?;
1566        drop(tables);
1567        txn.commit()?;
1568        // make the channel relatively large. there are some messages that don't
1569        // require a response, it's fine if they pile up a bit.
1570        let (tx, rx) = async_channel::bounded(1024);
1571        let tx2 = tx.clone();
1572        let on_file_create: CreateCb = Arc::new(move |hash| {
1573            // todo: make the callback allow async
1574            tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
1575                .ok();
1576            Ok(())
1577        });
1578        let create_options = BaoFileConfig::new(
1579            Arc::new(options.path.data_path.clone()),
1580            16 * 1024,
1581            Some(on_file_create),
1582        );
1583        Ok((
1584            Self {
1585                db,
1586                state: ActorState {
1587                    temp,
1588                    handles: BTreeMap::new(),
1589                    protected: BTreeSet::new(),
1590                    msgs_rx: rx,
1591                    options,
1592                    create_options: Arc::new(create_options),
1593                    rt,
1594                },
1595            },
1596            tx,
1597        ))
1598    }
1599
1600    async fn run_batched(mut self) -> ActorResult<()> {
1601        let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
1602        while let Some(msg) = msgs.recv().await {
1603            if let ActorMessage::Shutdown { tx } = msg {
1604                // Make sure the database is dropped before we send the reply.
1605                drop(self);
1606                if let Some(tx) = tx {
1607                    tx.send(()).ok();
1608                }
1609                break;
1610            }
1611            match msg.category() {
1612                MessageCategory::TopLevel => {
1613                    self.state.handle_toplevel(&self.db, msg)?;
1614                }
1615                MessageCategory::ReadOnly => {
1616                    msgs.push_back(msg).expect("just recv'd");
1617                    tracing::debug!("starting read transaction");
1618                    let txn = self.db.begin_read()?;
1619                    let tables = ReadOnlyTables::new(&txn)?;
1620                    let count = self.state.options.batch.max_read_batch;
1621                    let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
1622                    tokio::pin!(timeout);
1623                    for _ in 0..count {
1624                        tokio::select! {
1625                            msg = msgs.recv() => {
1626                                if let Some(msg) = msg {
1627                                    if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1628                                        msgs.push_back(msg).expect("just recv'd");
1629                                        break;
1630                                    }
1631                                } else {
1632                                    break;
1633                                }
1634                            }
1635                            _ = &mut timeout => {
1636                                tracing::debug!("read transaction timed out");
1637                                break;
1638                            }
1639                        }
1640                    }
1641                    tracing::debug!("done with read transaction");
1642                }
1643                MessageCategory::ReadWrite => {
1644                    msgs.push_back(msg).expect("just recv'd");
1645                    tracing::debug!("starting write transaction");
1646                    let txn = self.db.begin_write()?;
1647                    let mut delete_after_commit = Default::default();
1648                    let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1649                    let count = self.state.options.batch.max_write_batch;
1650                    let timeout = tokio::time::sleep(self.state.options.batch.max_write_duration);
1651                    tokio::pin!(timeout);
1652                    for _ in 0..count {
1653                        tokio::select! {
1654                            msg = msgs.recv() => {
1655                                if let Some(msg) = msg {
1656                                    if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1657                                        msgs.push_back(msg).expect("just recv'd");
1658                                        break;
1659                                    }
1660                                } else {
1661                                    break;
1662                                }
1663                            }
1664                            _ = &mut timeout => {
1665                                tracing::debug!("write transaction timed out");
1666                                break;
1667                            }
1668                        }
1669                    }
1670                    drop(tables);
1671                    txn.commit()?;
1672                    delete_after_commit.apply_and_clear(&self.state.options.path);
1673                    tracing::debug!("write transaction committed");
1674                }
1675            }
1676        }
1677        tracing::debug!("redb actor done");
1678        Ok(())
1679    }
1680}
1681
1682impl ActorState {
1683    fn entry_status(
1684        &mut self,
1685        tables: &impl ReadableTables,
1686        hash: Hash,
1687    ) -> ActorResult<EntryStatus> {
1688        let status = match tables.blobs().get(hash)? {
1689            Some(guard) => match guard.value() {
1690                EntryState::Complete { .. } => EntryStatus::Complete,
1691                EntryState::Partial { .. } => EntryStatus::Partial,
1692            },
1693            None => EntryStatus::NotFound,
1694        };
1695        Ok(status)
1696    }
1697
1698    fn get(
1699        &mut self,
1700        tables: &impl ReadableTables,
1701        hash: Hash,
1702    ) -> ActorResult<Option<BaoFileHandle>> {
1703        if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1704            return Ok(Some(handle));
1705        }
1706        let Some(entry) = tables.blobs().get(hash)? else {
1707            return Ok(None);
1708        };
1709        // todo: if complete, load inline data and/or outboard into memory if needed,
1710        // and return a complete entry.
1711        let entry = entry.value();
1712        let config = self.create_options.clone();
1713        let handle = match entry {
1714            EntryState::Complete {
1715                data_location,
1716                outboard_location,
1717            } => {
1718                let data = load_data(tables, &self.options.path, data_location, &hash)?;
1719                let outboard = load_outboard(
1720                    tables,
1721                    &self.options.path,
1722                    outboard_location,
1723                    data.size(),
1724                    &hash,
1725                )?;
1726                BaoFileHandle::new_complete(config, hash, data, outboard)
1727            }
1728            EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1729        };
1730        self.handles.insert(hash, handle.downgrade());
1731        Ok(Some(handle))
1732    }
1733
1734    fn export(
1735        &mut self,
1736        tables: &mut Tables,
1737        cmd: Export,
1738        tx: oneshot::Sender<ActorResult<()>>,
1739    ) -> ActorResult<()> {
1740        let Export {
1741            temp_tag,
1742            target,
1743            mode,
1744            progress,
1745        } = cmd;
1746        let guard = tables
1747            .blobs
1748            .get(temp_tag.hash())?
1749            .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1750        let entry = guard.value();
1751        match entry {
1752            EntryState::Complete {
1753                data_location,
1754                outboard_location,
1755            } => match data_location {
1756                DataLocation::Inline(()) => {
1757                    // ignore export mode, just copy. For inline data we can not reference anyway.
1758                    let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1759                        ActorError::Inconsistent("inline data not found".to_owned())
1760                    })?;
1761                    tracing::trace!("exporting inline data to {}", target.display());
1762                    tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1763                        .ok();
1764                }
1765                DataLocation::Owned(size) => {
1766                    let path = self.options.path.owned_data_path(temp_tag.hash());
1767                    match mode {
1768                        ExportMode::Copy => {
1769                            // copy in an external thread
1770                            self.rt.spawn_blocking(move || {
1771                                tx.send(export_file_copy(temp_tag, path, size, target, progress))
1772                                    .ok();
1773                            });
1774                        }
1775                        ExportMode::TryReference => match std::fs::rename(&path, &target) {
1776                            Ok(()) => {
1777                                let entry = EntryState::Complete {
1778                                    data_location: DataLocation::External(vec![target], size),
1779                                    outboard_location,
1780                                };
1781                                drop(guard);
1782                                tables.blobs.insert(temp_tag.hash(), entry)?;
1783                                drop(temp_tag);
1784                                tx.send(Ok(())).ok();
1785                            }
1786                            Err(e) => {
1787                                const ERR_CROSS: i32 = 18;
1788                                if e.raw_os_error() == Some(ERR_CROSS) {
1789                                    // Cross device renaming failed, copy instead
1790                                    match std::fs::copy(&path, &target) {
1791                                        Ok(_) => {
1792                                            let entry = EntryState::Complete {
1793                                                data_location: DataLocation::External(
1794                                                    vec![target],
1795                                                    size,
1796                                                ),
1797                                                outboard_location,
1798                                            };
1799
1800                                            drop(guard);
1801                                            tables.blobs.insert(temp_tag.hash(), entry)?;
1802                                            tables
1803                                                .delete_after_commit
1804                                                .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1805                                            drop(temp_tag);
1806
1807                                            tx.send(Ok(())).ok();
1808                                        }
1809                                        Err(e) => {
1810                                            drop(temp_tag);
1811                                            tx.send(Err(e.into())).ok();
1812                                        }
1813                                    }
1814                                } else {
1815                                    drop(temp_tag);
1816                                    tx.send(Err(e.into())).ok();
1817                                }
1818                            }
1819                        },
1820                    }
1821                }
1822                DataLocation::External(paths, size) => {
1823                    let path = paths
1824                        .first()
1825                        .ok_or_else(|| {
1826                            ActorError::Inconsistent("external path missing".to_owned())
1827                        })?
1828                        .to_owned();
1829                    // we can not reference external files, so we just copy them. But this does not have to happen in the actor.
1830                    if path == target {
1831                        // export to the same path, nothing to do
1832                        tx.send(Ok(())).ok();
1833                    } else {
1834                        // copy in an external thread
1835                        self.rt.spawn_blocking(move || {
1836                            tx.send(export_file_copy(temp_tag, path, size, target, progress))
1837                                .ok();
1838                        });
1839                    }
1840                }
1841            },
1842            EntryState::Partial { .. } => {
1843                return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1844            }
1845        }
1846        Ok(())
1847    }
1848
1849    fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1850        let Import {
1851            content_id,
1852            source: file,
1853            outboard,
1854            data_size,
1855        } = cmd;
1856        let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1857        let inline_data = data_size <= self.options.inline.max_data_inlined;
1858        let inline_outboard =
1859            outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1860        // from here on, everything related to the hash is protected by the temp tag
1861        let tag = self.temp.temp_tag(content_id);
1862        let hash = *tag.hash();
1863        self.protected.insert(hash);
1864        // move the data file into place, or create a reference to it
1865        let data_location = match file {
1866            ImportSource::External(external_path) => {
1867                tracing::debug!("stored external reference {}", external_path.display());
1868                if inline_data {
1869                    tracing::debug!(
1870                        "reading external data to inline it: {}",
1871                        external_path.display()
1872                    );
1873                    let data = Bytes::from(std::fs::read(&external_path)?);
1874                    DataLocation::Inline(data)
1875                } else {
1876                    DataLocation::External(vec![external_path], data_size)
1877                }
1878            }
1879            ImportSource::TempFile(temp_data_path) => {
1880                if inline_data {
1881                    tracing::debug!(
1882                        "reading and deleting temp file to inline it: {}",
1883                        temp_data_path.display()
1884                    );
1885                    let data = Bytes::from(read_and_remove(&temp_data_path)?);
1886                    DataLocation::Inline(data)
1887                } else {
1888                    let data_path = self.options.path.owned_data_path(&hash);
1889                    std::fs::rename(&temp_data_path, &data_path)?;
1890                    tracing::debug!("created file {}", data_path.display());
1891                    DataLocation::Owned(data_size)
1892                }
1893            }
1894            ImportSource::Memory(data) => {
1895                if inline_data {
1896                    DataLocation::Inline(data)
1897                } else {
1898                    let data_path = self.options.path.owned_data_path(&hash);
1899                    overwrite_and_sync(&data_path, &data)?;
1900                    tracing::debug!("created file {}", data_path.display());
1901                    DataLocation::Owned(data_size)
1902                }
1903            }
1904        };
1905        let outboard_location = if let Some(outboard) = outboard {
1906            if inline_outboard {
1907                OutboardLocation::Inline(Bytes::from(outboard))
1908            } else {
1909                let outboard_path = self.options.path.owned_outboard_path(&hash);
1910                // todo: this blocks the actor when writing a large outboard
1911                overwrite_and_sync(&outboard_path, &outboard)?;
1912                OutboardLocation::Owned
1913            }
1914        } else {
1915            OutboardLocation::NotNeeded
1916        };
1917        if let DataLocation::Inline(data) = &data_location {
1918            tables.inline_data.insert(hash, data.as_ref())?;
1919        }
1920        if let OutboardLocation::Inline(outboard) = &outboard_location {
1921            tables.inline_outboard.insert(hash, outboard.as_ref())?;
1922        }
1923        if let DataLocation::Owned(_) = &data_location {
1924            tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1925        }
1926        if let OutboardLocation::Owned = &outboard_location {
1927            tables
1928                .delete_after_commit
1929                .remove(hash, [BaoFilePart::Outboard]);
1930        }
1931        let entry = tables.blobs.get(hash)?;
1932        let entry = entry.map(|x| x.value()).unwrap_or_default();
1933        let data_location = data_location.discard_inline_data();
1934        let outboard_location = outboard_location.discard_extra_data();
1935        let entry = entry.union(EntryState::Complete {
1936            data_location,
1937            outboard_location,
1938        })?;
1939        tables.blobs.insert(hash, entry)?;
1940        Ok((tag, data_size))
1941    }
1942
1943    fn get_or_create(
1944        &mut self,
1945        tables: &impl ReadableTables,
1946        hash: Hash,
1947    ) -> ActorResult<BaoFileHandle> {
1948        self.protected.insert(hash);
1949        if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1950            return Ok(handle);
1951        }
1952        let entry = tables.blobs().get(hash)?;
1953        let handle = if let Some(entry) = entry {
1954            let entry = entry.value();
1955            match entry {
1956                EntryState::Complete {
1957                    data_location,
1958                    outboard_location,
1959                    ..
1960                } => {
1961                    let data = load_data(tables, &self.options.path, data_location, &hash)?;
1962                    let outboard = load_outboard(
1963                        tables,
1964                        &self.options.path,
1965                        outboard_location,
1966                        data.size(),
1967                        &hash,
1968                    )?;
1969                    tracing::debug!("creating complete entry for {}", hash.to_hex());
1970                    BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1971                }
1972                EntryState::Partial { .. } => {
1973                    tracing::debug!("creating partial entry for {}", hash.to_hex());
1974                    BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1975                }
1976            }
1977        } else {
1978            BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1979        };
1980        self.handles.insert(hash, handle.downgrade());
1981        Ok(handle)
1982    }
1983
1984    /// Read the entire blobs table. Callers can then sift through the results to find what they need
1985    fn blobs(
1986        &mut self,
1987        tables: &impl ReadableTables,
1988        filter: FilterPredicate<Hash, EntryState>,
1989    ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1990        let mut res = Vec::new();
1991        let mut index = 0u64;
1992        #[allow(clippy::explicit_counter_loop)]
1993        for item in tables.blobs().iter()? {
1994            match item {
1995                Ok((k, v)) => {
1996                    if let Some(item) = filter(index, k, v) {
1997                        res.push(Ok(item));
1998                    }
1999                }
2000                Err(e) => {
2001                    res.push(Err(e));
2002                }
2003            }
2004            index += 1;
2005        }
2006        Ok(res)
2007    }
2008
2009    /// Read the entire tags table. Callers can then sift through the results to find what they need
2010    fn tags(
2011        &mut self,
2012        tables: &impl ReadableTables,
2013        from: Option<Tag>,
2014        to: Option<Tag>,
2015    ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
2016        let mut res = Vec::new();
2017        let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
2018        let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2019        for item in tables.tags().range((from, to))? {
2020            match item {
2021                Ok((k, v)) => {
2022                    res.push(Ok((k.value(), v.value())));
2023                }
2024                Err(e) => {
2025                    res.push(Err(e));
2026                }
2027            }
2028        }
2029        Ok(res)
2030    }
2031
2032    fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
2033        let tag = {
2034            let tag = Tag::auto(SystemTime::now(), |x| {
2035                matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
2036            });
2037            tables.tags.insert(tag.clone(), content)?;
2038            tag
2039        };
2040        Ok(tag)
2041    }
2042
2043    fn rename_tag(&mut self, tables: &mut Tables, from: Tag, to: Tag) -> ActorResult<()> {
2044        let value = tables
2045            .tags
2046            .remove(from)?
2047            .ok_or_else(|| {
2048                ActorError::Io(io::Error::new(io::ErrorKind::NotFound, "tag not found"))
2049            })?
2050            .value();
2051        tables.tags.insert(to, value)?;
2052        Ok(())
2053    }
2054
2055    fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> {
2056        tables.tags.insert(tag, value)?;
2057        Ok(())
2058    }
2059
2060    fn delete_tags(
2061        &self,
2062        tables: &mut Tables,
2063        from: Option<Tag>,
2064        to: Option<Tag>,
2065    ) -> ActorResult<()> {
2066        let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
2067        let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2068        let removing = tables.tags.extract_from_if((from, to), |_, _| true)?;
2069        // drain the iterator to actually remove the tags
2070        for res in removing {
2071            res?;
2072        }
2073        Ok(())
2074    }
2075
2076    fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
2077        let entry = tables
2078            .blobs
2079            .get(hash)?
2080            .map(|x| x.value())
2081            .unwrap_or_default();
2082        let entry = entry.union(EntryState::Partial { size: None })?;
2083        tables.blobs.insert(hash, entry)?;
2084        // protect all three parts of the entry
2085        tables.delete_after_commit.remove(
2086            hash,
2087            [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
2088        );
2089        Ok(())
2090    }
2091
2092    fn update_inline_options(
2093        &mut self,
2094        db: &redb::Database,
2095        options: InlineOptions,
2096        reapply: bool,
2097    ) -> ActorResult<()> {
2098        self.options.inline = options;
2099        if reapply {
2100            let mut delete_after_commit = Default::default();
2101            let tx = db.begin_write()?;
2102            {
2103                let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
2104                let hashes = tables
2105                    .blobs
2106                    .iter()?
2107                    .map(|x| x.map(|(k, _)| k.value()))
2108                    .collect::<Result<Vec<_>, _>>()?;
2109                for hash in hashes {
2110                    let guard = tables
2111                        .blobs
2112                        .get(hash)?
2113                        .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
2114                    let entry = guard.value();
2115                    if let EntryState::Complete {
2116                        data_location,
2117                        outboard_location,
2118                    } = entry
2119                    {
2120                        let (data_location, data_size, data_location_changed) = match data_location
2121                        {
2122                            DataLocation::Owned(size) => {
2123                                // inline
2124                                if size <= self.options.inline.max_data_inlined {
2125                                    let path = self.options.path.owned_data_path(&hash);
2126                                    let data = std::fs::read(&path)?;
2127                                    tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2128                                    tables.inline_data.insert(hash, data.as_slice())?;
2129                                    (DataLocation::Inline(()), size, true)
2130                                } else {
2131                                    (DataLocation::Owned(size), size, false)
2132                                }
2133                            }
2134                            DataLocation::Inline(()) => {
2135                                let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
2136                                    ActorError::Inconsistent("inline data missing".to_owned())
2137                                })?;
2138                                let data = guard.value();
2139                                let size = data.len() as u64;
2140                                if size > self.options.inline.max_data_inlined {
2141                                    let path = self.options.path.owned_data_path(&hash);
2142                                    std::fs::write(&path, data)?;
2143                                    drop(guard);
2144                                    tables.inline_data.remove(hash)?;
2145                                    (DataLocation::Owned(size), size, true)
2146                                } else {
2147                                    (DataLocation::Inline(()), size, false)
2148                                }
2149                            }
2150                            DataLocation::External(paths, size) => {
2151                                (DataLocation::External(paths, size), size, false)
2152                            }
2153                        };
2154                        let outboard_size = raw_outboard_size(data_size);
2155                        let (outboard_location, outboard_location_changed) = match outboard_location
2156                        {
2157                            OutboardLocation::Owned
2158                                if outboard_size <= self.options.inline.max_outboard_inlined =>
2159                            {
2160                                let path = self.options.path.owned_outboard_path(&hash);
2161                                let outboard = std::fs::read(&path)?;
2162                                tables
2163                                    .delete_after_commit
2164                                    .insert(hash, [BaoFilePart::Outboard]);
2165                                tables.inline_outboard.insert(hash, outboard.as_slice())?;
2166                                (OutboardLocation::Inline(()), true)
2167                            }
2168                            OutboardLocation::Inline(())
2169                                if outboard_size > self.options.inline.max_outboard_inlined =>
2170                            {
2171                                let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2172                                    ActorError::Inconsistent("inline outboard missing".to_owned())
2173                                })?;
2174                                let outboard = guard.value();
2175                                let path = self.options.path.owned_outboard_path(&hash);
2176                                std::fs::write(&path, outboard)?;
2177                                drop(guard);
2178                                tables.inline_outboard.remove(hash)?;
2179                                (OutboardLocation::Owned, true)
2180                            }
2181                            x => (x, false),
2182                        };
2183                        drop(guard);
2184                        if data_location_changed || outboard_location_changed {
2185                            tables.blobs.insert(
2186                                hash,
2187                                EntryState::Complete {
2188                                    data_location,
2189                                    outboard_location,
2190                                },
2191                            )?;
2192                        }
2193                    }
2194                }
2195            }
2196            tx.commit()?;
2197            delete_after_commit.apply_and_clear(&self.options.path);
2198        }
2199        Ok(())
2200    }
2201
2202    fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
2203        for hash in hashes {
2204            if self.temp.as_ref().read().unwrap().contains(&hash) {
2205                continue;
2206            }
2207            if !force && self.protected.contains(&hash) {
2208                tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2209                continue;
2210            }
2211
2212            tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2213
2214            self.handles.remove(&hash);
2215            if let Some(entry) = tables.blobs.remove(hash)? {
2216                match entry.value() {
2217                    EntryState::Complete {
2218                        data_location,
2219                        outboard_location,
2220                    } => {
2221                        match data_location {
2222                            DataLocation::Inline(_) => {
2223                                tables.inline_data.remove(hash)?;
2224                            }
2225                            DataLocation::Owned(_) => {
2226                                // mark the data for deletion
2227                                tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2228                            }
2229                            DataLocation::External(_, _) => {}
2230                        }
2231                        match outboard_location {
2232                            OutboardLocation::Inline(_) => {
2233                                tables.inline_outboard.remove(hash)?;
2234                            }
2235                            OutboardLocation::Owned => {
2236                                // mark the outboard for deletion
2237                                tables
2238                                    .delete_after_commit
2239                                    .insert(hash, [BaoFilePart::Outboard]);
2240                            }
2241                            OutboardLocation::NotNeeded => {}
2242                        }
2243                    }
2244                    EntryState::Partial { .. } => {
2245                        // mark all parts for deletion
2246                        tables.delete_after_commit.insert(
2247                            hash,
2248                            [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2249                        );
2250                    }
2251                }
2252            }
2253        }
2254        Ok(())
2255    }
2256
2257    fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2258        let hash = entry.hash();
2259        let mut info = None;
2260        tracing::trace!("on_complete({})", hash.to_hex());
2261        entry.transform(|state| {
2262            tracing::trace!("on_complete transform {:?}", state);
2263            let entry = match complete_storage(
2264                state,
2265                &hash,
2266                &self.options.path,
2267                &self.options.inline,
2268                tables.delete_after_commit,
2269            )? {
2270                Ok(entry) => {
2271                    // store the info so we can insert it into the db later
2272                    info = Some((
2273                        entry.data_size(),
2274                        entry.data.mem().cloned(),
2275                        entry.outboard_size(),
2276                        entry.outboard.mem().cloned(),
2277                    ));
2278                    entry
2279                }
2280                Err(entry) => {
2281                    // the entry was already complete, nothing to do
2282                    entry
2283                }
2284            };
2285            Ok(BaoFileStorage::Complete(entry))
2286        })?;
2287        if let Some((data_size, data, outboard_size, outboard)) = info {
2288            let data_location = if data.is_some() {
2289                DataLocation::Inline(())
2290            } else {
2291                DataLocation::Owned(data_size)
2292            };
2293            let outboard_location = if outboard_size == 0 {
2294                OutboardLocation::NotNeeded
2295            } else if outboard.is_some() {
2296                OutboardLocation::Inline(())
2297            } else {
2298                OutboardLocation::Owned
2299            };
2300            {
2301                tracing::debug!(
2302                    "inserting complete entry for {}, {} bytes",
2303                    hash.to_hex(),
2304                    data_size,
2305                );
2306                let entry = tables
2307                    .blobs()
2308                    .get(hash)?
2309                    .map(|x| x.value())
2310                    .unwrap_or_default();
2311                let entry = entry.union(EntryState::Complete {
2312                    data_location,
2313                    outboard_location,
2314                })?;
2315                tables.blobs.insert(hash, entry)?;
2316                if let Some(data) = data {
2317                    tables.inline_data.insert(hash, data.as_ref())?;
2318                }
2319                if let Some(outboard) = outboard {
2320                    tables.inline_outboard.insert(hash, outboard.as_ref())?;
2321                }
2322            }
2323        }
2324        Ok(())
2325    }
2326
2327    fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2328        match msg {
2329            ActorMessage::UpdateInlineOptions {
2330                inline_options,
2331                reapply,
2332                tx,
2333            } => {
2334                let res = self.update_inline_options(db, inline_options, reapply);
2335                tx.send(res?).ok();
2336            }
2337            ActorMessage::Fsck {
2338                repair,
2339                progress,
2340                tx,
2341            } => {
2342                let res = self.consistency_check(db, repair, progress);
2343                tx.send(res).ok();
2344            }
2345            ActorMessage::Sync { tx } => {
2346                tx.send(()).ok();
2347            }
2348            x => {
2349                return Err(ActorError::Inconsistent(format!(
2350                    "unexpected message for handle_toplevel: {:?}",
2351                    x
2352                )))
2353            }
2354        }
2355        Ok(())
2356    }
2357
2358    fn handle_readonly(
2359        &mut self,
2360        tables: &impl ReadableTables,
2361        msg: ActorMessage,
2362    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2363        match msg {
2364            ActorMessage::Get { hash, tx } => {
2365                let res = self.get(tables, hash);
2366                tx.send(res).ok();
2367            }
2368            ActorMessage::GetOrCreate { hash, tx } => {
2369                let res = self.get_or_create(tables, hash);
2370                tx.send(res).ok();
2371            }
2372            ActorMessage::EntryStatus { hash, tx } => {
2373                let res = self.entry_status(tables, hash);
2374                tx.send(res).ok();
2375            }
2376            ActorMessage::Blobs { filter, tx } => {
2377                let res = self.blobs(tables, filter);
2378                tx.send(res).ok();
2379            }
2380            ActorMessage::Tags { from, to, tx } => {
2381                let res = self.tags(tables, from, to);
2382                tx.send(res).ok();
2383            }
2384            ActorMessage::GcStart { tx } => {
2385                self.protected.clear();
2386                self.handles.retain(|_, weak| weak.is_live());
2387                tx.send(()).ok();
2388            }
2389            ActorMessage::Dump => {
2390                dump(tables).ok();
2391            }
2392            #[cfg(test)]
2393            ActorMessage::EntryState { hash, tx } => {
2394                tx.send(self.entry_state(tables, hash)).ok();
2395            }
2396            ActorMessage::GetFullEntryState { hash, tx } => {
2397                let res = self.get_full_entry_state(tables, hash);
2398                tx.send(res).ok();
2399            }
2400            x => return Ok(Err(x)),
2401        }
2402        Ok(Ok(()))
2403    }
2404
2405    fn handle_readwrite(
2406        &mut self,
2407        tables: &mut Tables,
2408        msg: ActorMessage,
2409    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2410        match msg {
2411            ActorMessage::Import { cmd, tx } => {
2412                let res = self.import(tables, cmd);
2413                tx.send(res).ok();
2414            }
2415            ActorMessage::SetTag { tag, value, tx } => {
2416                let res = self.set_tag(tables, tag, value);
2417                tx.send(res).ok();
2418            }
2419            ActorMessage::DeleteTags { from, to, tx } => {
2420                let res = self.delete_tags(tables, from, to);
2421                tx.send(res).ok();
2422            }
2423            ActorMessage::CreateTag { hash, tx } => {
2424                let res = self.create_tag(tables, hash);
2425                tx.send(res).ok();
2426            }
2427            ActorMessage::RenameTag { from, to, tx } => {
2428                let res = self.rename_tag(tables, from, to);
2429                tx.send(res).ok();
2430            }
2431            ActorMessage::Delete { hashes, tx } => {
2432                let res = self.delete(tables, hashes, true);
2433                tx.send(res).ok();
2434            }
2435            ActorMessage::GcDelete { hashes, tx } => {
2436                let res = self.delete(tables, hashes, false);
2437                tx.send(res).ok();
2438            }
2439            ActorMessage::OnComplete { handle } => {
2440                let res = self.on_complete(tables, handle);
2441                res.ok();
2442            }
2443            ActorMessage::Export { cmd, tx } => {
2444                self.export(tables, cmd, tx)?;
2445            }
2446            ActorMessage::OnMemSizeExceeded { hash } => {
2447                let res = self.on_mem_size_exceeded(tables, hash);
2448                res.ok();
2449            }
2450            ActorMessage::Dump => {
2451                let res = dump(tables);
2452                res.ok();
2453            }
2454            ActorMessage::SetFullEntryState { hash, entry, tx } => {
2455                let res = self.set_full_entry_state(tables, hash, entry);
2456                tx.send(res).ok();
2457            }
2458            msg => {
2459                // try to handle it as readonly
2460                if let Err(msg) = self.handle_readonly(tables, msg)? {
2461                    return Ok(Err(msg));
2462                }
2463            }
2464        }
2465        Ok(Ok(()))
2466    }
2467}
2468
2469/// Export a file by copying out its content to a new location
2470fn export_file_copy(
2471    temp_tag: TempTag,
2472    path: PathBuf,
2473    size: u64,
2474    target: PathBuf,
2475    progress: ExportProgressCb,
2476) -> ActorResult<()> {
2477    progress(0)?;
2478    // todo: fine grained copy progress
2479    reflink_copy::reflink_or_copy(path, target)?;
2480    progress(size)?;
2481    drop(temp_tag);
2482    Ok(())
2483}
2484
2485fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2486    for e in tables.blobs().iter()? {
2487        let (k, v) = e?;
2488        let k = k.value();
2489        let v = v.value();
2490        println!("blobs: {} -> {:?}", k.to_hex(), v);
2491    }
2492    for e in tables.tags().iter()? {
2493        let (k, v) = e?;
2494        let k = k.value();
2495        let v = v.value();
2496        println!("tags: {} -> {:?}", k, v);
2497    }
2498    for e in tables.inline_data().iter()? {
2499        let (k, v) = e?;
2500        let k = k.value();
2501        let v = v.value();
2502        println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2503    }
2504    for e in tables.inline_outboard().iter()? {
2505        let (k, v) = e?;
2506        let k = k.value();
2507        let v = v.value();
2508        println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2509    }
2510    Ok(())
2511}
2512
2513fn load_data(
2514    tables: &impl ReadableTables,
2515    options: &PathOptions,
2516    location: DataLocation<(), u64>,
2517    hash: &Hash,
2518) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2519    Ok(match location {
2520        DataLocation::Inline(()) => {
2521            let Some(data) = tables.inline_data().get(hash)? else {
2522                return Err(ActorError::Inconsistent(format!(
2523                    "inconsistent database state: {} should have inline data but does not",
2524                    hash.to_hex()
2525                )));
2526            };
2527            MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2528        }
2529        DataLocation::Owned(data_size) => {
2530            let path = options.owned_data_path(hash);
2531            let Ok(file) = std::fs::File::open(&path) else {
2532                return Err(io::Error::new(
2533                    io::ErrorKind::NotFound,
2534                    format!("file not found: {}", path.display()),
2535                )
2536                .into());
2537            };
2538            MemOrFile::File((file, data_size))
2539        }
2540        DataLocation::External(paths, data_size) => {
2541            if paths.is_empty() {
2542                return Err(ActorError::Inconsistent(
2543                    "external data location must not be empty".into(),
2544                ));
2545            }
2546            let path = &paths[0];
2547            let Ok(file) = std::fs::File::open(path) else {
2548                return Err(io::Error::new(
2549                    io::ErrorKind::NotFound,
2550                    format!("external file not found: {}", path.display()),
2551                )
2552                .into());
2553            };
2554            MemOrFile::File((file, data_size))
2555        }
2556    })
2557}
2558
2559fn load_outboard(
2560    tables: &impl ReadableTables,
2561    options: &PathOptions,
2562    location: OutboardLocation,
2563    size: u64,
2564    hash: &Hash,
2565) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2566    Ok(match location {
2567        OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2568        OutboardLocation::Inline(_) => {
2569            let Some(outboard) = tables.inline_outboard().get(hash)? else {
2570                return Err(ActorError::Inconsistent(format!(
2571                    "inconsistent database state: {} should have inline outboard but does not",
2572                    hash.to_hex()
2573                )));
2574            };
2575            MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2576        }
2577        OutboardLocation::Owned => {
2578            let outboard_size = raw_outboard_size(size);
2579            let path = options.owned_outboard_path(hash);
2580            let Ok(file) = std::fs::File::open(&path) else {
2581                return Err(io::Error::new(
2582                    io::ErrorKind::NotFound,
2583                    format!("file not found: {} size={}", path.display(), outboard_size),
2584                )
2585                .into());
2586            };
2587            MemOrFile::File((file, outboard_size))
2588        }
2589    })
2590}
2591
2592/// Take a possibly incomplete storage and turn it into complete
2593fn complete_storage(
2594    storage: BaoFileStorage,
2595    hash: &Hash,
2596    path_options: &PathOptions,
2597    inline_options: &InlineOptions,
2598    delete_after_commit: &mut DeleteSet,
2599) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2600    let (data, outboard, _sizes) = match storage {
2601        BaoFileStorage::Complete(c) => return Ok(Err(c)),
2602        BaoFileStorage::IncompleteMem(storage) => {
2603            let (data, outboard, sizes) = storage.into_parts();
2604            (
2605                MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2606                MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2607                MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2608            )
2609        }
2610        BaoFileStorage::IncompleteFile(storage) => {
2611            let (data, outboard, sizes) = storage.into_parts();
2612            (
2613                MemOrFile::File(data),
2614                MemOrFile::File(outboard),
2615                MemOrFile::File(sizes),
2616            )
2617        }
2618    };
2619    let data_size = data.size()?.unwrap();
2620    let outboard_size = outboard.size()?.unwrap();
2621    // todo: perform more sanity checks if in debug mode
2622    debug_assert!(raw_outboard_size(data_size) == outboard_size);
2623    // inline data if needed, or write to file if needed
2624    let data = if data_size <= inline_options.max_data_inlined {
2625        match data {
2626            MemOrFile::File(data) => {
2627                let mut buf = vec![0; data_size as usize];
2628                data.read_at(0, &mut buf)?;
2629                // mark data for deletion after commit
2630                delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2631                MemOrFile::Mem(Bytes::from(buf))
2632            }
2633            MemOrFile::Mem(data) => MemOrFile::Mem(data),
2634        }
2635    } else {
2636        // protect the data from previous deletions
2637        delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2638        match data {
2639            MemOrFile::Mem(data) => {
2640                let path = path_options.owned_data_path(hash);
2641                let file = overwrite_and_sync(&path, &data)?;
2642                MemOrFile::File((file, data_size))
2643            }
2644            MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2645        }
2646    };
2647    // inline outboard if needed, or write to file if needed
2648    let outboard = if outboard_size == 0 {
2649        Default::default()
2650    } else if outboard_size <= inline_options.max_outboard_inlined {
2651        match outboard {
2652            MemOrFile::File(outboard) => {
2653                let mut buf = vec![0; outboard_size as usize];
2654                outboard.read_at(0, &mut buf)?;
2655                drop(outboard);
2656                // mark outboard for deletion after commit
2657                delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2658                MemOrFile::Mem(Bytes::from(buf))
2659            }
2660            MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2661        }
2662    } else {
2663        // protect the outboard from previous deletions
2664        delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2665        match outboard {
2666            MemOrFile::Mem(outboard) => {
2667                let path = path_options.owned_outboard_path(hash);
2668                let file = overwrite_and_sync(&path, &outboard)?;
2669                MemOrFile::File((file, outboard_size))
2670            }
2671            MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2672        }
2673    };
2674    // mark sizes for deletion after commit in any case - a complete entry
2675    // does not need sizes.
2676    delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2677    Ok(Ok(CompleteStorage { data, outboard }))
2678}