Skip to main content

contextdb_server/
sync_plugin.rs

1use contextdb_engine::plugin::{CommitSource, DatabasePlugin};
2use contextdb_tx::WriteSet;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use tokio::sync::mpsc;
5
6/// Plugin that marks auto-sync as active.
7/// Sends change notifications to the background push task via an mpsc channel.
8pub struct SyncPlugin {
9    tx: std::sync::Mutex<Option<mpsc::UnboundedSender<()>>>,
10    auto_enabled: AtomicBool,
11    pending_lsn: AtomicU64,
12}
13
14impl SyncPlugin {
15    pub fn new(tx: mpsc::UnboundedSender<()>) -> Self {
16        Self {
17            tx: std::sync::Mutex::new(Some(tx)),
18            auto_enabled: AtomicBool::new(false),
19            pending_lsn: AtomicU64::new(0),
20        }
21    }
22
23    /// Enable or disable auto-sync.
24    pub fn set_auto(&self, enabled: bool) {
25        self.auto_enabled.store(enabled, Ordering::SeqCst);
26    }
27
28    /// Check if auto-sync is enabled.
29    pub fn is_auto(&self) -> bool {
30        self.auto_enabled.load(Ordering::SeqCst)
31    }
32
33    pub fn pending_lsn(&self) -> u64 {
34        self.pending_lsn.load(Ordering::SeqCst)
35    }
36
37    /// Signal the background push task that a DML change occurred.
38    pub fn notify_change(&self) -> Result<(), &'static str> {
39        match self.tx.lock() {
40            Ok(guard) => {
41                if let Some(tx) = guard.as_ref() {
42                    if tx.send(()).is_err() {
43                        tracing::warn!("sync plugin receiver dropped; change notification lost");
44                        return Err("auto-sync worker unavailable");
45                    }
46                    Ok(())
47                } else {
48                    Err("auto-sync worker unavailable")
49                }
50            }
51            Err(_) => {
52                tracing::warn!("sync plugin mutex poisoned; skipping change notification");
53                Err("auto-sync worker unavailable")
54            }
55        }
56    }
57
58    /// Shutdown: drop the sender to close the channel and stop the background task.
59    pub fn shutdown(&self) {
60        match self.tx.lock() {
61            Ok(mut guard) => {
62                let _ = guard.take();
63            }
64            Err(_) => tracing::warn!("sync plugin mutex poisoned during shutdown"),
65        }
66    }
67}
68
69impl DatabasePlugin for SyncPlugin {
70    fn post_commit(&self, ws: &WriteSet, source: CommitSource) {
71        if !self.is_auto() || source == CommitSource::SyncPull || ws.is_empty() {
72            return;
73        }
74        if let Some(lsn) = ws.commit_lsn {
75            self.pending_lsn.fetch_max(lsn, Ordering::SeqCst);
76        }
77        let _ = self.notify_change();
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use std::sync::Arc;
85
86    #[test]
87    fn sync_03_plugin_survives_poisoned_mutex() {
88        let (tx, _rx) = mpsc::unbounded_channel();
89        let plugin = Arc::new(SyncPlugin::new(tx));
90        let poison_plugin = plugin.clone();
91        let _ = std::thread::spawn(move || {
92            let _guard = poison_plugin.tx.lock().unwrap();
93            panic!("poison sync_plugin mutex");
94        })
95        .join();
96
97        let panic = std::panic::catch_unwind(|| {
98            let _ = plugin.notify_change();
99        });
100        assert!(
101            panic.is_ok(),
102            "notify_change should not panic on a poisoned sync plugin mutex"
103        );
104    }
105
106    #[test]
107    fn sync_04_plugin_queues_multiple_notifications() {
108        let (tx, mut rx) = mpsc::unbounded_channel();
109        let plugin = SyncPlugin::new(tx);
110
111        plugin.notify_change().unwrap();
112        plugin.notify_change().unwrap();
113
114        assert_eq!(rx.try_recv(), Ok(()));
115        assert_eq!(rx.try_recv(), Ok(()));
116    }
117
118    #[test]
119    fn sync_05_plugin_reports_closed_receiver() {
120        let (tx, rx) = mpsc::unbounded_channel();
121        let plugin = SyncPlugin::new(tx);
122        drop(rx);
123
124        assert_eq!(plugin.notify_change(), Err("auto-sync worker unavailable"));
125    }
126
127    #[test]
128    fn sync_06_post_commit_notifies_only_for_local_writes_when_enabled() {
129        let (tx, mut rx) = mpsc::unbounded_channel();
130        let plugin = SyncPlugin::new(tx);
131        let mut ws = WriteSet::new();
132        ws.relational_inserts.push((
133            "t".to_string(),
134            contextdb_core::VersionedRow {
135                row_id: 1,
136                values: std::collections::HashMap::new(),
137                created_tx: 1,
138                deleted_tx: None,
139                lsn: 1,
140                created_at: None,
141            },
142        ));
143
144        plugin.post_commit(&ws, CommitSource::AutoCommit);
145        assert!(rx.try_recv().is_err(), "disabled auto-sync must stay quiet");
146
147        plugin.set_auto(true);
148        plugin.post_commit(&ws, CommitSource::SyncPull);
149        assert!(
150            rx.try_recv().is_err(),
151            "sync-pull commits must not trigger another auto-sync push"
152        );
153
154        plugin.post_commit(&ws, CommitSource::AutoCommit);
155        assert_eq!(rx.try_recv(), Ok(()));
156    }
157
158    #[test]
159    fn sync_07_post_commit_tracks_latest_pending_lsn() {
160        let (tx, _rx) = mpsc::unbounded_channel();
161        let plugin = SyncPlugin::new(tx);
162        plugin.set_auto(true);
163
164        let mut ws = WriteSet::new();
165        ws.commit_lsn = Some(7);
166        ws.relational_deletes.push(("t".to_string(), 1, 7));
167        plugin.post_commit(&ws, CommitSource::AutoCommit);
168        assert_eq!(plugin.pending_lsn(), 7);
169
170        let mut newer = WriteSet::new();
171        newer.commit_lsn = Some(11);
172        newer.relational_deletes.push(("t".to_string(), 2, 11));
173        plugin.post_commit(&newer, CommitSource::AutoCommit);
174        assert_eq!(plugin.pending_lsn(), 11);
175    }
176}