contextdb_cli/
auto_sync.rs1use std::future::Future;
2use std::time::Duration;
3use tokio::sync::mpsc;
4
5#[derive(Clone, Copy, Debug, PartialEq, Eq)]
6pub struct AutoSyncConfig {
7 pub debounce: Duration,
8 pub retry_backoff: Duration,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PushOutcome {
13 pub conflicts: Vec<String>,
14 pub caught_up: bool,
15}
16
17impl Default for AutoSyncConfig {
18 fn default() -> Self {
19 Self {
20 debounce: Duration::from_millis(500),
21 retry_backoff: Duration::from_millis(500),
22 }
23 }
24}
25
26pub async fn run_loop<P, Fut, R>(
27 mut rx: mpsc::UnboundedReceiver<()>,
28 config: AutoSyncConfig,
29 mut push: P,
30 mut report: R,
31) where
32 P: FnMut() -> Fut,
33 Fut: Future<Output = Result<PushOutcome, String>>,
34 R: FnMut(String),
35{
36 let mut next_delay = None;
37
38 loop {
39 let delay = match next_delay.take() {
40 Some(delay) => delay,
41 None => match rx.recv().await {
42 Some(()) => config.debounce,
43 None => break,
44 },
45 };
46
47 let sleep = tokio::time::sleep(delay);
48 tokio::pin!(sleep);
49
50 loop {
51 tokio::select! {
52 _ = &mut sleep => break,
53 maybe = rx.recv() => {
54 match maybe {
55 Some(()) => {
56 sleep.as_mut().reset(tokio::time::Instant::now() + config.debounce);
57 }
58 None => return,
59 }
60 }
61 }
62 }
63
64 match push().await {
65 Ok(outcome) => {
66 for reason in outcome.conflicts {
67 report(format!("sync conflict: {reason}"));
68 }
69 if !outcome.caught_up && !rx.is_closed() {
70 next_delay = Some(Duration::ZERO);
71 }
72 }
73 Err(err) => {
74 report(format!("auto-sync push failed: {err}"));
75 if rx.is_closed() {
76 break;
77 }
78 next_delay = Some(config.retry_backoff);
79 }
80 }
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use std::sync::atomic::{AtomicUsize, Ordering};
88 use std::sync::{Arc, Mutex};
89
90 #[tokio::test]
91 async fn auto_sync_retries_until_success_after_error() {
92 let (tx, rx) = mpsc::unbounded_channel();
93 let attempts = Arc::new(AtomicUsize::new(0));
94 let reports = Arc::new(Mutex::new(Vec::<String>::new()));
95 let attempts_for_task = attempts.clone();
96 let reports_for_task = reports.clone();
97
98 let handle = tokio::spawn(run_loop(
99 rx,
100 AutoSyncConfig {
101 debounce: Duration::from_millis(1),
102 retry_backoff: Duration::from_millis(1),
103 },
104 move || {
105 let attempts = attempts_for_task.clone();
106 async move {
107 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
108 if attempt == 0 {
109 Err("transient".to_string())
110 } else {
111 Ok(PushOutcome {
112 conflicts: Vec::new(),
113 caught_up: true,
114 })
115 }
116 }
117 },
118 move |msg| {
119 reports_for_task.lock().unwrap().push(msg);
120 },
121 ));
122
123 tx.send(()).unwrap();
124 tokio::time::sleep(Duration::from_millis(25)).await;
125 drop(tx);
126 handle.await.unwrap();
127
128 assert!(
129 attempts.load(Ordering::SeqCst) >= 2,
130 "auto-sync should retry after a failed push"
131 );
132 let reports = reports.lock().unwrap();
133 assert!(
134 reports
135 .iter()
136 .any(|msg| msg.contains("auto-sync push failed")),
137 "auto-sync should surface push failures"
138 );
139 }
140
141 #[tokio::test]
142 async fn auto_sync_coalesces_burst_notifications() {
143 let (tx, rx) = mpsc::unbounded_channel();
144 let attempts = Arc::new(AtomicUsize::new(0));
145 let attempts_for_task = attempts.clone();
146
147 let handle = tokio::spawn(run_loop(
148 rx,
149 AutoSyncConfig {
150 debounce: Duration::from_millis(10),
151 retry_backoff: Duration::from_millis(1),
152 },
153 move || {
154 let attempts = attempts_for_task.clone();
155 async move {
156 attempts.fetch_add(1, Ordering::SeqCst);
157 Ok(PushOutcome {
158 conflicts: Vec::new(),
159 caught_up: true,
160 })
161 }
162 },
163 |_msg| {},
164 ));
165
166 tx.send(()).unwrap();
167 tx.send(()).unwrap();
168 tx.send(()).unwrap();
169 tokio::time::sleep(Duration::from_millis(30)).await;
170 drop(tx);
171 handle.await.unwrap();
172
173 assert_eq!(
174 attempts.load(Ordering::SeqCst),
175 1,
176 "burst notifications should collapse into a single background push"
177 );
178 }
179
180 #[tokio::test]
181 async fn auto_sync_reports_conflicts() {
182 let (tx, rx) = mpsc::unbounded_channel();
183 let reports = Arc::new(Mutex::new(Vec::<String>::new()));
184 let reports_for_task = reports.clone();
185
186 let handle = tokio::spawn(run_loop(
187 rx,
188 AutoSyncConfig {
189 debounce: Duration::from_millis(1),
190 retry_backoff: Duration::from_millis(1),
191 },
192 || async {
193 Ok(PushOutcome {
194 conflicts: vec!["edge_wins".to_string()],
195 caught_up: true,
196 })
197 },
198 move |msg| {
199 reports_for_task.lock().unwrap().push(msg);
200 },
201 ));
202
203 tx.send(()).unwrap();
204 tokio::time::sleep(Duration::from_millis(10)).await;
205 drop(tx);
206 handle.await.unwrap();
207
208 let reports = reports.lock().unwrap();
209 assert!(
210 reports
211 .iter()
212 .any(|msg| msg.contains("sync conflict: edge_wins")),
213 "auto-sync should surface conflict reasons"
214 );
215 }
216
217 #[tokio::test]
218 async fn auto_sync_retries_until_caught_up() {
219 let (tx, rx) = mpsc::unbounded_channel();
220 let attempts = Arc::new(AtomicUsize::new(0));
221 let attempts_for_task = attempts.clone();
222
223 let handle = tokio::spawn(run_loop(
224 rx,
225 AutoSyncConfig {
226 debounce: Duration::from_millis(1),
227 retry_backoff: Duration::from_millis(1),
228 },
229 move || {
230 let attempts = attempts_for_task.clone();
231 async move {
232 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
233 Ok(PushOutcome {
234 conflicts: Vec::new(),
235 caught_up: attempt > 0,
236 })
237 }
238 },
239 |_msg| {},
240 ));
241
242 tx.send(()).unwrap();
243 tokio::time::sleep(Duration::from_millis(25)).await;
244 drop(tx);
245 handle.await.unwrap();
246
247 assert!(
248 attempts.load(Ordering::SeqCst) >= 2,
249 "auto-sync must keep pushing until the latest local LSN is covered"
250 );
251 }
252}