Skip to main content

contextdb_cli/
auto_sync.rs

1use std::future::Future;
2use std::time::Duration;
3use tokio::sync::mpsc;
4
5#[derive(Clone, Copy, Debug, PartialEq, Eq)]
6pub struct AutoSyncConfig {
7    pub debounce: Duration,
8    pub retry_backoff: Duration,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PushOutcome {
13    pub conflicts: Vec<String>,
14    pub caught_up: bool,
15}
16
17impl Default for AutoSyncConfig {
18    fn default() -> Self {
19        Self {
20            debounce: Duration::from_millis(500),
21            retry_backoff: Duration::from_millis(500),
22        }
23    }
24}
25
26pub async fn run_loop<P, Fut, R>(
27    mut rx: mpsc::UnboundedReceiver<()>,
28    config: AutoSyncConfig,
29    mut push: P,
30    mut report: R,
31) where
32    P: FnMut() -> Fut,
33    Fut: Future<Output = Result<PushOutcome, String>>,
34    R: FnMut(String),
35{
36    let mut next_delay = None;
37
38    loop {
39        let delay = match next_delay.take() {
40            Some(delay) => delay,
41            None => match rx.recv().await {
42                Some(()) => config.debounce,
43                None => break,
44            },
45        };
46
47        let sleep = tokio::time::sleep(delay);
48        tokio::pin!(sleep);
49
50        loop {
51            tokio::select! {
52                _ = &mut sleep => break,
53                maybe = rx.recv() => {
54                    match maybe {
55                        Some(()) => {
56                            sleep.as_mut().reset(tokio::time::Instant::now() + config.debounce);
57                        }
58                        None => return,
59                    }
60                }
61            }
62        }
63
64        match push().await {
65            Ok(outcome) => {
66                for reason in outcome.conflicts {
67                    report(format!("sync conflict: {reason}"));
68                }
69                if !outcome.caught_up && !rx.is_closed() {
70                    next_delay = Some(Duration::ZERO);
71                }
72            }
73            Err(err) => {
74                report(format!("auto-sync push failed: {err}"));
75                if rx.is_closed() {
76                    break;
77                }
78                next_delay = Some(config.retry_backoff);
79            }
80        }
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use std::sync::atomic::{AtomicUsize, Ordering};
88    use std::sync::{Arc, Mutex};
89
90    #[tokio::test]
91    async fn auto_sync_retries_until_success_after_error() {
92        let (tx, rx) = mpsc::unbounded_channel();
93        let attempts = Arc::new(AtomicUsize::new(0));
94        let reports = Arc::new(Mutex::new(Vec::<String>::new()));
95        let attempts_for_task = attempts.clone();
96        let reports_for_task = reports.clone();
97
98        let handle = tokio::spawn(run_loop(
99            rx,
100            AutoSyncConfig {
101                debounce: Duration::from_millis(1),
102                retry_backoff: Duration::from_millis(1),
103            },
104            move || {
105                let attempts = attempts_for_task.clone();
106                async move {
107                    let attempt = attempts.fetch_add(1, Ordering::SeqCst);
108                    if attempt == 0 {
109                        Err("transient".to_string())
110                    } else {
111                        Ok(PushOutcome {
112                            conflicts: Vec::new(),
113                            caught_up: true,
114                        })
115                    }
116                }
117            },
118            move |msg| {
119                reports_for_task.lock().unwrap().push(msg);
120            },
121        ));
122
123        tx.send(()).unwrap();
124        tokio::time::sleep(Duration::from_millis(25)).await;
125        drop(tx);
126        handle.await.unwrap();
127
128        assert!(
129            attempts.load(Ordering::SeqCst) >= 2,
130            "auto-sync should retry after a failed push"
131        );
132        let reports = reports.lock().unwrap();
133        assert!(
134            reports
135                .iter()
136                .any(|msg| msg.contains("auto-sync push failed")),
137            "auto-sync should surface push failures"
138        );
139    }
140
141    #[tokio::test]
142    async fn auto_sync_coalesces_burst_notifications() {
143        let (tx, rx) = mpsc::unbounded_channel();
144        let attempts = Arc::new(AtomicUsize::new(0));
145        let attempts_for_task = attempts.clone();
146
147        let handle = tokio::spawn(run_loop(
148            rx,
149            AutoSyncConfig {
150                debounce: Duration::from_millis(10),
151                retry_backoff: Duration::from_millis(1),
152            },
153            move || {
154                let attempts = attempts_for_task.clone();
155                async move {
156                    attempts.fetch_add(1, Ordering::SeqCst);
157                    Ok(PushOutcome {
158                        conflicts: Vec::new(),
159                        caught_up: true,
160                    })
161                }
162            },
163            |_msg| {},
164        ));
165
166        tx.send(()).unwrap();
167        tx.send(()).unwrap();
168        tx.send(()).unwrap();
169        tokio::time::sleep(Duration::from_millis(30)).await;
170        drop(tx);
171        handle.await.unwrap();
172
173        assert_eq!(
174            attempts.load(Ordering::SeqCst),
175            1,
176            "burst notifications should collapse into a single background push"
177        );
178    }
179
180    #[tokio::test]
181    async fn auto_sync_reports_conflicts() {
182        let (tx, rx) = mpsc::unbounded_channel();
183        let reports = Arc::new(Mutex::new(Vec::<String>::new()));
184        let reports_for_task = reports.clone();
185
186        let handle = tokio::spawn(run_loop(
187            rx,
188            AutoSyncConfig {
189                debounce: Duration::from_millis(1),
190                retry_backoff: Duration::from_millis(1),
191            },
192            || async {
193                Ok(PushOutcome {
194                    conflicts: vec!["edge_wins".to_string()],
195                    caught_up: true,
196                })
197            },
198            move |msg| {
199                reports_for_task.lock().unwrap().push(msg);
200            },
201        ));
202
203        tx.send(()).unwrap();
204        tokio::time::sleep(Duration::from_millis(10)).await;
205        drop(tx);
206        handle.await.unwrap();
207
208        let reports = reports.lock().unwrap();
209        assert!(
210            reports
211                .iter()
212                .any(|msg| msg.contains("sync conflict: edge_wins")),
213            "auto-sync should surface conflict reasons"
214        );
215    }
216
217    #[tokio::test]
218    async fn auto_sync_retries_until_caught_up() {
219        let (tx, rx) = mpsc::unbounded_channel();
220        let attempts = Arc::new(AtomicUsize::new(0));
221        let attempts_for_task = attempts.clone();
222
223        let handle = tokio::spawn(run_loop(
224            rx,
225            AutoSyncConfig {
226                debounce: Duration::from_millis(1),
227                retry_backoff: Duration::from_millis(1),
228            },
229            move || {
230                let attempts = attempts_for_task.clone();
231                async move {
232                    let attempt = attempts.fetch_add(1, Ordering::SeqCst);
233                    Ok(PushOutcome {
234                        conflicts: Vec::new(),
235                        caught_up: attempt > 0,
236                    })
237                }
238            },
239            |_msg| {},
240        ));
241
242        tx.send(()).unwrap();
243        tokio::time::sleep(Duration::from_millis(25)).await;
244        drop(tx);
245        handle.await.unwrap();
246
247        assert!(
248            attempts.load(Ordering::SeqCst) >= 2,
249            "auto-sync must keep pushing until the latest local LSN is covered"
250        );
251    }
252}