lucisearch 0.8.0

Embeddable, in-process search engine — the SQLite/DuckDB of Elasticsearch
Documentation
//! Five-state file locking protocol for cross-process coordination.
//!
//! Implements a SQLite-inspired lock progression:
//! UNLOCKED → SHARED → RESERVED → PENDING → EXCLUSIVE
//!
//! Uses `fcntl(2)` byte-range locks on specific offsets within the
//! Luci file header (bytes 49–560, currently unused).
//!
//! See [[architecture-cross-process-locking]].

use std::fs::File;
use std::os::unix::io::AsRawFd;

use crate::core::{LuciError, Result};

/// Byte offsets within the Luci file header used for locking.
/// These fall in the reserved/unused region (bytes 49–4095).
const PENDING_BYTE: u64 = 49;
const RESERVED_BYTE: u64 = 50;
const SHARED_FIRST: u64 = 51;
const SHARED_SIZE: u64 = 510;

/// Lock level for the five-state protocol.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum LockLevel {
    Unlocked = 0,
    Shared = 1,
    Reserved = 2,
    Pending = 3,
    Exclusive = 4,
}

/// File lock state machine. Wraps a raw fd and tracks the current lock level.
///
/// All transitions follow the legal state progression:
/// - UNLOCKED → SHARED (only valid transition from UNLOCKED)
/// - SHARED → RESERVED
/// - RESERVED → EXCLUSIVE (via PENDING internally)
/// - EXCLUSIVE/RESERVED → SHARED (downgrade)
/// - SHARED → UNLOCKED
pub struct FileLock {
    fd: i32,
    level: LockLevel,
}

impl FileLock {
    /// Create a new lock state for the given file. Starts UNLOCKED.
    pub fn new(file: &File) -> Self {
        Self {
            fd: file.as_raw_fd(),
            level: LockLevel::Unlocked,
        }
    }

    /// Current lock level.
    pub fn level(&self) -> LockLevel {
        self.level
    }

    /// Acquire SHARED lock (for readers and initial open).
    ///
    /// Multiple processes can hold SHARED simultaneously.
    /// Blocks if another process holds PENDING or EXCLUSIVE.
    pub fn lock_shared(&mut self) -> Result<()> {
        assert_eq!(
            self.level,
            LockLevel::Unlocked,
            "lock_shared requires UNLOCKED state"
        );

        // Step 1: Acquire read-lock on PENDING_BYTE (gate check).
        // If a writer holds PENDING (write-lock on this byte), we block.
        fcntl_lock(self.fd, libc::F_RDLCK, PENDING_BYTE, 1)?;

        // Step 2: Acquire read-lock on the shared range.
        let result = fcntl_lock(self.fd, libc::F_RDLCK, SHARED_FIRST, SHARED_SIZE);

        // Step 3: Release the temporary PENDING_BYTE lock.
        let _ = fcntl_lock(self.fd, libc::F_UNLCK, PENDING_BYTE, 1);

        result?;
        self.level = LockLevel::Shared;
        Ok(())
    }

    /// Acquire RESERVED lock (write intent — one at a time).
    ///
    /// Signals that this process intends to write. Readers continue
    /// unimpeded. Only one RESERVED lock can exist at a time.
    ///
    /// Retries with exponential backoff until the timeout expires,
    /// then returns `WriterLocked`.
    pub fn lock_reserved(&mut self, timeout: std::time::Duration) -> Result<()> {
        assert_eq!(
            self.level,
            LockLevel::Shared,
            "lock_reserved requires SHARED state"
        );

        let deadline = std::time::Instant::now() + timeout;
        let mut backoff = std::time::Duration::from_millis(1);
        let max_backoff = std::time::Duration::from_millis(100);

        loop {
            match fcntl_try_lock(self.fd, libc::F_WRLCK, RESERVED_BYTE, 1) {
                Ok(()) => break,
                Err(LuciError::WriterLocked) => {
                    if std::time::Instant::now() >= deadline {
                        return Err(LuciError::WriterLocked);
                    }
                    std::thread::sleep(backoff);
                    backoff = (backoff * 2).min(max_backoff);
                }
                Err(e) => return Err(e),
            }
        }

        self.level = LockLevel::Reserved;
        Ok(())
    }

    /// Escalate from RESERVED to EXCLUSIVE.
    ///
    /// First acquires PENDING (blocks new readers), then waits for
    /// existing readers to drain, then acquires EXCLUSIVE.
    ///
    /// This is a blocking operation — it waits until all existing
    /// SHARED locks are released.
    pub fn lock_exclusive(&mut self) -> Result<()> {
        assert!(
            self.level == LockLevel::Reserved || self.level == LockLevel::Pending,
            "lock_exclusive requires RESERVED or PENDING state"
        );

        if self.level == LockLevel::Reserved {
            // Step 1: Acquire write-lock on PENDING_BYTE.
            // This blocks new SHARED acquisitions (their step 1 read-lock
            // on PENDING_BYTE will conflict with our write-lock).
            fcntl_lock(self.fd, libc::F_WRLCK, PENDING_BYTE, 1)?;
            self.level = LockLevel::Pending;
        }

        // Step 2: Acquire write-lock on the shared range (blocking).
        // This waits until all existing SHARED holders release their
        // read-locks on this range.
        fcntl_lock(self.fd, libc::F_WRLCK, SHARED_FIRST, SHARED_SIZE)?;
        self.level = LockLevel::Exclusive;
        Ok(())
    }

    /// Downgrade from EXCLUSIVE or RESERVED to SHARED.
    pub fn downgrade_to_shared(&mut self) -> Result<()> {
        assert!(
            self.level >= LockLevel::Reserved,
            "downgrade_to_shared requires RESERVED or higher"
        );

        // Replace write-lock with read-lock on the shared range.
        fcntl_lock(self.fd, libc::F_RDLCK, SHARED_FIRST, SHARED_SIZE)?;

        // Release PENDING_BYTE and RESERVED_BYTE (2 consecutive bytes).
        fcntl_lock(self.fd, libc::F_UNLCK, PENDING_BYTE, 2)?;

        self.level = LockLevel::Shared;
        Ok(())
    }

    /// Release all locks (SHARED → UNLOCKED).
    pub fn unlock(&mut self) -> Result<()> {
        if self.level == LockLevel::Unlocked {
            return Ok(());
        }

        // Release everything: offset 0, length 0 means "all locks".
        // This is safe because we use a dedicated fd for locking.
        fcntl_lock(self.fd, libc::F_UNLCK, 0, 0)?;
        self.level = LockLevel::Unlocked;
        Ok(())
    }
}

impl Drop for FileLock {
    fn drop(&mut self) {
        let _ = self.unlock();
    }
}

/// Blocking fcntl lock operation.
fn fcntl_lock(fd: i32, lock_type: i16, start: u64, len: u64) -> Result<()> {
    let fl = libc::flock {
        l_type: lock_type,
        l_whence: libc::SEEK_SET as i16,
        l_start: start as i64,
        l_len: len as i64,
        l_pid: 0,
    };
    let ret = unsafe { libc::fcntl(fd, libc::F_SETLKW, &fl) };
    if ret == -1 {
        let err = std::io::Error::last_os_error();
        return Err(LuciError::Io(err));
    }
    Ok(())
}

/// Non-blocking fcntl lock attempt. Returns `WriterLocked` on conflict.
fn fcntl_try_lock(fd: i32, lock_type: i16, start: u64, len: u64) -> Result<()> {
    let fl = libc::flock {
        l_type: lock_type,
        l_whence: libc::SEEK_SET as i16,
        l_start: start as i64,
        l_len: len as i64,
        l_pid: 0,
    };
    let ret = unsafe { libc::fcntl(fd, libc::F_SETLK, &fl) };
    if ret == -1 {
        let err = std::io::Error::last_os_error();
        if err.raw_os_error() == Some(libc::EAGAIN) || err.raw_os_error() == Some(libc::EACCES) {
            return Err(LuciError::WriterLocked);
        }
        return Err(LuciError::Io(err));
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs::OpenOptions;

    fn test_file(name: &str) -> (std::path::PathBuf, File) {
        let path =
            std::env::temp_dir().join(format!("luci_lock_test_{}_{name}", std::process::id()));
        let _ = std::fs::remove_file(&path);
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(true)
            .open(&path)
            .unwrap();
        // Ensure the file is large enough for lock byte ranges
        file.set_len(4096).unwrap();
        (path, file)
    }

    #[test]
    fn shared_lock_roundtrip() {
        let (path, file) = test_file("shared");
        let mut lock = FileLock::new(&file);
        assert_eq!(lock.level(), LockLevel::Unlocked);

        lock.lock_shared().unwrap();
        assert_eq!(lock.level(), LockLevel::Shared);

        lock.unlock().unwrap();
        assert_eq!(lock.level(), LockLevel::Unlocked);

        std::fs::remove_file(path).ok();
    }

    #[test]
    fn full_escalation_and_downgrade() {
        let (path, file) = test_file("escalation");
        let mut lock = FileLock::new(&file);

        lock.lock_shared().unwrap();
        lock.lock_reserved(std::time::Duration::from_secs(5))
            .unwrap();
        assert_eq!(lock.level(), LockLevel::Reserved);

        lock.lock_exclusive().unwrap();
        assert_eq!(lock.level(), LockLevel::Exclusive);

        lock.downgrade_to_shared().unwrap();
        assert_eq!(lock.level(), LockLevel::Shared);

        lock.unlock().unwrap();
        std::fs::remove_file(path).ok();
    }

    #[test]
    fn same_process_fds_share_locks() {
        // fcntl locks are per-(process, inode), so two fds in the
        // same process share lock state. This is expected — within-process
        // serialization is handled by the Mutex, not file locks.
        let (path, file1) = test_file("same_process");
        let file2 = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&path)
            .unwrap();

        let mut lock1 = FileLock::new(&file1);
        let mut lock2 = FileLock::new(&file2);

        lock1.lock_shared().unwrap();
        lock1
            .lock_reserved(std::time::Duration::from_secs(5))
            .unwrap();

        // Same process: second fd succeeds (shared lock state)
        lock2.lock_shared().unwrap();
        lock2
            .lock_reserved(std::time::Duration::from_secs(5))
            .unwrap();

        lock1.downgrade_to_shared().unwrap();
        lock1.unlock().unwrap();
        lock2.downgrade_to_shared().unwrap();
        lock2.unlock().unwrap();
        std::fs::remove_file(path).ok();
    }

    #[test]
    fn drop_releases_locks() {
        let (path, file1) = test_file("drop_release");
        let file2 = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&path)
            .unwrap();

        {
            let mut lock1 = FileLock::new(&file1);
            lock1.lock_shared().unwrap();
            lock1
                .lock_reserved(std::time::Duration::from_secs(5))
                .unwrap();
            // lock1 dropped here — should release all locks
        }

        // Second lock should now succeed
        let mut lock2 = FileLock::new(&file2);
        lock2.lock_shared().unwrap();
        lock2
            .lock_reserved(std::time::Duration::from_secs(5))
            .unwrap();
        lock2.downgrade_to_shared().unwrap();
        lock2.unlock().unwrap();
        std::fs::remove_file(path).ok();
    }
}