varvedb 0.4.2

A high-performance, embedded, append-only event store for Rust.
Documentation
// This file is part of VarveDB.
//
// Copyright (C) 2025 Matheus Cardoso <varvedb@matheus.sbs>
//
// This Source Code Form is subject to the terms of the Mozilla Public License
// v. 2.0. If a copy of the MPL was not distributed with this file, You can
// obtain one at http://mozilla.org/MPL/2.0/.

//! Global event reading functionality.
//!
//! This module provides types for reading events across all streams in global order.

use heed::{Env, RoTxn, WithTls};

use crate::error::Result;
use crate::event::GlobalEvent;
use crate::types::{GlobalEventRecord, GlobalEventsDb, GlobalSequence};

#[cfg(feature = "notify")]
use crate::notify::WriteWatcher;

/// A reader for iterating events across all streams in global order.
///
/// This provides a view of all events in the order they were committed,
/// regardless of which stream they belong to.
pub struct GlobalReader {
    pub(crate) env: Env,
    pub(crate) global_db: GlobalEventsDb,
    pub(crate) scratch: rkyv::util::AlignedVec<16>,
    /// Write notification watcher (optional, only when notify feature is enabled)
    #[cfg(feature = "notify")]
    pub(crate) watcher: WriteWatcher,
}

impl GlobalReader {
    /// Get a single event by global sequence.
    ///
    /// Returns `None` if the event does not exist at the given sequence number.
    pub fn get(&mut self, global_seq: GlobalSequence) -> Result<Option<GlobalEvent>> {
        let rtxn = self.env.read_txn()?;
        let bytes = self.global_db.get(&rtxn, &global_seq.0)?;
        match bytes {
            Some(b) => {
                self.scratch.clear();
                self.scratch.extend_from_slice(b);
                match GlobalEventRecord::from_bytes(&self.scratch) {
                    Some(record) => Ok(Some(GlobalEvent {
                        global_seq,
                        stream_name: record.stream_name,
                        stream_id: record.stream_id,
                        stream_seq: record.stream_seq,
                        payload: record.payload,
                    })),
                    None => Ok(None),
                }
            }
            None => Ok(None),
        }
    }

    /// Get raw bytes for an event by global sequence.
    ///
    /// This is more efficient than `get` if you only need the payload bytes.
    pub fn get_bytes(&mut self, global_seq: GlobalSequence) -> Result<Option<&[u8]>> {
        let rtxn = self.env.read_txn()?;
        let bytes = self.global_db.get(&rtxn, &global_seq.0)?;
        match bytes {
            Some(b) => {
                self.scratch.clear();
                self.scratch.extend_from_slice(b);
                Ok(GlobalEventRecord::payload_from_bytes(&self.scratch))
            }
            None => Ok(None),
        }
    }

    /// Iterate all events from a given global sequence.
    ///
    /// Events are returned in global order starting from `from` (inclusive).
    pub fn iter_from(&self, from: GlobalSequence) -> Result<GlobalIterator<'_>> {
        let rtxn = self.env.read_txn()?;
        Ok(GlobalIterator {
            db: self.global_db,
            rtxn,
            from: from.0,
        })
    }

    /// Get a write watcher for async notification of new writes.
    ///
    /// This allows async readers to efficiently wait for new events without polling.
    #[cfg(feature = "notify")]
    pub fn watcher(&self) -> WriteWatcher {
        self.watcher.clone()
    }
}

impl Clone for GlobalReader {
    fn clone(&self) -> Self {
        Self {
            env: self.env.clone(),
            global_db: self.global_db,
            scratch: rkyv::util::AlignedVec::new(),
            #[cfg(feature = "notify")]
            watcher: self.watcher.clone(),
        }
    }
}

/// Iterator over global events.
///
/// Iterates events in global sequence order, starting from the sequence
/// specified when the iterator was created.
pub struct GlobalIterator<'a> {
    pub(crate) db: GlobalEventsDb,
    pub(crate) rtxn: RoTxn<'a, WithTls>,
    pub(crate) from: u64,
}

impl<'a> GlobalIterator<'a> {
    /// Collect all events as GlobalEvents.
    ///
    /// This consumes the iterator and returns all events as a vector.
    pub fn collect_all(self) -> Result<Vec<GlobalEvent>> {
        let mut results = Vec::new();
        let iter = self.db.range(&self.rtxn, &(self.from..))?;
        for item in iter {
            let (seq, bytes) = item?;
            if let Some(record) = GlobalEventRecord::from_bytes(bytes) {
                results.push(GlobalEvent {
                    global_seq: GlobalSequence(seq),
                    stream_name: record.stream_name,
                    stream_id: record.stream_id,
                    stream_seq: record.stream_seq,
                    payload: record.payload,
                });
            }
        }
        Ok(results)
    }

    /// Iterate and apply a function to each event.
    ///
    /// This consumes the iterator and calls the provided function for each event.
    pub fn for_each<F>(self, mut f: F) -> Result<()>
    where
        F: FnMut(GlobalEvent),
    {
        let iter = self.db.range(&self.rtxn, &(self.from..))?;
        for item in iter {
            let (seq, bytes) = item?;
            if let Some(record) = GlobalEventRecord::from_bytes(bytes) {
                f(GlobalEvent {
                    global_seq: GlobalSequence(seq),
                    stream_name: record.stream_name,
                    stream_id: record.stream_id,
                    stream_seq: record.stream_seq,
                    payload: record.payload,
                });
            }
        }
        Ok(())
    }
}

#[cfg(all(test, feature = "notify"))]
mod notify_tests {
    use rstest::fixture;
    use rstest::rstest;
    use tempfile::TempDir;

    use crate::{GlobalSequence, StreamId, Varve};

    struct TestDb {
        varve: Varve,
        _dir: TempDir,
    }

    #[fixture]
    fn db() -> TestDb {
        let dir = tempfile::tempdir().expect("Failed to create temp dir");
        let varve = Varve::new(dir.path()).expect("Failed to create Varve");
        TestDb { varve, _dir: dir }
    }

    #[rstest]
    fn global_reader_watcher_sees_new_writes(mut db: TestDb) {
        let global_reader = db.varve.global_reader();
        let watcher = global_reader.watcher();

        assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(0));

        let mut stream = db
            .varve
            .stream::<u64, 64>("events")
            .expect("Failed to create stream");

        stream
            .append(StreamId(1), &123u64)
            .expect("Failed to append");

        assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(1));
    }

    #[rstest]
    fn global_reader_clone_preserves_watcher(mut db: TestDb) {
        let global_reader = db.varve.global_reader();
        let global_reader2 = global_reader.clone();

        let w1 = global_reader.watcher();
        let w2 = global_reader2.watcher();

        assert_eq!(w1.committed_next_global_seq(), GlobalSequence(0));
        assert_eq!(w2.committed_next_global_seq(), GlobalSequence(0));

        let mut stream = db
            .varve
            .stream::<u64, 64>("events")
            .expect("Failed to create stream");

        stream.append(StreamId(1), &1u64).unwrap();
        stream.append(StreamId(1), &2u64).unwrap();

        assert_eq!(w1.committed_next_global_seq(), GlobalSequence(2));
        assert_eq!(w2.committed_next_global_seq(), GlobalSequence(2));
    }
}