contextdb_server/
sync_plugin.rs1use contextdb_engine::plugin::{CommitSource, DatabasePlugin};
2use contextdb_tx::WriteSet;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use tokio::sync::mpsc;
5
6pub struct SyncPlugin {
9 tx: std::sync::Mutex<Option<mpsc::UnboundedSender<()>>>,
10 auto_enabled: AtomicBool,
11 pending_lsn: AtomicU64,
12}
13
14impl SyncPlugin {
15 pub fn new(tx: mpsc::UnboundedSender<()>) -> Self {
16 Self {
17 tx: std::sync::Mutex::new(Some(tx)),
18 auto_enabled: AtomicBool::new(false),
19 pending_lsn: AtomicU64::new(0),
20 }
21 }
22
23 pub fn set_auto(&self, enabled: bool) {
25 self.auto_enabled.store(enabled, Ordering::SeqCst);
26 }
27
28 pub fn is_auto(&self) -> bool {
30 self.auto_enabled.load(Ordering::SeqCst)
31 }
32
33 pub fn pending_lsn(&self) -> u64 {
34 self.pending_lsn.load(Ordering::SeqCst)
35 }
36
37 pub fn notify_change(&self) -> Result<(), &'static str> {
39 match self.tx.lock() {
40 Ok(guard) => {
41 if let Some(tx) = guard.as_ref() {
42 if tx.send(()).is_err() {
43 tracing::warn!("sync plugin receiver dropped; change notification lost");
44 return Err("auto-sync worker unavailable");
45 }
46 Ok(())
47 } else {
48 Err("auto-sync worker unavailable")
49 }
50 }
51 Err(_) => {
52 tracing::warn!("sync plugin mutex poisoned; skipping change notification");
53 Err("auto-sync worker unavailable")
54 }
55 }
56 }
57
58 pub fn shutdown(&self) {
60 match self.tx.lock() {
61 Ok(mut guard) => {
62 let _ = guard.take();
63 }
64 Err(_) => tracing::warn!("sync plugin mutex poisoned during shutdown"),
65 }
66 }
67}
68
69impl DatabasePlugin for SyncPlugin {
70 fn post_commit(&self, ws: &WriteSet, source: CommitSource) {
71 if !self.is_auto() || source == CommitSource::SyncPull || ws.is_empty() {
72 return;
73 }
74 if let Some(lsn) = ws.commit_lsn {
75 self.pending_lsn.fetch_max(lsn, Ordering::SeqCst);
76 }
77 let _ = self.notify_change();
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use std::sync::Arc;
85
86 #[test]
87 fn sync_03_plugin_survives_poisoned_mutex() {
88 let (tx, _rx) = mpsc::unbounded_channel();
89 let plugin = Arc::new(SyncPlugin::new(tx));
90 let poison_plugin = plugin.clone();
91 let _ = std::thread::spawn(move || {
92 let _guard = poison_plugin.tx.lock().unwrap();
93 panic!("poison sync_plugin mutex");
94 })
95 .join();
96
97 let panic = std::panic::catch_unwind(|| {
98 let _ = plugin.notify_change();
99 });
100 assert!(
101 panic.is_ok(),
102 "notify_change should not panic on a poisoned sync plugin mutex"
103 );
104 }
105
106 #[test]
107 fn sync_04_plugin_queues_multiple_notifications() {
108 let (tx, mut rx) = mpsc::unbounded_channel();
109 let plugin = SyncPlugin::new(tx);
110
111 plugin.notify_change().unwrap();
112 plugin.notify_change().unwrap();
113
114 assert_eq!(rx.try_recv(), Ok(()));
115 assert_eq!(rx.try_recv(), Ok(()));
116 }
117
118 #[test]
119 fn sync_05_plugin_reports_closed_receiver() {
120 let (tx, rx) = mpsc::unbounded_channel();
121 let plugin = SyncPlugin::new(tx);
122 drop(rx);
123
124 assert_eq!(plugin.notify_change(), Err("auto-sync worker unavailable"));
125 }
126
127 #[test]
128 fn sync_06_post_commit_notifies_only_for_local_writes_when_enabled() {
129 let (tx, mut rx) = mpsc::unbounded_channel();
130 let plugin = SyncPlugin::new(tx);
131 let mut ws = WriteSet::new();
132 ws.relational_inserts.push((
133 "t".to_string(),
134 contextdb_core::VersionedRow {
135 row_id: 1,
136 values: std::collections::HashMap::new(),
137 created_tx: 1,
138 deleted_tx: None,
139 lsn: 1,
140 created_at: None,
141 },
142 ));
143
144 plugin.post_commit(&ws, CommitSource::AutoCommit);
145 assert!(rx.try_recv().is_err(), "disabled auto-sync must stay quiet");
146
147 plugin.set_auto(true);
148 plugin.post_commit(&ws, CommitSource::SyncPull);
149 assert!(
150 rx.try_recv().is_err(),
151 "sync-pull commits must not trigger another auto-sync push"
152 );
153
154 plugin.post_commit(&ws, CommitSource::AutoCommit);
155 assert_eq!(rx.try_recv(), Ok(()));
156 }
157
158 #[test]
159 fn sync_07_post_commit_tracks_latest_pending_lsn() {
160 let (tx, _rx) = mpsc::unbounded_channel();
161 let plugin = SyncPlugin::new(tx);
162 plugin.set_auto(true);
163
164 let mut ws = WriteSet::new();
165 ws.commit_lsn = Some(7);
166 ws.relational_deletes.push(("t".to_string(), 1, 7));
167 plugin.post_commit(&ws, CommitSource::AutoCommit);
168 assert_eq!(plugin.pending_lsn(), 7);
169
170 let mut newer = WriteSet::new();
171 newer.commit_lsn = Some(11);
172 newer.relational_deletes.push(("t".to_string(), 2, 11));
173 plugin.post_commit(&newer, CommitSource::AutoCommit);
174 assert_eq!(plugin.pending_lsn(), 11);
175 }
176}