Skip to main content

rouchdb_replication/
protocol.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use rouchdb_core::adapter::Adapter;
6use rouchdb_core::document::*;
7use rouchdb_core::error::Result;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::checkpoint::Checkpointer;
12
13/// Filter for selective replication.
14pub enum ReplicationFilter {
15    /// Replicate only these document IDs.
16    DocIds(Vec<String>),
17
18    /// Replicate documents matching a Mango selector.
19    Selector(serde_json::Value),
20
21    /// Replicate documents passing a custom predicate.
22    /// Receives the ChangeEvent (id, deleted, seq).
23    Custom(Arc<dyn Fn(&ChangeEvent) -> bool + Send + Sync>),
24}
25
26impl Clone for ReplicationFilter {
27    fn clone(&self) -> Self {
28        match self {
29            Self::DocIds(ids) => Self::DocIds(ids.clone()),
30            Self::Selector(sel) => Self::Selector(sel.clone()),
31            Self::Custom(f) => Self::Custom(Arc::clone(f)),
32        }
33    }
34}
35
36/// Replication configuration.
37pub struct ReplicationOptions {
38    /// Number of documents to process per batch.
39    pub batch_size: u64,
40    /// Maximum number of batches to buffer.
41    pub batches_limit: u64,
42    /// Optional filter for selective replication.
43    pub filter: Option<ReplicationFilter>,
44    /// Enable continuous/live replication.
45    pub live: bool,
46    /// Automatically retry on transient errors.
47    pub retry: bool,
48    /// Polling interval for live replication (default: 500ms).
49    pub poll_interval: Duration,
50    /// Backoff function for retry: takes attempt number, returns delay.
51    pub back_off_function: Option<Box<dyn Fn(u32) -> Duration + Send + Sync>>,
52    /// Override the starting sequence (skip checkpoint lookup).
53    pub since: Option<Seq>,
54    /// Whether to save/read checkpoints (default: true).
55    /// Set to false to always replicate from scratch.
56    pub checkpoint: bool,
57}
58
59impl Default for ReplicationOptions {
60    fn default() -> Self {
61        Self {
62            batch_size: 100,
63            batches_limit: 10,
64            filter: None,
65            live: false,
66            retry: false,
67            poll_interval: Duration::from_millis(500),
68            back_off_function: None,
69            since: None,
70            checkpoint: true,
71        }
72    }
73}
74
75/// Result of a completed replication.
76#[derive(Debug, Clone)]
77pub struct ReplicationResult {
78    pub ok: bool,
79    pub docs_read: u64,
80    pub docs_written: u64,
81    pub errors: Vec<String>,
82    pub last_seq: Seq,
83}
84
85/// Events emitted during replication for progress tracking.
86#[derive(Debug, Clone)]
87pub enum ReplicationEvent {
88    Change { docs_read: u64 },
89    Paused,
90    Active,
91    Complete(ReplicationResult),
92    Error(String),
93}
94
95/// Run a one-shot replication from source to target.
96///
97/// Implements the CouchDB replication protocol:
98/// 1. Read checkpoint
99/// 2. Fetch changes from source
100/// 3. Compute revs_diff against target
101/// 4. Fetch missing docs from source
102/// 5. Write to target
103/// 6. Save checkpoint
104pub async fn replicate(
105    source: &dyn Adapter,
106    target: &dyn Adapter,
107    opts: ReplicationOptions,
108) -> Result<ReplicationResult> {
109    let source_info = source.info().await?;
110    let target_info = target.info().await?;
111
112    let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
113
114    // Step 1: Read checkpoint (or use override)
115    let since = if let Some(ref override_since) = opts.since {
116        override_since.clone()
117    } else if opts.checkpoint {
118        checkpointer.read_checkpoint(source, target).await?
119    } else {
120        Seq::default()
121    };
122
123    // Extract doc_ids from filter (for ChangesOptions)
124    let filter_doc_ids = match &opts.filter {
125        Some(ReplicationFilter::DocIds(ids)) => Some(ids.clone()),
126        _ => None,
127    };
128
129    let mut total_docs_read = 0u64;
130    let mut total_docs_written = 0u64;
131    let mut errors = Vec::new();
132    let mut current_seq = since;
133
134    loop {
135        // Step 2: Fetch changes from source
136        let changes = source
137            .changes(ChangesOptions {
138                since: current_seq.clone(),
139                limit: Some(opts.batch_size),
140                include_docs: false,
141                doc_ids: filter_doc_ids.clone(),
142                ..Default::default()
143            })
144            .await?;
145
146        if changes.results.is_empty() {
147            break; // No more changes
148        }
149
150        let batch_last_seq = changes.last_seq;
151
152        // Step 2.5: Apply Custom filter to changes
153        let filtered_changes: Vec<&ChangeEvent> = match &opts.filter {
154            Some(ReplicationFilter::Custom(predicate)) => {
155                changes.results.iter().filter(|c| predicate(c)).collect()
156            }
157            _ => changes.results.iter().collect(),
158        };
159
160        total_docs_read += filtered_changes.len() as u64;
161
162        if filtered_changes.is_empty() {
163            current_seq = batch_last_seq;
164            if (changes.results.len() as u64) < opts.batch_size {
165                break;
166            }
167            continue;
168        }
169
170        // Step 3: Compute revision diff
171        let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
172        for change in &filtered_changes {
173            let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
174            rev_map.insert(change.id.clone(), revs);
175        }
176
177        let diff = target.revs_diff(rev_map).await?;
178
179        if diff.results.is_empty() {
180            // Target already has everything in this batch
181            current_seq = batch_last_seq;
182            if (changes.results.len() as u64) < opts.batch_size {
183                break;
184            }
185            continue;
186        }
187
188        // Step 4: Fetch missing documents from source
189        let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
190        for (doc_id, diff_result) in &diff.results {
191            for missing_rev in &diff_result.missing {
192                bulk_get_items.push(BulkGetItem {
193                    id: doc_id.clone(),
194                    rev: Some(missing_rev.clone()),
195                });
196            }
197        }
198
199        let bulk_get_response = source.bulk_get(bulk_get_items).await?;
200
201        // Step 5: Write to target with new_edits=false
202        let mut docs_to_write: Vec<Document> = Vec::new();
203        for result in &bulk_get_response.results {
204            for doc in &result.docs {
205                if let Some(ref json) = doc.ok {
206                    match Document::from_json(json.clone()) {
207                        Ok(document) => docs_to_write.push(document),
208                        Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
209                    }
210                }
211            }
212        }
213
214        // Step 4.5: Apply Selector filter to fetched documents
215        if let Some(ReplicationFilter::Selector(ref selector)) = opts.filter {
216            docs_to_write.retain(|doc| rouchdb_query::matches_selector(&doc.data, selector));
217        }
218
219        if !docs_to_write.is_empty() {
220            let write_count = docs_to_write.len() as u64;
221            let write_results = target
222                .bulk_docs(docs_to_write, BulkDocsOptions::replication())
223                .await?;
224
225            for wr in &write_results {
226                if !wr.ok {
227                    errors.push(format!(
228                        "write error for {}: {}",
229                        wr.id,
230                        wr.reason.as_deref().unwrap_or("unknown")
231                    ));
232                }
233            }
234
235            total_docs_written += write_count;
236        }
237
238        // Step 6: Save checkpoint (if enabled)
239        current_seq = batch_last_seq;
240        if opts.checkpoint {
241            let _ = checkpointer
242                .write_checkpoint(source, target, current_seq.clone())
243                .await;
244        }
245
246        // Check if we got fewer results than batch_size (last batch)
247        if (changes.results.len() as u64) < opts.batch_size {
248            break;
249        }
250    }
251
252    Ok(ReplicationResult {
253        ok: errors.is_empty(),
254        docs_read: total_docs_read,
255        docs_written: total_docs_written,
256        errors,
257        last_seq: current_seq,
258    })
259}
260
261/// Run a one-shot replication with event streaming.
262///
263/// Same as `replicate()` but emits `ReplicationEvent` through the provided
264/// channel as replication progresses.
265pub async fn replicate_with_events(
266    source: &dyn Adapter,
267    target: &dyn Adapter,
268    opts: ReplicationOptions,
269    events_tx: mpsc::Sender<ReplicationEvent>,
270) -> Result<ReplicationResult> {
271    let source_info = source.info().await?;
272    let target_info = target.info().await?;
273
274    let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
275
276    let since = if let Some(ref override_since) = opts.since {
277        override_since.clone()
278    } else if opts.checkpoint {
279        checkpointer.read_checkpoint(source, target).await?
280    } else {
281        Seq::default()
282    };
283
284    let filter_doc_ids = match &opts.filter {
285        Some(ReplicationFilter::DocIds(ids)) => Some(ids.clone()),
286        _ => None,
287    };
288
289    let mut total_docs_read = 0u64;
290    let mut total_docs_written = 0u64;
291    let mut errors = Vec::new();
292    let mut current_seq = since;
293
294    let _ = events_tx.send(ReplicationEvent::Active).await;
295
296    loop {
297        let changes = source
298            .changes(ChangesOptions {
299                since: current_seq.clone(),
300                limit: Some(opts.batch_size),
301                include_docs: false,
302                doc_ids: filter_doc_ids.clone(),
303                ..Default::default()
304            })
305            .await?;
306
307        if changes.results.is_empty() {
308            break;
309        }
310
311        let batch_last_seq = changes.last_seq;
312
313        let filtered_changes: Vec<&ChangeEvent> = match &opts.filter {
314            Some(ReplicationFilter::Custom(predicate)) => {
315                changes.results.iter().filter(|c| predicate(c)).collect()
316            }
317            _ => changes.results.iter().collect(),
318        };
319
320        total_docs_read += filtered_changes.len() as u64;
321
322        if filtered_changes.is_empty() {
323            current_seq = batch_last_seq;
324            if (changes.results.len() as u64) < opts.batch_size {
325                break;
326            }
327            continue;
328        }
329
330        let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
331        for change in &filtered_changes {
332            let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
333            rev_map.insert(change.id.clone(), revs);
334        }
335
336        let diff = target.revs_diff(rev_map).await?;
337
338        if diff.results.is_empty() {
339            current_seq = batch_last_seq;
340            if (changes.results.len() as u64) < opts.batch_size {
341                break;
342            }
343            continue;
344        }
345
346        let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
347        for (doc_id, diff_result) in &diff.results {
348            for missing_rev in &diff_result.missing {
349                bulk_get_items.push(BulkGetItem {
350                    id: doc_id.clone(),
351                    rev: Some(missing_rev.clone()),
352                });
353            }
354        }
355
356        let bulk_get_response = source.bulk_get(bulk_get_items).await?;
357
358        let mut docs_to_write: Vec<Document> = Vec::new();
359        for result in &bulk_get_response.results {
360            for doc in &result.docs {
361                if let Some(ref json) = doc.ok {
362                    match Document::from_json(json.clone()) {
363                        Ok(document) => docs_to_write.push(document),
364                        Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
365                    }
366                }
367            }
368        }
369
370        if let Some(ReplicationFilter::Selector(ref selector)) = opts.filter {
371            docs_to_write.retain(|doc| rouchdb_query::matches_selector(&doc.data, selector));
372        }
373
374        if !docs_to_write.is_empty() {
375            let write_count = docs_to_write.len() as u64;
376            let write_results = target
377                .bulk_docs(docs_to_write, BulkDocsOptions::replication())
378                .await?;
379
380            for wr in &write_results {
381                if !wr.ok {
382                    errors.push(format!(
383                        "write error for {}: {}",
384                        wr.id,
385                        wr.reason.as_deref().unwrap_or("unknown")
386                    ));
387                }
388            }
389
390            total_docs_written += write_count;
391        }
392
393        // Emit change event
394        let _ = events_tx
395            .send(ReplicationEvent::Change {
396                docs_read: total_docs_read,
397            })
398            .await;
399
400        current_seq = batch_last_seq;
401        if opts.checkpoint {
402            let _ = checkpointer
403                .write_checkpoint(source, target, current_seq.clone())
404                .await;
405        }
406
407        if (changes.results.len() as u64) < opts.batch_size {
408            break;
409        }
410    }
411
412    let result = ReplicationResult {
413        ok: errors.is_empty(),
414        docs_read: total_docs_read,
415        docs_written: total_docs_written,
416        errors,
417        last_seq: current_seq,
418    };
419
420    let _ = events_tx
421        .send(ReplicationEvent::Complete(result.clone()))
422        .await;
423
424    Ok(result)
425}
426
427/// Run continuous (live) replication from source to target.
428///
429/// Performs an initial one-shot replication, then polls for new changes
430/// at the configured `poll_interval`. Runs until the returned
431/// `ReplicationHandle` is cancelled/dropped.
432///
433/// Events are emitted through the returned channel receiver.
434pub fn replicate_live(
435    source: Arc<dyn Adapter>,
436    target: Arc<dyn Adapter>,
437    opts: ReplicationOptions,
438) -> (mpsc::Receiver<ReplicationEvent>, ReplicationHandle) {
439    let (tx, rx) = mpsc::channel(64);
440    let poll_interval = opts.poll_interval;
441    let retry = opts.retry;
442    let back_off = opts.back_off_function;
443
444    let cancel = CancellationToken::new();
445    let cancel_clone = cancel.clone();
446
447    tokio::spawn(async move {
448        let mut attempt: u32 = 0;
449
450        loop {
451            // Clone the filter for each iteration so Selector and Custom
452            // filters remain active across the entire live replication.
453            let one_shot_opts = ReplicationOptions {
454                batch_size: opts.batch_size,
455                batches_limit: opts.batches_limit,
456                filter: opts.filter.clone(),
457                live: false,
458                retry: false,
459                poll_interval,
460                back_off_function: None,
461                since: None,
462                checkpoint: opts.checkpoint,
463            };
464
465            let result =
466                replicate_with_events(source.as_ref(), target.as_ref(), one_shot_opts, tx.clone())
467                    .await;
468
469            match result {
470                Ok(r) => {
471                    attempt = 0; // Reset retry counter on success
472                    if r.docs_read == 0 {
473                        // No changes — emit Paused and wait
474                        let _ = tx.send(ReplicationEvent::Paused).await;
475                    }
476                }
477                Err(e) => {
478                    let _ = tx.send(ReplicationEvent::Error(e.to_string())).await;
479                    if retry {
480                        attempt += 1;
481                        let delay = if let Some(ref f) = back_off {
482                            f(attempt)
483                        } else {
484                            // Default exponential backoff: min(1s * 2^attempt, 60s)
485                            let secs = (1u64 << attempt.min(6)).min(60);
486                            Duration::from_secs(secs)
487                        };
488                        tokio::select! {
489                            _ = tokio::time::sleep(delay) => continue,
490                            _ = cancel_clone.cancelled() => break,
491                        }
492                    } else {
493                        break;
494                    }
495                }
496            }
497
498            // Wait for poll_interval or cancellation
499            tokio::select! {
500                _ = tokio::time::sleep(poll_interval) => {},
501                _ = cancel_clone.cancelled() => break,
502            }
503        }
504    });
505
506    (rx, ReplicationHandle { cancel })
507}
508
509/// Handle for a live replication task. Dropping this cancels the replication.
510pub struct ReplicationHandle {
511    cancel: CancellationToken,
512}
513
514impl ReplicationHandle {
515    /// Cancel the live replication.
516    pub fn cancel(&self) {
517        self.cancel.cancel();
518    }
519}
520
521impl Drop for ReplicationHandle {
522    fn drop(&mut self) {
523        self.cancel.cancel();
524    }
525}
526
527// ---------------------------------------------------------------------------
528// Tests
529// ---------------------------------------------------------------------------
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use rouchdb_adapter_memory::MemoryAdapter;
535
536    async fn put_doc(adapter: &dyn Adapter, id: &str, data: serde_json::Value) {
537        let doc = Document {
538            id: id.into(),
539            rev: None,
540            deleted: false,
541            data,
542            attachments: HashMap::new(),
543        };
544        adapter
545            .bulk_docs(vec![doc], BulkDocsOptions::new())
546            .await
547            .unwrap();
548    }
549
550    #[tokio::test]
551    async fn replicate_empty_databases() {
552        let source = MemoryAdapter::new("source");
553        let target = MemoryAdapter::new("target");
554
555        let result = replicate(&source, &target, ReplicationOptions::default())
556            .await
557            .unwrap();
558
559        assert!(result.ok);
560        assert_eq!(result.docs_read, 0);
561        assert_eq!(result.docs_written, 0);
562    }
563
564    #[tokio::test]
565    async fn replicate_source_to_target() {
566        let source = MemoryAdapter::new("source");
567        let target = MemoryAdapter::new("target");
568
569        put_doc(&source, "doc1", serde_json::json!({"name": "Alice"})).await;
570        put_doc(&source, "doc2", serde_json::json!({"name": "Bob"})).await;
571        put_doc(&source, "doc3", serde_json::json!({"name": "Charlie"})).await;
572
573        let result = replicate(&source, &target, ReplicationOptions::default())
574            .await
575            .unwrap();
576
577        assert!(result.ok);
578        assert_eq!(result.docs_read, 3);
579        assert_eq!(result.docs_written, 3);
580
581        // Verify target has the documents
582        let target_info = target.info().await.unwrap();
583        assert_eq!(target_info.doc_count, 3);
584
585        let doc = target.get("doc1", GetOptions::default()).await.unwrap();
586        assert_eq!(doc.data["name"], "Alice");
587    }
588
589    #[tokio::test]
590    async fn replicate_incremental() {
591        let source = MemoryAdapter::new("source");
592        let target = MemoryAdapter::new("target");
593
594        // First replication
595        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
596        let r1 = replicate(&source, &target, ReplicationOptions::default())
597            .await
598            .unwrap();
599        assert_eq!(r1.docs_written, 1);
600
601        // Add more docs
602        put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
603        put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
604
605        // Second replication should only sync new docs
606        let r2 = replicate(&source, &target, ReplicationOptions::default())
607            .await
608            .unwrap();
609        assert_eq!(r2.docs_read, 2);
610        assert_eq!(r2.docs_written, 2);
611
612        let target_info = target.info().await.unwrap();
613        assert_eq!(target_info.doc_count, 3);
614    }
615
616    #[tokio::test]
617    async fn replicate_already_synced() {
618        let source = MemoryAdapter::new("source");
619        let target = MemoryAdapter::new("target");
620
621        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
622
623        // First replication
624        replicate(&source, &target, ReplicationOptions::default())
625            .await
626            .unwrap();
627
628        // Second replication with no new changes
629        let result = replicate(&source, &target, ReplicationOptions::default())
630            .await
631            .unwrap();
632
633        assert!(result.ok);
634        assert_eq!(result.docs_written, 0);
635    }
636
637    #[tokio::test]
638    async fn replicate_batched() {
639        let source = MemoryAdapter::new("source");
640        let target = MemoryAdapter::new("target");
641
642        // Create more docs than batch size
643        for i in 0..15 {
644            put_doc(
645                &source,
646                &format!("doc{:03}", i),
647                serde_json::json!({"i": i}),
648            )
649            .await;
650        }
651
652        let result = replicate(
653            &source,
654            &target,
655            ReplicationOptions {
656                batch_size: 5,
657                ..Default::default()
658            },
659        )
660        .await
661        .unwrap();
662
663        assert!(result.ok);
664        assert_eq!(result.docs_written, 15);
665
666        let target_info = target.info().await.unwrap();
667        assert_eq!(target_info.doc_count, 15);
668    }
669
670    #[tokio::test]
671    async fn replicate_with_deletes() {
672        let source = MemoryAdapter::new("source");
673        let target = MemoryAdapter::new("target");
674
675        // Create and sync
676        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
677        replicate(&source, &target, ReplicationOptions::default())
678            .await
679            .unwrap();
680
681        // Delete on source
682        let doc = source.get("doc1", GetOptions::default()).await.unwrap();
683        let del = Document {
684            id: "doc1".into(),
685            rev: doc.rev,
686            deleted: true,
687            data: serde_json::json!({}),
688            attachments: HashMap::new(),
689        };
690        source
691            .bulk_docs(vec![del], BulkDocsOptions::new())
692            .await
693            .unwrap();
694
695        // Replicate delete
696        let result = replicate(&source, &target, ReplicationOptions::default())
697            .await
698            .unwrap();
699        assert!(result.ok);
700
701        // Target should see deletion
702        let target_info = target.info().await.unwrap();
703        assert_eq!(target_info.doc_count, 0);
704    }
705
706    // -----------------------------------------------------------------------
707    // Filtered replication tests
708    // -----------------------------------------------------------------------
709
710    #[tokio::test]
711    async fn replicate_filtered_by_doc_ids() {
712        let source = MemoryAdapter::new("source");
713        let target = MemoryAdapter::new("target");
714
715        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
716        put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
717        put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
718        put_doc(&source, "doc4", serde_json::json!({"v": 4})).await;
719        put_doc(&source, "doc5", serde_json::json!({"v": 5})).await;
720
721        let result = replicate(
722            &source,
723            &target,
724            ReplicationOptions {
725                filter: Some(ReplicationFilter::DocIds(vec![
726                    "doc2".into(),
727                    "doc4".into(),
728                ])),
729                ..Default::default()
730            },
731        )
732        .await
733        .unwrap();
734
735        assert!(result.ok);
736        assert_eq!(result.docs_written, 2);
737
738        let target_info = target.info().await.unwrap();
739        assert_eq!(target_info.doc_count, 2);
740
741        target.get("doc2", GetOptions::default()).await.unwrap();
742        target.get("doc4", GetOptions::default()).await.unwrap();
743
744        // doc1, doc3, doc5 should not exist
745        assert!(target.get("doc1", GetOptions::default()).await.is_err());
746        assert!(target.get("doc3", GetOptions::default()).await.is_err());
747        assert!(target.get("doc5", GetOptions::default()).await.is_err());
748    }
749
750    #[tokio::test]
751    async fn replicate_filtered_by_selector() {
752        let source = MemoryAdapter::new("source");
753        let target = MemoryAdapter::new("target");
754
755        put_doc(
756            &source,
757            "inv1",
758            serde_json::json!({"type": "invoice", "amount": 100}),
759        )
760        .await;
761        put_doc(
762            &source,
763            "inv2",
764            serde_json::json!({"type": "invoice", "amount": 200}),
765        )
766        .await;
767        put_doc(
768            &source,
769            "user1",
770            serde_json::json!({"type": "user", "name": "Alice"}),
771        )
772        .await;
773        put_doc(
774            &source,
775            "user2",
776            serde_json::json!({"type": "user", "name": "Bob"}),
777        )
778        .await;
779
780        let result = replicate(
781            &source,
782            &target,
783            ReplicationOptions {
784                filter: Some(ReplicationFilter::Selector(
785                    serde_json::json!({"type": "invoice"}),
786                )),
787                ..Default::default()
788            },
789        )
790        .await
791        .unwrap();
792
793        assert!(result.ok);
794        assert_eq!(result.docs_written, 2);
795
796        let target_info = target.info().await.unwrap();
797        assert_eq!(target_info.doc_count, 2);
798
799        let doc = target.get("inv1", GetOptions::default()).await.unwrap();
800        assert_eq!(doc.data["amount"], 100);
801
802        assert!(target.get("user1", GetOptions::default()).await.is_err());
803    }
804
805    #[tokio::test]
806    async fn replicate_filtered_by_custom_closure() {
807        let source = MemoryAdapter::new("source");
808        let target = MemoryAdapter::new("target");
809
810        put_doc(&source, "public:doc1", serde_json::json!({"v": 1})).await;
811        put_doc(&source, "public:doc2", serde_json::json!({"v": 2})).await;
812        put_doc(&source, "private:doc3", serde_json::json!({"v": 3})).await;
813        put_doc(&source, "private:doc4", serde_json::json!({"v": 4})).await;
814
815        let result = replicate(
816            &source,
817            &target,
818            ReplicationOptions {
819                filter: Some(ReplicationFilter::Custom(Arc::new(|change| {
820                    change.id.starts_with("public:")
821                }))),
822                ..Default::default()
823            },
824        )
825        .await
826        .unwrap();
827
828        assert!(result.ok);
829        assert_eq!(result.docs_written, 2);
830
831        let target_info = target.info().await.unwrap();
832        assert_eq!(target_info.doc_count, 2);
833
834        target
835            .get("public:doc1", GetOptions::default())
836            .await
837            .unwrap();
838        assert!(
839            target
840                .get("private:doc3", GetOptions::default())
841                .await
842                .is_err()
843        );
844    }
845
846    #[tokio::test]
847    async fn replicate_filtered_incremental() {
848        let source = MemoryAdapter::new("source");
849        let target = MemoryAdapter::new("target");
850
851        // First batch
852        put_doc(&source, "doc1", serde_json::json!({"type": "a"})).await;
853        put_doc(&source, "doc2", serde_json::json!({"type": "b"})).await;
854
855        let r1 = replicate(
856            &source,
857            &target,
858            ReplicationOptions {
859                filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
860                ..Default::default()
861            },
862        )
863        .await
864        .unwrap();
865        assert_eq!(r1.docs_written, 1);
866
867        // Add more docs
868        put_doc(&source, "doc3", serde_json::json!({"type": "a"})).await;
869        put_doc(&source, "doc4", serde_json::json!({"type": "b"})).await;
870
871        // Second replication — checkpoint should have advanced past doc1/doc2
872        let r2 = replicate(
873            &source,
874            &target,
875            ReplicationOptions {
876                filter: Some(ReplicationFilter::DocIds(vec![
877                    "doc1".into(),
878                    "doc3".into(),
879                ])),
880                ..Default::default()
881            },
882        )
883        .await
884        .unwrap();
885
886        // Only doc3 is new (doc1 was already replicated)
887        assert_eq!(r2.docs_written, 1);
888
889        let target_info = target.info().await.unwrap();
890        assert_eq!(target_info.doc_count, 2); // doc1 + doc3
891    }
892
893    #[tokio::test]
894    async fn replicate_filtered_with_deletes() {
895        let source = MemoryAdapter::new("source");
896        let target = MemoryAdapter::new("target");
897
898        put_doc(&source, "doc1", serde_json::json!({"type": "keep"})).await;
899        put_doc(&source, "doc2", serde_json::json!({"type": "skip"})).await;
900
901        // Replicate only doc1
902        replicate(
903            &source,
904            &target,
905            ReplicationOptions {
906                filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
907                ..Default::default()
908            },
909        )
910        .await
911        .unwrap();
912
913        // Delete doc1 on source
914        let doc = source.get("doc1", GetOptions::default()).await.unwrap();
915        let del = Document {
916            id: "doc1".into(),
917            rev: doc.rev,
918            deleted: true,
919            data: serde_json::json!({}),
920            attachments: HashMap::new(),
921        };
922        source
923            .bulk_docs(vec![del], BulkDocsOptions::new())
924            .await
925            .unwrap();
926
927        // Replicate again with same filter — deletion should propagate
928        let result = replicate(
929            &source,
930            &target,
931            ReplicationOptions {
932                filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
933                ..Default::default()
934            },
935        )
936        .await
937        .unwrap();
938
939        assert!(result.ok);
940        let target_info = target.info().await.unwrap();
941        assert_eq!(target_info.doc_count, 0);
942    }
943
944    #[tokio::test]
945    async fn replicate_no_filter_unchanged() {
946        let source = MemoryAdapter::new("source");
947        let target = MemoryAdapter::new("target");
948
949        put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
950        put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
951        put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
952
953        // No filter — should replicate everything (same as before)
954        let result = replicate(
955            &source,
956            &target,
957            ReplicationOptions {
958                filter: None,
959                ..Default::default()
960            },
961        )
962        .await
963        .unwrap();
964
965        assert!(result.ok);
966        assert_eq!(result.docs_read, 3);
967        assert_eq!(result.docs_written, 3);
968
969        let target_info = target.info().await.unwrap();
970        assert_eq!(target_info.doc_count, 3);
971    }
972}