Skip to main content

mentedb_core/
mvcc.rs

1//! MVCC Version Tracking: simple multi-version concurrency control for memories.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use serde::{Deserialize, Serialize};
8
9use crate::types::{AgentId, MemoryId, Timestamp};
10
11/// A single versioned snapshot of a memory.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Version {
14    /// Monotonically increasing version identifier.
15    pub version_id: u64,
16    /// The memory this version belongs to.
17    pub memory_id: MemoryId,
18    /// The agent that created this version.
19    pub agent_id: AgentId,
20    /// Creation timestamp of this version.
21    pub timestamp: Timestamp,
22    /// Hash of the serialized memory data for change detection.
23    pub data_hash: u64,
24}
25
26/// Tracks version history for all memories.
27#[derive(Debug)]
28pub struct VersionStore {
29    versions: HashMap<MemoryId, Vec<Version>>,
30    next_version: AtomicU64,
31}
32
33impl Default for VersionStore {
34    fn default() -> Self {
35        Self {
36            versions: HashMap::new(),
37            next_version: AtomicU64::new(1),
38        }
39    }
40}
41
42fn now_micros() -> Timestamp {
43    SystemTime::now()
44        .duration_since(UNIX_EPOCH)
45        .unwrap_or_default()
46        .as_micros() as Timestamp
47}
48
49impl VersionStore {
50    /// Creates a new empty version store.
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    /// Record a write and return the new version ID.
56    pub fn record_write(&mut self, memory_id: MemoryId, agent_id: AgentId, data_hash: u64) -> u64 {
57        let vid = self.next_version.fetch_add(1, Ordering::Relaxed);
58        let version = Version {
59            version_id: vid,
60            memory_id,
61            agent_id,
62            timestamp: now_micros(),
63            data_hash,
64        };
65        self.versions.entry(memory_id).or_default().push(version);
66        vid
67    }
68
69    /// Get the latest version for a memory.
70    pub fn get_latest(&self, memory_id: MemoryId) -> Option<&Version> {
71        self.versions.get(&memory_id).and_then(|v| v.last())
72    }
73
74    /// Get the full version history for a memory.
75    pub fn get_history(&self, memory_id: MemoryId) -> Vec<&Version> {
76        self.versions
77            .get(&memory_id)
78            .map(|v| v.iter().collect())
79            .unwrap_or_default()
80    }
81
82    /// Get the version that was current at or before `timestamp`.
83    pub fn get_version_at(&self, memory_id: MemoryId, timestamp: Timestamp) -> Option<&Version> {
84        self.versions
85            .get(&memory_id)
86            .and_then(|v| v.iter().rev().find(|ver| ver.timestamp <= timestamp))
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn record_and_get_latest() {
96        let mut store = VersionStore::new();
97        let mid = MemoryId::new();
98        let aid = AgentId::new();
99        store.record_write(mid, aid, 111);
100        store.record_write(mid, aid, 222);
101        assert_eq!(store.get_latest(mid).unwrap().data_hash, 222);
102    }
103
104    #[test]
105    fn version_ids_increment() {
106        let mut store = VersionStore::new();
107        let mid = MemoryId::new();
108        let aid = AgentId::new();
109        let v1 = store.record_write(mid, aid, 1);
110        let v2 = store.record_write(mid, aid, 2);
111        assert_eq!(v2, v1 + 1);
112    }
113
114    #[test]
115    fn get_history() {
116        let mut store = VersionStore::new();
117        let mid = MemoryId::new();
118        let aid = AgentId::new();
119        store.record_write(mid, aid, 10);
120        store.record_write(mid, aid, 20);
121        store.record_write(mid, aid, 30);
122        assert_eq!(store.get_history(mid).len(), 3);
123    }
124
125    #[test]
126    fn get_version_at() {
127        let mut store = VersionStore::new();
128        let mid = MemoryId::new();
129        let aid = AgentId::new();
130        store.record_write(mid, aid, 1);
131        // The latest write should be findable at a very large timestamp.
132        let ver = store.get_version_at(mid, u64::MAX);
133        assert!(ver.is_some());
134    }
135
136    #[test]
137    fn empty_history() {
138        let store = VersionStore::new();
139        assert!(store.get_latest(MemoryId::new()).is_none());
140        assert!(store.get_history(MemoryId::new()).is_empty());
141    }
142}