Skip to main content

embeddenator_fs/fs/versioned/
engram.rs

1//! Top-level versioned engram with CAS-based root updates
2//!
3//! This module provides the main VersionedEngram struct that coordinates
4//! all versioned components and provides high-level read/write operations.
5
6use super::chunk_store::VersionedChunkStore;
7use super::corrections::VersionedCorrectionStore;
8use super::manifest::VersionedManifest;
9use super::transaction::{Transaction, TransactionManager, TransactionStatus};
10use super::types::{VersionMismatch, VersionedResult};
11use crate::SparseVec;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock};
14
15/// A fully versioned engram with optimistic locking
16///
17/// This is the top-level structure that coordinates all versioned components.
18/// It provides high-level read/write operations with ACID-like properties.
19pub struct VersionedEngram {
20    /// Root VSA vector (wrapped in Arc for immutable sharing)
21    root: Arc<RwLock<Arc<SparseVec>>>,
22
23    /// Version of the root vector
24    root_version: Arc<AtomicU64>,
25
26    /// Versioned chunk store (NOT the VSA codebook - that's in embeddenator-vsa)
27    pub chunk_store: VersionedChunkStore,
28
29    /// Versioned corrections
30    pub corrections: VersionedCorrectionStore,
31
32    /// Versioned manifest
33    pub manifest: VersionedManifest,
34
35    /// Transaction manager
36    tx_manager: TransactionManager,
37
38    /// Transaction log
39    tx_log: Arc<RwLock<Vec<Transaction>>>,
40
41    /// Global engram version (coordinates all components)
42    global_version: Arc<AtomicU64>,
43}
44
45impl VersionedEngram {
46    /// Create a new versioned engram with default dimensionality
47    ///
48    /// # Arguments
49    /// * `dimensionality` - Reserved for future use. The actual dimensionality is
50    ///   determined by `SparseVec::new()` which uses the system's configured default.
51    ///   This parameter is preserved in the API for future VSA codebook integration
52    ///   when the dimensionality becomes configurable.
53    pub fn new(_dimensionality: usize) -> Self {
54        // SparseVec::new() doesn't take dimensionality parameter
55        Self::with_root(Arc::new(SparseVec::new()))
56    }
57
58    /// Create a versioned engram with an existing root vector
59    pub fn with_root(root: Arc<SparseVec>) -> Self {
60        Self {
61            root: Arc::new(RwLock::new(root)),
62            root_version: Arc::new(AtomicU64::new(0)),
63            chunk_store: VersionedChunkStore::new(),
64            corrections: VersionedCorrectionStore::new(),
65            manifest: VersionedManifest::new(),
66            tx_manager: TransactionManager::new(),
67            tx_log: Arc::new(RwLock::new(Vec::new())),
68            global_version: Arc::new(AtomicU64::new(0)),
69        }
70    }
71
72    /// Get the current global version
73    pub fn version(&self) -> u64 {
74        self.global_version.load(Ordering::Acquire)
75    }
76
77    /// Get the current root version
78    pub fn root_version(&self) -> u64 {
79        self.root_version.load(Ordering::Acquire)
80    }
81
82    /// Get a reference to the root vector
83    pub fn root(&self) -> Arc<SparseVec> {
84        let root_lock = self.root.read().unwrap();
85        Arc::clone(&*root_lock)
86    }
87
88    /// Update the root vector with Compare-And-Swap (CAS)
89    ///
90    /// This is the core operation for optimistic locking on the root.
91    /// It attempts to update the root only if the current version matches
92    /// the expected version.
93    pub fn update_root(
94        &self,
95        new_root: Arc<SparseVec>,
96        expected_version: u64,
97    ) -> VersionedResult<u64> {
98        let mut root_lock = self.root.write().unwrap();
99
100        // Check version
101        let current_version = self.root_version.load(Ordering::Acquire);
102        if current_version != expected_version {
103            return Err(VersionMismatch {
104                expected: expected_version,
105                actual: current_version,
106            });
107        }
108
109        // Update root
110        *root_lock = new_root;
111
112        // Increment version
113        let new_version = self.root_version.fetch_add(1, Ordering::AcqRel) + 1;
114        Ok(new_version)
115    }
116
117    /// Bundle a chunk into the root with automatic retry
118    ///
119    /// This operation superimposes a new chunk vector into the root using the
120    /// VSA bundle operation (XOR for binary vectors). The bundling is performed
121    /// with optimistic locking and automatic retry on version conflicts.
122    ///
123    /// # Algorithm
124    /// 1. Read current root and its version
125    /// 2. Create new root by bundling: `new_root = current_root ⊕ chunk_vec`
126    /// 3. Attempt CAS update with optimistic locking
127    /// 4. Retry with exponential backoff if version conflict detected
128    ///
129    /// # Arguments
130    /// * `chunk_vec` - The chunk vector to bundle into the root
131    ///
132    /// # Returns
133    /// * `Ok(new_version)` - The new root version after successful bundle
134    /// * `Err(e)` - Error after max retries exceeded or other failure
135    pub fn bundle_chunk(&self, chunk_vec: &SparseVec) -> Result<u64, String> {
136        const MAX_RETRIES: usize = 10;
137
138        for attempt in 0..MAX_RETRIES {
139            // Read current root
140            let current_root = self.root();
141            let current_version = self.root_version();
142
143            // Create new root by bundling the chunk with current root
144            // Bundle operation: new_root = current_root ⊕ chunk_vec
145            let new_root = Arc::new(current_root.bundle(chunk_vec));
146
147            // Try to update with CAS
148            match self.update_root(new_root, current_version) {
149                Ok(new_version) => return Ok(new_version),
150                Err(_) if attempt < MAX_RETRIES - 1 => {
151                    // Retry with exponential backoff
152                    std::thread::sleep(std::time::Duration::from_micros(1 << attempt));
153                    continue;
154                }
155                Err(e) => {
156                    return Err(format!(
157                        "Failed to bundle chunk after {} attempts: {}",
158                        MAX_RETRIES, e
159                    ))
160                }
161            }
162        }
163
164        Err("Max retries exceeded".to_string())
165    }
166
167    /// Begin a new transaction
168    pub fn begin_transaction(&self) -> Transaction {
169        self.tx_manager.begin(self.version())
170    }
171
172    /// Commit a transaction
173    pub fn commit_transaction(&self, mut tx: Transaction) -> Result<(), String> {
174        // Verify engram version hasn't changed too much
175        let current_version = self.version();
176        if current_version > tx.engram_version + 10 {
177            // Allow some version drift, but not too much
178            return Err("Engram version drifted too far, transaction may conflict".to_string());
179        }
180
181        // Mark as committed
182        tx.commit();
183
184        // Add to log
185        let mut log = self.tx_log.write().unwrap();
186        log.push(tx);
187
188        // Increment global version
189        self.global_version.fetch_add(1, Ordering::AcqRel);
190
191        Ok(())
192    }
193
194    /// Abort a transaction
195    pub fn abort_transaction(&self, mut tx: Transaction) {
196        tx.abort();
197
198        // Add to log
199        let mut log = self.tx_log.write().unwrap();
200        log.push(tx);
201    }
202
203    /// Get transaction statistics
204    pub fn transaction_stats(&self) -> TransactionStats {
205        let log = self.tx_log.read().unwrap();
206
207        let total = log.len();
208        let committed = log
209            .iter()
210            .filter(|tx| tx.status == TransactionStatus::Committed)
211            .count();
212        let aborted = log
213            .iter()
214            .filter(|tx| tx.status == TransactionStatus::Aborted)
215            .count();
216
217        TransactionStats {
218            total_transactions: total,
219            committed_transactions: committed,
220            aborted_transactions: aborted,
221            success_rate: if total > 0 {
222                committed as f64 / total as f64
223            } else {
224                0.0
225            },
226        }
227    }
228
229    /// Get comprehensive engram statistics
230    pub fn stats(&self) -> EngramStats {
231        EngramStats {
232            global_version: self.version(),
233            root_version: self.root_version(),
234            chunk_store: self.chunk_store.stats(),
235            corrections: self.corrections.stats(),
236            manifest: self.manifest.stats(),
237            transactions: self.transaction_stats(),
238        }
239    }
240}
241
242impl Default for VersionedEngram {
243    fn default() -> Self {
244        Self::new(10000) // Default VSA dimensionality
245    }
246}
247
248impl Clone for VersionedEngram {
249    fn clone(&self) -> Self {
250        Self {
251            root: Arc::clone(&self.root),
252            root_version: Arc::clone(&self.root_version),
253            chunk_store: self.chunk_store.clone(),
254            corrections: self.corrections.clone(),
255            manifest: self.manifest.clone(),
256            tx_manager: TransactionManager::new(), // New manager for clone
257            tx_log: Arc::new(RwLock::new(Vec::new())), // Fresh log
258            global_version: Arc::clone(&self.global_version),
259        }
260    }
261}
262
263/// Transaction statistics
264#[derive(Debug, Clone)]
265pub struct TransactionStats {
266    pub total_transactions: usize,
267    pub committed_transactions: usize,
268    pub aborted_transactions: usize,
269    pub success_rate: f64,
270}
271
272/// Comprehensive engram statistics
273#[derive(Debug, Clone)]
274pub struct EngramStats {
275    pub global_version: u64,
276    pub root_version: u64,
277    pub chunk_store: super::chunk_store::CodebookStats,
278    pub corrections: super::corrections::CorrectionStats,
279    pub manifest: super::manifest::ManifestStats,
280    pub transactions: TransactionStats,
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_engram_creation() {
289        let engram = VersionedEngram::new(10000);
290        assert_eq!(engram.version(), 0);
291        assert_eq!(engram.root_version(), 0);
292    }
293
294    #[test]
295    fn test_root_update() {
296        let engram = VersionedEngram::new(10000);
297        let new_root = Arc::new(SparseVec::new());
298
299        let version = engram.update_root(new_root, 0).unwrap();
300        assert_eq!(version, 1);
301        assert_eq!(engram.root_version(), 1);
302    }
303
304    #[test]
305    fn test_root_update_version_mismatch() {
306        let engram = VersionedEngram::new(10000);
307        let new_root = Arc::new(SparseVec::new());
308
309        // Update once
310        engram.update_root(Arc::clone(&new_root), 0).unwrap();
311
312        // Try to update with old version
313        let result = engram.update_root(new_root, 0);
314        assert!(result.is_err());
315    }
316
317    #[test]
318    fn test_transaction_lifecycle() {
319        let engram = VersionedEngram::new(10000);
320
321        let tx = engram.begin_transaction();
322        assert_eq!(tx.status, TransactionStatus::Pending);
323
324        engram.commit_transaction(tx).unwrap();
325
326        let stats = engram.transaction_stats();
327        assert_eq!(stats.total_transactions, 1);
328        assert_eq!(stats.committed_transactions, 1);
329        assert_eq!(stats.success_rate, 1.0);
330    }
331}