Skip to main content

oxide_mirror/
sync.rs

1//! Glue layer that drives a [`SyncSource`] against a [`MirrorStore`].
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use futures_util::StreamExt;
7use serde::{Deserialize, Serialize};
8
9use crate::conflict::{ConflictStrategy, LastWriteWins};
10use crate::error::{MirrorError, Result};
11use crate::source::SyncSource;
12use crate::store::MirrorStore;
13
14/// What happened during a single [`Syncer::sync_source`] run.
15#[derive(Debug, Clone, Default, Serialize, Deserialize)]
16pub struct SyncReport {
17    /// Source id that was synced.
18    pub source: String,
19    /// Total deltas pulled.
20    pub pulled: usize,
21    /// Deltas that actually changed `mirror_records`.
22    pub applied: usize,
23    /// Deltas that were rejected by the conflict strategy.
24    pub skipped: usize,
25    /// Cursor returned by the source's final batch.
26    pub final_cursor: Option<String>,
27    /// Per-resource counters (handy for cross-resource sources).
28    pub per_resource: HashMap<String, ResourceCounters>,
29}
30
31/// Per-resource counters inside a [`SyncReport`].
32#[derive(Debug, Clone, Default, Serialize, Deserialize)]
33pub struct ResourceCounters {
34    /// Deltas pulled.
35    pub pulled: usize,
36    /// Deltas applied.
37    pub applied: usize,
38    /// Deltas skipped.
39    pub skipped: usize,
40}
41
42/// Holds the registered [`SyncSource`]s and runs them against a
43/// [`MirrorStore`].
44pub struct Syncer {
45    store: MirrorStore,
46    sources: HashMap<String, Arc<dyn SyncSource>>,
47    strategy: Arc<dyn ConflictStrategy>,
48}
49
50impl Syncer {
51    /// Build a syncer with the [`LastWriteWins`] default strategy.
52    pub fn new(store: MirrorStore) -> Self {
53        Self::with_strategy(store, Arc::new(LastWriteWins))
54    }
55
56    /// Build a syncer with a custom conflict strategy.
57    pub fn with_strategy(store: MirrorStore, strategy: Arc<dyn ConflictStrategy>) -> Self {
58        Self {
59            store,
60            sources: HashMap::new(),
61            strategy,
62        }
63    }
64
65    /// Register a source so it can be referenced by id later.
66    pub fn register_source(&mut self, source: Arc<dyn SyncSource>) {
67        self.sources.insert(source.id().to_string(), source);
68    }
69
70    /// Read-only access to the store, e.g. for queries.
71    pub fn store(&self) -> &MirrorStore {
72        &self.store
73    }
74
75    /// Look up a source by id.
76    pub fn source(&self, id: &str) -> Option<Arc<dyn SyncSource>> {
77        self.sources.get(id).cloned()
78    }
79
80    /// Sync every batch a source is able to produce in one go. Stops when the
81    /// source reports `has_more = false` or returns an empty batch.
82    pub async fn sync_source(&self, source_id: &str) -> Result<SyncReport> {
83        let source = self
84            .sources
85            .get(source_id)
86            .ok_or_else(|| MirrorError::UnknownSource(source_id.to_string()))?;
87        self.sync_with(source.as_ref()).await
88    }
89
90    /// Drain a source's [`SyncSource::subscribe`] stream into the store.
91    ///
92    /// Each delta is applied immediately as it arrives. Returns the count of
93    /// deltas successfully applied. Stops when the stream ends or errors.
94    pub async fn sync_stream(
95        &self,
96        source: &dyn SyncSource,
97        cursor: Option<String>,
98    ) -> Result<u64> {
99        let mut stream = source.subscribe(cursor).await?;
100        let mut applied: u64 = 0;
101        while let Some(item) = stream.next().await {
102            let delta = item?;
103            let outcome = self
104                .store
105                .apply_delta(&delta, self.strategy.as_ref())
106                .await?;
107            if outcome.applied {
108                applied += 1;
109            }
110        }
111        Ok(applied)
112    }
113
114    /// Sync a single concrete [`SyncSource`] (handy when callers manage their
115    /// own registry).
116    pub async fn sync_with(&self, source: &dyn SyncSource) -> Result<SyncReport> {
117        let id = source.id().to_string();
118        let mut report = SyncReport {
119            source: id.clone(),
120            ..SyncReport::default()
121        };
122
123        // Resume from the most recent global cursor for this source. Per-
124        // resource cursors are still tracked in `mirror_cursors`; this is the
125        // "everything" wildcard.
126        let mut cursor = self.store.get_cursor(&id, "*").await?;
127
128        loop {
129            let batch = source.pull(cursor.clone()).await?;
130            if batch.deltas.is_empty() && !batch.has_more {
131                if let Some(next) = batch.next_cursor.clone() {
132                    self.store.set_cursor(&id, "*", Some(&next)).await?;
133                    report.final_cursor = Some(next);
134                }
135                break;
136            }
137            for delta in &batch.deltas {
138                let counters = report
139                    .per_resource
140                    .entry(delta.resource.clone())
141                    .or_default();
142                counters.pulled += 1;
143                report.pulled += 1;
144                let outcome = self
145                    .store
146                    .apply_delta(delta, self.strategy.as_ref())
147                    .await?;
148                if outcome.applied {
149                    counters.applied += 1;
150                    report.applied += 1;
151                    self.store
152                        .set_cursor(&id, &delta.resource, Some(&delta.record_id))
153                        .await?;
154                } else {
155                    counters.skipped += 1;
156                    report.skipped += 1;
157                }
158            }
159            if let Some(next) = batch.next_cursor.clone() {
160                self.store.set_cursor(&id, "*", Some(&next)).await?;
161                cursor = Some(next.clone());
162                report.final_cursor = Some(next);
163            }
164            if !batch.has_more {
165                break;
166            }
167        }
168
169        Ok(report)
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::conflict::{HighestConfidence, LastWriteWins};
177    use crate::event::Delta;
178    use crate::source::{PullResult, StaticSource};
179    use serde_json::json;
180
181    fn upsert(id: &str, payload: serde_json::Value) -> Delta {
182        Delta::upsert("pets", id, payload, "static")
183    }
184
185    #[tokio::test]
186    async fn sync_pulls_and_applies_single_batch() {
187        let store = MirrorStore::in_memory().await.unwrap();
188        let source = Arc::new(StaticSource::from_deltas(
189            "static",
190            vec![
191                upsert("1", json!({"name": "Rex"})),
192                upsert("2", json!({"name": "Buddy"})),
193            ],
194        ));
195        let mut syncer = Syncer::new(store.clone());
196        syncer.register_source(source);
197
198        let report = syncer.sync_source("static").await.unwrap();
199        assert_eq!(report.pulled, 2);
200        assert_eq!(report.applied, 2);
201        assert_eq!(report.skipped, 0);
202        assert_eq!(store.list_records("pets").await.unwrap().len(), 2);
203    }
204
205    #[tokio::test]
206    async fn sync_paginates_until_has_more_false() {
207        let store = MirrorStore::in_memory().await.unwrap();
208        let source = Arc::new(StaticSource::new(
209            "src",
210            vec![
211                PullResult {
212                    deltas: vec![upsert("1", json!({"n": 1}))],
213                    next_cursor: Some("page-2".into()),
214                    has_more: true,
215                },
216                PullResult {
217                    deltas: vec![upsert("2", json!({"n": 2}))],
218                    next_cursor: Some("page-3".into()),
219                    has_more: false,
220                },
221            ],
222        ));
223        let mut syncer = Syncer::new(store.clone());
224        syncer.register_source(source);
225
226        let report = syncer.sync_source("src").await.unwrap();
227        assert_eq!(report.pulled, 2);
228        assert_eq!(report.final_cursor.as_deref(), Some("page-3"));
229        assert_eq!(
230            store.get_cursor("src", "*").await.unwrap().as_deref(),
231            Some("page-3")
232        );
233    }
234
235    #[tokio::test]
236    async fn sync_honours_conflict_strategy() {
237        let store = MirrorStore::in_memory().await.unwrap();
238        let high = upsert("1", json!({"v": "good"})).with_confidence(0.9);
239        let low = upsert("1", json!({"v": "bad"})).with_confidence(0.1);
240
241        let source = Arc::new(StaticSource::from_deltas("src", vec![high, low]));
242        let mut syncer = Syncer::with_strategy(store.clone(), Arc::new(HighestConfidence));
243        syncer.register_source(source);
244
245        let report = syncer.sync_source("src").await.unwrap();
246        assert_eq!(report.applied, 1);
247        assert_eq!(report.skipped, 1);
248        let rec = store.get_record("pets", "1").await.unwrap().unwrap();
249        assert_eq!(rec.payload["v"], json!("good"));
250    }
251
252    #[tokio::test]
253    async fn sync_stream_applies_deltas() {
254        let store = MirrorStore::in_memory().await.unwrap();
255        let source = Arc::new(StaticSource::from_deltas(
256            "src",
257            vec![upsert("a", json!({"x": 1})), upsert("b", json!({"x": 2}))],
258        ));
259        let syncer = Syncer::new(store.clone());
260        let applied = syncer.sync_stream(source.as_ref(), None).await.unwrap();
261        assert_eq!(applied, 2);
262        assert_eq!(store.list_records("pets").await.unwrap().len(), 2);
263    }
264
265    #[tokio::test]
266    async fn unknown_source_errors() {
267        let store = MirrorStore::in_memory().await.unwrap();
268        let _last = LastWriteWins;
269        let syncer = Syncer::new(store);
270        let err = syncer.sync_source("missing").await.unwrap_err();
271        assert!(matches!(err, MirrorError::UnknownSource(_)));
272    }
273}