seerdb 0.0.10

Research-grade storage engine with learned data structures
Documentation
//! Point-in-time consistent snapshots for seerdb

use crate::memtable::Memtable;
use crate::range::RangeIterator;
use crate::sstable::SSTable;
use crate::types::SnapshotHandle;
use crate::MergeOperator;
use bytes::Bytes;
use std::sync::{Arc, Mutex};

/// A point-in-time consistent view of the database.
///
/// Snapshots capture the database state at a specific sequence number, providing
/// consistent reads even as new writes occur. All reads through a snapshot see
/// exactly the state that existed when the snapshot was created.
///
/// # Usage
///
/// ```rust,no_run
/// use seerdb::{DB, DBOptions};
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let db = DB::open(DBOptions::default())?;
/// db.put(b"key", b"value1")?;
///
/// // Create snapshot
/// let snapshot = db.snapshot()?;
///
/// // New write doesn't affect snapshot
/// db.put(b"key", b"value2")?;
///
/// // Snapshot still sees old value
/// assert_eq!(snapshot.get(b"key")?, Some(bytes::Bytes::from("value1")));
/// # Ok(())
/// # }
/// ```
///
/// # Automatic Cleanup
///
/// Snapshots automatically unregister from the GC tracker when dropped,
/// allowing compaction to clean up old versions no longer needed.
pub struct Snapshot {
    memtables: Vec<Arc<Memtable>>,
    immutable_memtables: Option<Arc<Vec<Arc<Memtable>>>>,
    sstables: Vec<Vec<Arc<Mutex<SSTable>>>>,
    sequence_number: u64,
    merge_operator: Option<Arc<dyn MergeOperator>>,
    /// Handle for automatic unregistration from `SnapshotTracker` on drop.
    /// When this Snapshot is dropped, the handle unregisters from the tracker,
    /// allowing compaction to GC versions older than this snapshot.
    #[allow(dead_code)] // Drop impl uses this implicitly
    gc_handle: Option<SnapshotHandle>,
}

impl Snapshot {
    /// Create a snapshot with GC tracking.
    /// The snapshot will automatically unregister from the tracker when dropped.
    pub(crate) fn with_gc_handle(
        memtables: Vec<Arc<Memtable>>,
        immutable_memtables: Option<Arc<Vec<Arc<Memtable>>>>,
        sstables: Vec<Vec<Arc<Mutex<SSTable>>>>,
        sequence_number: u64,
        merge_operator: Option<Arc<dyn MergeOperator>>,
        gc_handle: SnapshotHandle,
    ) -> Self {
        Self {
            memtables,
            immutable_memtables,
            sstables,
            sequence_number,
            merge_operator,
            gc_handle: Some(gc_handle),
        }
    }

    pub fn get(&self, key: &[u8]) -> crate::db::Result<Option<Bytes>> {
        for memtable in &self.memtables {
            if let Some((value, _seq)) = memtable.get(key, self.sequence_number) {
                if value.is_empty() {
                    return Ok(None);
                }
                return Ok(Some(Bytes::from(value.to_vec())));
            }
        }

        if let Some(ref immutables) = self.immutable_memtables {
            for memtable in immutables.iter() {
                if let Some((value, _seq)) = memtable.get(key, self.sequence_number) {
                    if value.is_empty() {
                        return Ok(None);
                    }
                    return Ok(Some(Bytes::from(value.to_vec())));
                }
            }
        }

        for level_sstables in self.sstables.iter() {
            let sstables_iter = level_sstables.iter().rev();
            for sstable_arc in sstables_iter {
                let mut sstable_guard = sstable_arc.lock().expect("SSTable lock poisoned");
                // Use MVCC-aware lookup to respect snapshot sequence number
                if let Ok(Some(value)) = sstable_guard.get_mvcc(key, self.sequence_number) {
                    drop(sstable_guard);
                    if value.is_empty() {
                        return Ok(None);
                    }
                    return Ok(Some(value));
                }
            }
        }

        Ok(None)
    }

    pub fn range(
        &self,
        start_key: &[u8],
        end_key: Option<&[u8]>,
    ) -> crate::db::Result<RangeIterator> {
        let mut partition_refs: Vec<&Memtable> = self
            .memtables
            .iter()
            .map(std::convert::AsRef::as_ref)
            .collect();

        if let Some(ref immutables) = self.immutable_memtables {
            partition_refs.extend(immutables.iter().map(std::convert::AsRef::as_ref));
        }

        let mut sstables = Vec::new();
        for level_sstables in &self.sstables {
            for sstable_arc in level_sstables {
                let sstable_guard = sstable_arc.lock().expect("SSTable lock poisoned");
                let overlaps = sstable_guard.overlaps_range(start_key, end_key);

                if overlaps {
                    let iter = sstable_guard.scan_range(start_key, end_key);
                    drop(sstable_guard);
                    sstables.push(iter);
                } else {
                    drop(sstable_guard);
                }
            }
        }

        RangeIterator::new(
            start_key,
            end_key,
            &partition_refs,
            sstables,
            self.merge_operator.clone(),
        )
    }

    #[must_use]
    pub const fn sequence_number(&self) -> u64 {
        self.sequence_number
    }
}

impl std::fmt::Debug for Snapshot {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let total_sstables: usize = self.sstables.iter().map(std::vec::Vec::len).sum();
        f.debug_struct("Snapshot")
            .field("sequence_number", &self.sequence_number)
            .field("memtable_partitions", &self.memtables.len())
            .field(
                "has_immutable_memtables",
                &self.immutable_memtables.is_some(),
            )
            .field("lsm_levels", &self.sstables.len())
            .field("total_sstables", &total_sstables)
            .finish()
    }
}

#[cfg(test)]
mod tests {}