pluresdb_sync/
lib.rs

1//! High-level synchronization primitives for PluresDB.
2//!
3//! These types provide a foundational event pipeline that higher-level
4//! replication components can build on top of. For now we expose a lightweight
5//! broadcast hub with typed events that integrates with Tokio tasks.
6
7use anyhow::Result;
8use serde::{Deserialize, Serialize};
9use tokio::sync::broadcast;
10use tracing::instrument;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13pub enum SyncEvent {
14    NodeUpsert { id: String },
15    NodeDelete { id: String },
16    PeerConnected { peer_id: String },
17    PeerDisconnected { peer_id: String },
18}
19
20#[derive(Debug)]
21pub struct SyncBroadcaster {
22    sender: broadcast::Sender<SyncEvent>,
23}
24
25impl Default for SyncBroadcaster {
26    fn default() -> Self {
27        let (sender, _receiver) = broadcast::channel(1024);
28        Self { sender }
29    }
30}
31
32impl SyncBroadcaster {
33    pub fn new(capacity: usize) -> Self {
34        let (sender, _receiver) = broadcast::channel(capacity);
35        Self { sender }
36    }
37
38    pub fn subscribe(&self) -> broadcast::Receiver<SyncEvent> {
39        self.sender.subscribe()
40    }
41
42    #[instrument(skip(self))]
43    pub fn publish(&self, event: SyncEvent) -> Result<()> {
44        self.sender.send(event)?;
45        Ok(())
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52
53    #[tokio::test]
54    async fn broadcast_events() {
55        let hub = SyncBroadcaster::default();
56        let mut rx = hub.subscribe();
57        hub.publish(SyncEvent::NodeUpsert {
58            id: "node-1".to_string(),
59        })
60        .unwrap();
61
62        let received = rx.recv().await.unwrap();
63        assert_eq!(
64            received,
65            SyncEvent::NodeUpsert {
66                id: "node-1".to_string()
67            }
68        );
69    }
70}