1use crate::delta::StateDelta;
36use crate::serde_ga3;
37use crate::VectorClock;
38use cliffy_core::GA3;
39use serde::{Deserialize, Serialize};
40use std::collections::VecDeque;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Snapshot {
45 #[serde(with = "serde_ga3")]
47 pub state: GA3,
48 pub clock: VectorClock,
50 pub id: u64,
52 pub timestamp: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct StoredOperation {
59 pub delta: StateDelta,
61 pub sequence: u64,
63 pub compacted: bool,
65}
66
67pub trait GeometricStore {
74 fn save_snapshot(&mut self, state: &GA3, clock: &VectorClock);
76
77 fn load_latest_snapshot(&self) -> Option<Snapshot>;
79
80 fn load_snapshot(&self, id: u64) -> Option<Snapshot>;
82
83 fn append_operation(&mut self, delta: StateDelta);
85
86 fn operations_since(&self, clock: &VectorClock) -> Vec<StoredOperation>;
88
89 fn operations_since_sequence(&self, sequence: u64) -> Vec<StoredOperation>;
91
92 fn compact(&mut self) -> Option<Snapshot>;
96
97 fn stats(&self) -> StorageStats;
99
100 fn clear(&mut self);
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct StorageStats {
107 pub snapshot_count: usize,
109 pub operation_count: usize,
111 pub total_size_bytes: usize,
113 pub pending_operations: usize,
115}
116
117#[derive(Debug, Default)]
121pub struct MemoryStore {
122 snapshots: Vec<Snapshot>,
123 operations: VecDeque<StoredOperation>,
124 next_snapshot_id: u64,
125 next_sequence: u64,
126 current_state: Option<GA3>,
128 current_clock: VectorClock,
130 config: MemoryStoreConfig,
132}
133
134#[derive(Debug, Clone)]
136pub struct MemoryStoreConfig {
137 pub max_snapshots: usize,
139 pub max_operations_before_compact: usize,
141 pub auto_compact: bool,
143}
144
145impl Default for MemoryStoreConfig {
146 fn default() -> Self {
147 Self {
148 max_snapshots: 10,
149 max_operations_before_compact: 1000,
150 auto_compact: true,
151 }
152 }
153}
154
155impl MemoryStore {
156 pub fn new() -> Self {
158 Self::default()
159 }
160
161 pub fn with_config(config: MemoryStoreConfig) -> Self {
163 Self {
164 config,
165 ..Default::default()
166 }
167 }
168
169 pub fn get_current_state(&self) -> Option<GA3> {
171 let snapshot = self.load_latest_snapshot()?;
172 let mut state = snapshot.state;
173
174 for op in self.operations_since(&snapshot.clock) {
175 crate::delta::apply_delta(&mut state, &op.delta);
176 }
177
178 Some(state)
179 }
180
181 fn prune_snapshots(&mut self) {
183 while self.snapshots.len() > self.config.max_snapshots {
184 self.snapshots.remove(0);
185 }
186 }
187
188 fn should_compact(&self) -> bool {
190 self.config.auto_compact
191 && self.operations.len() >= self.config.max_operations_before_compact
192 }
193}
194
195impl GeometricStore for MemoryStore {
196 fn save_snapshot(&mut self, state: &GA3, clock: &VectorClock) {
197 let snapshot = Snapshot {
198 state: state.clone(),
199 clock: clock.clone(),
200 id: self.next_snapshot_id,
201 timestamp: current_timestamp_ms(),
202 };
203 self.next_snapshot_id += 1;
204 self.snapshots.push(snapshot);
205 self.current_state = Some(state.clone());
206 self.current_clock = clock.clone();
207 self.prune_snapshots();
208 }
209
210 fn load_latest_snapshot(&self) -> Option<Snapshot> {
211 self.snapshots.last().cloned()
212 }
213
214 fn load_snapshot(&self, id: u64) -> Option<Snapshot> {
215 self.snapshots.iter().find(|s| s.id == id).cloned()
216 }
217
218 fn append_operation(&mut self, delta: StateDelta) {
219 if let Some(ref mut state) = self.current_state {
221 crate::delta::apply_delta(state, &delta);
222 }
223 self.current_clock.update(&delta.to_clock);
224
225 let op = StoredOperation {
226 delta,
227 sequence: self.next_sequence,
228 compacted: false,
229 };
230 self.next_sequence += 1;
231 self.operations.push_back(op);
232
233 if self.should_compact() {
235 self.compact();
236 }
237 }
238
239 fn operations_since(&self, clock: &VectorClock) -> Vec<StoredOperation> {
240 self.operations
241 .iter()
242 .filter(|op| clock.happens_before(&op.delta.to_clock))
243 .cloned()
244 .collect()
245 }
246
247 fn operations_since_sequence(&self, sequence: u64) -> Vec<StoredOperation> {
248 self.operations
249 .iter()
250 .filter(|op| op.sequence >= sequence)
251 .cloned()
252 .collect()
253 }
254
255 fn compact(&mut self) -> Option<Snapshot> {
256 let state = self.current_state.clone()?;
257
258 self.save_snapshot(&state, &self.current_clock.clone());
260
261 self.operations.clear();
263
264 self.load_latest_snapshot()
265 }
266
267 fn stats(&self) -> StorageStats {
268 let pending = if let Some(snapshot) = self.snapshots.last() {
269 self.operations_since(&snapshot.clock).len()
270 } else {
271 self.operations.len()
272 };
273
274 StorageStats {
275 snapshot_count: self.snapshots.len(),
276 operation_count: self.operations.len(),
277 total_size_bytes: self.estimate_size(),
278 pending_operations: pending,
279 }
280 }
281
282 fn clear(&mut self) {
283 self.snapshots.clear();
284 self.operations.clear();
285 self.current_state = None;
286 self.current_clock = VectorClock::new();
287 self.next_snapshot_id = 0;
288 self.next_sequence = 0;
289 }
290}
291
292impl MemoryStore {
293 fn estimate_size(&self) -> usize {
294 self.snapshots.len() * 64 + self.operations.len() * 96
296 }
297}
298
299fn current_timestamp_ms() -> u64 {
301 std::time::SystemTime::now()
302 .duration_since(std::time::UNIX_EPOCH)
303 .map(|d| d.as_millis() as u64)
304 .unwrap_or(0)
305}
306
307#[derive(Debug)]
309pub struct RecoveryResult {
310 pub state: GA3,
312 pub clock: VectorClock,
314 pub operations_replayed: usize,
316 pub base_snapshot_id: Option<u64>,
318}
319
320pub fn recover_state(store: &impl GeometricStore) -> Option<RecoveryResult> {
324 let snapshot = store.load_latest_snapshot()?;
325 let mut state = snapshot.state.clone();
326 let mut clock = snapshot.clock.clone();
327
328 let ops = store.operations_since(&snapshot.clock);
329 let ops_count = ops.len();
330
331 for op in ops {
332 crate::delta::apply_delta(&mut state, &op.delta);
333 clock.update(&op.delta.to_clock);
334 }
335
336 Some(RecoveryResult {
337 state,
338 clock,
339 operations_replayed: ops_count,
340 base_snapshot_id: Some(snapshot.id),
341 })
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use uuid::Uuid;
348
349 #[test]
350 fn test_memory_store_snapshot() {
351 let mut store = MemoryStore::new();
352
353 let state = GA3::scalar(42.0);
354 let clock = VectorClock::new();
355
356 store.save_snapshot(&state, &clock);
357
358 let loaded = store.load_latest_snapshot().unwrap();
359 assert!((loaded.state.scalar_part() - 42.0).abs() < 1e-10);
360 }
361
362 #[test]
363 fn test_memory_store_operations() {
364 let mut store = MemoryStore::new();
365 let node_id = Uuid::new_v4();
366
367 let state = GA3::scalar(10.0);
369 let mut clock = VectorClock::new();
370 store.save_snapshot(&state, &clock);
371
372 clock.tick(node_id);
374 let delta =
375 StateDelta::additive(GA3::scalar(5.0), VectorClock::new(), clock.clone(), node_id);
376 store.append_operation(delta);
377
378 let ops = store.operations_since(&VectorClock::new());
380 assert_eq!(ops.len(), 1);
381 }
382
383 #[test]
384 fn test_recovery() {
385 let mut store = MemoryStore::new();
386 let node_id = Uuid::new_v4();
387
388 let state = GA3::scalar(10.0);
390 let mut clock = VectorClock::new();
391 store.save_snapshot(&state, &clock);
392
393 clock.tick(node_id);
395 store.append_operation(StateDelta::additive(
396 GA3::scalar(5.0),
397 VectorClock::new(),
398 clock.clone(),
399 node_id,
400 ));
401
402 clock.tick(node_id);
403 store.append_operation(StateDelta::additive(
404 GA3::scalar(3.0),
405 VectorClock::new(),
406 clock.clone(),
407 node_id,
408 ));
409
410 let result = recover_state(&store).unwrap();
412
413 assert!((result.state.scalar_part() - 18.0).abs() < 1e-10);
415 assert_eq!(result.operations_replayed, 2);
416 }
417
418 #[test]
419 fn test_compaction() {
420 let config = MemoryStoreConfig {
421 max_snapshots: 5,
422 max_operations_before_compact: 3,
423 auto_compact: true,
424 };
425 let mut store = MemoryStore::with_config(config);
426 let node_id = Uuid::new_v4();
427
428 let state = GA3::scalar(0.0);
430 let mut clock = VectorClock::new();
431 store.save_snapshot(&state, &clock);
432
433 for i in 1..=5 {
435 clock.tick(node_id);
436 store.append_operation(StateDelta::additive(
437 GA3::scalar(i as f64),
438 VectorClock::new(),
439 clock.clone(),
440 node_id,
441 ));
442 }
443
444 assert!(store.operations.len() < 5);
447 assert!(store.snapshots.len() >= 2);
448 }
449
450 #[test]
451 fn test_stats() {
452 let mut store = MemoryStore::new();
453 let node_id = Uuid::new_v4();
454
455 let state = GA3::scalar(0.0);
456 let mut clock = VectorClock::new();
457 store.save_snapshot(&state, &clock);
458
459 clock.tick(node_id);
460 store.append_operation(StateDelta::additive(
461 GA3::scalar(1.0),
462 VectorClock::new(),
463 clock.clone(),
464 node_id,
465 ));
466
467 let stats = store.stats();
468 assert_eq!(stats.snapshot_count, 1);
469 assert_eq!(stats.operation_count, 1);
470 }
471
472 #[test]
473 fn test_clear() {
474 let mut store = MemoryStore::new();
475
476 store.save_snapshot(&GA3::scalar(1.0), &VectorClock::new());
477 assert!(store.load_latest_snapshot().is_some());
478
479 store.clear();
480 assert!(store.load_latest_snapshot().is_none());
481 }
482
483 #[test]
484 fn test_snapshot_pruning() {
485 let config = MemoryStoreConfig {
486 max_snapshots: 3,
487 max_operations_before_compact: 1000,
488 auto_compact: false,
489 };
490 let mut store = MemoryStore::with_config(config);
491
492 for i in 0..5 {
494 store.save_snapshot(&GA3::scalar(i as f64), &VectorClock::new());
495 }
496
497 assert_eq!(store.snapshots.len(), 3);
499 }
500
501 #[test]
502 fn test_get_current_state() {
503 let mut store = MemoryStore::new();
504 let node_id = Uuid::new_v4();
505
506 let state = GA3::scalar(10.0);
508 let mut clock = VectorClock::new();
509 store.save_snapshot(&state, &clock);
510
511 clock.tick(node_id);
513 store.append_operation(StateDelta::additive(
514 GA3::scalar(5.0),
515 VectorClock::new(),
516 clock.clone(),
517 node_id,
518 ));
519
520 let current = store.get_current_state().unwrap();
521 assert!((current.scalar_part() - 15.0).abs() < 1e-10);
522 }
523
524 #[test]
525 fn test_operations_since_sequence() {
526 let mut store = MemoryStore::new();
527 let node_id = Uuid::new_v4();
528
529 store.save_snapshot(&GA3::scalar(0.0), &VectorClock::new());
530
531 let mut clock = VectorClock::new();
532 for _ in 0..5 {
533 clock.tick(node_id);
534 store.append_operation(StateDelta::additive(
535 GA3::scalar(1.0),
536 VectorClock::new(),
537 clock.clone(),
538 node_id,
539 ));
540 }
541
542 let ops = store.operations_since_sequence(3);
543 assert_eq!(ops.len(), 2); }
545}