use super::chunk_store::VersionedChunkStore;
use super::corrections::VersionedCorrectionStore;
use super::manifest::VersionedManifest;
use super::transaction::{Transaction, TransactionManager, TransactionStatus};
use super::types::{VersionMismatch, VersionedResult};
use crate::SparseVec;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
pub struct VersionedEngram {
root: Arc<RwLock<Arc<SparseVec>>>,
root_version: Arc<AtomicU64>,
pub chunk_store: VersionedChunkStore,
pub corrections: VersionedCorrectionStore,
pub manifest: VersionedManifest,
tx_manager: TransactionManager,
tx_log: Arc<RwLock<Vec<Transaction>>>,
global_version: Arc<AtomicU64>,
}
impl VersionedEngram {
pub fn new(_dimensionality: usize) -> Self {
Self::with_root(Arc::new(SparseVec::new()))
}
pub fn with_root(root: Arc<SparseVec>) -> Self {
Self {
root: Arc::new(RwLock::new(root)),
root_version: Arc::new(AtomicU64::new(0)),
chunk_store: VersionedChunkStore::new(),
corrections: VersionedCorrectionStore::new(),
manifest: VersionedManifest::new(),
tx_manager: TransactionManager::new(),
tx_log: Arc::new(RwLock::new(Vec::new())),
global_version: Arc::new(AtomicU64::new(0)),
}
}
pub fn version(&self) -> u64 {
self.global_version.load(Ordering::Acquire)
}
pub fn root_version(&self) -> u64 {
self.root_version.load(Ordering::Acquire)
}
pub fn root(&self) -> Arc<SparseVec> {
let root_lock = self.root.read().unwrap();
Arc::clone(&*root_lock)
}
pub fn update_root(
&self,
new_root: Arc<SparseVec>,
expected_version: u64,
) -> VersionedResult<u64> {
let mut root_lock = self.root.write().unwrap();
let current_version = self.root_version.load(Ordering::Acquire);
if current_version != expected_version {
return Err(VersionMismatch {
expected: expected_version,
actual: current_version,
});
}
*root_lock = new_root;
let new_version = self.root_version.fetch_add(1, Ordering::AcqRel) + 1;
Ok(new_version)
}
pub fn bundle_chunk(&self, chunk_vec: &SparseVec) -> Result<u64, String> {
const MAX_RETRIES: usize = 10;
for attempt in 0..MAX_RETRIES {
let current_root = self.root();
let current_version = self.root_version();
let new_root = Arc::new(current_root.bundle(chunk_vec));
match self.update_root(new_root, current_version) {
Ok(new_version) => return Ok(new_version),
Err(_) if attempt < MAX_RETRIES - 1 => {
std::thread::sleep(std::time::Duration::from_micros(1 << attempt));
continue;
}
Err(e) => {
return Err(format!(
"Failed to bundle chunk after {} attempts: {}",
MAX_RETRIES, e
))
}
}
}
Err("Max retries exceeded".to_string())
}
pub fn begin_transaction(&self) -> Transaction {
self.tx_manager.begin(self.version())
}
pub fn commit_transaction(&self, mut tx: Transaction) -> Result<(), String> {
let current_version = self.version();
if current_version > tx.engram_version + 10 {
return Err("Engram version drifted too far, transaction may conflict".to_string());
}
tx.commit();
let mut log = self.tx_log.write().unwrap();
log.push(tx);
self.global_version.fetch_add(1, Ordering::AcqRel);
Ok(())
}
pub fn abort_transaction(&self, mut tx: Transaction) {
tx.abort();
let mut log = self.tx_log.write().unwrap();
log.push(tx);
}
pub fn transaction_stats(&self) -> TransactionStats {
let log = self.tx_log.read().unwrap();
let total = log.len();
let committed = log
.iter()
.filter(|tx| tx.status == TransactionStatus::Committed)
.count();
let aborted = log
.iter()
.filter(|tx| tx.status == TransactionStatus::Aborted)
.count();
TransactionStats {
total_transactions: total,
committed_transactions: committed,
aborted_transactions: aborted,
success_rate: if total > 0 {
committed as f64 / total as f64
} else {
0.0
},
}
}
pub fn stats(&self) -> EngramStats {
EngramStats {
global_version: self.version(),
root_version: self.root_version(),
chunk_store: self.chunk_store.stats(),
corrections: self.corrections.stats(),
manifest: self.manifest.stats(),
transactions: self.transaction_stats(),
}
}
}
impl Default for VersionedEngram {
fn default() -> Self {
Self::new(10000) }
}
impl Clone for VersionedEngram {
fn clone(&self) -> Self {
Self {
root: Arc::clone(&self.root),
root_version: Arc::clone(&self.root_version),
chunk_store: self.chunk_store.clone(),
corrections: self.corrections.clone(),
manifest: self.manifest.clone(),
tx_manager: TransactionManager::new(), tx_log: Arc::new(RwLock::new(Vec::new())), global_version: Arc::clone(&self.global_version),
}
}
}
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub total_transactions: usize,
pub committed_transactions: usize,
pub aborted_transactions: usize,
pub success_rate: f64,
}
#[derive(Debug, Clone)]
pub struct EngramStats {
pub global_version: u64,
pub root_version: u64,
pub chunk_store: super::chunk_store::CodebookStats,
pub corrections: super::corrections::CorrectionStats,
pub manifest: super::manifest::ManifestStats,
pub transactions: TransactionStats,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_engram_creation() {
let engram = VersionedEngram::new(10000);
assert_eq!(engram.version(), 0);
assert_eq!(engram.root_version(), 0);
}
#[test]
fn test_root_update() {
let engram = VersionedEngram::new(10000);
let new_root = Arc::new(SparseVec::new());
let version = engram.update_root(new_root, 0).unwrap();
assert_eq!(version, 1);
assert_eq!(engram.root_version(), 1);
}
#[test]
fn test_root_update_version_mismatch() {
let engram = VersionedEngram::new(10000);
let new_root = Arc::new(SparseVec::new());
engram.update_root(Arc::clone(&new_root), 0).unwrap();
let result = engram.update_root(new_root, 0);
assert!(result.is_err());
}
#[test]
fn test_transaction_lifecycle() {
let engram = VersionedEngram::new(10000);
let tx = engram.begin_transaction();
assert_eq!(tx.status, TransactionStatus::Pending);
engram.commit_transaction(tx).unwrap();
let stats = engram.transaction_stats();
assert_eq!(stats.total_transactions, 1);
assert_eq!(stats.committed_transactions, 1);
assert_eq!(stats.success_rate, 1.0);
}
}