dx_forge/sync/
protocol.rs1use std::sync::Arc;
2use tokio::sync::broadcast;
3
4use crate::crdt::Operation;
5
6#[derive(Clone)]
10pub struct SyncManager {
11 tx: broadcast::Sender<Arc<Operation>>,
12}
13
14impl SyncManager {
15 pub fn new() -> Self {
17 let (tx, _) = broadcast::channel(256);
18 Self { tx }
19 }
20
21 pub fn subscribe(&self) -> broadcast::Receiver<Arc<Operation>> {
24 self.tx.subscribe()
25 }
26
27 pub fn publish(
30 &self,
31 op: Arc<Operation>,
32 ) -> Result<usize, broadcast::error::SendError<Arc<Operation>>> {
33 self.tx.send(op)
34 }
35}
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40
41 #[tokio::test]
42 async fn sync_manager_roundtrip() {
43 let mgr = SyncManager::new();
44 let mut rx = mgr.subscribe();
45
46 let op = Arc::new(Operation::new(
47 "/tmp/x".to_string(),
48 crate::crdt::OperationType::FileCreate {
49 content: "a".into(),
50 },
51 "actor".into(),
52 ));
53 mgr.publish(op.clone()).unwrap();
54
55 let got = rx.recv().await.unwrap();
56 assert_eq!(got.id, op.id);
57 }
58}
59