dx_forge/sync/
protocol.rs

1use std::sync::Arc;
2use tokio::sync::broadcast;
3
4use crate::crdt::Operation;
5
6/// Lightweight in-process sync manager using a tokio broadcast channel.
7/// Components can `publish` operations and other components can `subscribe`
8/// to receive live updates. Messages are wrapped in `Arc` to make cloning cheap.
9#[derive(Clone)]
10pub struct SyncManager {
11    tx: broadcast::Sender<Arc<Operation>>,
12}
13
14impl SyncManager {
15    /// Create a new SyncManager with a reasonable buffer size.
16    pub fn new() -> Self {
17        let (tx, _) = broadcast::channel(256);
18        Self { tx }
19    }
20
21    /// Subscribe to live operations. The receiver will receive only
22    /// messages published after subscription.
23    pub fn subscribe(&self) -> broadcast::Receiver<Arc<Operation>> {
24        self.tx.subscribe()
25    }
26
27    /// Publish an operation to all subscribers. Returns Err if there are
28    /// no subscribers or the buffer is full.
29    pub fn publish(
30        &self,
31        op: Arc<Operation>,
32    ) -> Result<usize, broadcast::error::SendError<Arc<Operation>>> {
33        self.tx.send(op)
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40
41    #[tokio::test]
42    async fn sync_manager_roundtrip() {
43        let mgr = SyncManager::new();
44        let mut rx = mgr.subscribe();
45
46        let op = Arc::new(Operation::new(
47            "/tmp/x".to_string(),
48            crate::crdt::OperationType::FileCreate {
49                content: "a".into(),
50            },
51            "actor".into(),
52        ));
53        mgr.publish(op.clone()).unwrap();
54
55        let got = rx.recv().await.unwrap();
56        assert_eq!(got.id, op.id);
57    }
58}
59// Future: WebSocket-based sync protocol for real-time collaboration