iroh_blobs/store/
fs.rs

1//! redb backed storage
2//!
3//! Data can get into the store in two ways:
4//!
5//! 1. import from local data
6//! 2. sync from a remote
7//!
8//! These two cases are very different. In the first case, we have the data
9//! completely and don't know the hash yet. We compute the outboard and hash,
10//! and only then move/reference the data into the store.
11//!
12//! The entry for the hash comes into existence already complete.
13//!
14//! In the second case, we know the hash, but don't have the data yet. We create
15//! a partial entry, and then request the data from the remote. This is the more
16//! complex case.
17//!
18//! Partial entries always start as pure in memory entries without a database
19//! entry. Only once we receive enough data, we convert them into a persistent
20//! partial entry. This is necessary because we can't trust the size given
21//! by the remote side before receiving data. It is also an optimization,
22//! because for small blobs it is not worth it to create a partial entry.
23//!
24//! A persistent partial entry is always stored as three files in the file
25//! system: The data file, the outboard file, and a sizes file that contains
26//! the most up to date information about the size of the data.
27//!
28//! The redb database entry for a persistent partial entry does not contain
29//! any information about the size of the data until the size is exactly known.
30//!
31//! Updating this information on each write would be too costly.
32//!
33//! Marking a partial entry as complete is done from the outside. At this point
34//! the size is taken as validated. Depending on the size we decide whether to
35//! store data and outboard inline or to keep storing it in external files.
36//!
37//! Data can get out of the store in two ways:
38//!
39//! 1. the data and outboard of both partial and complete entries can be read at any time and
40//!    shared over the network. Only data that is complete will be shared, everything else will
41//!    lead to validation errors.
42//!
43//! 2. entries can be exported to the file system. This currently only works for complete entries.
44//!
45//! Tables:
46//!
47//! The blobs table contains a mapping from hash to rough entry state.
48//! The inline_data table contains the actual data for complete entries.
49//! The inline_outboard table contains the actual outboard for complete entries.
50//! The tags table contains a mapping from tag to hash.
51//!
52//! Design:
53//!
54//! The redb store is accessed in a single threaded way by an actor that runs
55//! on its own std thread. Communication with this actor is via a flume channel,
56//! with oneshot channels for the return values if needed.
57//!
58//! Errors:
59//!
60//! ActorError is an enum containing errors that can happen inside message
61//! handlers of the actor. This includes various redb related errors and io
62//! errors when reading or writing non-inlined data or outboard files.
63//!
64//! OuterError is an enum containing all the actor errors and in addition
65//! errors when communicating with the actor.
66use std::{
67    collections::{BTreeMap, BTreeSet},
68    future::Future,
69    io,
70    path::{Path, PathBuf},
71    sync::{Arc, RwLock},
72    time::{Duration, SystemTime},
73};
74
75use bao_tree::io::{
76    fsm::Outboard,
77    sync::{ReadAt, Size},
78};
79use bytes::Bytes;
80use futures_lite::{Stream, StreamExt};
81use genawaiter::rc::{Co, Gen};
82use iroh_io::AsyncSliceReader;
83use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
84use serde::{Deserialize, Serialize};
85use smallvec::SmallVec;
86use tokio::io::AsyncWriteExt;
87use tracing::trace_span;
88
89mod tables;
90#[doc(hidden)]
91pub mod test_support;
92#[cfg(test)]
93mod tests;
94mod util;
95mod validate;
96
97use tables::{ReadOnlyTables, ReadableTables, Tables};
98
99use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
100use super::{
101    bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102    temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
103    ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
104};
105use crate::{
106    store::{
107        bao_file::{BaoFileStorage, CompleteStorage},
108        fs::{
109            tables::BaoFilePart,
110            util::{overwrite_and_sync, read_and_remove},
111        },
112        GcMarkEvent, GcSweepEvent,
113    },
114    util::{
115        compute_outboard,
116        progress::{
117            BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
118            ProgressSender,
119        },
120        raw_outboard_size, MemOrFile, TagCounter, TagDrop,
121    },
122    BlobFormat, Hash, HashAndFormat, Tag, TempTag,
123};
124
125/// Location of the data.
126///
127/// Data can be inlined in the database, a file conceptually owned by the store,
128/// or a number of external files conceptually owned by the user.
129///
130/// Only complete data can be inlined.
131#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum DataLocation<I = (), E = ()> {
133    /// Data is in the inline_data table.
134    Inline(I),
135    /// Data is in the canonical location in the data directory.
136    Owned(E),
137    /// Data is in several external locations. This should be a non-empty list.
138    External(Vec<PathBuf>, E),
139}
140
141impl<X> DataLocation<X, u64> {
142    fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
143        Ok(match (self, that) {
144            (
145                DataLocation::External(mut paths, a_size),
146                DataLocation::External(b_paths, b_size),
147            ) => {
148                if a_size != b_size {
149                    return Err(ActorError::Inconsistent(format!(
150                        "complete size mismatch {} {}",
151                        a_size, b_size
152                    )));
153                }
154                paths.extend(b_paths);
155                paths.sort();
156                paths.dedup();
157                DataLocation::External(paths, a_size)
158            }
159            (_, b @ DataLocation::Owned(_)) => {
160                // owned needs to win, since it has an associated file. Choosing
161                // external would orphan the file.
162                b
163            }
164            (a @ DataLocation::Owned(_), _) => {
165                // owned needs to win, since it has an associated file. Choosing
166                // external would orphan the file.
167                a
168            }
169            (_, b @ DataLocation::Inline(_)) => {
170                // inline needs to win, since it has associated data. Choosing
171                // external would orphan the file.
172                b
173            }
174            (a @ DataLocation::Inline(_), _) => {
175                // inline needs to win, since it has associated data. Choosing
176                // external would orphan the file.
177                a
178            }
179        })
180    }
181}
182
183impl<I, E> DataLocation<I, E> {
184    fn discard_inline_data(self) -> DataLocation<(), E> {
185        match self {
186            DataLocation::Inline(_) => DataLocation::Inline(()),
187            DataLocation::Owned(x) => DataLocation::Owned(x),
188            DataLocation::External(paths, x) => DataLocation::External(paths, x),
189        }
190    }
191}
192
193/// Location of the outboard.
194///
195/// Outboard can be inlined in the database or a file conceptually owned by the store.
196/// Outboards are implementation specific to the store and as such are always owned.
197///
198/// Only complete outboards can be inlined.
199#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
200pub(crate) enum OutboardLocation<I = ()> {
201    /// Outboard is in the inline_outboard table.
202    Inline(I),
203    /// Outboard is in the canonical location in the data directory.
204    Owned,
205    /// Outboard is not needed
206    NotNeeded,
207}
208
209impl<I> OutboardLocation<I> {
210    fn discard_extra_data(self) -> OutboardLocation<()> {
211        match self {
212            Self::Inline(_) => OutboardLocation::Inline(()),
213            Self::Owned => OutboardLocation::Owned,
214            Self::NotNeeded => OutboardLocation::NotNeeded,
215        }
216    }
217}
218
219/// The information about an entry that we keep in the entry table for quick access.
220///
221/// The exact info to store here is TBD, so usually you should use the accessor methods.
222#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub(crate) enum EntryState<I = ()> {
224    /// For a complete entry we always know the size. It does not make much sense
225    /// to write to a complete entry, so they are much easier to share.
226    Complete {
227        /// Location of the data.
228        data_location: DataLocation<I, u64>,
229        /// Location of the outboard.
230        outboard_location: OutboardLocation<I>,
231    },
232    /// Partial entries are entries for which we know the hash, but don't have
233    /// all the data. They are created when syncing from somewhere else by hash.
234    ///
235    /// As such they are always owned. There is also no inline storage for them.
236    /// Non short lived partial entries always live in the file system, and for
237    /// short lived ones we never create a database entry in the first place.
238    Partial {
239        /// Once we get the last chunk of a partial entry, we have validated
240        /// the size of the entry despite it still being incomplete.
241        ///
242        /// E.g. a giant file where we just requested the last chunk.
243        size: Option<u64>,
244    },
245}
246
247impl Default for EntryState {
248    fn default() -> Self {
249        Self::Partial { size: None }
250    }
251}
252
253impl EntryState {
254    fn union(self, that: Self) -> ActorResult<Self> {
255        match (self, that) {
256            (
257                Self::Complete {
258                    data_location,
259                    outboard_location,
260                },
261                Self::Complete {
262                    data_location: b_data_location,
263                    ..
264                },
265            ) => Ok(Self::Complete {
266                // combine external paths if needed
267                data_location: data_location.union(b_data_location)?,
268                outboard_location,
269            }),
270            (a @ Self::Complete { .. }, Self::Partial { .. }) =>
271            // complete wins over partial
272            {
273                Ok(a)
274            }
275            (Self::Partial { .. }, b @ Self::Complete { .. }) =>
276            // complete wins over partial
277            {
278                Ok(b)
279            }
280            (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
281            // keep known size from either entry
282            {
283                let size = match (a_size, b_size) {
284                    (Some(a_size), Some(b_size)) => {
285                        // validated sizes are different. this means that at
286                        // least one validation was wrong, which would be a bug
287                        // in bao-tree.
288                        if a_size != b_size {
289                            return Err(ActorError::Inconsistent(format!(
290                                "validated size mismatch {} {}",
291                                a_size, b_size
292                            )));
293                        }
294                        Some(a_size)
295                    }
296                    (Some(a_size), None) => Some(a_size),
297                    (None, Some(b_size)) => Some(b_size),
298                    (None, None) => None,
299                };
300                Ok(Self::Partial { size })
301            }
302        }
303    }
304}
305
306impl redb::Value for EntryState {
307    type SelfType<'a> = EntryState;
308
309    type AsBytes<'a> = SmallVec<[u8; 128]>;
310
311    fn fixed_width() -> Option<usize> {
312        None
313    }
314
315    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
316    where
317        Self: 'a,
318    {
319        postcard::from_bytes(data).unwrap()
320    }
321
322    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
323    where
324        Self: 'a,
325        Self: 'b,
326    {
327        postcard::to_extend(value, SmallVec::new()).unwrap()
328    }
329
330    fn type_name() -> redb::TypeName {
331        redb::TypeName::new("EntryState")
332    }
333}
334
335/// Options for inlining small complete data or outboards.
336#[derive(Debug, Clone)]
337pub struct InlineOptions {
338    /// Maximum data size to inline.
339    pub max_data_inlined: u64,
340    /// Maximum outboard size to inline.
341    pub max_outboard_inlined: u64,
342}
343
344impl InlineOptions {
345    /// Do not inline anything, ever.
346    pub const NO_INLINE: Self = Self {
347        max_data_inlined: 0,
348        max_outboard_inlined: 0,
349    };
350    /// Always inline everything
351    pub const ALWAYS_INLINE: Self = Self {
352        max_data_inlined: u64::MAX,
353        max_outboard_inlined: u64::MAX,
354    };
355}
356
357impl Default for InlineOptions {
358    fn default() -> Self {
359        Self {
360            max_data_inlined: 1024 * 16,
361            max_outboard_inlined: 1024 * 16,
362        }
363    }
364}
365
366/// Options for directories used by the file store.
367#[derive(Debug, Clone)]
368pub struct PathOptions {
369    /// Path to the directory where data and outboard files are stored.
370    pub data_path: PathBuf,
371    /// Path to the directory where temp files are stored.
372    /// This *must* be on the same device as `data_path`, since we need to
373    /// atomically move temp files into place.
374    pub temp_path: PathBuf,
375}
376
377impl PathOptions {
378    fn new(root: &Path) -> Self {
379        Self {
380            data_path: root.join("data"),
381            temp_path: root.join("temp"),
382        }
383    }
384
385    fn owned_data_path(&self, hash: &Hash) -> PathBuf {
386        self.data_path.join(format!("{}.data", hash.to_hex()))
387    }
388
389    fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
390        self.data_path.join(format!("{}.obao4", hash.to_hex()))
391    }
392
393    fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
394        self.data_path.join(format!("{}.sizes4", hash.to_hex()))
395    }
396
397    fn temp_file_name(&self) -> PathBuf {
398        self.temp_path.join(temp_name())
399    }
400}
401
402/// Options for transaction batching.
403#[derive(Debug, Clone)]
404pub struct BatchOptions {
405    /// Maximum number of actor messages to batch before creating a new read transaction.
406    pub max_read_batch: usize,
407    /// Maximum duration to wait before committing a read transaction.
408    pub max_read_duration: Duration,
409    /// Maximum number of actor messages to batch before committing write transaction.
410    pub max_write_batch: usize,
411    /// Maximum duration to wait before committing a write transaction.
412    pub max_write_duration: Duration,
413}
414
415impl Default for BatchOptions {
416    fn default() -> Self {
417        Self {
418            max_read_batch: 10000,
419            max_read_duration: Duration::from_secs(1),
420            max_write_batch: 1000,
421            max_write_duration: Duration::from_millis(500),
422        }
423    }
424}
425
426/// Options for the file store.
427#[derive(Debug, Clone)]
428pub struct Options {
429    /// Path options.
430    pub path: PathOptions,
431    /// Inline storage options.
432    pub inline: InlineOptions,
433    /// Transaction batching options.
434    pub batch: BatchOptions,
435}
436
437#[derive(derive_more::Debug)]
438pub(crate) enum ImportSource {
439    TempFile(PathBuf),
440    External(PathBuf),
441    Memory(#[debug(skip)] Bytes),
442}
443
444impl ImportSource {
445    fn content(&self) -> MemOrFile<&[u8], &Path> {
446        match self {
447            Self::TempFile(path) => MemOrFile::File(path.as_path()),
448            Self::External(path) => MemOrFile::File(path.as_path()),
449            Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
450        }
451    }
452
453    fn len(&self) -> io::Result<u64> {
454        match self {
455            Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
456            Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
457            Self::Memory(data) => Ok(data.len() as u64),
458        }
459    }
460}
461
462/// Use BaoFileHandle as the entry type for the map.
463pub type Entry = BaoFileHandle;
464
465impl super::MapEntry for Entry {
466    fn hash(&self) -> Hash {
467        self.hash()
468    }
469
470    fn size(&self) -> BaoBlobSize {
471        let size = self.current_size().unwrap();
472        tracing::trace!("redb::Entry::size() = {}", size);
473        BaoBlobSize::new(size, self.is_complete())
474    }
475
476    fn is_complete(&self) -> bool {
477        self.is_complete()
478    }
479
480    async fn outboard(&self) -> io::Result<impl Outboard> {
481        self.outboard()
482    }
483
484    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
485        Ok(self.data_reader())
486    }
487}
488
489impl super::MapEntryMut for Entry {
490    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
491        Ok(self.writer())
492    }
493}
494
495#[derive(derive_more::Debug)]
496pub(crate) struct Import {
497    /// The hash and format of the data to import
498    content_id: HashAndFormat,
499    /// The source of the data to import, can be a temp file, external file, or memory
500    source: ImportSource,
501    /// Data size
502    data_size: u64,
503    /// Outboard without length prefix
504    #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
505    outboard: Option<Vec<u8>>,
506}
507
508#[derive(derive_more::Debug)]
509pub(crate) struct Export {
510    /// A temp tag to keep the entry alive while exporting. This also
511    /// contains the hash to be exported.
512    temp_tag: TempTag,
513    /// The target path for the export.
514    target: PathBuf,
515    /// The export mode to use.
516    mode: ExportMode,
517    /// The progress callback to use.
518    #[debug(skip)]
519    progress: ExportProgressCb,
520}
521
522#[derive(derive_more::Debug)]
523pub(crate) enum ActorMessage {
524    // Query method: get a file handle for a hash, if it exists.
525    // This will produce a file handle even for entries that are not yet in redb at all.
526    Get {
527        hash: Hash,
528        tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
529    },
530    /// Query method: get the rough entry status for a hash. Just complete, partial or not found.
531    EntryStatus {
532        hash: Hash,
533        tx: oneshot::Sender<ActorResult<EntryStatus>>,
534    },
535    #[cfg(test)]
536    /// Query method: get the full entry state for a hash, both in memory and in redb.
537    /// This is everything we got about the entry, including the actual inline outboard and data.
538    EntryState {
539        hash: Hash,
540        tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
541    },
542    /// Query method: get the full entry state for a hash.
543    GetFullEntryState {
544        hash: Hash,
545        tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
546    },
547    /// Modification method: set the full entry state for a hash.
548    SetFullEntryState {
549        hash: Hash,
550        entry: Option<EntryData>,
551        tx: oneshot::Sender<ActorResult<()>>,
552    },
553    /// Modification method: get or create a file handle for a hash.
554    ///
555    /// If the entry exists in redb, either partial or complete, the corresponding
556    /// data will be returned. If it does not yet exist, a new partial file handle
557    /// will be created, but not yet written to redb.
558    GetOrCreate {
559        hash: Hash,
560        tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
561    },
562    /// Modification method: inline size was exceeded for a partial entry.
563    /// If the entry is complete, this is a no-op. If the entry is partial and in
564    /// memory, it will be written to a file and created in redb.
565    OnMemSizeExceeded { hash: Hash },
566    /// Modification method: marks a partial entry as complete.
567    /// Calling this on a complete entry is a no-op.
568    OnComplete { handle: BaoFileHandle },
569    /// Modification method: import data into a redb store
570    ///
571    /// At this point the size, hash and outboard must already be known.
572    Import {
573        cmd: Import,
574        tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
575    },
576    /// Modification method: export data from a redb store
577    ///
578    /// In most cases this will not modify the store. Only when using
579    /// [`ExportMode::TryReference`] and the entry is large enough to not be
580    /// inlined.
581    Export {
582        cmd: Export,
583        tx: oneshot::Sender<ActorResult<()>>,
584    },
585    /// Update inline options
586    UpdateInlineOptions {
587        /// The new inline options
588        inline_options: InlineOptions,
589        /// Whether to reapply the new options to existing entries
590        reapply: bool,
591        tx: oneshot::Sender<()>,
592    },
593    /// Bulk query method: get entries from the blobs table
594    Blobs {
595        #[debug(skip)]
596        filter: FilterPredicate<Hash, EntryState>,
597        #[allow(clippy::type_complexity)]
598        tx: oneshot::Sender<
599            ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
600        >,
601    },
602    /// Bulk query method: get the entire tags table
603    Tags {
604        #[debug(skip)]
605        filter: FilterPredicate<Tag, HashAndFormat>,
606        #[allow(clippy::type_complexity)]
607        tx: oneshot::Sender<
608            ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
609        >,
610    },
611    /// Modification method: set a tag to a value, or remove it.
612    SetTag {
613        tag: Tag,
614        value: Option<HashAndFormat>,
615        tx: oneshot::Sender<ActorResult<()>>,
616    },
617    /// Modification method: create a new unique tag and set it to a value.
618    CreateTag {
619        hash: HashAndFormat,
620        tx: oneshot::Sender<ActorResult<Tag>>,
621    },
622    /// Modification method: unconditional delete the data for a number of hashes
623    Delete {
624        hashes: Vec<Hash>,
625        tx: oneshot::Sender<ActorResult<()>>,
626    },
627    /// Modification method: delete the data for a number of hashes, only if not protected
628    GcDelete {
629        hashes: Vec<Hash>,
630        tx: oneshot::Sender<ActorResult<()>>,
631    },
632    /// Sync the entire database to disk.
633    ///
634    /// This just makes sure that there is no write transaction open.
635    Sync { tx: oneshot::Sender<()> },
636    /// Internal method: dump the entire database to stdout.
637    Dump,
638    /// Internal method: validate the entire database.
639    ///
640    /// Note that this will block the actor until it is done, so don't use it
641    /// on a node under load.
642    Fsck {
643        repair: bool,
644        progress: BoxedProgressSender<ConsistencyCheckProgress>,
645        tx: oneshot::Sender<ActorResult<()>>,
646    },
647    /// Internal method: notify the actor that a new gc epoch has started.
648    ///
649    /// This will be called periodically and can be used to do misc cleanups.
650    GcStart { tx: oneshot::Sender<()> },
651    /// Internal method: shutdown the actor.
652    ///
653    /// Can have an optional oneshot sender to signal when the actor has shut down.
654    Shutdown { tx: Option<oneshot::Sender<()>> },
655}
656
657impl ActorMessage {
658    fn category(&self) -> MessageCategory {
659        match self {
660            Self::Get { .. }
661            | Self::GetOrCreate { .. }
662            | Self::EntryStatus { .. }
663            | Self::Blobs { .. }
664            | Self::Tags { .. }
665            | Self::GcStart { .. }
666            | Self::GetFullEntryState { .. }
667            | Self::Dump => MessageCategory::ReadOnly,
668            Self::Import { .. }
669            | Self::Export { .. }
670            | Self::OnMemSizeExceeded { .. }
671            | Self::OnComplete { .. }
672            | Self::SetTag { .. }
673            | Self::CreateTag { .. }
674            | Self::SetFullEntryState { .. }
675            | Self::Delete { .. }
676            | Self::GcDelete { .. } => MessageCategory::ReadWrite,
677            Self::UpdateInlineOptions { .. }
678            | Self::Sync { .. }
679            | Self::Shutdown { .. }
680            | Self::Fsck { .. } => MessageCategory::TopLevel,
681            #[cfg(test)]
682            Self::EntryState { .. } => MessageCategory::ReadOnly,
683        }
684    }
685}
686
687enum MessageCategory {
688    ReadOnly,
689    ReadWrite,
690    TopLevel,
691}
692
693/// Predicate for filtering entries in a redb table.
694pub(crate) type FilterPredicate<K, V> =
695    Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
696
697/// Storage that is using a redb database for small files and files for
698/// large files.
699#[derive(Debug, Clone)]
700pub struct Store(Arc<StoreInner>);
701
702impl Store {
703    /// Load or create a new store.
704    pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
705        let path = root.as_ref();
706        let db_path = path.join("blobs.db");
707        let options = Options {
708            path: PathOptions::new(path),
709            inline: Default::default(),
710            batch: Default::default(),
711        };
712        Self::new(db_path, options).await
713    }
714
715    /// Create a new store with custom options.
716    pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
717        // spawn_blocking because StoreInner::new creates directories
718        let rt = tokio::runtime::Handle::try_current()
719            .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
720        let inner =
721            tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
722        Ok(Self(Arc::new(inner)))
723    }
724
725    /// Update the inline options.
726    ///
727    /// When reapply is true, the new options will be applied to all existing
728    /// entries.
729    pub async fn update_inline_options(
730        &self,
731        inline_options: InlineOptions,
732        reapply: bool,
733    ) -> io::Result<()> {
734        Ok(self
735            .0
736            .update_inline_options(inline_options, reapply)
737            .await?)
738    }
739
740    /// Dump the entire content of the database to stdout.
741    pub async fn dump(&self) -> io::Result<()> {
742        Ok(self.0.dump().await?)
743    }
744}
745
746#[derive(Debug)]
747struct StoreInner {
748    tx: async_channel::Sender<ActorMessage>,
749    temp: Arc<RwLock<TempCounterMap>>,
750    handle: Option<std::thread::JoinHandle<()>>,
751    path_options: Arc<PathOptions>,
752}
753
754impl TagDrop for RwLock<TempCounterMap> {
755    fn on_drop(&self, content: &HashAndFormat) {
756        self.write().unwrap().dec(content);
757    }
758}
759
760impl TagCounter for RwLock<TempCounterMap> {
761    fn on_create(&self, content: &HashAndFormat) {
762        self.write().unwrap().inc(content);
763    }
764}
765
766impl StoreInner {
767    fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
768        tracing::trace!(
769            "creating data directory: {}",
770            options.path.data_path.display()
771        );
772        std::fs::create_dir_all(&options.path.data_path)?;
773        tracing::trace!(
774            "creating temp directory: {}",
775            options.path.temp_path.display()
776        );
777        std::fs::create_dir_all(&options.path.temp_path)?;
778        tracing::trace!(
779            "creating parent directory for db file{}",
780            path.parent().unwrap().display()
781        );
782        std::fs::create_dir_all(path.parent().unwrap())?;
783        let temp: Arc<RwLock<TempCounterMap>> = Default::default();
784        let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
785        let handle = std::thread::Builder::new()
786            .name("redb-actor".to_string())
787            .spawn(move || {
788                rt.block_on(async move {
789                    if let Err(cause) = actor.run_batched().await {
790                        tracing::error!("redb actor failed: {}", cause);
791                    }
792                });
793            })
794            .expect("failed to spawn thread");
795        Ok(Self {
796            tx,
797            temp,
798            handle: Some(handle),
799            path_options: Arc::new(options.path),
800        })
801    }
802
803    pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
804        let (tx, rx) = oneshot::channel();
805        self.tx.send(ActorMessage::Get { hash, tx }).await?;
806        Ok(rx.await??)
807    }
808
809    async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
810        let (tx, rx) = oneshot::channel();
811        self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
812        Ok(rx.await??)
813    }
814
815    async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
816        let (tx, rx) = oneshot::channel();
817        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
818            let v = v.value();
819            if let EntryState::Complete { .. } = &v {
820                Some((k.value(), v))
821            } else {
822                None
823            }
824        });
825        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
826        let blobs = rx.await?;
827        let res = blobs?
828            .into_iter()
829            .map(|r| {
830                r.map(|(hash, _)| hash)
831                    .map_err(|e| ActorError::from(e).into())
832            })
833            .collect::<Vec<_>>();
834        Ok(res)
835    }
836
837    async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
838        let (tx, rx) = oneshot::channel();
839        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
840            let v = v.value();
841            if let EntryState::Partial { .. } = &v {
842                Some((k.value(), v))
843            } else {
844                None
845            }
846        });
847        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
848        let blobs = rx.await?;
849        let res = blobs?
850            .into_iter()
851            .map(|r| {
852                r.map(|(hash, _)| hash)
853                    .map_err(|e| ActorError::from(e).into())
854            })
855            .collect::<Vec<_>>();
856        Ok(res)
857    }
858
859    async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
860        let (tx, rx) = oneshot::channel();
861        let filter: FilterPredicate<Tag, HashAndFormat> =
862            Box::new(|_i, k, v| Some((k.value(), v.value())));
863        self.tx.send(ActorMessage::Tags { filter, tx }).await?;
864        let tags = rx.await?;
865        // transform the internal error type into io::Error
866        let tags = tags?
867            .into_iter()
868            .map(|r| r.map_err(|e| ActorError::from(e).into()))
869            .collect();
870        Ok(tags)
871    }
872
873    async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
874        let (tx, rx) = oneshot::channel();
875        self.tx
876            .send(ActorMessage::SetTag { tag, value, tx })
877            .await?;
878        Ok(rx.await??)
879    }
880
881    async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
882        let (tx, rx) = oneshot::channel();
883        self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
884        Ok(rx.await??)
885    }
886
887    async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
888        let (tx, rx) = oneshot::channel();
889        self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
890        Ok(rx.await??)
891    }
892
893    async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
894        let (tx, rx) = oneshot::channel();
895        self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
896        Ok(rx.await??)
897    }
898
899    async fn gc_start(&self) -> OuterResult<()> {
900        let (tx, rx) = oneshot::channel();
901        self.tx.send(ActorMessage::GcStart { tx }).await?;
902        Ok(rx.await?)
903    }
904
905    async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
906        let (tx, rx) = oneshot::channel();
907        self.tx
908            .send(ActorMessage::EntryStatus { hash: *hash, tx })
909            .await?;
910        Ok(rx.await??)
911    }
912
913    fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
914        let (tx, rx) = oneshot::channel();
915        self.tx
916            .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
917        Ok(rx.recv()??)
918    }
919
920    async fn complete(&self, entry: Entry) -> OuterResult<()> {
921        self.tx
922            .send(ActorMessage::OnComplete { handle: entry })
923            .await?;
924        Ok(())
925    }
926
927    async fn export(
928        &self,
929        hash: Hash,
930        target: PathBuf,
931        mode: ExportMode,
932        progress: ExportProgressCb,
933    ) -> OuterResult<()> {
934        tracing::debug!(
935            "exporting {} to {} using mode {:?}",
936            hash.to_hex(),
937            target.display(),
938            mode
939        );
940        if !target.is_absolute() {
941            return Err(io::Error::new(
942                io::ErrorKind::InvalidInput,
943                "target path must be absolute",
944            )
945            .into());
946        }
947        let parent = target.parent().ok_or_else(|| {
948            OuterError::from(io::Error::new(
949                io::ErrorKind::InvalidInput,
950                "target path has no parent directory",
951            ))
952        })?;
953        std::fs::create_dir_all(parent)?;
954        let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
955        let (tx, rx) = oneshot::channel();
956        self.tx
957            .send(ActorMessage::Export {
958                cmd: Export {
959                    temp_tag,
960                    target,
961                    mode,
962                    progress,
963                },
964                tx,
965            })
966            .await?;
967        Ok(rx.await??)
968    }
969
970    async fn consistency_check(
971        &self,
972        repair: bool,
973        progress: BoxedProgressSender<ConsistencyCheckProgress>,
974    ) -> OuterResult<()> {
975        let (tx, rx) = oneshot::channel();
976        self.tx
977            .send(ActorMessage::Fsck {
978                repair,
979                progress,
980                tx,
981            })
982            .await?;
983        Ok(rx.await??)
984    }
985
986    async fn update_inline_options(
987        &self,
988        inline_options: InlineOptions,
989        reapply: bool,
990    ) -> OuterResult<()> {
991        let (tx, rx) = oneshot::channel();
992        self.tx
993            .send(ActorMessage::UpdateInlineOptions {
994                inline_options,
995                reapply,
996                tx,
997            })
998            .await?;
999        Ok(rx.await?)
1000    }
1001
1002    async fn dump(&self) -> OuterResult<()> {
1003        self.tx.send(ActorMessage::Dump).await?;
1004        Ok(())
1005    }
1006
1007    async fn sync(&self) -> OuterResult<()> {
1008        let (tx, rx) = oneshot::channel();
1009        self.tx.send(ActorMessage::Sync { tx }).await?;
1010        Ok(rx.await?)
1011    }
1012
1013    fn import_file_sync(
1014        &self,
1015        path: PathBuf,
1016        mode: ImportMode,
1017        format: BlobFormat,
1018        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1019    ) -> OuterResult<(TempTag, u64)> {
1020        if !path.is_absolute() {
1021            return Err(
1022                io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1023            );
1024        }
1025        if !path.is_file() && !path.is_symlink() {
1026            return Err(io::Error::new(
1027                io::ErrorKind::InvalidInput,
1028                "path is not a file or symlink",
1029            )
1030            .into());
1031        }
1032        let id = progress.new_id();
1033        progress.blocking_send(ImportProgress::Found {
1034            id,
1035            name: path.to_string_lossy().to_string(),
1036        })?;
1037        let file = match mode {
1038            ImportMode::TryReference => ImportSource::External(path),
1039            ImportMode::Copy => {
1040                if std::fs::metadata(&path)?.len() < 16 * 1024 {
1041                    // we don't know if the data will be inlined since we don't
1042                    // have the inline options here. But still for such a small file
1043                    // it does not seem worth it do to the temp file ceremony.
1044                    let data = std::fs::read(&path)?;
1045                    ImportSource::Memory(data.into())
1046                } else {
1047                    let temp_path = self.temp_file_name();
1048                    // copy the data, since it is not stable
1049                    progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1050                    if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1051                        tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1052                    } else {
1053                        tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1054                    }
1055                    // copy progress for size will be called in finalize_import_sync
1056                    ImportSource::TempFile(temp_path)
1057                }
1058            }
1059        };
1060        let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1061        Ok((tag, size))
1062    }
1063
1064    fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1065        let id = 0;
1066        let file = ImportSource::Memory(data);
1067        let progress = IgnoreProgressSender::default();
1068        let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1069        Ok(tag)
1070    }
1071
1072    fn finalize_import_sync(
1073        &self,
1074        file: ImportSource,
1075        format: BlobFormat,
1076        id: u64,
1077        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1078    ) -> OuterResult<(TempTag, u64)> {
1079        let data_size = file.len()?;
1080        tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1081        progress.blocking_send(ImportProgress::Size {
1082            id,
1083            size: data_size,
1084        })?;
1085        let progress2 = progress.clone();
1086        let (hash, outboard) = match file.content() {
1087            MemOrFile::File(path) => {
1088                let span = trace_span!("outboard.compute", path = %path.display());
1089                let _guard = span.enter();
1090                let file = std::fs::File::open(path)?;
1091                compute_outboard(file, data_size, move |offset| {
1092                    Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1093                })?
1094            }
1095            MemOrFile::Mem(bytes) => {
1096                // todo: progress? usually this is will be small enough that progress might not be needed.
1097                compute_outboard(bytes, data_size, |_| Ok(()))?
1098            }
1099        };
1100        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1101        // from here on, everything related to the hash is protected by the temp tag
1102        let tag = self.temp.temp_tag(HashAndFormat { hash, format });
1103        let hash = *tag.hash();
1104        // blocking send for the import
1105        let (tx, rx) = oneshot::channel();
1106        self.tx.send_blocking(ActorMessage::Import {
1107            cmd: Import {
1108                content_id: HashAndFormat { hash, format },
1109                source: file,
1110                outboard,
1111                data_size,
1112            },
1113            tx,
1114        })?;
1115        Ok(rx.recv()??)
1116    }
1117
1118    fn temp_file_name(&self) -> PathBuf {
1119        self.path_options.temp_file_name()
1120    }
1121
1122    async fn shutdown(&self) {
1123        let (tx, rx) = oneshot::channel();
1124        self.tx
1125            .send(ActorMessage::Shutdown { tx: Some(tx) })
1126            .await
1127            .ok();
1128        rx.await.ok();
1129    }
1130}
1131
1132impl Drop for StoreInner {
1133    fn drop(&mut self) {
1134        if let Some(handle) = self.handle.take() {
1135            self.tx
1136                .send_blocking(ActorMessage::Shutdown { tx: None })
1137                .ok();
1138            handle.join().ok();
1139        }
1140    }
1141}
1142
1143struct ActorState {
1144    handles: BTreeMap<Hash, BaoFileHandleWeak>,
1145    protected: BTreeSet<Hash>,
1146    temp: Arc<RwLock<TempCounterMap>>,
1147    msgs_rx: async_channel::Receiver<ActorMessage>,
1148    create_options: Arc<BaoFileConfig>,
1149    options: Options,
1150    rt: tokio::runtime::Handle,
1151}
1152
1153/// The actor for the redb store.
1154///
1155/// It is split into the database and the rest of the state to allow for split
1156/// borrows in the message handlers.
1157struct Actor {
1158    db: redb::Database,
1159    state: ActorState,
1160}
1161
1162/// Error type for message handler functions of the redb actor.
1163///
1164/// What can go wrong are various things with redb, as well as io errors related
1165/// to files other than redb.
1166#[derive(Debug, thiserror::Error)]
1167pub(crate) enum ActorError {
1168    #[error("table error: {0}")]
1169    Table(#[from] redb::TableError),
1170    #[error("database error: {0}")]
1171    Database(#[from] redb::DatabaseError),
1172    #[error("transaction error: {0}")]
1173    Transaction(#[from] redb::TransactionError),
1174    #[error("commit error: {0}")]
1175    Commit(#[from] redb::CommitError),
1176    #[error("storage error: {0}")]
1177    Storage(#[from] redb::StorageError),
1178    #[error("io error: {0}")]
1179    Io(#[from] io::Error),
1180    #[error("inconsistent database state: {0}")]
1181    Inconsistent(String),
1182    #[error("error during database migration: {0}")]
1183    Migration(#[source] anyhow::Error),
1184}
1185
1186impl From<ActorError> for io::Error {
1187    fn from(e: ActorError) -> Self {
1188        match e {
1189            ActorError::Io(e) => e,
1190            e => io::Error::new(io::ErrorKind::Other, e),
1191        }
1192    }
1193}
1194
1195/// Result type for handler functions of the redb actor.
1196///
1197/// See [`ActorError`] for what can go wrong.
1198pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1199
1200/// Error type for calling the redb actor from the store.
1201///
1202/// What can go wrong is all the things in [`ActorError`] and in addition
1203/// sending and receiving messages.
1204#[derive(Debug, thiserror::Error)]
1205pub(crate) enum OuterError {
1206    #[error("inner error: {0}")]
1207    Inner(#[from] ActorError),
1208    #[error("send error")]
1209    Send,
1210    #[error("progress send error: {0}")]
1211    ProgressSend(#[from] ProgressSendError),
1212    #[error("recv error: {0}")]
1213    Recv(#[from] oneshot::RecvError),
1214    #[error("recv error: {0}")]
1215    AsyncChannelRecv(#[from] async_channel::RecvError),
1216    #[error("join error: {0}")]
1217    JoinTask(#[from] tokio::task::JoinError),
1218}
1219
1220impl From<async_channel::SendError<ActorMessage>> for OuterError {
1221    fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
1222        OuterError::Send
1223    }
1224}
1225
1226/// Result type for calling the redb actor from the store.
1227///
1228/// See [`OuterError`] for what can go wrong.
1229pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1230
1231impl From<io::Error> for OuterError {
1232    fn from(e: io::Error) -> Self {
1233        OuterError::Inner(ActorError::Io(e))
1234    }
1235}
1236
1237impl From<OuterError> for io::Error {
1238    fn from(e: OuterError) -> Self {
1239        match e {
1240            OuterError::Inner(ActorError::Io(e)) => e,
1241            e => io::Error::new(io::ErrorKind::Other, e),
1242        }
1243    }
1244}
1245
1246impl super::Map for Store {
1247    type Entry = Entry;
1248
1249    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1250        Ok(self.0.get(*hash).await?)
1251    }
1252}
1253
1254impl super::MapMut for Store {
1255    type EntryMut = Entry;
1256
1257    async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1258        Ok(self.0.get_or_create(hash).await?)
1259    }
1260
1261    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1262        Ok(self.0.entry_status(hash).await?)
1263    }
1264
1265    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1266        self.get(hash).await
1267    }
1268
1269    async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1270        Ok(self.0.complete(entry).await?)
1271    }
1272
1273    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1274        Ok(self.0.entry_status_sync(hash)?)
1275    }
1276}
1277
1278impl super::ReadableStore for Store {
1279    async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1280        Ok(Box::new(self.0.blobs().await?.into_iter()))
1281    }
1282
1283    async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1284        Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1285    }
1286
1287    async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1288        Ok(Box::new(self.0.tags().await?.into_iter()))
1289    }
1290
1291    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1292        Box::new(self.0.temp.read().unwrap().keys())
1293    }
1294
1295    async fn consistency_check(
1296        &self,
1297        repair: bool,
1298        tx: BoxedProgressSender<ConsistencyCheckProgress>,
1299    ) -> io::Result<()> {
1300        self.0.consistency_check(repair, tx.clone()).await?;
1301        Ok(())
1302    }
1303
1304    async fn export(
1305        &self,
1306        hash: Hash,
1307        target: PathBuf,
1308        mode: ExportMode,
1309        progress: ExportProgressCb,
1310    ) -> io::Result<()> {
1311        Ok(self.0.export(hash, target, mode, progress).await?)
1312    }
1313}
1314
1315impl super::Store for Store {
1316    async fn import_file(
1317        &self,
1318        path: PathBuf,
1319        mode: ImportMode,
1320        format: BlobFormat,
1321        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1322    ) -> io::Result<(crate::TempTag, u64)> {
1323        let this = self.0.clone();
1324        Ok(
1325            tokio::task::spawn_blocking(move || {
1326                this.import_file_sync(path, mode, format, progress)
1327            })
1328            .await??,
1329        )
1330    }
1331
1332    async fn import_bytes(
1333        &self,
1334        data: bytes::Bytes,
1335        format: crate::BlobFormat,
1336    ) -> io::Result<crate::TempTag> {
1337        let this = self.0.clone();
1338        Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1339    }
1340
1341    async fn import_stream(
1342        &self,
1343        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1344        format: BlobFormat,
1345        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1346    ) -> io::Result<(TempTag, u64)> {
1347        let this = self.clone();
1348        let id = progress.new_id();
1349        // write to a temp file
1350        let temp_data_path = this.0.temp_file_name();
1351        let name = temp_data_path
1352            .file_name()
1353            .expect("just created")
1354            .to_string_lossy()
1355            .to_string();
1356        progress.send(ImportProgress::Found { id, name }).await?;
1357        let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1358        let mut offset = 0;
1359        while let Some(chunk) = data.next().await {
1360            let chunk = chunk?;
1361            writer.write_all(&chunk).await?;
1362            offset += chunk.len() as u64;
1363            progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1364        }
1365        writer.flush().await?;
1366        drop(writer);
1367        let file = ImportSource::TempFile(temp_data_path);
1368        Ok(tokio::task::spawn_blocking(move || {
1369            this.0.finalize_import_sync(file, format, id, progress)
1370        })
1371        .await??)
1372    }
1373
1374    async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
1375        Ok(self.0.set_tag(name, hash).await?)
1376    }
1377
1378    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1379        Ok(self.0.create_tag(hash).await?)
1380    }
1381
1382    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1383        Ok(self.0.delete(hashes).await?)
1384    }
1385
1386    async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
1387    where
1388        G: Fn() -> Gut,
1389        Gut: Future<Output = BTreeSet<Hash>> + Send,
1390    {
1391        tracing::info!("Starting GC task with interval {:?}", config.period);
1392        let mut live = BTreeSet::new();
1393        'outer: loop {
1394            if let Err(cause) = self.0.gc_start().await {
1395                tracing::debug!(
1396                    "unable to notify the db of GC start: {cause}. Shutting down GC loop."
1397                );
1398                break;
1399            }
1400            // do delay before the two phases of GC
1401            tokio::time::sleep(config.period).await;
1402            tracing::debug!("Starting GC");
1403            live.clear();
1404
1405            let p = protected_cb().await;
1406            live.extend(p);
1407
1408            tracing::debug!("Starting GC mark phase");
1409            let live_ref = &mut live;
1410            let mut stream = Gen::new(|co| async move {
1411                if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
1412                    co.yield_(GcMarkEvent::Error(e)).await;
1413                }
1414            });
1415            while let Some(item) = stream.next().await {
1416                match item {
1417                    GcMarkEvent::CustomDebug(text) => {
1418                        tracing::debug!("{}", text);
1419                    }
1420                    GcMarkEvent::CustomWarning(text, _) => {
1421                        tracing::warn!("{}", text);
1422                    }
1423                    GcMarkEvent::Error(err) => {
1424                        tracing::error!("Fatal error during GC mark {}", err);
1425                        continue 'outer;
1426                    }
1427                }
1428            }
1429            drop(stream);
1430
1431            tracing::debug!("Starting GC sweep phase");
1432            let live_ref = &live;
1433            let mut stream = Gen::new(|co| async move {
1434                if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
1435                    co.yield_(GcSweepEvent::Error(e)).await;
1436                }
1437            });
1438            while let Some(item) = stream.next().await {
1439                match item {
1440                    GcSweepEvent::CustomDebug(text) => {
1441                        tracing::debug!("{}", text);
1442                    }
1443                    GcSweepEvent::CustomWarning(text, _) => {
1444                        tracing::warn!("{}", text);
1445                    }
1446                    GcSweepEvent::Error(err) => {
1447                        tracing::error!("Fatal error during GC mark {}", err);
1448                        continue 'outer;
1449                    }
1450                }
1451            }
1452            if let Some(ref cb) = config.done_callback {
1453                cb();
1454            }
1455        }
1456    }
1457
1458    fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1459        self.0.temp.temp_tag(value)
1460    }
1461
1462    async fn sync(&self) -> io::Result<()> {
1463        Ok(self.0.sync().await?)
1464    }
1465
1466    async fn shutdown(&self) {
1467        self.0.shutdown().await;
1468    }
1469}
1470
1471pub(super) async fn gc_sweep_task(
1472    store: &Store,
1473    live: &BTreeSet<Hash>,
1474    co: &Co<GcSweepEvent>,
1475) -> anyhow::Result<()> {
1476    let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
1477    let mut count = 0;
1478    let mut batch = Vec::new();
1479    for hash in blobs {
1480        let hash = hash?;
1481        if !live.contains(&hash) {
1482            batch.push(hash);
1483            count += 1;
1484        }
1485        if batch.len() >= 100 {
1486            store.0.gc_delete(batch.clone()).await?;
1487            batch.clear();
1488        }
1489    }
1490    if !batch.is_empty() {
1491        store.0.gc_delete(batch).await?;
1492    }
1493    co.yield_(GcSweepEvent::CustomDebug(format!(
1494        "deleted {} blobs",
1495        count
1496    )))
1497    .await;
1498    Ok(())
1499}
1500
1501impl Actor {
1502    fn new(
1503        path: &Path,
1504        options: Options,
1505        temp: Arc<RwLock<TempCounterMap>>,
1506        rt: tokio::runtime::Handle,
1507    ) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
1508        let db = match redb::Database::create(path) {
1509            Ok(db) => db,
1510            Err(DatabaseError::UpgradeRequired(1)) => {
1511                return Err(ActorError::Migration(anyhow::anyhow!(
1512                    "migration from v1 no longer supported"
1513                )))
1514            }
1515            Err(err) => return Err(err.into()),
1516        };
1517
1518        let txn = db.begin_write()?;
1519        // create tables and drop them just to create them.
1520        let mut t = Default::default();
1521        let tables = Tables::new(&txn, &mut t)?;
1522        drop(tables);
1523        txn.commit()?;
1524        // make the channel relatively large. there are some messages that don't
1525        // require a response, it's fine if they pile up a bit.
1526        let (tx, rx) = async_channel::bounded(1024);
1527        let tx2 = tx.clone();
1528        let on_file_create: CreateCb = Arc::new(move |hash| {
1529            // todo: make the callback allow async
1530            tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
1531                .ok();
1532            Ok(())
1533        });
1534        let create_options = BaoFileConfig::new(
1535            Arc::new(options.path.data_path.clone()),
1536            16 * 1024,
1537            Some(on_file_create),
1538        );
1539        Ok((
1540            Self {
1541                db,
1542                state: ActorState {
1543                    temp,
1544                    handles: BTreeMap::new(),
1545                    protected: BTreeSet::new(),
1546                    msgs_rx: rx,
1547                    options,
1548                    create_options: Arc::new(create_options),
1549                    rt,
1550                },
1551            },
1552            tx,
1553        ))
1554    }
1555
1556    async fn run_batched(mut self) -> ActorResult<()> {
1557        let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
1558        while let Some(msg) = msgs.recv().await {
1559            if let ActorMessage::Shutdown { tx } = msg {
1560                // Make sure the database is dropped before we send the reply.
1561                drop(self);
1562                if let Some(tx) = tx {
1563                    tx.send(()).ok();
1564                }
1565                break;
1566            }
1567            match msg.category() {
1568                MessageCategory::TopLevel => {
1569                    self.state.handle_toplevel(&self.db, msg)?;
1570                }
1571                MessageCategory::ReadOnly => {
1572                    msgs.push_back(msg).expect("just recv'd");
1573                    tracing::debug!("starting read transaction");
1574                    let txn = self.db.begin_read()?;
1575                    let tables = ReadOnlyTables::new(&txn)?;
1576                    let count = self.state.options.batch.max_read_batch;
1577                    let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
1578                    tokio::pin!(timeout);
1579                    for _ in 0..count {
1580                        tokio::select! {
1581                            msg = msgs.recv() => {
1582                                if let Some(msg) = msg {
1583                                    if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1584                                        msgs.push_back(msg).expect("just recv'd");
1585                                        break;
1586                                    }
1587                                } else {
1588                                    break;
1589                                }
1590                            }
1591                            _ = &mut timeout => {
1592                                tracing::debug!("read transaction timed out");
1593                                break;
1594                            }
1595                        }
1596                    }
1597                    tracing::debug!("done with read transaction");
1598                }
1599                MessageCategory::ReadWrite => {
1600                    msgs.push_back(msg).expect("just recv'd");
1601                    tracing::debug!("starting write transaction");
1602                    let txn = self.db.begin_write()?;
1603                    let mut delete_after_commit = Default::default();
1604                    let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1605                    let count = self.state.options.batch.max_write_batch;
1606                    let timeout = tokio::time::sleep(self.state.options.batch.max_write_duration);
1607                    tokio::pin!(timeout);
1608                    for _ in 0..count {
1609                        tokio::select! {
1610                            msg = msgs.recv() => {
1611                                if let Some(msg) = msg {
1612                                    if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1613                                        msgs.push_back(msg).expect("just recv'd");
1614                                        break;
1615                                    }
1616                                } else {
1617                                    break;
1618                                }
1619                            }
1620                            _ = &mut timeout => {
1621                                tracing::debug!("write transaction timed out");
1622                                break;
1623                            }
1624                        }
1625                    }
1626                    drop(tables);
1627                    txn.commit()?;
1628                    delete_after_commit.apply_and_clear(&self.state.options.path);
1629                    tracing::debug!("write transaction committed");
1630                }
1631            }
1632        }
1633        tracing::debug!("redb actor done");
1634        Ok(())
1635    }
1636}
1637
1638impl ActorState {
1639    fn entry_status(
1640        &mut self,
1641        tables: &impl ReadableTables,
1642        hash: Hash,
1643    ) -> ActorResult<EntryStatus> {
1644        let status = match tables.blobs().get(hash)? {
1645            Some(guard) => match guard.value() {
1646                EntryState::Complete { .. } => EntryStatus::Complete,
1647                EntryState::Partial { .. } => EntryStatus::Partial,
1648            },
1649            None => EntryStatus::NotFound,
1650        };
1651        Ok(status)
1652    }
1653
1654    fn get(
1655        &mut self,
1656        tables: &impl ReadableTables,
1657        hash: Hash,
1658    ) -> ActorResult<Option<BaoFileHandle>> {
1659        if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1660            return Ok(Some(handle));
1661        }
1662        let Some(entry) = tables.blobs().get(hash)? else {
1663            return Ok(None);
1664        };
1665        // todo: if complete, load inline data and/or outboard into memory if needed,
1666        // and return a complete entry.
1667        let entry = entry.value();
1668        let config = self.create_options.clone();
1669        let handle = match entry {
1670            EntryState::Complete {
1671                data_location,
1672                outboard_location,
1673            } => {
1674                let data = load_data(tables, &self.options.path, data_location, &hash)?;
1675                let outboard = load_outboard(
1676                    tables,
1677                    &self.options.path,
1678                    outboard_location,
1679                    data.size(),
1680                    &hash,
1681                )?;
1682                BaoFileHandle::new_complete(config, hash, data, outboard)
1683            }
1684            EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1685        };
1686        self.handles.insert(hash, handle.downgrade());
1687        Ok(Some(handle))
1688    }
1689
1690    fn export(
1691        &mut self,
1692        tables: &mut Tables,
1693        cmd: Export,
1694        tx: oneshot::Sender<ActorResult<()>>,
1695    ) -> ActorResult<()> {
1696        let Export {
1697            temp_tag,
1698            target,
1699            mode,
1700            progress,
1701        } = cmd;
1702        let guard = tables
1703            .blobs
1704            .get(temp_tag.hash())?
1705            .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1706        let entry = guard.value();
1707        match entry {
1708            EntryState::Complete {
1709                data_location,
1710                outboard_location,
1711            } => match data_location {
1712                DataLocation::Inline(()) => {
1713                    // ignore export mode, just copy. For inline data we can not reference anyway.
1714                    let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1715                        ActorError::Inconsistent("inline data not found".to_owned())
1716                    })?;
1717                    tracing::trace!("exporting inline data to {}", target.display());
1718                    tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1719                        .ok();
1720                }
1721                DataLocation::Owned(size) => {
1722                    let path = self.options.path.owned_data_path(temp_tag.hash());
1723                    match mode {
1724                        ExportMode::Copy => {
1725                            // copy in an external thread
1726                            self.rt.spawn_blocking(move || {
1727                                tx.send(export_file_copy(temp_tag, path, size, target, progress))
1728                                    .ok();
1729                            });
1730                        }
1731                        ExportMode::TryReference => match std::fs::rename(&path, &target) {
1732                            Ok(()) => {
1733                                let entry = EntryState::Complete {
1734                                    data_location: DataLocation::External(vec![target], size),
1735                                    outboard_location,
1736                                };
1737                                drop(guard);
1738                                tables.blobs.insert(temp_tag.hash(), entry)?;
1739                                drop(temp_tag);
1740                                tx.send(Ok(())).ok();
1741                            }
1742                            Err(e) => {
1743                                const ERR_CROSS: i32 = 18;
1744                                if e.raw_os_error() == Some(ERR_CROSS) {
1745                                    // Cross device renaming failed, copy instead
1746                                    match std::fs::copy(&path, &target) {
1747                                        Ok(_) => {
1748                                            let entry = EntryState::Complete {
1749                                                data_location: DataLocation::External(
1750                                                    vec![target],
1751                                                    size,
1752                                                ),
1753                                                outboard_location,
1754                                            };
1755
1756                                            drop(guard);
1757                                            tables.blobs.insert(temp_tag.hash(), entry)?;
1758                                            tables
1759                                                .delete_after_commit
1760                                                .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1761                                            drop(temp_tag);
1762
1763                                            tx.send(Ok(())).ok();
1764                                        }
1765                                        Err(e) => {
1766                                            drop(temp_tag);
1767                                            tx.send(Err(e.into())).ok();
1768                                        }
1769                                    }
1770                                } else {
1771                                    drop(temp_tag);
1772                                    tx.send(Err(e.into())).ok();
1773                                }
1774                            }
1775                        },
1776                    }
1777                }
1778                DataLocation::External(paths, size) => {
1779                    let path = paths
1780                        .first()
1781                        .ok_or_else(|| {
1782                            ActorError::Inconsistent("external path missing".to_owned())
1783                        })?
1784                        .to_owned();
1785                    // we can not reference external files, so we just copy them. But this does not have to happen in the actor.
1786                    if path == target {
1787                        // export to the same path, nothing to do
1788                        tx.send(Ok(())).ok();
1789                    } else {
1790                        // copy in an external thread
1791                        self.rt.spawn_blocking(move || {
1792                            tx.send(export_file_copy(temp_tag, path, size, target, progress))
1793                                .ok();
1794                        });
1795                    }
1796                }
1797            },
1798            EntryState::Partial { .. } => {
1799                return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1800            }
1801        }
1802        Ok(())
1803    }
1804
1805    fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1806        let Import {
1807            content_id,
1808            source: file,
1809            outboard,
1810            data_size,
1811        } = cmd;
1812        let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1813        let inline_data = data_size <= self.options.inline.max_data_inlined;
1814        let inline_outboard =
1815            outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1816        // from here on, everything related to the hash is protected by the temp tag
1817        let tag = self.temp.temp_tag(content_id);
1818        let hash = *tag.hash();
1819        self.protected.insert(hash);
1820        // move the data file into place, or create a reference to it
1821        let data_location = match file {
1822            ImportSource::External(external_path) => {
1823                tracing::debug!("stored external reference {}", external_path.display());
1824                if inline_data {
1825                    tracing::debug!(
1826                        "reading external data to inline it: {}",
1827                        external_path.display()
1828                    );
1829                    let data = Bytes::from(std::fs::read(&external_path)?);
1830                    DataLocation::Inline(data)
1831                } else {
1832                    DataLocation::External(vec![external_path], data_size)
1833                }
1834            }
1835            ImportSource::TempFile(temp_data_path) => {
1836                if inline_data {
1837                    tracing::debug!(
1838                        "reading and deleting temp file to inline it: {}",
1839                        temp_data_path.display()
1840                    );
1841                    let data = Bytes::from(read_and_remove(&temp_data_path)?);
1842                    DataLocation::Inline(data)
1843                } else {
1844                    let data_path = self.options.path.owned_data_path(&hash);
1845                    std::fs::rename(&temp_data_path, &data_path)?;
1846                    tracing::debug!("created file {}", data_path.display());
1847                    DataLocation::Owned(data_size)
1848                }
1849            }
1850            ImportSource::Memory(data) => {
1851                if inline_data {
1852                    DataLocation::Inline(data)
1853                } else {
1854                    let data_path = self.options.path.owned_data_path(&hash);
1855                    overwrite_and_sync(&data_path, &data)?;
1856                    tracing::debug!("created file {}", data_path.display());
1857                    DataLocation::Owned(data_size)
1858                }
1859            }
1860        };
1861        let outboard_location = if let Some(outboard) = outboard {
1862            if inline_outboard {
1863                OutboardLocation::Inline(Bytes::from(outboard))
1864            } else {
1865                let outboard_path = self.options.path.owned_outboard_path(&hash);
1866                // todo: this blocks the actor when writing a large outboard
1867                overwrite_and_sync(&outboard_path, &outboard)?;
1868                OutboardLocation::Owned
1869            }
1870        } else {
1871            OutboardLocation::NotNeeded
1872        };
1873        if let DataLocation::Inline(data) = &data_location {
1874            tables.inline_data.insert(hash, data.as_ref())?;
1875        }
1876        if let OutboardLocation::Inline(outboard) = &outboard_location {
1877            tables.inline_outboard.insert(hash, outboard.as_ref())?;
1878        }
1879        if let DataLocation::Owned(_) = &data_location {
1880            tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1881        }
1882        if let OutboardLocation::Owned = &outboard_location {
1883            tables
1884                .delete_after_commit
1885                .remove(hash, [BaoFilePart::Outboard]);
1886        }
1887        let entry = tables.blobs.get(hash)?;
1888        let entry = entry.map(|x| x.value()).unwrap_or_default();
1889        let data_location = data_location.discard_inline_data();
1890        let outboard_location = outboard_location.discard_extra_data();
1891        let entry = entry.union(EntryState::Complete {
1892            data_location,
1893            outboard_location,
1894        })?;
1895        tables.blobs.insert(hash, entry)?;
1896        Ok((tag, data_size))
1897    }
1898
1899    fn get_or_create(
1900        &mut self,
1901        tables: &impl ReadableTables,
1902        hash: Hash,
1903    ) -> ActorResult<BaoFileHandle> {
1904        self.protected.insert(hash);
1905        if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1906            return Ok(handle);
1907        }
1908        let entry = tables.blobs().get(hash)?;
1909        let handle = if let Some(entry) = entry {
1910            let entry = entry.value();
1911            match entry {
1912                EntryState::Complete {
1913                    data_location,
1914                    outboard_location,
1915                    ..
1916                } => {
1917                    let data = load_data(tables, &self.options.path, data_location, &hash)?;
1918                    let outboard = load_outboard(
1919                        tables,
1920                        &self.options.path,
1921                        outboard_location,
1922                        data.size(),
1923                        &hash,
1924                    )?;
1925                    tracing::debug!("creating complete entry for {}", hash.to_hex());
1926                    BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1927                }
1928                EntryState::Partial { .. } => {
1929                    tracing::debug!("creating partial entry for {}", hash.to_hex());
1930                    BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1931                }
1932            }
1933        } else {
1934            BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1935        };
1936        self.handles.insert(hash, handle.downgrade());
1937        Ok(handle)
1938    }
1939
1940    /// Read the entire blobs table. Callers can then sift through the results to find what they need
1941    fn blobs(
1942        &mut self,
1943        tables: &impl ReadableTables,
1944        filter: FilterPredicate<Hash, EntryState>,
1945    ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1946        let mut res = Vec::new();
1947        let mut index = 0u64;
1948        #[allow(clippy::explicit_counter_loop)]
1949        for item in tables.blobs().iter()? {
1950            match item {
1951                Ok((k, v)) => {
1952                    if let Some(item) = filter(index, k, v) {
1953                        res.push(Ok(item));
1954                    }
1955                }
1956                Err(e) => {
1957                    res.push(Err(e));
1958                }
1959            }
1960            index += 1;
1961        }
1962        Ok(res)
1963    }
1964
1965    /// Read the entire tags table. Callers can then sift through the results to find what they need
1966    fn tags(
1967        &mut self,
1968        tables: &impl ReadableTables,
1969        filter: FilterPredicate<Tag, HashAndFormat>,
1970    ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
1971        let mut res = Vec::new();
1972        let mut index = 0u64;
1973        #[allow(clippy::explicit_counter_loop)]
1974        for item in tables.tags().iter()? {
1975            match item {
1976                Ok((k, v)) => {
1977                    if let Some(item) = filter(index, k, v) {
1978                        res.push(Ok(item));
1979                    }
1980                }
1981                Err(e) => {
1982                    res.push(Err(e));
1983                }
1984            }
1985            index += 1;
1986        }
1987        Ok(res)
1988    }
1989
1990    fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
1991        let tag = {
1992            let tag = Tag::auto(SystemTime::now(), |x| {
1993                matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
1994            });
1995            tables.tags.insert(tag.clone(), content)?;
1996            tag
1997        };
1998        Ok(tag)
1999    }
2000
2001    fn set_tag(
2002        &self,
2003        tables: &mut Tables,
2004        tag: Tag,
2005        value: Option<HashAndFormat>,
2006    ) -> ActorResult<()> {
2007        match value {
2008            Some(value) => {
2009                tables.tags.insert(tag, value)?;
2010            }
2011            None => {
2012                tables.tags.remove(tag)?;
2013            }
2014        }
2015        Ok(())
2016    }
2017
2018    fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
2019        let entry = tables
2020            .blobs
2021            .get(hash)?
2022            .map(|x| x.value())
2023            .unwrap_or_default();
2024        let entry = entry.union(EntryState::Partial { size: None })?;
2025        tables.blobs.insert(hash, entry)?;
2026        // protect all three parts of the entry
2027        tables.delete_after_commit.remove(
2028            hash,
2029            [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
2030        );
2031        Ok(())
2032    }
2033
2034    fn update_inline_options(
2035        &mut self,
2036        db: &redb::Database,
2037        options: InlineOptions,
2038        reapply: bool,
2039    ) -> ActorResult<()> {
2040        self.options.inline = options;
2041        if reapply {
2042            let mut delete_after_commit = Default::default();
2043            let tx = db.begin_write()?;
2044            {
2045                let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
2046                let hashes = tables
2047                    .blobs
2048                    .iter()?
2049                    .map(|x| x.map(|(k, _)| k.value()))
2050                    .collect::<Result<Vec<_>, _>>()?;
2051                for hash in hashes {
2052                    let guard = tables
2053                        .blobs
2054                        .get(hash)?
2055                        .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
2056                    let entry = guard.value();
2057                    if let EntryState::Complete {
2058                        data_location,
2059                        outboard_location,
2060                    } = entry
2061                    {
2062                        let (data_location, data_size, data_location_changed) = match data_location
2063                        {
2064                            DataLocation::Owned(size) => {
2065                                // inline
2066                                if size <= self.options.inline.max_data_inlined {
2067                                    let path = self.options.path.owned_data_path(&hash);
2068                                    let data = std::fs::read(&path)?;
2069                                    tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2070                                    tables.inline_data.insert(hash, data.as_slice())?;
2071                                    (DataLocation::Inline(()), size, true)
2072                                } else {
2073                                    (DataLocation::Owned(size), size, false)
2074                                }
2075                            }
2076                            DataLocation::Inline(()) => {
2077                                let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
2078                                    ActorError::Inconsistent("inline data missing".to_owned())
2079                                })?;
2080                                let data = guard.value();
2081                                let size = data.len() as u64;
2082                                if size > self.options.inline.max_data_inlined {
2083                                    let path = self.options.path.owned_data_path(&hash);
2084                                    std::fs::write(&path, data)?;
2085                                    drop(guard);
2086                                    tables.inline_data.remove(hash)?;
2087                                    (DataLocation::Owned(size), size, true)
2088                                } else {
2089                                    (DataLocation::Inline(()), size, false)
2090                                }
2091                            }
2092                            DataLocation::External(paths, size) => {
2093                                (DataLocation::External(paths, size), size, false)
2094                            }
2095                        };
2096                        let outboard_size = raw_outboard_size(data_size);
2097                        let (outboard_location, outboard_location_changed) = match outboard_location
2098                        {
2099                            OutboardLocation::Owned
2100                                if outboard_size <= self.options.inline.max_outboard_inlined =>
2101                            {
2102                                let path = self.options.path.owned_outboard_path(&hash);
2103                                let outboard = std::fs::read(&path)?;
2104                                tables
2105                                    .delete_after_commit
2106                                    .insert(hash, [BaoFilePart::Outboard]);
2107                                tables.inline_outboard.insert(hash, outboard.as_slice())?;
2108                                (OutboardLocation::Inline(()), true)
2109                            }
2110                            OutboardLocation::Inline(())
2111                                if outboard_size > self.options.inline.max_outboard_inlined =>
2112                            {
2113                                let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2114                                    ActorError::Inconsistent("inline outboard missing".to_owned())
2115                                })?;
2116                                let outboard = guard.value();
2117                                let path = self.options.path.owned_outboard_path(&hash);
2118                                std::fs::write(&path, outboard)?;
2119                                drop(guard);
2120                                tables.inline_outboard.remove(hash)?;
2121                                (OutboardLocation::Owned, true)
2122                            }
2123                            x => (x, false),
2124                        };
2125                        drop(guard);
2126                        if data_location_changed || outboard_location_changed {
2127                            tables.blobs.insert(
2128                                hash,
2129                                EntryState::Complete {
2130                                    data_location,
2131                                    outboard_location,
2132                                },
2133                            )?;
2134                        }
2135                    }
2136                }
2137            }
2138            tx.commit()?;
2139            delete_after_commit.apply_and_clear(&self.options.path);
2140        }
2141        Ok(())
2142    }
2143
2144    fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
2145        for hash in hashes {
2146            if self.temp.as_ref().read().unwrap().contains(&hash) {
2147                continue;
2148            }
2149            if !force && self.protected.contains(&hash) {
2150                tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2151                continue;
2152            }
2153
2154            tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2155
2156            self.handles.remove(&hash);
2157            if let Some(entry) = tables.blobs.remove(hash)? {
2158                match entry.value() {
2159                    EntryState::Complete {
2160                        data_location,
2161                        outboard_location,
2162                    } => {
2163                        match data_location {
2164                            DataLocation::Inline(_) => {
2165                                tables.inline_data.remove(hash)?;
2166                            }
2167                            DataLocation::Owned(_) => {
2168                                // mark the data for deletion
2169                                tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2170                            }
2171                            DataLocation::External(_, _) => {}
2172                        }
2173                        match outboard_location {
2174                            OutboardLocation::Inline(_) => {
2175                                tables.inline_outboard.remove(hash)?;
2176                            }
2177                            OutboardLocation::Owned => {
2178                                // mark the outboard for deletion
2179                                tables
2180                                    .delete_after_commit
2181                                    .insert(hash, [BaoFilePart::Outboard]);
2182                            }
2183                            OutboardLocation::NotNeeded => {}
2184                        }
2185                    }
2186                    EntryState::Partial { .. } => {
2187                        // mark all parts for deletion
2188                        tables.delete_after_commit.insert(
2189                            hash,
2190                            [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2191                        );
2192                    }
2193                }
2194            }
2195        }
2196        Ok(())
2197    }
2198
2199    fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2200        let hash = entry.hash();
2201        let mut info = None;
2202        tracing::trace!("on_complete({})", hash.to_hex());
2203        entry.transform(|state| {
2204            tracing::trace!("on_complete transform {:?}", state);
2205            let entry = match complete_storage(
2206                state,
2207                &hash,
2208                &self.options.path,
2209                &self.options.inline,
2210                tables.delete_after_commit,
2211            )? {
2212                Ok(entry) => {
2213                    // store the info so we can insert it into the db later
2214                    info = Some((
2215                        entry.data_size(),
2216                        entry.data.mem().cloned(),
2217                        entry.outboard_size(),
2218                        entry.outboard.mem().cloned(),
2219                    ));
2220                    entry
2221                }
2222                Err(entry) => {
2223                    // the entry was already complete, nothing to do
2224                    entry
2225                }
2226            };
2227            Ok(BaoFileStorage::Complete(entry))
2228        })?;
2229        if let Some((data_size, data, outboard_size, outboard)) = info {
2230            let data_location = if data.is_some() {
2231                DataLocation::Inline(())
2232            } else {
2233                DataLocation::Owned(data_size)
2234            };
2235            let outboard_location = if outboard_size == 0 {
2236                OutboardLocation::NotNeeded
2237            } else if outboard.is_some() {
2238                OutboardLocation::Inline(())
2239            } else {
2240                OutboardLocation::Owned
2241            };
2242            {
2243                tracing::debug!(
2244                    "inserting complete entry for {}, {} bytes",
2245                    hash.to_hex(),
2246                    data_size,
2247                );
2248                let entry = tables
2249                    .blobs()
2250                    .get(hash)?
2251                    .map(|x| x.value())
2252                    .unwrap_or_default();
2253                let entry = entry.union(EntryState::Complete {
2254                    data_location,
2255                    outboard_location,
2256                })?;
2257                tables.blobs.insert(hash, entry)?;
2258                if let Some(data) = data {
2259                    tables.inline_data.insert(hash, data.as_ref())?;
2260                }
2261                if let Some(outboard) = outboard {
2262                    tables.inline_outboard.insert(hash, outboard.as_ref())?;
2263                }
2264            }
2265        }
2266        Ok(())
2267    }
2268
2269    fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2270        match msg {
2271            ActorMessage::UpdateInlineOptions {
2272                inline_options,
2273                reapply,
2274                tx,
2275            } => {
2276                let res = self.update_inline_options(db, inline_options, reapply);
2277                tx.send(res?).ok();
2278            }
2279            ActorMessage::Fsck {
2280                repair,
2281                progress,
2282                tx,
2283            } => {
2284                let res = self.consistency_check(db, repair, progress);
2285                tx.send(res).ok();
2286            }
2287            ActorMessage::Sync { tx } => {
2288                tx.send(()).ok();
2289            }
2290            x => {
2291                return Err(ActorError::Inconsistent(format!(
2292                    "unexpected message for handle_toplevel: {:?}",
2293                    x
2294                )))
2295            }
2296        }
2297        Ok(())
2298    }
2299
2300    fn handle_readonly(
2301        &mut self,
2302        tables: &impl ReadableTables,
2303        msg: ActorMessage,
2304    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2305        match msg {
2306            ActorMessage::Get { hash, tx } => {
2307                let res = self.get(tables, hash);
2308                tx.send(res).ok();
2309            }
2310            ActorMessage::GetOrCreate { hash, tx } => {
2311                let res = self.get_or_create(tables, hash);
2312                tx.send(res).ok();
2313            }
2314            ActorMessage::EntryStatus { hash, tx } => {
2315                let res = self.entry_status(tables, hash);
2316                tx.send(res).ok();
2317            }
2318            ActorMessage::Blobs { filter, tx } => {
2319                let res = self.blobs(tables, filter);
2320                tx.send(res).ok();
2321            }
2322            ActorMessage::Tags { filter, tx } => {
2323                let res = self.tags(tables, filter);
2324                tx.send(res).ok();
2325            }
2326            ActorMessage::GcStart { tx } => {
2327                self.protected.clear();
2328                self.handles.retain(|_, weak| weak.is_live());
2329                tx.send(()).ok();
2330            }
2331            ActorMessage::Dump => {
2332                dump(tables).ok();
2333            }
2334            #[cfg(test)]
2335            ActorMessage::EntryState { hash, tx } => {
2336                tx.send(self.entry_state(tables, hash)).ok();
2337            }
2338            ActorMessage::GetFullEntryState { hash, tx } => {
2339                let res = self.get_full_entry_state(tables, hash);
2340                tx.send(res).ok();
2341            }
2342            x => return Ok(Err(x)),
2343        }
2344        Ok(Ok(()))
2345    }
2346
2347    fn handle_readwrite(
2348        &mut self,
2349        tables: &mut Tables,
2350        msg: ActorMessage,
2351    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2352        match msg {
2353            ActorMessage::Import { cmd, tx } => {
2354                let res = self.import(tables, cmd);
2355                tx.send(res).ok();
2356            }
2357            ActorMessage::SetTag { tag, value, tx } => {
2358                let res = self.set_tag(tables, tag, value);
2359                tx.send(res).ok();
2360            }
2361            ActorMessage::CreateTag { hash, tx } => {
2362                let res = self.create_tag(tables, hash);
2363                tx.send(res).ok();
2364            }
2365            ActorMessage::Delete { hashes, tx } => {
2366                let res = self.delete(tables, hashes, true);
2367                tx.send(res).ok();
2368            }
2369            ActorMessage::GcDelete { hashes, tx } => {
2370                let res = self.delete(tables, hashes, false);
2371                tx.send(res).ok();
2372            }
2373            ActorMessage::OnComplete { handle } => {
2374                let res = self.on_complete(tables, handle);
2375                res.ok();
2376            }
2377            ActorMessage::Export { cmd, tx } => {
2378                self.export(tables, cmd, tx)?;
2379            }
2380            ActorMessage::OnMemSizeExceeded { hash } => {
2381                let res = self.on_mem_size_exceeded(tables, hash);
2382                res.ok();
2383            }
2384            ActorMessage::Dump => {
2385                let res = dump(tables);
2386                res.ok();
2387            }
2388            ActorMessage::SetFullEntryState { hash, entry, tx } => {
2389                let res = self.set_full_entry_state(tables, hash, entry);
2390                tx.send(res).ok();
2391            }
2392            msg => {
2393                // try to handle it as readonly
2394                if let Err(msg) = self.handle_readonly(tables, msg)? {
2395                    return Ok(Err(msg));
2396                }
2397            }
2398        }
2399        Ok(Ok(()))
2400    }
2401}
2402
2403/// Export a file by copying out its content to a new location
2404fn export_file_copy(
2405    temp_tag: TempTag,
2406    path: PathBuf,
2407    size: u64,
2408    target: PathBuf,
2409    progress: ExportProgressCb,
2410) -> ActorResult<()> {
2411    progress(0)?;
2412    // todo: fine grained copy progress
2413    reflink_copy::reflink_or_copy(path, target)?;
2414    progress(size)?;
2415    drop(temp_tag);
2416    Ok(())
2417}
2418
2419fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2420    for e in tables.blobs().iter()? {
2421        let (k, v) = e?;
2422        let k = k.value();
2423        let v = v.value();
2424        println!("blobs: {} -> {:?}", k.to_hex(), v);
2425    }
2426    for e in tables.tags().iter()? {
2427        let (k, v) = e?;
2428        let k = k.value();
2429        let v = v.value();
2430        println!("tags: {} -> {:?}", k, v);
2431    }
2432    for e in tables.inline_data().iter()? {
2433        let (k, v) = e?;
2434        let k = k.value();
2435        let v = v.value();
2436        println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2437    }
2438    for e in tables.inline_outboard().iter()? {
2439        let (k, v) = e?;
2440        let k = k.value();
2441        let v = v.value();
2442        println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2443    }
2444    Ok(())
2445}
2446
2447fn load_data(
2448    tables: &impl ReadableTables,
2449    options: &PathOptions,
2450    location: DataLocation<(), u64>,
2451    hash: &Hash,
2452) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2453    Ok(match location {
2454        DataLocation::Inline(()) => {
2455            let Some(data) = tables.inline_data().get(hash)? else {
2456                return Err(ActorError::Inconsistent(format!(
2457                    "inconsistent database state: {} should have inline data but does not",
2458                    hash.to_hex()
2459                )));
2460            };
2461            MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2462        }
2463        DataLocation::Owned(data_size) => {
2464            let path = options.owned_data_path(hash);
2465            let Ok(file) = std::fs::File::open(&path) else {
2466                return Err(io::Error::new(
2467                    io::ErrorKind::NotFound,
2468                    format!("file not found: {}", path.display()),
2469                )
2470                .into());
2471            };
2472            MemOrFile::File((file, data_size))
2473        }
2474        DataLocation::External(paths, data_size) => {
2475            if paths.is_empty() {
2476                return Err(ActorError::Inconsistent(
2477                    "external data location must not be empty".into(),
2478                ));
2479            }
2480            let path = &paths[0];
2481            let Ok(file) = std::fs::File::open(path) else {
2482                return Err(io::Error::new(
2483                    io::ErrorKind::NotFound,
2484                    format!("external file not found: {}", path.display()),
2485                )
2486                .into());
2487            };
2488            MemOrFile::File((file, data_size))
2489        }
2490    })
2491}
2492
2493fn load_outboard(
2494    tables: &impl ReadableTables,
2495    options: &PathOptions,
2496    location: OutboardLocation,
2497    size: u64,
2498    hash: &Hash,
2499) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2500    Ok(match location {
2501        OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2502        OutboardLocation::Inline(_) => {
2503            let Some(outboard) = tables.inline_outboard().get(hash)? else {
2504                return Err(ActorError::Inconsistent(format!(
2505                    "inconsistent database state: {} should have inline outboard but does not",
2506                    hash.to_hex()
2507                )));
2508            };
2509            MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2510        }
2511        OutboardLocation::Owned => {
2512            let outboard_size = raw_outboard_size(size);
2513            let path = options.owned_outboard_path(hash);
2514            let Ok(file) = std::fs::File::open(&path) else {
2515                return Err(io::Error::new(
2516                    io::ErrorKind::NotFound,
2517                    format!("file not found: {} size={}", path.display(), outboard_size),
2518                )
2519                .into());
2520            };
2521            MemOrFile::File((file, outboard_size))
2522        }
2523    })
2524}
2525
2526/// Take a possibly incomplete storage and turn it into complete
2527fn complete_storage(
2528    storage: BaoFileStorage,
2529    hash: &Hash,
2530    path_options: &PathOptions,
2531    inline_options: &InlineOptions,
2532    delete_after_commit: &mut DeleteSet,
2533) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2534    let (data, outboard, _sizes) = match storage {
2535        BaoFileStorage::Complete(c) => return Ok(Err(c)),
2536        BaoFileStorage::IncompleteMem(storage) => {
2537            let (data, outboard, sizes) = storage.into_parts();
2538            (
2539                MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2540                MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2541                MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2542            )
2543        }
2544        BaoFileStorage::IncompleteFile(storage) => {
2545            let (data, outboard, sizes) = storage.into_parts();
2546            (
2547                MemOrFile::File(data),
2548                MemOrFile::File(outboard),
2549                MemOrFile::File(sizes),
2550            )
2551        }
2552    };
2553    let data_size = data.size()?.unwrap();
2554    let outboard_size = outboard.size()?.unwrap();
2555    // todo: perform more sanity checks if in debug mode
2556    debug_assert!(raw_outboard_size(data_size) == outboard_size);
2557    // inline data if needed, or write to file if needed
2558    let data = if data_size <= inline_options.max_data_inlined {
2559        match data {
2560            MemOrFile::File(data) => {
2561                let mut buf = vec![0; data_size as usize];
2562                data.read_at(0, &mut buf)?;
2563                // mark data for deletion after commit
2564                delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2565                MemOrFile::Mem(Bytes::from(buf))
2566            }
2567            MemOrFile::Mem(data) => MemOrFile::Mem(data),
2568        }
2569    } else {
2570        // protect the data from previous deletions
2571        delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2572        match data {
2573            MemOrFile::Mem(data) => {
2574                let path = path_options.owned_data_path(hash);
2575                let file = overwrite_and_sync(&path, &data)?;
2576                MemOrFile::File((file, data_size))
2577            }
2578            MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2579        }
2580    };
2581    // inline outboard if needed, or write to file if needed
2582    let outboard = if outboard_size == 0 {
2583        Default::default()
2584    } else if outboard_size <= inline_options.max_outboard_inlined {
2585        match outboard {
2586            MemOrFile::File(outboard) => {
2587                let mut buf = vec![0; outboard_size as usize];
2588                outboard.read_at(0, &mut buf)?;
2589                drop(outboard);
2590                // mark outboard for deletion after commit
2591                delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2592                MemOrFile::Mem(Bytes::from(buf))
2593            }
2594            MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2595        }
2596    } else {
2597        // protect the outboard from previous deletions
2598        delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2599        match outboard {
2600            MemOrFile::Mem(outboard) => {
2601                let path = path_options.owned_outboard_path(hash);
2602                let file = overwrite_and_sync(&path, &outboard)?;
2603                MemOrFile::File((file, outboard_size))
2604            }
2605            MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2606        }
2607    };
2608    // mark sizes for deletion after commit in any case - a complete entry
2609    // does not need sizes.
2610    delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2611    Ok(Ok(CompleteStorage { data, outboard }))
2612}