1use std::collections::HashMap;
2
3use rouchdb_core::adapter::Adapter;
4use rouchdb_core::document::*;
5use rouchdb_core::error::Result;
6
7use crate::checkpoint::Checkpointer;
8
9#[derive(Debug, Clone)]
11pub struct ReplicationOptions {
12 pub batch_size: u64,
14 pub batches_limit: u64,
16}
17
18impl Default for ReplicationOptions {
19 fn default() -> Self {
20 Self {
21 batch_size: 100,
22 batches_limit: 10,
23 }
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct ReplicationResult {
30 pub ok: bool,
31 pub docs_read: u64,
32 pub docs_written: u64,
33 pub errors: Vec<String>,
34 pub last_seq: Seq,
35}
36
37#[derive(Debug, Clone)]
39pub enum ReplicationEvent {
40 Change { docs_read: u64 },
41 Paused,
42 Active,
43 Complete(ReplicationResult),
44 Error(String),
45}
46
47pub async fn replicate(
57 source: &dyn Adapter,
58 target: &dyn Adapter,
59 opts: ReplicationOptions,
60) -> Result<ReplicationResult> {
61 let source_info = source.info().await?;
62 let target_info = target.info().await?;
63
64 let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
65
66 let since = checkpointer.read_checkpoint(source, target).await?;
68
69 let mut total_docs_read = 0u64;
70 let mut total_docs_written = 0u64;
71 let mut errors = Vec::new();
72 let mut current_seq = since;
73
74 loop {
75 let changes = source
77 .changes(ChangesOptions {
78 since: current_seq.clone(),
79 limit: Some(opts.batch_size),
80 include_docs: false,
81 ..Default::default()
82 })
83 .await?;
84
85 if changes.results.is_empty() {
86 break; }
88
89 let batch_last_seq = changes.last_seq;
90 total_docs_read += changes.results.len() as u64;
91
92 let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
94 for change in &changes.results {
95 let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
96 rev_map.insert(change.id.clone(), revs);
97 }
98
99 let diff = target.revs_diff(rev_map).await?;
100
101 if diff.results.is_empty() {
102 current_seq = batch_last_seq;
104 continue;
105 }
106
107 let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
109 for (doc_id, diff_result) in &diff.results {
110 for missing_rev in &diff_result.missing {
111 bulk_get_items.push(BulkGetItem {
112 id: doc_id.clone(),
113 rev: Some(missing_rev.clone()),
114 });
115 }
116 }
117
118 let bulk_get_response = source.bulk_get(bulk_get_items).await?;
119
120 let mut docs_to_write: Vec<Document> = Vec::new();
122 for result in &bulk_get_response.results {
123 for doc in &result.docs {
124 if let Some(ref json) = doc.ok {
125 match Document::from_json(json.clone()) {
126 Ok(document) => docs_to_write.push(document),
127 Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
128 }
129 }
130 }
131 }
132
133 if !docs_to_write.is_empty() {
134 let write_count = docs_to_write.len() as u64;
135 let write_results = target
136 .bulk_docs(docs_to_write, BulkDocsOptions::replication())
137 .await?;
138
139 for wr in &write_results {
140 if !wr.ok {
141 errors.push(format!(
142 "write error for {}: {}",
143 wr.id,
144 wr.reason.as_deref().unwrap_or("unknown")
145 ));
146 }
147 }
148
149 total_docs_written += write_count;
150 }
151
152 current_seq = batch_last_seq;
154 let _ = checkpointer
155 .write_checkpoint(source, target, current_seq.clone())
156 .await;
157
158 if (changes.results.len() as u64) < opts.batch_size {
160 break;
161 }
162 }
163
164 Ok(ReplicationResult {
165 ok: errors.is_empty(),
166 docs_read: total_docs_read,
167 docs_written: total_docs_written,
168 errors,
169 last_seq: current_seq,
170 })
171}
172
173#[cfg(test)]
178mod tests {
179 use super::*;
180 use rouchdb_adapter_memory::MemoryAdapter;
181
182 async fn put_doc(adapter: &dyn Adapter, id: &str, data: serde_json::Value) {
183 let doc = Document {
184 id: id.into(),
185 rev: None,
186 deleted: false,
187 data,
188 attachments: HashMap::new(),
189 };
190 adapter
191 .bulk_docs(vec![doc], BulkDocsOptions::new())
192 .await
193 .unwrap();
194 }
195
196 #[tokio::test]
197 async fn replicate_empty_databases() {
198 let source = MemoryAdapter::new("source");
199 let target = MemoryAdapter::new("target");
200
201 let result = replicate(&source, &target, ReplicationOptions::default())
202 .await
203 .unwrap();
204
205 assert!(result.ok);
206 assert_eq!(result.docs_read, 0);
207 assert_eq!(result.docs_written, 0);
208 }
209
210 #[tokio::test]
211 async fn replicate_source_to_target() {
212 let source = MemoryAdapter::new("source");
213 let target = MemoryAdapter::new("target");
214
215 put_doc(&source, "doc1", serde_json::json!({"name": "Alice"})).await;
216 put_doc(&source, "doc2", serde_json::json!({"name": "Bob"})).await;
217 put_doc(&source, "doc3", serde_json::json!({"name": "Charlie"})).await;
218
219 let result = replicate(&source, &target, ReplicationOptions::default())
220 .await
221 .unwrap();
222
223 assert!(result.ok);
224 assert_eq!(result.docs_read, 3);
225 assert_eq!(result.docs_written, 3);
226
227 let target_info = target.info().await.unwrap();
229 assert_eq!(target_info.doc_count, 3);
230
231 let doc = target.get("doc1", GetOptions::default()).await.unwrap();
232 assert_eq!(doc.data["name"], "Alice");
233 }
234
235 #[tokio::test]
236 async fn replicate_incremental() {
237 let source = MemoryAdapter::new("source");
238 let target = MemoryAdapter::new("target");
239
240 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
242 let r1 = replicate(&source, &target, ReplicationOptions::default())
243 .await
244 .unwrap();
245 assert_eq!(r1.docs_written, 1);
246
247 put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
249 put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
250
251 let r2 = replicate(&source, &target, ReplicationOptions::default())
253 .await
254 .unwrap();
255 assert_eq!(r2.docs_read, 2);
256 assert_eq!(r2.docs_written, 2);
257
258 let target_info = target.info().await.unwrap();
259 assert_eq!(target_info.doc_count, 3);
260 }
261
262 #[tokio::test]
263 async fn replicate_already_synced() {
264 let source = MemoryAdapter::new("source");
265 let target = MemoryAdapter::new("target");
266
267 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
268
269 replicate(&source, &target, ReplicationOptions::default())
271 .await
272 .unwrap();
273
274 let result = replicate(&source, &target, ReplicationOptions::default())
276 .await
277 .unwrap();
278
279 assert!(result.ok);
280 assert_eq!(result.docs_written, 0);
281 }
282
283 #[tokio::test]
284 async fn replicate_batched() {
285 let source = MemoryAdapter::new("source");
286 let target = MemoryAdapter::new("target");
287
288 for i in 0..15 {
290 put_doc(&source, &format!("doc{:03}", i), serde_json::json!({"i": i})).await;
291 }
292
293 let result = replicate(
294 &source,
295 &target,
296 ReplicationOptions {
297 batch_size: 5,
298 ..Default::default()
299 },
300 )
301 .await
302 .unwrap();
303
304 assert!(result.ok);
305 assert_eq!(result.docs_written, 15);
306
307 let target_info = target.info().await.unwrap();
308 assert_eq!(target_info.doc_count, 15);
309 }
310
311 #[tokio::test]
312 async fn replicate_with_deletes() {
313 let source = MemoryAdapter::new("source");
314 let target = MemoryAdapter::new("target");
315
316 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
318 replicate(&source, &target, ReplicationOptions::default())
319 .await
320 .unwrap();
321
322 let doc = source.get("doc1", GetOptions::default()).await.unwrap();
324 let del = Document {
325 id: "doc1".into(),
326 rev: doc.rev,
327 deleted: true,
328 data: serde_json::json!({}),
329 attachments: HashMap::new(),
330 };
331 source
332 .bulk_docs(vec![del], BulkDocsOptions::new())
333 .await
334 .unwrap();
335
336 let result = replicate(&source, &target, ReplicationOptions::default())
338 .await
339 .unwrap();
340 assert!(result.ok);
341
342 let target_info = target.info().await.unwrap();
344 assert_eq!(target_info.doc_count, 0);
345 }
346}