dx_forge/storage/
oplog.rs1use anyhow::{anyhow, Result};
2use crossbeam::channel::{self, Sender};
3use dashmap::DashMap;
4use std::sync::Arc;
5use std::thread;
6use uuid::Uuid;
7
8use super::Database;
9use crate::crdt::Operation;
10
11pub struct OperationLog {
12 cache: DashMap<Uuid, Operation>,
14 queue: Sender<Operation>,
15}
16
17impl OperationLog {
18 pub fn new(db: Arc<Database>) -> Self {
19 let (tx, rx) = channel::unbounded::<Operation>();
20 let worker_db = db.clone();
21 thread::Builder::new()
22 .name("forge-oplog-writer".to_string())
23 .spawn(move || {
24 while let Ok(op) = rx.recv() {
25 if let Err(err) = worker_db.store_operation(&op) {
26 eprintln!("⚠️ Failed to persist operation {}: {err}", op.id);
27 }
28 }
29 })
30 .expect("failed to spawn oplog writer thread");
31
32 Self {
33 cache: DashMap::new(),
34 queue: tx,
35 }
36 }
37
38 pub fn append(&self, operation: Operation) -> Result<bool> {
39 let is_new = self.cache.insert(operation.id, operation.clone()).is_none();
40 if !is_new {
41 return Ok(false);
42 }
43
44 self.queue
45 .send(operation)
46 .map_err(|err| anyhow!("failed to enqueue operation for persistence: {err}"))?;
47
48 Ok(true)
49 }
50
51 #[allow(dead_code)]
52 pub fn get(&self, id: &Uuid) -> Option<Operation> {
53 self.cache.get(id).map(|op| op.clone())
54 }
55}