1use std::collections::HashMap;
4use std::sync::Arc;
5
6use futures_util::StreamExt;
7use serde::{Deserialize, Serialize};
8
9use crate::conflict::{ConflictStrategy, LastWriteWins};
10use crate::error::{MirrorError, Result};
11use crate::source::SyncSource;
12use crate::store::MirrorStore;
13
14#[derive(Debug, Clone, Default, Serialize, Deserialize)]
16pub struct SyncReport {
17 pub source: String,
19 pub pulled: usize,
21 pub applied: usize,
23 pub skipped: usize,
25 pub final_cursor: Option<String>,
27 pub per_resource: HashMap<String, ResourceCounters>,
29}
30
31#[derive(Debug, Clone, Default, Serialize, Deserialize)]
33pub struct ResourceCounters {
34 pub pulled: usize,
36 pub applied: usize,
38 pub skipped: usize,
40}
41
42pub struct Syncer {
45 store: MirrorStore,
46 sources: HashMap<String, Arc<dyn SyncSource>>,
47 strategy: Arc<dyn ConflictStrategy>,
48}
49
50impl Syncer {
51 pub fn new(store: MirrorStore) -> Self {
53 Self::with_strategy(store, Arc::new(LastWriteWins))
54 }
55
56 pub fn with_strategy(store: MirrorStore, strategy: Arc<dyn ConflictStrategy>) -> Self {
58 Self {
59 store,
60 sources: HashMap::new(),
61 strategy,
62 }
63 }
64
65 pub fn register_source(&mut self, source: Arc<dyn SyncSource>) {
67 self.sources.insert(source.id().to_string(), source);
68 }
69
70 pub fn store(&self) -> &MirrorStore {
72 &self.store
73 }
74
75 pub fn source(&self, id: &str) -> Option<Arc<dyn SyncSource>> {
77 self.sources.get(id).cloned()
78 }
79
80 pub async fn sync_source(&self, source_id: &str) -> Result<SyncReport> {
83 let source = self
84 .sources
85 .get(source_id)
86 .ok_or_else(|| MirrorError::UnknownSource(source_id.to_string()))?;
87 self.sync_with(source.as_ref()).await
88 }
89
90 pub async fn sync_stream(
95 &self,
96 source: &dyn SyncSource,
97 cursor: Option<String>,
98 ) -> Result<u64> {
99 let mut stream = source.subscribe(cursor).await?;
100 let mut applied: u64 = 0;
101 while let Some(item) = stream.next().await {
102 let delta = item?;
103 let outcome = self
104 .store
105 .apply_delta(&delta, self.strategy.as_ref())
106 .await?;
107 if outcome.applied {
108 applied += 1;
109 }
110 }
111 Ok(applied)
112 }
113
114 pub async fn sync_with(&self, source: &dyn SyncSource) -> Result<SyncReport> {
117 let id = source.id().to_string();
118 let mut report = SyncReport {
119 source: id.clone(),
120 ..SyncReport::default()
121 };
122
123 let mut cursor = self.store.get_cursor(&id, "*").await?;
127
128 loop {
129 let batch = source.pull(cursor.clone()).await?;
130 if batch.deltas.is_empty() && !batch.has_more {
131 if let Some(next) = batch.next_cursor.clone() {
132 self.store.set_cursor(&id, "*", Some(&next)).await?;
133 report.final_cursor = Some(next);
134 }
135 break;
136 }
137 for delta in &batch.deltas {
138 let counters = report
139 .per_resource
140 .entry(delta.resource.clone())
141 .or_default();
142 counters.pulled += 1;
143 report.pulled += 1;
144 let outcome = self
145 .store
146 .apply_delta(delta, self.strategy.as_ref())
147 .await?;
148 if outcome.applied {
149 counters.applied += 1;
150 report.applied += 1;
151 self.store
152 .set_cursor(&id, &delta.resource, Some(&delta.record_id))
153 .await?;
154 } else {
155 counters.skipped += 1;
156 report.skipped += 1;
157 }
158 }
159 if let Some(next) = batch.next_cursor.clone() {
160 self.store.set_cursor(&id, "*", Some(&next)).await?;
161 cursor = Some(next.clone());
162 report.final_cursor = Some(next);
163 }
164 if !batch.has_more {
165 break;
166 }
167 }
168
169 Ok(report)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::conflict::{HighestConfidence, LastWriteWins};
177 use crate::event::Delta;
178 use crate::source::{PullResult, StaticSource};
179 use serde_json::json;
180
181 fn upsert(id: &str, payload: serde_json::Value) -> Delta {
182 Delta::upsert("pets", id, payload, "static")
183 }
184
185 #[tokio::test]
186 async fn sync_pulls_and_applies_single_batch() {
187 let store = MirrorStore::in_memory().await.unwrap();
188 let source = Arc::new(StaticSource::from_deltas(
189 "static",
190 vec![
191 upsert("1", json!({"name": "Rex"})),
192 upsert("2", json!({"name": "Buddy"})),
193 ],
194 ));
195 let mut syncer = Syncer::new(store.clone());
196 syncer.register_source(source);
197
198 let report = syncer.sync_source("static").await.unwrap();
199 assert_eq!(report.pulled, 2);
200 assert_eq!(report.applied, 2);
201 assert_eq!(report.skipped, 0);
202 assert_eq!(store.list_records("pets").await.unwrap().len(), 2);
203 }
204
205 #[tokio::test]
206 async fn sync_paginates_until_has_more_false() {
207 let store = MirrorStore::in_memory().await.unwrap();
208 let source = Arc::new(StaticSource::new(
209 "src",
210 vec![
211 PullResult {
212 deltas: vec![upsert("1", json!({"n": 1}))],
213 next_cursor: Some("page-2".into()),
214 has_more: true,
215 },
216 PullResult {
217 deltas: vec![upsert("2", json!({"n": 2}))],
218 next_cursor: Some("page-3".into()),
219 has_more: false,
220 },
221 ],
222 ));
223 let mut syncer = Syncer::new(store.clone());
224 syncer.register_source(source);
225
226 let report = syncer.sync_source("src").await.unwrap();
227 assert_eq!(report.pulled, 2);
228 assert_eq!(report.final_cursor.as_deref(), Some("page-3"));
229 assert_eq!(
230 store.get_cursor("src", "*").await.unwrap().as_deref(),
231 Some("page-3")
232 );
233 }
234
235 #[tokio::test]
236 async fn sync_honours_conflict_strategy() {
237 let store = MirrorStore::in_memory().await.unwrap();
238 let high = upsert("1", json!({"v": "good"})).with_confidence(0.9);
239 let low = upsert("1", json!({"v": "bad"})).with_confidence(0.1);
240
241 let source = Arc::new(StaticSource::from_deltas("src", vec![high, low]));
242 let mut syncer = Syncer::with_strategy(store.clone(), Arc::new(HighestConfidence));
243 syncer.register_source(source);
244
245 let report = syncer.sync_source("src").await.unwrap();
246 assert_eq!(report.applied, 1);
247 assert_eq!(report.skipped, 1);
248 let rec = store.get_record("pets", "1").await.unwrap().unwrap();
249 assert_eq!(rec.payload["v"], json!("good"));
250 }
251
252 #[tokio::test]
253 async fn sync_stream_applies_deltas() {
254 let store = MirrorStore::in_memory().await.unwrap();
255 let source = Arc::new(StaticSource::from_deltas(
256 "src",
257 vec![upsert("a", json!({"x": 1})), upsert("b", json!({"x": 2}))],
258 ));
259 let syncer = Syncer::new(store.clone());
260 let applied = syncer.sync_stream(source.as_ref(), None).await.unwrap();
261 assert_eq!(applied, 2);
262 assert_eq!(store.list_records("pets").await.unwrap().len(), 2);
263 }
264
265 #[tokio::test]
266 async fn unknown_source_errors() {
267 let store = MirrorStore::in_memory().await.unwrap();
268 let _last = LastWriteWins;
269 let syncer = Syncer::new(store);
270 let err = syncer.sync_source("missing").await.unwrap_err();
271 assert!(matches!(err, MirrorError::UnknownSource(_)));
272 }
273}