uni-store 2.0.6

Storage layer for Uni graph database - Lance datasets, LSM deltas, and WAL
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

//! Per-fork [`WriteAheadLog`] path conventions and factory.
//!
//! Each fork has its own WAL stream rooted at
//! `wal_forks/{fork_id}/`. **This prefix is intentionally NOT under
//! primary's `wal/` directory** — `ObjectStore::list` is recursive,
//! so anything under `wal/` would surface to primary's WAL recovery
//! and break the spec §10 isolation invariant. The plan originally
//! proposed `wal/forks/{fork_id}/` but the prefix-collision bug
//! forced the move; documented here to prevent revert.
//!
//! Properties:
//!
//! - A fork's WAL append never blocks primary's WAL append.
//! - Primary WAL listing under `wal/` skips fork segments (separate
//!   top-level directory).
//! - Recovery can replay each fork's WAL independently into the
//!   matching fork's L0 (Day 6 wires this).
//! - GC of fork WAL segments is local to the fork's directory; on
//!   `drop_fork`, the entire directory can be removed in one
//!   list-and-delete.
//!
//! Phase 2's plan §D1 chose option A (per-fork L0 + WAL) over option
//! B (single tagged L0) precisely so that fork WAL is structurally
//! isolated. The cost is N WAL streams instead of one tagged stream;
//! the win is that primary's WAL invariants don't change.

// Rust guideline compliant

use std::sync::Arc;

use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use uni_common::core::fork::ForkId;

use crate::runtime::wal::WriteAheadLog;

/// Build the canonical WAL prefix for a fork.
///
/// All segment files for the fork land under
/// `wal_forks/{fork_id}/segment-{lsn}.wal`. The final filename is
/// generated by `WriteAheadLog` itself; this helper just returns the
/// directory prefix. The leading directory is `wal_forks` (with
/// underscore) — see module docs for why this differs from the
/// `wal/` prefix.
#[must_use]
pub fn wal_prefix(fork_id: &ForkId) -> ObjectStorePath {
    ObjectStorePath::from(format!("wal_forks/{fork_id}"))
}

/// Construct a fresh fork-scoped [`WriteAheadLog`].
///
/// The returned log shares the primary `ObjectStore` but is rooted at
/// the fork-specific prefix, so its segment listing and GC paths are
/// scoped to a single fork's directory.
#[must_use]
pub fn new_for_fork(store: Arc<dyn ObjectStore>, fork_id: &ForkId) -> WriteAheadLog {
    WriteAheadLog::new(store, wal_prefix(fork_id))
}

/// Convenience: an `Arc<WriteAheadLog>` ready for the Day 4 Writer
/// factory's `Option<Arc<WriteAheadLog>>` parameter.
#[must_use]
pub fn new_for_fork_arc(store: Arc<dyn ObjectStore>, fork_id: &ForkId) -> Arc<WriteAheadLog> {
    Arc::new(new_for_fork(store, fork_id))
}

#[cfg(test)]
mod tests {
    use super::*;
    use object_store::local::LocalFileSystem;
    use tempfile::TempDir;

    use crate::runtime::wal::Mutation;
    use uni_common::Vid;

    async fn fresh_store() -> (TempDir, Arc<dyn ObjectStore>) {
        let dir = TempDir::new().unwrap();
        let store: Arc<dyn ObjectStore> =
            Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
        (dir, store)
    }

    #[tokio::test]
    async fn prefix_includes_fork_id_under_wal_forks() {
        let id = ForkId::new();
        let p = wal_prefix(&id);
        let s = p.to_string();
        assert!(s.starts_with("wal_forks/"), "got {s}");
        assert!(s.contains(&id.to_string()));
        // Critically NOT under `wal/` — primary list would see it.
        assert!(
            !s.starts_with("wal/"),
            "fork WAL must not nest under primary WAL"
        );
    }

    #[tokio::test]
    async fn fork_wal_initialize_yields_zero_lsn_on_empty_dir() {
        let (_dir, store) = fresh_store().await;
        let id = ForkId::new();
        let wal = new_for_fork(store, &id);
        let max = wal.initialize().await.unwrap();
        assert_eq!(max, 0, "empty fork WAL has max LSN 0");
    }

    #[tokio::test]
    async fn append_flush_replay_roundtrip_on_fork_wal() {
        let (dir, store) = fresh_store().await;
        let id = ForkId::new();
        let wal = new_for_fork(store.clone(), &id);
        wal.initialize().await.unwrap();

        // Append two mutations and flush. The exact Mutation shape
        // depends on the WAL crate's enum; we use a vertex-insert
        // mutation matching the smallest reasonable variant.
        wal.append(&Mutation::DeleteVertex {
            vid: Vid::new(7),
            labels: vec![],
        })
        .unwrap();
        wal.append(&Mutation::DeleteVertex {
            vid: Vid::new(8),
            labels: vec![],
        })
        .unwrap();
        let lsn = wal.flush().await.unwrap();
        assert!(lsn >= 1, "flush returned LSN {lsn}");

        // Reopen the same fork's WAL on a fresh handle and confirm
        // initialize sees the persisted segment.
        let wal2 = new_for_fork(store.clone(), &id);
        let max = wal2.initialize().await.unwrap();
        assert_eq!(max, lsn);

        let replayed = wal2.replay().await.unwrap();
        assert_eq!(replayed.len(), 2);
        assert!(matches!(&replayed[0], Mutation::DeleteVertex { vid, .. } if *vid == Vid::new(7)));
        assert!(matches!(&replayed[1], Mutation::DeleteVertex { vid, .. } if *vid == Vid::new(8)));

        let _ = dir;
    }

    #[tokio::test]
    async fn primary_wal_unaffected_by_fork_wal_writes() {
        // Path isolation: the fork's WAL writes go to `wal/forks/<id>/`,
        // not `wal/`. Primary's WAL on the same store sees nothing.
        let (_dir, store) = fresh_store().await;
        let id = ForkId::new();
        let fork_wal = new_for_fork(store.clone(), &id);
        fork_wal.initialize().await.unwrap();
        fork_wal
            .append(&Mutation::DeleteVertex {
                vid: Vid::new(99),
                labels: vec![],
            })
            .unwrap();
        fork_wal.flush().await.unwrap();

        // Primary WAL prefix is just "wal".
        let primary_wal = WriteAheadLog::new(store, ObjectStorePath::from("wal"));
        let max = primary_wal.initialize().await.unwrap();
        assert_eq!(max, 0, "primary WAL must not see fork-side segments");
    }

    #[tokio::test]
    async fn two_forks_have_independent_wal_prefixes() {
        let (_dir, store) = fresh_store().await;
        let id_a = ForkId::new();
        let id_b = ForkId::new();
        let wal_a = new_for_fork(store.clone(), &id_a);
        let wal_b = new_for_fork(store.clone(), &id_b);

        wal_a.initialize().await.unwrap();
        wal_b.initialize().await.unwrap();
        wal_a
            .append(&Mutation::DeleteVertex {
                vid: Vid::new(1),
                labels: vec![],
            })
            .unwrap();
        wal_a.flush().await.unwrap();

        // wal_b sees nothing — its prefix is different.
        let max_b = WriteAheadLog::new(store, wal_prefix(&id_b))
            .initialize()
            .await
            .unwrap();
        assert_eq!(max_b, 0);
    }
}