Skip to main content

rouchdb_replication/
protocol.rs

1use std::collections::HashMap;
2
3use rouchdb_core::adapter::Adapter;
4use rouchdb_core::document::*;
5use rouchdb_core::error::Result;
6
7use crate::checkpoint::Checkpointer;
8
9/// Replication configuration.
10#[derive(Debug, Clone)]
11pub struct ReplicationOptions {
12    /// Number of documents to process per batch.
13    pub batch_size: u64,
14    /// Maximum number of batches to buffer.
15    pub batches_limit: u64,
16}
17
18impl Default for ReplicationOptions {
19    fn default() -> Self {
20        Self {
21            batch_size: 100,
22            batches_limit: 10,
23        }
24    }
25}
26
27/// Result of a completed replication.
28#[derive(Debug, Clone)]
29pub struct ReplicationResult {
30    pub ok: bool,
31    pub docs_read: u64,
32    pub docs_written: u64,
33    pub errors: Vec<String>,
34    pub last_seq: Seq,
35}
36
37/// Events emitted during replication for progress tracking.
38#[derive(Debug, Clone)]
39pub enum ReplicationEvent {
40    Change { docs_read: u64 },
41    Paused,
42    Active,
43    Complete(ReplicationResult),
44    Error(String),
45}
46
47/// Run a one-shot replication from source to target.
48///
49/// Implements the CouchDB replication protocol:
50/// 1. Read checkpoint
51/// 2. Fetch changes from source
52/// 3. Compute revs_diff against target
53/// 4. Fetch missing docs from source
54/// 5. Write to target
55/// 6. Save checkpoint
56pub async fn replicate(
57    source: &dyn Adapter,
58    target: &dyn Adapter,
59    opts: ReplicationOptions,
60) -> Result<ReplicationResult> {
61    let source_info = source.info().await?;
62    let target_info = target.info().await?;
63
64    let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
65
66    // Step 1: Read checkpoint
67    let since = checkpointer.read_checkpoint(source, target).await?;
68
69    let mut total_docs_read = 0u64;
70    let mut total_docs_written = 0u64;
71    let mut errors = Vec::new();
72    let mut current_seq = since;
73
74    loop {
75        // Step 2: Fetch changes from source
76        let changes = source
77            .changes(ChangesOptions {
78                since: current_seq.clone(),
79                limit: Some(opts.batch_size),
80                include_docs: false,
81                ..Default::default()
82            })
83            .await?;
84
85        if changes.results.is_empty() {
86            break; // No more changes
87        }
88
89        let batch_last_seq = changes.last_seq;
90        total_docs_read += changes.results.len() as u64;
91
92        // Step 3: Compute revision diff
93        let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
94        for change in &changes.results {
95            let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
96            rev_map.insert(change.id.clone(), revs);
97        }
98
99        let diff = target.revs_diff(rev_map).await?;
100
101        if diff.results.is_empty() {
102            // Target already has everything in this batch
103            current_seq = batch_last_seq;
104            continue;
105        }
106
107        // Step 4: Fetch missing documents from source
108        let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
109        for (doc_id, diff_result) in &diff.results {
110            for missing_rev in &diff_result.missing {
111                bulk_get_items.push(BulkGetItem {
112                    id: doc_id.clone(),
113                    rev: Some(missing_rev.clone()),
114                });
115            }
116        }
117
118        let bulk_get_response = source.bulk_get(bulk_get_items).await?;
119
120        // Step 5: Write to target with new_edits=false
121        let mut docs_to_write: Vec<Document> = Vec::new();
122        for result in &bulk_get_response.results {
123            for doc in &result.docs {
124                if let Some(ref json) = doc.ok {
125                    match Document::from_json(json.clone()) {
126                        Ok(document) => docs_to_write.push(document),
127                        Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
128                    }
129                }
130            }
131        }
132
133        if !docs_to_write.is_empty() {
134            let write_count = docs_to_write.len() as u64;
135            let write_results = target
136                .bulk_docs(docs_to_write, BulkDocsOptions::replication())
137                .await?;
138
139            for wr in &write_results {
140                if !wr.ok {
141                    errors.push(format!(
142                        "write error for {}: {}",
143                        wr.id,
144                        wr.reason.as_deref().unwrap_or("unknown")
145                    ));
146                }
147            }
148
149            total_docs_written += write_count;
150        }
151
152        // Step 6: Save checkpoint
153        current_seq = batch_last_seq;
154        let _ = checkpointer
155            .write_checkpoint(source, target, current_seq.clone())
156            .await;
157
158        // Check if we got fewer results than batch_size (last batch)
159        if (changes.results.len() as u64) < opts.batch_size {
160            break;
161        }
162    }
163
164    Ok(ReplicationResult {
165        ok: errors.is_empty(),
166        docs_read: total_docs_read,
167        docs_written: total_docs_written,
168        errors,
169        last_seq: current_seq,
170    })
171}
172
173// ---------------------------------------------------------------------------
174// Tests
175// ---------------------------------------------------------------------------
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use rouchdb_adapter_memory::MemoryAdapter;
181
182    async fn put_doc(adapter: &dyn Adapter, id: &str, data: serde_json::Value) {
183        let doc = Document {
184            id: id.into(),
185            rev: None,
186            deleted: false,
187            data,
188            attachments: HashMap::new(),
189        };
190        adapter
191            .bulk_docs(vec![doc], BulkDocsOptions::new())
192            .await
193            .unwrap();
194    }
195
196    #[tokio::test]
197    async fn replicate_empty_databases() {
198        let source = MemoryAdapter::new("source");
199        let target = MemoryAdapter::new("target");
200
201        let result = replicate(&source, &target, ReplicationOptions::default())
202            .await
203            .unwrap();
204
205        assert!(result.ok);
206        assert_eq!(result.docs_read, 0);
207        assert_eq!(result.docs_written, 0);
208    }
209
210    #[tokio::test]
211    async fn replicate_source_to_target() {
212        let source = MemoryAdapter::new("source");
213        let target = MemoryAdapter::new("target");
214
215        put_doc(&source, "doc1", serde_json::json!({"name": "Alice"})).await;
216        put_doc(&source, "doc2", serde_json::json!({"name": "Bob"})).await;
217        put_doc(&source, "doc3", serde_json::json!({"name": "Charlie"})).await;
218
219        let result = replicate(&source, &target, ReplicationOptions::default())
220            .await
221            .unwrap();
222
223        assert!(result.ok);
224        assert_eq!(result.docs_read, 3);
225        assert_eq!(result.docs_written, 3);
226
227        // Verify target has the documents
228        let target_info = target.info().await.unwrap();
229        assert_eq!(target_info.doc_count, 3);
230
231        let doc = target.get("doc1", GetOptions::default()).await.unwrap();
232        assert_eq!(doc.data["name"], "Alice");
233    }
234
235    #[tokio::test]
236    async fn replicate_incremental() {
237        let source = MemoryAdapter::new("source");
238        let target = MemoryAdapter::new("target");
239
240        // First replication
241        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
242        let r1 = replicate(&source, &target, ReplicationOptions::default())
243            .await
244            .unwrap();
245        assert_eq!(r1.docs_written, 1);
246
247        // Add more docs
248        put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
249        put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
250
251        // Second replication should only sync new docs
252        let r2 = replicate(&source, &target, ReplicationOptions::default())
253            .await
254            .unwrap();
255        assert_eq!(r2.docs_read, 2);
256        assert_eq!(r2.docs_written, 2);
257
258        let target_info = target.info().await.unwrap();
259        assert_eq!(target_info.doc_count, 3);
260    }
261
262    #[tokio::test]
263    async fn replicate_already_synced() {
264        let source = MemoryAdapter::new("source");
265        let target = MemoryAdapter::new("target");
266
267        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
268
269        // First replication
270        replicate(&source, &target, ReplicationOptions::default())
271            .await
272            .unwrap();
273
274        // Second replication with no new changes
275        let result = replicate(&source, &target, ReplicationOptions::default())
276            .await
277            .unwrap();
278
279        assert!(result.ok);
280        assert_eq!(result.docs_written, 0);
281    }
282
283    #[tokio::test]
284    async fn replicate_batched() {
285        let source = MemoryAdapter::new("source");
286        let target = MemoryAdapter::new("target");
287
288        // Create more docs than batch size
289        for i in 0..15 {
290            put_doc(
291                &source,
292                &format!("doc{:03}", i),
293                serde_json::json!({"i": i}),
294            )
295            .await;
296        }
297
298        let result = replicate(
299            &source,
300            &target,
301            ReplicationOptions {
302                batch_size: 5,
303                ..Default::default()
304            },
305        )
306        .await
307        .unwrap();
308
309        assert!(result.ok);
310        assert_eq!(result.docs_written, 15);
311
312        let target_info = target.info().await.unwrap();
313        assert_eq!(target_info.doc_count, 15);
314    }
315
316    #[tokio::test]
317    async fn replicate_with_deletes() {
318        let source = MemoryAdapter::new("source");
319        let target = MemoryAdapter::new("target");
320
321        // Create and sync
322        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
323        replicate(&source, &target, ReplicationOptions::default())
324            .await
325            .unwrap();
326
327        // Delete on source
328        let doc = source.get("doc1", GetOptions::default()).await.unwrap();
329        let del = Document {
330            id: "doc1".into(),
331            rev: doc.rev,
332            deleted: true,
333            data: serde_json::json!({}),
334            attachments: HashMap::new(),
335        };
336        source
337            .bulk_docs(vec![del], BulkDocsOptions::new())
338            .await
339            .unwrap();
340
341        // Replicate delete
342        let result = replicate(&source, &target, ReplicationOptions::default())
343            .await
344            .unwrap();
345        assert!(result.ok);
346
347        // Target should see deletion
348        let target_info = target.info().await.unwrap();
349        assert_eq!(target_info.doc_count, 0);
350    }
351}