embeddenator_fs/fs/versioned/
engram.rs1use super::chunk_store::VersionedChunkStore;
7use super::corrections::VersionedCorrectionStore;
8use super::manifest::VersionedManifest;
9use super::transaction::{Transaction, TransactionManager, TransactionStatus};
10use super::types::{VersionMismatch, VersionedResult};
11use crate::SparseVec;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock};
14
15pub struct VersionedEngram {
20 root: Arc<RwLock<Arc<SparseVec>>>,
22
23 root_version: Arc<AtomicU64>,
25
26 pub chunk_store: VersionedChunkStore,
28
29 pub corrections: VersionedCorrectionStore,
31
32 pub manifest: VersionedManifest,
34
35 tx_manager: TransactionManager,
37
38 tx_log: Arc<RwLock<Vec<Transaction>>>,
40
41 global_version: Arc<AtomicU64>,
43}
44
45impl VersionedEngram {
46 pub fn new(_dimensionality: usize) -> Self {
54 Self::with_root(Arc::new(SparseVec::new()))
56 }
57
58 pub fn with_root(root: Arc<SparseVec>) -> Self {
60 Self {
61 root: Arc::new(RwLock::new(root)),
62 root_version: Arc::new(AtomicU64::new(0)),
63 chunk_store: VersionedChunkStore::new(),
64 corrections: VersionedCorrectionStore::new(),
65 manifest: VersionedManifest::new(),
66 tx_manager: TransactionManager::new(),
67 tx_log: Arc::new(RwLock::new(Vec::new())),
68 global_version: Arc::new(AtomicU64::new(0)),
69 }
70 }
71
72 pub fn version(&self) -> u64 {
74 self.global_version.load(Ordering::Acquire)
75 }
76
77 pub fn root_version(&self) -> u64 {
79 self.root_version.load(Ordering::Acquire)
80 }
81
82 pub fn root(&self) -> Arc<SparseVec> {
84 let root_lock = self.root.read().unwrap();
85 Arc::clone(&*root_lock)
86 }
87
88 pub fn update_root(
94 &self,
95 new_root: Arc<SparseVec>,
96 expected_version: u64,
97 ) -> VersionedResult<u64> {
98 let mut root_lock = self.root.write().unwrap();
99
100 let current_version = self.root_version.load(Ordering::Acquire);
102 if current_version != expected_version {
103 return Err(VersionMismatch {
104 expected: expected_version,
105 actual: current_version,
106 });
107 }
108
109 *root_lock = new_root;
111
112 let new_version = self.root_version.fetch_add(1, Ordering::AcqRel) + 1;
114 Ok(new_version)
115 }
116
117 pub fn bundle_chunk(&self, chunk_vec: &SparseVec) -> Result<u64, String> {
136 const MAX_RETRIES: usize = 10;
137
138 for attempt in 0..MAX_RETRIES {
139 let current_root = self.root();
141 let current_version = self.root_version();
142
143 let new_root = Arc::new(current_root.bundle(chunk_vec));
146
147 match self.update_root(new_root, current_version) {
149 Ok(new_version) => return Ok(new_version),
150 Err(_) if attempt < MAX_RETRIES - 1 => {
151 std::thread::sleep(std::time::Duration::from_micros(1 << attempt));
153 continue;
154 }
155 Err(e) => {
156 return Err(format!(
157 "Failed to bundle chunk after {} attempts: {}",
158 MAX_RETRIES, e
159 ))
160 }
161 }
162 }
163
164 Err("Max retries exceeded".to_string())
165 }
166
167 pub fn begin_transaction(&self) -> Transaction {
169 self.tx_manager.begin(self.version())
170 }
171
172 pub fn commit_transaction(&self, mut tx: Transaction) -> Result<(), String> {
174 let current_version = self.version();
176 if current_version > tx.engram_version + 10 {
177 return Err("Engram version drifted too far, transaction may conflict".to_string());
179 }
180
181 tx.commit();
183
184 let mut log = self.tx_log.write().unwrap();
186 log.push(tx);
187
188 self.global_version.fetch_add(1, Ordering::AcqRel);
190
191 Ok(())
192 }
193
194 pub fn abort_transaction(&self, mut tx: Transaction) {
196 tx.abort();
197
198 let mut log = self.tx_log.write().unwrap();
200 log.push(tx);
201 }
202
203 pub fn transaction_stats(&self) -> TransactionStats {
205 let log = self.tx_log.read().unwrap();
206
207 let total = log.len();
208 let committed = log
209 .iter()
210 .filter(|tx| tx.status == TransactionStatus::Committed)
211 .count();
212 let aborted = log
213 .iter()
214 .filter(|tx| tx.status == TransactionStatus::Aborted)
215 .count();
216
217 TransactionStats {
218 total_transactions: total,
219 committed_transactions: committed,
220 aborted_transactions: aborted,
221 success_rate: if total > 0 {
222 committed as f64 / total as f64
223 } else {
224 0.0
225 },
226 }
227 }
228
229 pub fn stats(&self) -> EngramStats {
231 EngramStats {
232 global_version: self.version(),
233 root_version: self.root_version(),
234 chunk_store: self.chunk_store.stats(),
235 corrections: self.corrections.stats(),
236 manifest: self.manifest.stats(),
237 transactions: self.transaction_stats(),
238 }
239 }
240}
241
242impl Default for VersionedEngram {
243 fn default() -> Self {
244 Self::new(10000) }
246}
247
248impl Clone for VersionedEngram {
249 fn clone(&self) -> Self {
250 Self {
251 root: Arc::clone(&self.root),
252 root_version: Arc::clone(&self.root_version),
253 chunk_store: self.chunk_store.clone(),
254 corrections: self.corrections.clone(),
255 manifest: self.manifest.clone(),
256 tx_manager: TransactionManager::new(), tx_log: Arc::new(RwLock::new(Vec::new())), global_version: Arc::clone(&self.global_version),
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct TransactionStats {
266 pub total_transactions: usize,
267 pub committed_transactions: usize,
268 pub aborted_transactions: usize,
269 pub success_rate: f64,
270}
271
272#[derive(Debug, Clone)]
274pub struct EngramStats {
275 pub global_version: u64,
276 pub root_version: u64,
277 pub chunk_store: super::chunk_store::CodebookStats,
278 pub corrections: super::corrections::CorrectionStats,
279 pub manifest: super::manifest::ManifestStats,
280 pub transactions: TransactionStats,
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_engram_creation() {
289 let engram = VersionedEngram::new(10000);
290 assert_eq!(engram.version(), 0);
291 assert_eq!(engram.root_version(), 0);
292 }
293
294 #[test]
295 fn test_root_update() {
296 let engram = VersionedEngram::new(10000);
297 let new_root = Arc::new(SparseVec::new());
298
299 let version = engram.update_root(new_root, 0).unwrap();
300 assert_eq!(version, 1);
301 assert_eq!(engram.root_version(), 1);
302 }
303
304 #[test]
305 fn test_root_update_version_mismatch() {
306 let engram = VersionedEngram::new(10000);
307 let new_root = Arc::new(SparseVec::new());
308
309 engram.update_root(Arc::clone(&new_root), 0).unwrap();
311
312 let result = engram.update_root(new_root, 0);
314 assert!(result.is_err());
315 }
316
317 #[test]
318 fn test_transaction_lifecycle() {
319 let engram = VersionedEngram::new(10000);
320
321 let tx = engram.begin_transaction();
322 assert_eq!(tx.status, TransactionStatus::Pending);
323
324 engram.commit_transaction(tx).unwrap();
325
326 let stats = engram.transaction_stats();
327 assert_eq!(stats.total_transactions, 1);
328 assert_eq!(stats.committed_transactions, 1);
329 assert_eq!(stats.success_rate, 1.0);
330 }
331}