use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use crate::error::EdgestoreError;
use crate::segment::SegmentReader;
use crate::types::{decode_key, encode_key, MemEntry, Operation, SegmentId};
struct SnapshotRegistryInner {
next_id: u64,
pinned: HashMap<u64, Vec<SegmentId>>,
}
impl SnapshotRegistryInner {
fn new() -> Self {
SnapshotRegistryInner {
next_id: 1,
pinned: HashMap::new(),
}
}
}
#[derive(Clone)]
pub struct SnapshotRegistry(Arc<Mutex<SnapshotRegistryInner>>);
impl SnapshotRegistry {
pub fn new() -> Self {
SnapshotRegistry(Arc::new(Mutex::new(SnapshotRegistryInner::new())))
}
pub fn register(&self, segment_ids: &[SegmentId]) -> u64 {
let mut inner = self.0.lock().expect("SnapshotRegistry lock poisoned");
let id = inner.next_id;
inner.next_id += 1;
inner.pinned.insert(id, segment_ids.to_vec());
id
}
pub fn release(&self, snapshot_id: u64) {
let mut inner = self.0.lock().expect("SnapshotRegistry lock poisoned");
inner.pinned.remove(&snapshot_id);
}
pub fn is_pinned(&self, segment_id: SegmentId) -> bool {
let inner = self.0.lock().expect("SnapshotRegistry lock poisoned");
inner.pinned.values().any(|ids| ids.contains(&segment_id))
}
pub fn pinned_ids(&self) -> HashSet<SegmentId> {
let inner = self.0.lock().expect("SnapshotRegistry lock poisoned");
inner
.pinned
.values()
.flat_map(|ids| ids.iter().copied())
.collect()
}
}
impl Default for SnapshotRegistry {
fn default() -> Self {
Self::new()
}
}
pub struct Snapshot {
pub snapshot_id: u64,
registry: SnapshotRegistry,
pub segment_ids: Vec<SegmentId>,
pub base_path: PathBuf,
}
impl Snapshot {
pub fn new(
snapshot_id: u64,
registry: SnapshotRegistry,
segment_ids: Vec<SegmentId>,
base_path: PathBuf,
) -> Self {
Snapshot { snapshot_id, registry, segment_ids, base_path }
}
pub fn get(&self, ns: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, EdgestoreError> {
let encoded = encode_key(ns, key);
let mut best: Option<MemEntry> = None;
for &seg_id in &self.segment_ids {
let reader = SegmentReader::open(self.base_path.clone(), seg_id)?;
if let Some(entry) = reader.get(&encoded)? {
let is_better = best.as_ref().is_none_or(|b| entry.lsn > b.lsn);
if is_better {
best = Some(entry);
}
}
}
match best {
Some(e) if e.op == Operation::Put => Ok(e.value),
_ => Ok(None),
}
}
#[allow(clippy::type_complexity)]
pub fn range(
&self,
ns: &[u8],
start: &[u8],
end: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, EdgestoreError> {
let enc_start = encode_key(ns, start);
let enc_end = encode_key(ns, end);
let mut merged: HashMap<Vec<u8>, MemEntry> = HashMap::new();
for &seg_id in &self.segment_ids {
let reader = SegmentReader::open(self.base_path.clone(), seg_id)?;
for (raw_key, entry) in reader.range_scan(&enc_start, &enc_end)? {
let existing_lsn = merged.get(&raw_key).map(|e| e.lsn).unwrap_or(0);
if entry.lsn > existing_lsn {
merged.insert(raw_key, entry);
}
}
}
let mut results: Vec<(Vec<u8>, Vec<u8>)> = merged
.into_iter()
.filter(|(_, e)| e.op == Operation::Put)
.filter_map(|(raw_key, e)| {
let (_, user_key) = decode_key(&raw_key).ok()?;
let value = e.value?;
Some((user_key, value))
})
.collect();
results.sort_by(|(a, _), (b, _)| a.cmp(b));
Ok(results)
}
}
impl Drop for Snapshot {
fn drop(&mut self) {
self.registry.release(self.snapshot_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segment::SegmentWriter;
use crate::types::{encode_key, MemEntry, Operation};
use tempfile::TempDir;
#[test]
fn test_registry_register_pins_segments() {
let reg = SnapshotRegistry::new();
reg.register(&[1, 2, 3]);
assert!(reg.is_pinned(1));
assert!(reg.is_pinned(2));
assert!(reg.is_pinned(3));
assert!(!reg.is_pinned(4));
}
#[test]
fn test_registry_release_unpins() {
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[10, 20, 30]);
assert!(reg.is_pinned(10));
reg.release(snap_id);
assert!(!reg.is_pinned(10));
assert!(!reg.is_pinned(20));
assert!(!reg.is_pinned(30));
}
#[test]
fn test_registry_two_snapshots_overlap() {
let reg = SnapshotRegistry::new();
let snap1 = reg.register(&[1, 2]);
let _snap2 = reg.register(&[2, 3]);
assert!(reg.is_pinned(1));
assert!(reg.is_pinned(2));
assert!(reg.is_pinned(3));
reg.release(snap1);
assert!(!reg.is_pinned(1));
assert!(reg.is_pinned(2)); assert!(reg.is_pinned(3)); }
#[test]
fn test_snapshot_drop_releases_pins() {
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[42]);
let dir = TempDir::new().unwrap();
let snapshot = Snapshot::new(snap_id, reg.clone(), vec![42], dir.path().to_path_buf());
assert!(reg.is_pinned(42));
drop(snapshot);
assert!(!reg.is_pinned(42));
}
fn make_put_entry(key: &[u8], value: &[u8], lsn: u64) -> MemEntry {
MemEntry {
key: key.to_vec(),
value: Some(value.to_vec()),
op: Operation::Put,
lsn,
timestamp: 3_600_000_000_000,
ttl: 0,
}
}
#[test]
fn test_snapshot_get_reads_pinned_segment() {
let dir = TempDir::new().unwrap();
let segment_id: SegmentId = 0;
let ns = b"ns";
let user_key = b"key1";
let encoded_key = encode_key(ns, user_key);
let value = b"hello-world";
let entry = make_put_entry(&encoded_key, value, 1);
let mut entries = vec![(encoded_key.clone(), entry)];
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), segment_id, 3600);
writer.flush(&entries).unwrap();
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[segment_id]);
let snapshot = Snapshot::new(snap_id, reg.clone(), vec![segment_id], dir.path().to_path_buf());
let result = snapshot.get(ns, user_key).unwrap();
assert_eq!(result, Some(value.to_vec()));
}
#[test]
fn test_snapshot_range_returns_sorted_pairs() {
let dir = TempDir::new().unwrap();
let ns = b"ns";
let mut entries: Vec<(Vec<u8>, MemEntry)> = (0..10u64).map(|i| {
let enc = encode_key(ns, format!("key-{:04}", i).as_bytes());
let e = make_put_entry(&enc, format!("val-{}", i).as_bytes(), i + 1);
(enc, e)
}).collect();
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[0]);
let snapshot = Snapshot::new(snap_id, reg, vec![0], dir.path().to_path_buf());
let results = snapshot.range(ns, b"key-0002", b"key-0007").unwrap();
assert_eq!(results.len(), 5, "range should return 5 entries");
let raw_keys: Vec<&[u8]> = results.iter().map(|(k, _)| k.as_slice()).collect();
let mut sorted = raw_keys.clone();
sorted.sort();
assert_eq!(raw_keys, sorted, "range results must be sorted");
assert_eq!(&raw_keys[0], b"key-0002");
assert_eq!(&raw_keys[4], b"key-0006");
}
#[test]
fn test_snapshot_get_absent_key_returns_none() {
let dir = TempDir::new().unwrap();
let ns = b"ns";
let enc = encode_key(ns, b"only-key");
let entry = make_put_entry(&enc, b"v", 1);
let mut entries = vec![(enc, entry)];
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[0]);
let snapshot = Snapshot::new(snap_id, reg, vec![0], dir.path().to_path_buf());
let result = snapshot.get(ns, b"not-present").unwrap();
assert!(result.is_none());
}
#[test]
fn test_registry_pinned_ids_flat_set() {
let reg = SnapshotRegistry::new();
reg.register(&[1, 2]);
reg.register(&[2, 3, 4]);
let ids = reg.pinned_ids();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
assert!(ids.contains(&4));
assert!(!ids.contains(&5));
}
#[test]
fn test_snapshot_get_highest_lsn_across_segments() {
let dir = TempDir::new().unwrap();
let ns = b"ns";
let enc = encode_key(ns, b"shared_key");
let old_entry = MemEntry {
key: enc.clone(),
value: Some(b"old_value".to_vec()),
op: Operation::Put,
lsn: 1,
timestamp: 3_600_000_000_000,
ttl: 0,
};
let mut entries0 = vec![(enc.clone(), old_entry)];
entries0.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer0 = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer0.flush(&entries0).unwrap();
let new_entry = MemEntry {
key: enc.clone(),
value: Some(b"new_value".to_vec()),
op: Operation::Put,
lsn: 5,
timestamp: 3_600_000_000_001,
ttl: 0,
};
let mut entries1 = vec![(enc.clone(), new_entry)];
entries1.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer1 = SegmentWriter::new(dir.path().to_path_buf(), 1, 3600);
writer1.flush(&entries1).unwrap();
{
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[0, 1]);
let snap = Snapshot::new(snap_id, reg, vec![0, 1], dir.path().to_path_buf());
let result = snap.get(ns, b"shared_key").unwrap();
assert_eq!(
result,
Some(b"new_value".to_vec()),
"Case A [0,1]: expected new_value (lsn=5)"
);
}
{
let reg = SnapshotRegistry::new();
let snap_id = reg.register(&[1, 0]);
let snap2 = Snapshot::new(snap_id, reg, vec![1, 0], dir.path().to_path_buf());
let result = snap2.get(ns, b"shared_key").unwrap();
assert_eq!(
result,
Some(b"new_value".to_vec()),
"Case B [1,0]: expected new_value (lsn=5)"
);
}
}
}