dx_forge/storage/
oplog.rs

1use anyhow::{anyhow, Result};
2use crossbeam::channel::{self, Sender};
3use dashmap::DashMap;
4use std::sync::Arc;
5use std::thread;
6use uuid::Uuid;
7
8use super::Database;
9use crate::crdt::Operation;
10
11pub struct OperationLog {
12    // In-memory cache for fast lookups and deduplication
13    cache: DashMap<Uuid, Operation>,
14    queue: Sender<Operation>,
15}
16
17impl OperationLog {
18    pub fn new(db: Arc<Database>) -> Self {
19        let (tx, rx) = channel::unbounded::<Operation>();
20        let worker_db = db.clone();
21        thread::Builder::new()
22            .name("forge-oplog-writer".to_string())
23            .spawn(move || {
24                while let Ok(op) = rx.recv() {
25                    if let Err(err) = worker_db.store_operation(&op) {
26                        eprintln!("⚠️  Failed to persist operation {}: {err}", op.id);
27                    }
28                }
29            })
30            .expect("failed to spawn oplog writer thread");
31
32        Self {
33            cache: DashMap::new(),
34            queue: tx,
35        }
36    }
37
38    pub fn append(&self, operation: Operation) -> Result<bool> {
39        let is_new = self.cache.insert(operation.id, operation.clone()).is_none();
40        if !is_new {
41            return Ok(false);
42        }
43
44        self.queue
45            .send(operation)
46            .map_err(|err| anyhow!("failed to enqueue operation for persistence: {err}"))?;
47
48        Ok(true)
49    }
50
51    #[allow(dead_code)]
52    pub fn get(&self, id: &Uuid) -> Option<Operation> {
53        self.cache.get(id).map(|op| op.clone())
54    }
55}