coordinode-lsm-tree 4.4.0

A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs) — CoordiNode fork
Documentation
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{
    Slice,
    fs::{Fs, FsFile},
};
use std::{io::Write, path::Path};

pub const MAGIC_BYTES: [u8; 4] = [b'L', b'S', b'M', 3];

pub const TABLES_FOLDER: &str = "tables";
pub const BLOBS_FOLDER: &str = "blobs";
pub const CURRENT_VERSION_FILE: &str = "current";

/// Reads bytes from a file at the given offset without changing the cursor.
///
/// Uses [`FsFile::read_at`] (equivalent to `pread(2)`) so multiple threads
/// can call this concurrently on the same file handle.
pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Result<Slice> {
    // SAFETY: This slice builder starts uninitialized, but we know its length
    //
    // We use FsFile::read_at which gives us the number of bytes read.
    // If that number does not match the slice length, the function errors,
    // so the (partially) uninitialized buffer is discarded.
    //
    // Additionally, generally, block loads furthermore do a checksum check which
    // would likely catch the buffer being wrong somehow.
    #[expect(unsafe_code, reason = "see safety")]
    let mut builder = unsafe { Slice::builder_unzeroed(size) };

    // Single call is correct: FsFile::read_at has fill-or-EOF semantics —
    // implementations handle EINTR/short-read retry internally.
    let bytes_read = file.read_at(&mut builder, offset)?;

    if bytes_read != size {
        return Err(std::io::Error::new(
            std::io::ErrorKind::UnexpectedEof,
            format!(
                "read_exact({bytes_read}) at {offset} did not read enough bytes {size}; file has length {}",
                file.metadata()?.len
            ),
        ));
    }

    Ok(builder.freeze().into())
}

/// Atomically rewrites a file via the [`Fs`] trait.
///
/// Writes `content` to a temporary file in the same directory, fsyncs it,
/// then renames over `path`. This ensures readers never see a partial write.
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
    use crate::fs::FsOpenOptions;
    use std::sync::atomic::{AtomicU64, Ordering};

    static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);

    #[expect(
        clippy::expect_used,
        reason = "every file should have a parent directory"
    )]
    let folder = path.parent().expect("should have a parent");

    let pid = std::process::id();

    // Retry with incrementing seq on AlreadyExists — handles leftover temp
    // files from a previous crash (PID can be reused, especially in containers).
    let tmp_path = loop {
        let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
        let candidate = folder.join(format!(".tmp_{pid}_{seq}"));
        match fs.open(
            &candidate,
            &FsOpenOptions::new().write(true).create_new(true),
        ) {
            Ok(mut file) => {
                let write_result = file
                    .write_all(content)
                    .and_then(|()| file.flush())
                    .and_then(|()| FsFile::sync_all(&*file));
                if let Err(e) = write_result {
                    drop(file);
                    let _ = fs.remove_file(&candidate);
                    return Err(e);
                }
                break candidate;
            }
            // Leftover temp file from a previous crash — retry with next seq.
            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
            Err(e) => return Err(e),
        }
    };

    // std::fs::rename overwrites existing destinations on all platforms
    // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows).
    if let Err(e) = fs.rename(&tmp_path, path) {
        let _ = fs.remove_file(&tmp_path);
        return Err(e);
    }
    fsync_directory(folder, fs)?;

    Ok(())
}

/// Delegates directory sync to the backend.
///
/// On Windows, `StdFs::sync_directory` already returns `Ok(())` (directory
/// fsync is unsupported), but non-`StdFs` backends (e.g., `MemFs`) may use
/// this call for path validation. Always delegate rather than short-circuiting.
pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> {
    fs.sync_directory(path)
}

#[cfg(test)]
#[allow(
    clippy::unwrap_used,
    clippy::indexing_slicing,
    clippy::useless_vec,
    reason = "test code"
)]
mod tests {
    use super::*;
    use crate::fs::StdFs;
    use std::fs::File;
    use std::io::Write;
    use test_log::test;

    #[test]
    fn read_exact_short_read_returns_error() -> crate::Result<()> {
        let dir = tempfile::tempdir()?;
        let path = dir.path().join("short.bin");
        {
            let mut f = File::create(&path)?;
            f.write_all(b"hello")?; // 5 bytes
        }

        let file = File::open(&path)?;
        // Request 10 bytes from a 5-byte file → short read → UnexpectedEof
        let err = read_exact(&file, 0, 10).unwrap_err();
        assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);

        Ok(())
    }

    #[test]
    fn atomic_rewrite() -> crate::Result<()> {
        let dir = tempfile::tempdir()?;

        let path = dir.path().join("test.txt");
        {
            let mut file = File::create(&path)?;
            write!(file, "asdasdasdasdasd")?;
        }

        rewrite_atomic(&path, b"newcontent", &StdFs)?;

        let content = std::fs::read_to_string(&path)?;
        assert_eq!("newcontent", content);

        Ok(())
    }

    /// Verifies that `StdFs::rename` atomically replaces an existing
    /// destination file — the contract required by `rewrite_atomic`.
    #[test]
    fn std_fs_rename_replaces_existing_file() -> crate::Result<()> {
        use crate::fs::{Fs, FsOpenOptions};

        let dir = tempfile::tempdir()?;
        let src = dir.path().join("src.txt");
        let dst = dir.path().join("dst.txt");

        // Create both files via Fs trait.
        let opts = FsOpenOptions::new().write(true).create(true);
        let mut f = StdFs.open(&src, &opts)?;
        f.write_all(b"new")?;
        drop(f);

        let mut f = StdFs.open(&dst, &opts)?;
        f.write_all(b"old")?;
        drop(f);

        StdFs.rename(&src, &dst)?;

        // dst now has src content, src is gone.
        let content = std::fs::read_to_string(&dst)?;
        assert_eq!("new", content);
        assert!(!src.exists());

        Ok(())
    }
}