use std::{
    collections::{BTreeMap, BTreeSet},
    io,
    path::{Path, PathBuf},
    sync::{Arc, RwLock},
    time::{Duration, SystemTime},
};
use bao_tree::io::{
    fsm::Outboard,
    sync::{ReadAt, Size},
};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tokio::io::AsyncWriteExt;
use tracing::trace_span;
mod import_flat_store;
mod migrate_redb_v1_v2;
mod tables;
#[doc(hidden)]
pub mod test_support;
#[cfg(test)]
mod tests;
mod util;
mod validate;
use crate::{
    store::{
        bao_file::{BaoFileStorage, CompleteStorage},
        fs::{
            tables::BaoFilePart,
            util::{overwrite_and_sync, read_and_remove},
        },
    },
    util::{
        compute_outboard,
        progress::{
            BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
            ProgressSender,
        },
        raw_outboard_size, MemOrFile, TagCounter, TagDrop,
    },
    Tag, TempTag,
};
use tables::{ReadOnlyTables, ReadableTables, Tables};
use self::{tables::DeleteSet, util::PeekableFlumeReceiver};
use self::test_support::EntryData;
use super::{
    bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
    temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
    ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum DataLocation<I = (), E = ()> {
    Inline(I),
    Owned(E),
    External(Vec<PathBuf>, E),
}
impl<X> DataLocation<X, u64> {
    fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
        Ok(match (self, that) {
            (
                DataLocation::External(mut paths, a_size),
                DataLocation::External(b_paths, b_size),
            ) => {
                if a_size != b_size {
                    return Err(ActorError::Inconsistent(format!(
                        "complete size mismatch {} {}",
                        a_size, b_size
                    )));
                }
                paths.extend(b_paths);
                paths.sort();
                paths.dedup();
                DataLocation::External(paths, a_size)
            }
            (_, b @ DataLocation::Owned(_)) => {
                b
            }
            (a @ DataLocation::Owned(_), _) => {
                a
            }
            (_, b @ DataLocation::Inline(_)) => {
                b
            }
            (a @ DataLocation::Inline(_), _) => {
                a
            }
        })
    }
}
impl<I, E> DataLocation<I, E> {
    fn discard_inline_data(self) -> DataLocation<(), E> {
        match self {
            DataLocation::Inline(_) => DataLocation::Inline(()),
            DataLocation::Owned(x) => DataLocation::Owned(x),
            DataLocation::External(paths, x) => DataLocation::External(paths, x),
        }
    }
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum OutboardLocation<I = ()> {
    Inline(I),
    Owned,
    NotNeeded,
}
impl<I> OutboardLocation<I> {
    fn discard_extra_data(self) -> OutboardLocation<()> {
        match self {
            Self::Inline(_) => OutboardLocation::Inline(()),
            Self::Owned => OutboardLocation::Owned,
            Self::NotNeeded => OutboardLocation::NotNeeded,
        }
    }
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum EntryState<I = ()> {
    Complete {
        data_location: DataLocation<I, u64>,
        outboard_location: OutboardLocation<I>,
    },
    Partial {
        size: Option<u64>,
    },
}
impl Default for EntryState {
    fn default() -> Self {
        Self::Partial { size: None }
    }
}
impl EntryState {
    fn union(self, that: Self) -> ActorResult<Self> {
        match (self, that) {
            (
                Self::Complete {
                    data_location,
                    outboard_location,
                },
                Self::Complete {
                    data_location: b_data_location,
                    ..
                },
            ) => Ok(Self::Complete {
                data_location: data_location.union(b_data_location)?,
                outboard_location,
            }),
            (a @ Self::Complete { .. }, Self::Partial { .. }) =>
            {
                Ok(a)
            }
            (Self::Partial { .. }, b @ Self::Complete { .. }) =>
            {
                Ok(b)
            }
            (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
            {
                let size = match (a_size, b_size) {
                    (Some(a_size), Some(b_size)) => {
                        if a_size != b_size {
                            return Err(ActorError::Inconsistent(format!(
                                "validated size mismatch {} {}",
                                a_size, b_size
                            )));
                        }
                        Some(a_size)
                    }
                    (Some(a_size), None) => Some(a_size),
                    (None, Some(b_size)) => Some(b_size),
                    (None, None) => None,
                };
                Ok(Self::Partial { size })
            }
        }
    }
}
impl redb::Value for EntryState {
    type SelfType<'a> = EntryState;
    type AsBytes<'a> = SmallVec<[u8; 128]>;
    fn fixed_width() -> Option<usize> {
        None
    }
    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
    where
        Self: 'a,
    {
        postcard::from_bytes(data).unwrap()
    }
    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
    where
        Self: 'a,
        Self: 'b,
    {
        postcard::to_extend(value, SmallVec::new()).unwrap()
    }
    fn type_name() -> redb::TypeName {
        redb::TypeName::new("EntryState")
    }
}
#[derive(Debug, Clone)]
pub struct InlineOptions {
    pub max_data_inlined: u64,
    pub max_outboard_inlined: u64,
}
impl InlineOptions {
    pub const NO_INLINE: Self = Self {
        max_data_inlined: 0,
        max_outboard_inlined: 0,
    };
    pub const ALWAYS_INLINE: Self = Self {
        max_data_inlined: u64::MAX,
        max_outboard_inlined: u64::MAX,
    };
}
impl Default for InlineOptions {
    fn default() -> Self {
        Self {
            max_data_inlined: 1024 * 16,
            max_outboard_inlined: 1024 * 16,
        }
    }
}
#[derive(Debug, Clone)]
pub struct PathOptions {
    pub data_path: PathBuf,
    pub temp_path: PathBuf,
}
impl PathOptions {
    fn new(root: &Path) -> Self {
        Self {
            data_path: root.join("data"),
            temp_path: root.join("temp"),
        }
    }
    fn owned_data_path(&self, hash: &Hash) -> PathBuf {
        self.data_path.join(format!("{}.data", hash.to_hex()))
    }
    fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
        self.data_path.join(format!("{}.obao4", hash.to_hex()))
    }
    fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
        self.data_path.join(format!("{}.sizes4", hash.to_hex()))
    }
    fn temp_file_name(&self) -> PathBuf {
        self.temp_path.join(temp_name())
    }
}
#[derive(Debug, Clone)]
pub struct BatchOptions {
    pub max_read_batch: usize,
    pub max_read_duration: Duration,
    pub max_write_batch: usize,
    pub max_write_duration: Duration,
}
impl Default for BatchOptions {
    fn default() -> Self {
        Self {
            max_read_batch: 10000,
            max_read_duration: Duration::from_secs(1),
            max_write_batch: 1000,
            max_write_duration: Duration::from_millis(500),
        }
    }
}
#[derive(Debug, Clone)]
pub struct Options {
    pub path: PathOptions,
    pub inline: InlineOptions,
    pub batch: BatchOptions,
}
#[derive(derive_more::Debug)]
pub(crate) enum ImportSource {
    TempFile(PathBuf),
    External(PathBuf),
    Memory(#[debug(skip)] Bytes),
}
impl ImportSource {
    fn content(&self) -> MemOrFile<&[u8], &Path> {
        match self {
            Self::TempFile(path) => MemOrFile::File(path.as_path()),
            Self::External(path) => MemOrFile::File(path.as_path()),
            Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
        }
    }
    fn len(&self) -> io::Result<u64> {
        match self {
            Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
            Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
            Self::Memory(data) => Ok(data.len() as u64),
        }
    }
}
pub type Entry = BaoFileHandle;
impl super::MapEntry for Entry {
    fn hash(&self) -> Hash {
        self.hash()
    }
    fn size(&self) -> BaoBlobSize {
        let size = self.current_size().unwrap();
        tracing::trace!("redb::Entry::size() = {}", size);
        BaoBlobSize::new(size, self.is_complete())
    }
    fn is_complete(&self) -> bool {
        self.is_complete()
    }
    async fn outboard(&self) -> io::Result<impl Outboard> {
        self.outboard()
    }
    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
        Ok(self.data_reader())
    }
}
impl super::MapEntryMut for Entry {
    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
        Ok(self.writer())
    }
}
#[derive(derive_more::Debug)]
pub(crate) struct Import {
    content_id: HashAndFormat,
    source: ImportSource,
    data_size: u64,
    #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
    outboard: Option<Vec<u8>>,
}
#[derive(derive_more::Debug)]
pub(crate) struct Export {
    temp_tag: TempTag,
    target: PathBuf,
    mode: ExportMode,
    #[debug(skip)]
    progress: ExportProgressCb,
}
#[derive(derive_more::Debug)]
pub(crate) enum ActorMessage {
    Get {
        hash: Hash,
        tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
    },
    EntryStatus {
        hash: Hash,
        tx: oneshot::Sender<ActorResult<EntryStatus>>,
    },
    #[cfg(test)]
    EntryState {
        hash: Hash,
        tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
    },
    GetFullEntryState {
        hash: Hash,
        tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
    },
    SetFullEntryState {
        hash: Hash,
        entry: Option<EntryData>,
        tx: oneshot::Sender<ActorResult<()>>,
    },
    GetOrCreate {
        hash: Hash,
        tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
    },
    OnMemSizeExceeded { hash: Hash },
    OnComplete { handle: BaoFileHandle },
    Import {
        cmd: Import,
        tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
    },
    Export {
        cmd: Export,
        tx: oneshot::Sender<ActorResult<()>>,
    },
    ImportFlatStore {
        paths: FlatStorePaths,
        tx: oneshot::Sender<bool>,
    },
    UpdateInlineOptions {
        inline_options: InlineOptions,
        reapply: bool,
        tx: oneshot::Sender<()>,
    },
    Blobs {
        #[debug(skip)]
        filter: FilterPredicate<Hash, EntryState>,
        #[allow(clippy::type_complexity)]
        tx: oneshot::Sender<
            ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
        >,
    },
    Tags {
        #[debug(skip)]
        filter: FilterPredicate<Tag, HashAndFormat>,
        #[allow(clippy::type_complexity)]
        tx: oneshot::Sender<
            ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
        >,
    },
    SetTag {
        tag: Tag,
        value: Option<HashAndFormat>,
        tx: oneshot::Sender<ActorResult<()>>,
    },
    CreateTag {
        hash: HashAndFormat,
        tx: oneshot::Sender<ActorResult<Tag>>,
    },
    Delete {
        hashes: Vec<Hash>,
        tx: oneshot::Sender<ActorResult<()>>,
    },
    Sync { tx: oneshot::Sender<()> },
    Dump,
    Fsck {
        repair: bool,
        progress: BoxedProgressSender<ConsistencyCheckProgress>,
        tx: oneshot::Sender<ActorResult<()>>,
    },
    GcStart { tx: oneshot::Sender<()> },
    Shutdown { tx: Option<oneshot::Sender<()>> },
}
impl ActorMessage {
    fn category(&self) -> MessageCategory {
        match self {
            Self::Get { .. }
            | Self::GetOrCreate { .. }
            | Self::EntryStatus { .. }
            | Self::Blobs { .. }
            | Self::Tags { .. }
            | Self::GcStart { .. }
            | Self::GetFullEntryState { .. }
            | Self::Dump => MessageCategory::ReadOnly,
            Self::Import { .. }
            | Self::Export { .. }
            | Self::OnMemSizeExceeded { .. }
            | Self::OnComplete { .. }
            | Self::SetTag { .. }
            | Self::CreateTag { .. }
            | Self::SetFullEntryState { .. }
            | Self::Delete { .. } => MessageCategory::ReadWrite,
            Self::UpdateInlineOptions { .. }
            | Self::Sync { .. }
            | Self::Shutdown { .. }
            | Self::Fsck { .. }
            | Self::ImportFlatStore { .. } => MessageCategory::TopLevel,
            #[cfg(test)]
            Self::EntryState { .. } => MessageCategory::ReadOnly,
        }
    }
}
enum MessageCategory {
    ReadOnly,
    ReadWrite,
    TopLevel,
}
pub(crate) type FilterPredicate<K, V> =
    Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
#[derive(Debug)]
pub struct FlatStorePaths {
    pub complete: PathBuf,
    pub partial: PathBuf,
    pub meta: PathBuf,
}
#[derive(Debug, Clone)]
pub struct Store(Arc<StoreInner>);
impl Store {
    pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
        let path = root.as_ref();
        let db_path = path.join("blobs.db");
        let options = Options {
            path: PathOptions::new(path),
            inline: Default::default(),
            batch: Default::default(),
        };
        Self::new(db_path, options).await
    }
    pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
        let rt = tokio::runtime::Handle::try_current()
            .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
        let inner =
            tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
        Ok(Self(Arc::new(inner)))
    }
    pub async fn update_inline_options(
        &self,
        inline_options: InlineOptions,
        reapply: bool,
    ) -> io::Result<()> {
        Ok(self
            .0
            .update_inline_options(inline_options, reapply)
            .await?)
    }
    pub async fn dump(&self) -> io::Result<()> {
        Ok(self.0.dump().await?)
    }
    #[deprecated(
        since = "0.23.0",
        note = "Flat stores are deprecated and future versions will not be able to migrate."
    )]
    pub async fn import_flat_store(&self, paths: FlatStorePaths) -> io::Result<bool> {
        Ok(self.0.import_flat_store(paths).await?)
    }
}
#[derive(Debug)]
struct StoreInner {
    tx: async_channel::Sender<ActorMessage>,
    temp: Arc<RwLock<TempCounterMap>>,
    handle: Option<std::thread::JoinHandle<()>>,
    path_options: Arc<PathOptions>,
}
impl TagDrop for RwLock<TempCounterMap> {
    fn on_drop(&self, content: &HashAndFormat) {
        self.write().unwrap().dec(content);
    }
}
impl TagCounter for RwLock<TempCounterMap> {
    fn on_create(&self, content: &HashAndFormat) {
        self.write().unwrap().inc(content);
    }
}
impl StoreInner {
    fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
        tracing::trace!(
            "creating data directory: {}",
            options.path.data_path.display()
        );
        std::fs::create_dir_all(&options.path.data_path)?;
        tracing::trace!(
            "creating temp directory: {}",
            options.path.temp_path.display()
        );
        std::fs::create_dir_all(&options.path.temp_path)?;
        tracing::trace!(
            "creating parent directory for db file{}",
            path.parent().unwrap().display()
        );
        std::fs::create_dir_all(path.parent().unwrap())?;
        let temp: Arc<RwLock<TempCounterMap>> = Default::default();
        let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
        let handle = std::thread::Builder::new()
            .name("redb-actor".to_string())
            .spawn(move || {
                rt.block_on(async move {
                    if let Err(cause) = actor.run_batched().await {
                        tracing::error!("redb actor failed: {}", cause);
                    }
                });
            })
            .expect("failed to spawn thread");
        Ok(Self {
            tx,
            temp,
            handle: Some(handle),
            path_options: Arc::new(options.path),
        })
    }
    pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::Get { hash, tx }).await?;
        Ok(rx.await??)
    }
    async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
        Ok(rx.await??)
    }
    async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
        let (tx, rx) = oneshot::channel();
        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
            let v = v.value();
            if let EntryState::Complete { .. } = &v {
                Some((k.value(), v))
            } else {
                None
            }
        });
        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
        let blobs = rx.await?;
        let res = blobs?
            .into_iter()
            .map(|r| {
                r.map(|(hash, _)| hash)
                    .map_err(|e| ActorError::from(e).into())
            })
            .collect::<Vec<_>>();
        Ok(res)
    }
    async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
        let (tx, rx) = oneshot::channel();
        let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
            let v = v.value();
            if let EntryState::Partial { .. } = &v {
                Some((k.value(), v))
            } else {
                None
            }
        });
        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
        let blobs = rx.await?;
        let res = blobs?
            .into_iter()
            .map(|r| {
                r.map(|(hash, _)| hash)
                    .map_err(|e| ActorError::from(e).into())
            })
            .collect::<Vec<_>>();
        Ok(res)
    }
    async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
        let (tx, rx) = oneshot::channel();
        let filter: FilterPredicate<Tag, HashAndFormat> =
            Box::new(|_i, k, v| Some((k.value(), v.value())));
        self.tx.send(ActorMessage::Tags { filter, tx }).await?;
        let tags = rx.await?;
        let tags = tags?
            .into_iter()
            .map(|r| r.map_err(|e| ActorError::from(e).into()))
            .collect();
        Ok(tags)
    }
    async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::SetTag { tag, value, tx })
            .await?;
        Ok(rx.await??)
    }
    async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
        Ok(rx.await??)
    }
    async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
        Ok(rx.await??)
    }
    async fn gc_start(&self) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::GcStart { tx }).await?;
        Ok(rx.await?)
    }
    async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::EntryStatus { hash: *hash, tx })
            .await?;
        Ok(rx.await??)
    }
    fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
        Ok(rx.recv()??)
    }
    async fn complete(&self, entry: Entry) -> OuterResult<()> {
        self.tx
            .send(ActorMessage::OnComplete { handle: entry })
            .await?;
        Ok(())
    }
    async fn export(
        &self,
        hash: Hash,
        target: PathBuf,
        mode: ExportMode,
        progress: ExportProgressCb,
    ) -> OuterResult<()> {
        tracing::debug!(
            "exporting {} to {} using mode {:?}",
            hash.to_hex(),
            target.display(),
            mode
        );
        if !target.is_absolute() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "target path must be absolute",
            )
            .into());
        }
        let parent = target.parent().ok_or_else(|| {
            OuterError::from(io::Error::new(
                io::ErrorKind::InvalidInput,
                "target path has no parent directory",
            ))
        })?;
        std::fs::create_dir_all(parent)?;
        let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::Export {
                cmd: Export {
                    temp_tag,
                    target,
                    mode,
                    progress,
                },
                tx,
            })
            .await?;
        Ok(rx.await??)
    }
    async fn consistency_check(
        &self,
        repair: bool,
        progress: BoxedProgressSender<ConsistencyCheckProgress>,
    ) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::Fsck {
                repair,
                progress,
                tx,
            })
            .await?;
        Ok(rx.await??)
    }
    async fn import_flat_store(&self, paths: FlatStorePaths) -> OuterResult<bool> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::ImportFlatStore { paths, tx })
            .await?;
        Ok(rx.await?)
    }
    async fn update_inline_options(
        &self,
        inline_options: InlineOptions,
        reapply: bool,
    ) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::UpdateInlineOptions {
                inline_options,
                reapply,
                tx,
            })
            .await?;
        Ok(rx.await?)
    }
    async fn dump(&self) -> OuterResult<()> {
        self.tx.send(ActorMessage::Dump).await?;
        Ok(())
    }
    async fn sync(&self) -> OuterResult<()> {
        let (tx, rx) = oneshot::channel();
        self.tx.send(ActorMessage::Sync { tx }).await?;
        Ok(rx.await?)
    }
    fn import_file_sync(
        &self,
        path: PathBuf,
        mode: ImportMode,
        format: BlobFormat,
        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
    ) -> OuterResult<(TempTag, u64)> {
        if !path.is_absolute() {
            return Err(
                io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
            );
        }
        if !path.is_file() && !path.is_symlink() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "path is not a file or symlink",
            )
            .into());
        }
        let id = progress.new_id();
        progress.blocking_send(ImportProgress::Found {
            id,
            name: path.to_string_lossy().to_string(),
        })?;
        let file = match mode {
            ImportMode::TryReference => ImportSource::External(path),
            ImportMode::Copy => {
                if std::fs::metadata(&path)?.len() < 16 * 1024 {
                    let data = std::fs::read(&path)?;
                    ImportSource::Memory(data.into())
                } else {
                    let temp_path = self.temp_file_name();
                    progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
                    if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
                        tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
                    } else {
                        tracing::debug!("copied {} to {}", path.display(), temp_path.display());
                    }
                    ImportSource::TempFile(temp_path)
                }
            }
        };
        let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
        Ok((tag, size))
    }
    fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
        let id = 0;
        let file = ImportSource::Memory(data);
        let progress = IgnoreProgressSender::default();
        let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
        Ok(tag)
    }
    fn finalize_import_sync(
        &self,
        file: ImportSource,
        format: BlobFormat,
        id: u64,
        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
    ) -> OuterResult<(TempTag, u64)> {
        let data_size = file.len()?;
        tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
        progress.blocking_send(ImportProgress::Size {
            id,
            size: data_size,
        })?;
        let progress2 = progress.clone();
        let (hash, outboard) = match file.content() {
            MemOrFile::File(path) => {
                let span = trace_span!("outboard.compute", path = %path.display());
                let _guard = span.enter();
                let file = std::fs::File::open(path)?;
                compute_outboard(file, data_size, move |offset| {
                    Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
                })?
            }
            MemOrFile::Mem(bytes) => {
                compute_outboard(bytes, data_size, |_| Ok(()))?
            }
        };
        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
        let tag = self.temp.temp_tag(HashAndFormat { hash, format });
        let hash = *tag.hash();
        let (tx, rx) = oneshot::channel();
        self.tx.send_blocking(ActorMessage::Import {
            cmd: Import {
                content_id: HashAndFormat { hash, format },
                source: file,
                outboard,
                data_size,
            },
            tx,
        })?;
        Ok(rx.recv()??)
    }
    fn temp_file_name(&self) -> PathBuf {
        self.path_options.temp_file_name()
    }
    async fn shutdown(&self) {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(ActorMessage::Shutdown { tx: Some(tx) })
            .await
            .ok();
        rx.await.ok();
    }
}
impl Drop for StoreInner {
    fn drop(&mut self) {
        if let Some(handle) = self.handle.take() {
            self.tx
                .send_blocking(ActorMessage::Shutdown { tx: None })
                .ok();
            handle.join().ok();
        }
    }
}
struct ActorState {
    handles: BTreeMap<Hash, BaoFileHandleWeak>,
    protected: BTreeSet<Hash>,
    temp: Arc<RwLock<TempCounterMap>>,
    msgs_rx: async_channel::Receiver<ActorMessage>,
    create_options: Arc<BaoFileConfig>,
    options: Options,
    rt: tokio::runtime::Handle,
}
struct Actor {
    db: redb::Database,
    state: ActorState,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ActorError {
    #[error("table error: {0}")]
    Table(#[from] redb::TableError),
    #[error("database error: {0}")]
    Database(#[from] redb::DatabaseError),
    #[error("transaction error: {0}")]
    Transaction(#[from] redb::TransactionError),
    #[error("commit error: {0}")]
    Commit(#[from] redb::CommitError),
    #[error("storage error: {0}")]
    Storage(#[from] redb::StorageError),
    #[error("io error: {0}")]
    Io(#[from] io::Error),
    #[error("inconsistent database state: {0}")]
    Inconsistent(String),
    #[error("error during database migration: {0}")]
    Migration(#[source] anyhow::Error),
}
impl From<ActorError> for io::Error {
    fn from(e: ActorError) -> Self {
        match e {
            ActorError::Io(e) => e,
            e => io::Error::new(io::ErrorKind::Other, e),
        }
    }
}
pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
#[derive(Debug, thiserror::Error)]
pub(crate) enum OuterError {
    #[error("inner error: {0}")]
    Inner(#[from] ActorError),
    #[error("send error")]
    Send,
    #[error("progress send error: {0}")]
    ProgressSend(#[from] ProgressSendError),
    #[error("recv error: {0}")]
    Recv(#[from] oneshot::RecvError),
    #[error("recv error: {0}")]
    AsyncChannelRecv(#[from] async_channel::RecvError),
    #[error("join error: {0}")]
    JoinTask(#[from] tokio::task::JoinError),
}
impl From<async_channel::SendError<ActorMessage>> for OuterError {
    fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
        OuterError::Send
    }
}
pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
impl From<io::Error> for OuterError {
    fn from(e: io::Error) -> Self {
        OuterError::Inner(ActorError::Io(e))
    }
}
impl From<OuterError> for io::Error {
    fn from(e: OuterError) -> Self {
        match e {
            OuterError::Inner(ActorError::Io(e)) => e,
            e => io::Error::new(io::ErrorKind::Other, e),
        }
    }
}
impl super::Map for Store {
    type Entry = Entry;
    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
        Ok(self.0.get(*hash).await?.map(From::from))
    }
}
impl super::MapMut for Store {
    type EntryMut = Entry;
    async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
        Ok(self.0.get_or_create(hash).await?)
    }
    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
        Ok(self.0.entry_status(hash).await?)
    }
    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
        self.get(hash).await
    }
    async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
        Ok(self.0.complete(entry).await?)
    }
    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
        Ok(self.0.entry_status_sync(hash)?)
    }
}
impl super::ReadableStore for Store {
    async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
        Ok(Box::new(self.0.blobs().await?.into_iter()))
    }
    async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
        Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
    }
    async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
        Ok(Box::new(self.0.tags().await?.into_iter()))
    }
    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
        Box::new(self.0.temp.read().unwrap().keys())
    }
    async fn consistency_check(
        &self,
        repair: bool,
        tx: BoxedProgressSender<ConsistencyCheckProgress>,
    ) -> io::Result<()> {
        self.0.consistency_check(repair, tx.clone()).await?;
        Ok(())
    }
    async fn export(
        &self,
        hash: Hash,
        target: PathBuf,
        mode: ExportMode,
        progress: ExportProgressCb,
    ) -> io::Result<()> {
        Ok(self.0.export(hash, target, mode, progress).await?)
    }
}
impl super::Store for Store {
    async fn import_file(
        &self,
        path: PathBuf,
        mode: ImportMode,
        format: BlobFormat,
        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
    ) -> io::Result<(crate::TempTag, u64)> {
        let this = self.0.clone();
        Ok(
            tokio::task::spawn_blocking(move || {
                this.import_file_sync(path, mode, format, progress)
            })
            .await??,
        )
    }
    async fn import_bytes(
        &self,
        data: bytes::Bytes,
        format: iroh_base::hash::BlobFormat,
    ) -> io::Result<crate::TempTag> {
        let this = self.0.clone();
        Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
    }
    async fn import_stream(
        &self,
        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
        format: BlobFormat,
        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
    ) -> io::Result<(TempTag, u64)> {
        let this = self.clone();
        let id = progress.new_id();
        let temp_data_path = this.0.temp_file_name();
        let name = temp_data_path
            .file_name()
            .expect("just created")
            .to_string_lossy()
            .to_string();
        progress.send(ImportProgress::Found { id, name }).await?;
        let mut writer = tokio::fs::File::create(&temp_data_path).await?;
        let mut offset = 0;
        while let Some(chunk) = data.next().await {
            let chunk = chunk?;
            writer.write_all(&chunk).await?;
            offset += chunk.len() as u64;
            progress.try_send(ImportProgress::CopyProgress { id, offset })?;
        }
        writer.flush().await?;
        drop(writer);
        let file = ImportSource::TempFile(temp_data_path);
        Ok(tokio::task::spawn_blocking(move || {
            this.0.finalize_import_sync(file, format, id, progress)
        })
        .await??)
    }
    async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
        Ok(self.0.set_tag(name, hash).await?)
    }
    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
        Ok(self.0.create_tag(hash).await?)
    }
    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
        Ok(self.0.delete(hashes).await?)
    }
    async fn gc_start(&self) -> io::Result<()> {
        self.0.gc_start().await?;
        Ok(())
    }
    fn temp_tag(&self, value: HashAndFormat) -> TempTag {
        self.0.temp.temp_tag(value)
    }
    async fn sync(&self) -> io::Result<()> {
        Ok(self.0.sync().await?)
    }
    async fn shutdown(&self) {
        self.0.shutdown().await;
    }
}
impl Actor {
    fn new(
        path: &Path,
        options: Options,
        temp: Arc<RwLock<TempCounterMap>>,
        rt: tokio::runtime::Handle,
    ) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
        let db = match redb::Database::create(path) {
            Ok(db) => db,
            Err(DatabaseError::UpgradeRequired(1)) => {
                migrate_redb_v1_v2::run(path).map_err(ActorError::Migration)?
            }
            Err(err) => return Err(err.into()),
        };
        let txn = db.begin_write()?;
        let mut t = Default::default();
        let tables = Tables::new(&txn, &mut t)?;
        drop(tables);
        txn.commit()?;
        let (tx, rx) = async_channel::bounded(1024);
        let tx2 = tx.clone();
        let on_file_create: CreateCb = Arc::new(move |hash| {
            tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
                .ok();
            Ok(())
        });
        let create_options = BaoFileConfig::new(
            Arc::new(options.path.data_path.clone()),
            16 * 1024,
            Some(on_file_create),
        );
        Ok((
            Self {
                db,
                state: ActorState {
                    temp,
                    handles: BTreeMap::new(),
                    protected: BTreeSet::new(),
                    msgs_rx: rx,
                    options,
                    create_options: Arc::new(create_options),
                    rt,
                },
            },
            tx,
        ))
    }
    async fn run_batched(mut self) -> ActorResult<()> {
        let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
        while let Some(msg) = msgs.recv().await {
            if let ActorMessage::Shutdown { tx } = msg {
                drop(self);
                if let Some(tx) = tx {
                    tx.send(()).ok();
                }
                break;
            }
            match msg.category() {
                MessageCategory::TopLevel => {
                    self.state.handle_toplevel(&self.db, msg)?;
                }
                MessageCategory::ReadOnly => {
                    msgs.push_back(msg).expect("just recv'd");
                    tracing::debug!("starting read transaction");
                    let txn = self.db.begin_read()?;
                    let tables = ReadOnlyTables::new(&txn)?;
                    let count = self.state.options.batch.max_read_batch;
                    let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
                    tokio::pin!(timeout);
                    for _ in 0..count {
                        tokio::select! {
                            msg = msgs.recv() => {
                                if let Some(msg) = msg {
                                    if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
                                        msgs.push_back(msg).expect("just recv'd");
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            _ = &mut timeout => {
                                tracing::debug!("read transaction timed out");
                                break;
                            }
                        }
                    }
                    tracing::debug!("done with read transaction");
                }
                MessageCategory::ReadWrite => {
                    msgs.push_back(msg).expect("just recv'd");
                    tracing::debug!("starting write transaction");
                    let txn = self.db.begin_write()?;
                    let mut delete_after_commit = Default::default();
                    let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
                    let count = self.state.options.batch.max_write_batch;
                    let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
                    tokio::pin!(timeout);
                    for _ in 0..count {
                        tokio::select! {
                            msg = msgs.recv() => {
                                if let Some(msg) = msg {
                                    if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
                                        msgs.push_back(msg).expect("just recv'd");
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            _ = &mut timeout => {
                                tracing::debug!("write transaction timed out");
                                break;
                            }
                        }
                    }
                    drop(tables);
                    txn.commit()?;
                    delete_after_commit.apply_and_clear(&self.state.options.path);
                    tracing::debug!("write transaction committed");
                }
            }
        }
        tracing::debug!("redb actor done");
        Ok(())
    }
}
impl ActorState {
    fn entry_status(
        &mut self,
        tables: &impl ReadableTables,
        hash: Hash,
    ) -> ActorResult<EntryStatus> {
        let status = match tables.blobs().get(hash)? {
            Some(guard) => match guard.value() {
                EntryState::Complete { .. } => EntryStatus::Complete,
                EntryState::Partial { .. } => EntryStatus::Partial,
            },
            None => EntryStatus::NotFound,
        };
        Ok(status)
    }
    fn get(
        &mut self,
        tables: &impl ReadableTables,
        hash: Hash,
    ) -> ActorResult<Option<BaoFileHandle>> {
        if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
            return Ok(Some(handle));
        }
        let Some(entry) = tables.blobs().get(hash)? else {
            return Ok(None);
        };
        let entry = entry.value();
        let config = self.create_options.clone();
        let handle = match entry {
            EntryState::Complete {
                data_location,
                outboard_location,
            } => {
                let data = load_data(tables, &self.options.path, data_location, &hash)?;
                let outboard = load_outboard(
                    tables,
                    &self.options.path,
                    outboard_location,
                    data.size(),
                    &hash,
                )?;
                BaoFileHandle::new_complete(config, hash, data, outboard)
            }
            EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
        };
        self.handles.insert(hash, handle.downgrade());
        Ok(Some(handle))
    }
    fn export(
        &mut self,
        tables: &mut Tables,
        cmd: Export,
        tx: oneshot::Sender<ActorResult<()>>,
    ) -> ActorResult<()> {
        let Export {
            temp_tag,
            target,
            mode,
            progress,
        } = cmd;
        let guard = tables
            .blobs
            .get(temp_tag.hash())?
            .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
        let entry = guard.value();
        match entry {
            EntryState::Complete {
                data_location,
                outboard_location,
            } => match data_location {
                DataLocation::Inline(()) => {
                    let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
                        ActorError::Inconsistent("inline data not found".to_owned())
                    })?;
                    tracing::trace!("exporting inline data to {}", target.display());
                    tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
                        .ok();
                }
                DataLocation::Owned(size) => {
                    let path = self.options.path.owned_data_path(temp_tag.hash());
                    match mode {
                        ExportMode::Copy => {
                            self.rt.spawn_blocking(move || {
                                tx.send(export_file_copy(temp_tag, path, size, target, progress))
                                    .ok();
                            });
                        }
                        ExportMode::TryReference => match std::fs::rename(&path, &target) {
                            Ok(()) => {
                                let entry = EntryState::Complete {
                                    data_location: DataLocation::External(vec![target], size),
                                    outboard_location,
                                };
                                drop(guard);
                                tables.blobs.insert(temp_tag.hash(), entry)?;
                                drop(temp_tag);
                                tx.send(Ok(())).ok();
                            }
                            Err(e) => {
                                const ERR_CROSS: i32 = 18;
                                if e.raw_os_error() == Some(ERR_CROSS) {
                                    match std::fs::copy(&path, &target) {
                                        Ok(_) => {
                                            let entry = EntryState::Complete {
                                                data_location: DataLocation::External(
                                                    vec![target],
                                                    size,
                                                ),
                                                outboard_location,
                                            };
                                            drop(guard);
                                            tables.blobs.insert(temp_tag.hash(), entry)?;
                                            tables
                                                .delete_after_commit
                                                .insert(*temp_tag.hash(), [BaoFilePart::Data]);
                                            drop(temp_tag);
                                            tx.send(Ok(())).ok();
                                        }
                                        Err(e) => {
                                            drop(temp_tag);
                                            tx.send(Err(e.into())).ok();
                                        }
                                    }
                                } else {
                                    drop(temp_tag);
                                    tx.send(Err(e.into())).ok();
                                }
                            }
                        },
                    }
                }
                DataLocation::External(paths, size) => {
                    let path = paths
                        .first()
                        .ok_or_else(|| {
                            ActorError::Inconsistent("external path missing".to_owned())
                        })?
                        .to_owned();
                    if path == target {
                        tx.send(Ok(())).ok();
                    } else {
                        self.rt.spawn_blocking(move || {
                            tx.send(export_file_copy(temp_tag, path, size, target, progress))
                                .ok();
                        });
                    }
                }
            },
            EntryState::Partial { .. } => {
                return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
            }
        }
        Ok(())
    }
    fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
        let Import {
            content_id,
            source: file,
            outboard,
            data_size,
        } = cmd;
        let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
        let inline_data = data_size <= self.options.inline.max_data_inlined;
        let inline_outboard =
            outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
        let tag = self.temp.temp_tag(content_id);
        let hash = *tag.hash();
        self.protected.insert(hash);
        let data_location = match file {
            ImportSource::External(external_path) => {
                tracing::debug!("stored external reference {}", external_path.display());
                if inline_data {
                    tracing::debug!(
                        "reading external data to inline it: {}",
                        external_path.display()
                    );
                    let data = Bytes::from(std::fs::read(&external_path)?);
                    DataLocation::Inline(data)
                } else {
                    DataLocation::External(vec![external_path], data_size)
                }
            }
            ImportSource::TempFile(temp_data_path) => {
                if inline_data {
                    tracing::debug!(
                        "reading and deleting temp file to inline it: {}",
                        temp_data_path.display()
                    );
                    let data = Bytes::from(read_and_remove(&temp_data_path)?);
                    DataLocation::Inline(data)
                } else {
                    let data_path = self.options.path.owned_data_path(&hash);
                    std::fs::rename(&temp_data_path, &data_path)?;
                    tracing::debug!("created file {}", data_path.display());
                    DataLocation::Owned(data_size)
                }
            }
            ImportSource::Memory(data) => {
                if inline_data {
                    DataLocation::Inline(data)
                } else {
                    let data_path = self.options.path.owned_data_path(&hash);
                    overwrite_and_sync(&data_path, &data)?;
                    tracing::debug!("created file {}", data_path.display());
                    DataLocation::Owned(data_size)
                }
            }
        };
        let outboard_location = if let Some(outboard) = outboard {
            if inline_outboard {
                OutboardLocation::Inline(Bytes::from(outboard))
            } else {
                let outboard_path = self.options.path.owned_outboard_path(&hash);
                overwrite_and_sync(&outboard_path, &outboard)?;
                OutboardLocation::Owned
            }
        } else {
            OutboardLocation::NotNeeded
        };
        if let DataLocation::Inline(data) = &data_location {
            tables.inline_data.insert(hash, data.as_ref())?;
        }
        if let OutboardLocation::Inline(outboard) = &outboard_location {
            tables.inline_outboard.insert(hash, outboard.as_ref())?;
        }
        if let DataLocation::Owned(_) = &data_location {
            tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
        }
        if let OutboardLocation::Owned = &outboard_location {
            tables
                .delete_after_commit
                .remove(hash, [BaoFilePart::Outboard]);
        }
        let entry = tables.blobs.get(hash)?;
        let entry = entry.map(|x| x.value()).unwrap_or_default();
        let data_location = data_location.discard_inline_data();
        let outboard_location = outboard_location.discard_extra_data();
        let entry = entry.union(EntryState::Complete {
            data_location,
            outboard_location,
        })?;
        tables.blobs.insert(hash, entry)?;
        Ok((tag, data_size))
    }
    fn get_or_create(
        &mut self,
        tables: &impl ReadableTables,
        hash: Hash,
    ) -> ActorResult<BaoFileHandle> {
        self.protected.insert(hash);
        if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
            return Ok(handle);
        }
        let entry = tables.blobs().get(hash)?;
        let handle = if let Some(entry) = entry {
            let entry = entry.value();
            match entry {
                EntryState::Complete {
                    data_location,
                    outboard_location,
                    ..
                } => {
                    let data = load_data(tables, &self.options.path, data_location, &hash)?;
                    let outboard = load_outboard(
                        tables,
                        &self.options.path,
                        outboard_location,
                        data.size(),
                        &hash,
                    )?;
                    println!("creating complete entry for {}", hash.to_hex());
                    BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
                }
                EntryState::Partial { .. } => {
                    println!("creating partial entry for {}", hash.to_hex());
                    BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
                }
            }
        } else {
            BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
        };
        self.handles.insert(hash, handle.downgrade());
        Ok(handle)
    }
    fn blobs(
        &mut self,
        tables: &impl ReadableTables,
        filter: FilterPredicate<Hash, EntryState>,
    ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
        let mut res = Vec::new();
        let mut index = 0u64;
        #[allow(clippy::explicit_counter_loop)]
        for item in tables.blobs().iter()? {
            match item {
                Ok((k, v)) => {
                    if let Some(item) = filter(index, k, v) {
                        res.push(Ok(item));
                    }
                }
                Err(e) => {
                    res.push(Err(e));
                }
            }
            index += 1;
        }
        Ok(res)
    }
    fn tags(
        &mut self,
        tables: &impl ReadableTables,
        filter: FilterPredicate<Tag, HashAndFormat>,
    ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
        let mut res = Vec::new();
        let mut index = 0u64;
        #[allow(clippy::explicit_counter_loop)]
        for item in tables.tags().iter()? {
            match item {
                Ok((k, v)) => {
                    if let Some(item) = filter(index, k, v) {
                        res.push(Ok(item));
                    }
                }
                Err(e) => {
                    res.push(Err(e));
                }
            }
            index += 1;
        }
        Ok(res)
    }
    fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
        let tag = {
            let tag = Tag::auto(SystemTime::now(), |x| {
                matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
            });
            tables.tags.insert(tag.clone(), content)?;
            tag
        };
        Ok(tag)
    }
    fn set_tag(
        &self,
        tables: &mut Tables,
        tag: Tag,
        value: Option<HashAndFormat>,
    ) -> ActorResult<()> {
        match value {
            Some(value) => {
                tables.tags.insert(tag, value)?;
            }
            None => {
                tables.tags.remove(tag)?;
            }
        }
        Ok(())
    }
    fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
        let entry = tables
            .blobs
            .get(hash)?
            .map(|x| x.value())
            .unwrap_or_default();
        let entry = entry.union(EntryState::Partial { size: None })?;
        tables.blobs.insert(hash, entry)?;
        tables.delete_after_commit.remove(
            hash,
            [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
        );
        Ok(())
    }
    fn update_inline_options(
        &mut self,
        db: &redb::Database,
        options: InlineOptions,
        reapply: bool,
    ) -> ActorResult<()> {
        self.options.inline = options;
        if reapply {
            let mut delete_after_commit = Default::default();
            let tx = db.begin_write()?;
            {
                let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
                let hashes = tables
                    .blobs
                    .iter()?
                    .map(|x| x.map(|(k, _)| k.value()))
                    .collect::<Result<Vec<_>, _>>()?;
                for hash in hashes {
                    let guard = tables
                        .blobs
                        .get(hash)?
                        .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
                    let entry = guard.value();
                    if let EntryState::Complete {
                        data_location,
                        outboard_location,
                    } = entry
                    {
                        let (data_location, data_size, data_location_changed) = match data_location
                        {
                            DataLocation::Owned(size) => {
                                if size <= self.options.inline.max_data_inlined {
                                    let path = self.options.path.owned_data_path(&hash);
                                    let data = std::fs::read(&path)?;
                                    tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
                                    tables.inline_data.insert(hash, data.as_slice())?;
                                    (DataLocation::Inline(()), size, true)
                                } else {
                                    (DataLocation::Owned(size), size, false)
                                }
                            }
                            DataLocation::Inline(()) => {
                                let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
                                    ActorError::Inconsistent("inline data missing".to_owned())
                                })?;
                                let data = guard.value();
                                let size = data.len() as u64;
                                if size > self.options.inline.max_data_inlined {
                                    let path = self.options.path.owned_data_path(&hash);
                                    std::fs::write(&path, data)?;
                                    drop(guard);
                                    tables.inline_data.remove(hash)?;
                                    (DataLocation::Owned(size), size, true)
                                } else {
                                    (DataLocation::Inline(()), size, false)
                                }
                            }
                            DataLocation::External(paths, size) => {
                                (DataLocation::External(paths, size), size, false)
                            }
                        };
                        let outboard_size = raw_outboard_size(data_size);
                        let (outboard_location, outboard_location_changed) = match outboard_location
                        {
                            OutboardLocation::Owned
                                if outboard_size <= self.options.inline.max_outboard_inlined =>
                            {
                                let path = self.options.path.owned_outboard_path(&hash);
                                let outboard = std::fs::read(&path)?;
                                tables
                                    .delete_after_commit
                                    .insert(hash, [BaoFilePart::Outboard]);
                                tables.inline_outboard.insert(hash, outboard.as_slice())?;
                                (OutboardLocation::Inline(()), true)
                            }
                            OutboardLocation::Inline(())
                                if outboard_size > self.options.inline.max_outboard_inlined =>
                            {
                                let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
                                    ActorError::Inconsistent("inline outboard missing".to_owned())
                                })?;
                                let outboard = guard.value();
                                let path = self.options.path.owned_outboard_path(&hash);
                                std::fs::write(&path, outboard)?;
                                drop(guard);
                                tables.inline_outboard.remove(hash)?;
                                (OutboardLocation::Owned, true)
                            }
                            x => (x, false),
                        };
                        drop(guard);
                        if data_location_changed || outboard_location_changed {
                            tables.blobs.insert(
                                hash,
                                EntryState::Complete {
                                    data_location,
                                    outboard_location,
                                },
                            )?;
                        }
                    }
                }
            }
            tx.commit()?;
            delete_after_commit.apply_and_clear(&self.options.path);
        }
        Ok(())
    }
    fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>) -> ActorResult<()> {
        for hash in hashes {
            if self.temp.as_ref().read().unwrap().contains(&hash) {
                continue;
            }
            if self.protected.contains(&hash) {
                tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
                continue;
            }
            tracing::debug!("deleting {}", &hash.to_hex()[..8]);
            self.handles.remove(&hash);
            if let Some(entry) = tables.blobs.remove(hash)? {
                match entry.value() {
                    EntryState::Complete {
                        data_location,
                        outboard_location,
                    } => {
                        match data_location {
                            DataLocation::Inline(_) => {
                                tables.inline_data.remove(hash)?;
                            }
                            DataLocation::Owned(_) => {
                                tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
                            }
                            DataLocation::External(_, _) => {}
                        }
                        match outboard_location {
                            OutboardLocation::Inline(_) => {
                                tables.inline_outboard.remove(hash)?;
                            }
                            OutboardLocation::Owned => {
                                tables
                                    .delete_after_commit
                                    .insert(hash, [BaoFilePart::Outboard]);
                            }
                            OutboardLocation::NotNeeded => {}
                        }
                    }
                    EntryState::Partial { .. } => {
                        tables.delete_after_commit.insert(
                            hash,
                            [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
                        );
                    }
                }
            }
        }
        Ok(())
    }
    fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
        let hash = entry.hash();
        let mut info = None;
        tracing::trace!("on_complete({})", hash.to_hex());
        entry.transform(|state| {
            tracing::trace!("on_complete transform {:?}", state);
            let entry = match complete_storage(
                state,
                &hash,
                &self.options.path,
                &self.options.inline,
                tables.delete_after_commit,
            )? {
                Ok(entry) => {
                    info = Some((
                        entry.data_size(),
                        entry.data.mem().cloned(),
                        entry.outboard_size(),
                        entry.outboard.mem().cloned(),
                    ));
                    entry
                }
                Err(entry) => {
                    entry
                }
            };
            Ok(BaoFileStorage::Complete(entry))
        })?;
        if let Some((data_size, data, outboard_size, outboard)) = info {
            let data_location = if data.is_some() {
                DataLocation::Inline(())
            } else {
                DataLocation::Owned(data_size)
            };
            let outboard_location = if outboard_size == 0 {
                OutboardLocation::NotNeeded
            } else if outboard.is_some() {
                OutboardLocation::Inline(())
            } else {
                OutboardLocation::Owned
            };
            {
                tracing::debug!(
                    "inserting complete entry for {}, {} bytes",
                    hash.to_hex(),
                    data_size,
                );
                let entry = tables
                    .blobs()
                    .get(hash)?
                    .map(|x| x.value())
                    .unwrap_or_default();
                let entry = entry.union(EntryState::Complete {
                    data_location,
                    outboard_location,
                })?;
                tables.blobs.insert(hash, entry)?;
                if let Some(data) = data {
                    tables.inline_data.insert(hash, data.as_ref())?;
                }
                if let Some(outboard) = outboard {
                    tables.inline_outboard.insert(hash, outboard.as_ref())?;
                }
            }
        }
        Ok(())
    }
    fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
        match msg {
            ActorMessage::ImportFlatStore { paths, tx } => {
                let res = self.import_flat_store(db, paths);
                tx.send(res?).ok();
            }
            ActorMessage::UpdateInlineOptions {
                inline_options,
                reapply,
                tx,
            } => {
                let res = self.update_inline_options(db, inline_options, reapply);
                tx.send(res?).ok();
            }
            ActorMessage::Fsck {
                repair,
                progress,
                tx,
            } => {
                let res = self.consistency_check(db, repair, progress);
                tx.send(res).ok();
            }
            ActorMessage::Sync { tx } => {
                tx.send(()).ok();
            }
            x => {
                return Err(ActorError::Inconsistent(format!(
                    "unexpected message for handle_toplevel: {:?}",
                    x
                )))
            }
        }
        Ok(())
    }
    fn handle_readonly(
        &mut self,
        tables: &impl ReadableTables,
        msg: ActorMessage,
    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
        match msg {
            ActorMessage::Get { hash, tx } => {
                let res = self.get(tables, hash);
                tx.send(res).ok();
            }
            ActorMessage::GetOrCreate { hash, tx } => {
                let res = self.get_or_create(tables, hash);
                tx.send(res).ok();
            }
            ActorMessage::EntryStatus { hash, tx } => {
                let res = self.entry_status(tables, hash);
                tx.send(res).ok();
            }
            ActorMessage::Blobs { filter, tx } => {
                let res = self.blobs(tables, filter);
                tx.send(res).ok();
            }
            ActorMessage::Tags { filter, tx } => {
                let res = self.tags(tables, filter);
                tx.send(res).ok();
            }
            ActorMessage::GcStart { tx } => {
                self.protected.clear();
                self.handles.retain(|_, weak| weak.is_live());
                tx.send(()).ok();
            }
            ActorMessage::Dump => {
                dump(tables).ok();
            }
            #[cfg(test)]
            ActorMessage::EntryState { hash, tx } => {
                tx.send(self.entry_state(tables, hash)).ok();
            }
            ActorMessage::GetFullEntryState { hash, tx } => {
                let res = self.get_full_entry_state(tables, hash);
                tx.send(res).ok();
            }
            x => return Ok(Err(x)),
        }
        Ok(Ok(()))
    }
    fn handle_readwrite(
        &mut self,
        tables: &mut Tables,
        msg: ActorMessage,
    ) -> ActorResult<std::result::Result<(), ActorMessage>> {
        match msg {
            ActorMessage::Import { cmd, tx } => {
                let res = self.import(tables, cmd);
                tx.send(res).ok();
            }
            ActorMessage::SetTag { tag, value, tx } => {
                let res = self.set_tag(tables, tag, value);
                tx.send(res).ok();
            }
            ActorMessage::CreateTag { hash, tx } => {
                let res = self.create_tag(tables, hash);
                tx.send(res).ok();
            }
            ActorMessage::Delete { hashes, tx } => {
                let res = self.delete(tables, hashes);
                tx.send(res).ok();
            }
            ActorMessage::OnComplete { handle } => {
                let res = self.on_complete(tables, handle);
                res.ok();
            }
            ActorMessage::Export { cmd, tx } => {
                self.export(tables, cmd, tx)?;
            }
            ActorMessage::OnMemSizeExceeded { hash } => {
                let res = self.on_mem_size_exceeded(tables, hash);
                res.ok();
            }
            ActorMessage::Dump => {
                let res = dump(tables);
                res.ok();
            }
            ActorMessage::SetFullEntryState { hash, entry, tx } => {
                let res = self.set_full_entry_state(tables, hash, entry);
                tx.send(res).ok();
            }
            msg => {
                if let Err(msg) = self.handle_readonly(tables, msg)? {
                    return Ok(Err(msg));
                }
            }
        }
        Ok(Ok(()))
    }
}
fn export_file_copy(
    temp_tag: TempTag,
    path: PathBuf,
    size: u64,
    target: PathBuf,
    progress: ExportProgressCb,
) -> ActorResult<()> {
    progress(0)?;
    reflink_copy::reflink_or_copy(path, target)?;
    progress(size)?;
    drop(temp_tag);
    Ok(())
}
fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
    for e in tables.blobs().iter()? {
        let (k, v) = e?;
        let k = k.value();
        let v = v.value();
        println!("blobs: {} -> {:?}", k.to_hex(), v);
    }
    for e in tables.tags().iter()? {
        let (k, v) = e?;
        let k = k.value();
        let v = v.value();
        println!("tags: {} -> {:?}", k, v);
    }
    for e in tables.inline_data().iter()? {
        let (k, v) = e?;
        let k = k.value();
        let v = v.value();
        println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
    }
    for e in tables.inline_outboard().iter()? {
        let (k, v) = e?;
        let k = k.value();
        let v = v.value();
        println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
    }
    Ok(())
}
fn load_data(
    tables: &impl ReadableTables,
    options: &PathOptions,
    location: DataLocation<(), u64>,
    hash: &Hash,
) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
    Ok(match location {
        DataLocation::Inline(()) => {
            let Some(data) = tables.inline_data().get(hash)? else {
                return Err(ActorError::Inconsistent(format!(
                    "inconsistent database state: {} should have inline data but does not",
                    hash.to_hex()
                )));
            };
            MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
        }
        DataLocation::Owned(data_size) => {
            let path = options.owned_data_path(hash);
            let Ok(file) = std::fs::File::open(&path) else {
                return Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("file not found: {}", path.display()),
                )
                .into());
            };
            MemOrFile::File((file, data_size))
        }
        DataLocation::External(paths, data_size) => {
            if paths.is_empty() {
                return Err(ActorError::Inconsistent(
                    "external data location must not be empty".into(),
                ));
            }
            let path = &paths[0];
            let Ok(file) = std::fs::File::open(path) else {
                return Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("external file not found: {}", path.display()),
                )
                .into());
            };
            MemOrFile::File((file, data_size))
        }
    })
}
fn load_outboard(
    tables: &impl ReadableTables,
    options: &PathOptions,
    location: OutboardLocation,
    size: u64,
    hash: &Hash,
) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
    Ok(match location {
        OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
        OutboardLocation::Inline(_) => {
            let Some(outboard) = tables.inline_outboard().get(hash)? else {
                return Err(ActorError::Inconsistent(format!(
                    "inconsistent database state: {} should have inline outboard but does not",
                    hash.to_hex()
                )));
            };
            MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
        }
        OutboardLocation::Owned => {
            let outboard_size = raw_outboard_size(size);
            let path = options.owned_outboard_path(hash);
            let Ok(file) = std::fs::File::open(&path) else {
                return Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("file not found: {} size={}", path.display(), outboard_size),
                )
                .into());
            };
            MemOrFile::File((file, outboard_size))
        }
    })
}
fn complete_storage(
    storage: BaoFileStorage,
    hash: &Hash,
    path_options: &PathOptions,
    inline_options: &InlineOptions,
    delete_after_commit: &mut DeleteSet,
) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
    let (data, outboard, _sizes) = match storage {
        BaoFileStorage::Complete(c) => return Ok(Err(c)),
        BaoFileStorage::IncompleteMem(storage) => {
            let (data, outboard, sizes) = storage.into_parts();
            (
                MemOrFile::Mem(Bytes::from(data.into_parts().0)),
                MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
                MemOrFile::Mem(Bytes::from(sizes.to_vec())),
            )
        }
        BaoFileStorage::IncompleteFile(storage) => {
            let (data, outboard, sizes) = storage.into_parts();
            (
                MemOrFile::File(data),
                MemOrFile::File(outboard),
                MemOrFile::File(sizes),
            )
        }
    };
    let data_size = data.size()?.unwrap();
    let outboard_size = outboard.size()?.unwrap();
    debug_assert!(raw_outboard_size(data_size) == outboard_size);
    let data = if data_size <= inline_options.max_data_inlined {
        match data {
            MemOrFile::File(data) => {
                let mut buf = vec![0; data_size as usize];
                data.read_at(0, &mut buf)?;
                delete_after_commit.insert(*hash, [BaoFilePart::Data]);
                MemOrFile::Mem(Bytes::from(buf))
            }
            MemOrFile::Mem(data) => MemOrFile::Mem(data),
        }
    } else {
        delete_after_commit.remove(*hash, [BaoFilePart::Data]);
        match data {
            MemOrFile::Mem(data) => {
                let path = path_options.owned_data_path(hash);
                let file = overwrite_and_sync(&path, &data)?;
                MemOrFile::File((file, data_size))
            }
            MemOrFile::File(data) => MemOrFile::File((data, data_size)),
        }
    };
    let outboard = if outboard_size == 0 {
        Default::default()
    } else if outboard_size <= inline_options.max_outboard_inlined {
        match outboard {
            MemOrFile::File(outboard) => {
                let mut buf = vec![0; outboard_size as usize];
                outboard.read_at(0, &mut buf)?;
                drop(outboard);
                delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
                MemOrFile::Mem(Bytes::from(buf))
            }
            MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
        }
    } else {
        delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
        match outboard {
            MemOrFile::Mem(outboard) => {
                let path = path_options.owned_outboard_path(hash);
                let file = overwrite_and_sync(&path, &outboard)?;
                MemOrFile::File((file, outboard_size))
            }
            MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
        }
    };
    delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
    Ok(Ok(CompleteStorage { data, outboard }))
}