liteboxfs 0.1.0

A modern POSIX filesystem in a SQLite database
Documentation
use std::time::{Duration, Instant};

use rusqlite::DropBehavior;

use crate::{Connection, Filesystem, RootId, sql::SqlStore};

/// Idle threshold after which we commit a dirty long-lived transaction.
pub(super) const DEFAULT_IDLE_THRESHOLD: Duration = Duration::from_secs(1);

/// Hard ceiling on how long a single transaction may stay open while continuously dirty. This caps
/// the maximum data-loss window on crash.
pub(super) const DEFAULT_AGE_CEILING: Duration = Duration::from_secs(30);

/// Per-transaction state tracked alongside the connection.
#[derive(Debug)]
struct TxState {
    began_at: Instant,
    last_activity: Instant,
    /// The total number of rows changed at the moment the transaction began.
    ///
    /// This is used to determine whether any changes have been made in the current transaction by
    /// comparing it to the current total changes reported by the connection.
    changes_at_begin: u64,
}

impl TxState {
    fn new(changes_at_begin: u64) -> Self {
        let now = Instant::now();

        Self {
            began_at: now,
            last_activity: now,
            changes_at_begin,
        }
    }
}

/// A long-lived SQLite transaction owned by the FUSE adapter.
///
/// Holds the [`Connection`] across many FUSE method calls and manages the transaction lifecycle at
/// the SQL level  via manual `BEGIN` / `COMMIT` statements. This allows us to have long-lived
/// transactions with more control over when we commit. Otherwise, we would need to commit on every
/// FUSE operation, which would be a performance nightmare.
#[derive(Debug)]
pub(super) struct ConnTx {
    conn: Connection,
    /// This is `None` between transactions (after a commit, before the next `with_fs` re-opens
    /// one).
    tx: Option<TxState>,
    root_id: RootId,
}

impl ConnTx {
    pub fn new(conn: Connection, root_id: RootId) -> Self {
        Self {
            conn,
            tx: None,
            root_id,
        }
    }

    fn ensure_tx_started(&mut self) -> crate::Result<()> {
        if self.tx.is_none() {
            self.conn.execute_batch("BEGIN DEFERRED")?;
            self.tx = Some(TxState::new(self.conn.total_changes()));
        }
        Ok(())
    }

    /// Call `f`, passing it a [`Filesystem`] from the held transaction.
    ///
    /// The filesystem's underlying savepoint commits on drop at the end of the closure. The
    /// current root is set before `f` is called. If `f` returns an error, the savepoint rolls
    /// back, but the held transaction itself stays open.
    pub fn with_fs<R, E, F>(&mut self, f: F) -> Result<R, E>
    where
        F: FnOnce(&mut Filesystem) -> Result<R, E>,
        E: From<crate::Error>,
    {
        self.ensure_tx_started().map_err(E::from)?;

        self.tx
            .as_mut()
            .expect("Transaction state should be present at this point.")
            .last_activity = Instant::now();

        let settings = self.conn.settings().clone();
        let default_root_id = self.conn.default_root_id();
        let root_id = self.root_id;

        let mut savepoint = self.conn.raw_savepoint().map_err(E::from)?;
        savepoint.set_drop_behavior(DropBehavior::Commit);

        let store = SqlStore::new(savepoint, settings.clone());
        let mut fs = Filesystem::new(default_root_id, settings, store);
        fs.switch_root(root_id).map_err(E::from)?;

        f(&mut fs)
    }

    /// Whether the held transaction has accumulated any changes.
    pub fn is_dirty(&self) -> bool {
        self.tx
            .as_ref()
            .is_some_and(|t| self.conn.total_changes() != t.changes_at_begin)
    }

    /// How long ago this transaction last saw activity, or `None` if no transaction is held.
    pub fn idle_for(&self) -> Option<Duration> {
        self.tx.as_ref().map(|t| t.last_activity.elapsed())
    }

    /// How long ago this transaction began, or `None` if no transaction is held.
    pub fn age(&self) -> Option<Duration> {
        self.tx.as_ref().map(|t| t.began_at.elapsed())
    }

    /// Commit the held transaction if dirty.
    ///
    /// No-op if nothing has been written or no transaction is held. Clean transactions are left
    /// open, since `BEGIN DEFERRED` followed by no statements holds no SQLite lock.
    pub fn commit_if_dirty(&mut self) -> crate::Result<()> {
        if self.is_dirty() {
            self.commit_unconditionally()
        } else {
            Ok(())
        }
    }

    /// Commit unconditionally and end the transaction. No-op if no transaction is held.
    pub fn commit_unconditionally(&mut self) -> crate::Result<()> {
        if self.tx.is_some() {
            if self.is_dirty() {
                self.conn.clean_orphan_blocks()?;
            }
            self.conn.execute_batch("COMMIT")?;
            self.tx = None;
        }

        Ok(())
    }

    /// Commit if the transaction has been idle for at least `threshold`.
    fn commit_if_idle(&mut self, threshold: Duration) -> crate::Result<()> {
        if self.idle_for().is_some_and(|idle| idle >= threshold) {
            self.commit_if_dirty()
        } else {
            Ok(())
        }
    }

    /// Commit if the transaction has been open for at least `threshold`.
    fn commit_if_aged(&mut self, threshold: Duration) -> crate::Result<()> {
        if self.age().is_some_and(|age| age >= threshold) {
            self.commit_if_dirty()
        } else {
            Ok(())
        }
    }

    /// Apply the default time-based commit triggers.
    ///
    /// This ensures long-lived dirty transactions do not exceed the configured idle / age windows.
    pub fn apply_default_commit_triggers(&mut self) -> crate::Result<()> {
        self.commit_if_idle(DEFAULT_IDLE_THRESHOLD)?;
        self.commit_if_aged(DEFAULT_AGE_CEILING)?;

        Ok(())
    }
}

impl Drop for ConnTx {
    fn drop(&mut self) {
        // Attempt to commit changes on drop. If this fails, any pending uncommitted changes are
        // lost. Operations that should be durable should call `commit_unconditionally` explicitly.
        if self.tx.is_some() {
            self.commit_unconditionally().ok();
        }
    }
}