1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use rouchdb_core::adapter::Adapter;
6use rouchdb_core::document::*;
7use rouchdb_core::error::Result;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::checkpoint::Checkpointer;
12
13pub enum ReplicationFilter {
15 DocIds(Vec<String>),
17
18 Selector(serde_json::Value),
20
21 Custom(Arc<dyn Fn(&ChangeEvent) -> bool + Send + Sync>),
24}
25
26impl Clone for ReplicationFilter {
27 fn clone(&self) -> Self {
28 match self {
29 Self::DocIds(ids) => Self::DocIds(ids.clone()),
30 Self::Selector(sel) => Self::Selector(sel.clone()),
31 Self::Custom(f) => Self::Custom(Arc::clone(f)),
32 }
33 }
34}
35
36pub struct ReplicationOptions {
38 pub batch_size: u64,
40 pub batches_limit: u64,
42 pub filter: Option<ReplicationFilter>,
44 pub live: bool,
46 pub retry: bool,
48 pub poll_interval: Duration,
50 pub back_off_function: Option<Box<dyn Fn(u32) -> Duration + Send + Sync>>,
52 pub since: Option<Seq>,
54 pub checkpoint: bool,
57}
58
59impl Default for ReplicationOptions {
60 fn default() -> Self {
61 Self {
62 batch_size: 100,
63 batches_limit: 10,
64 filter: None,
65 live: false,
66 retry: false,
67 poll_interval: Duration::from_millis(500),
68 back_off_function: None,
69 since: None,
70 checkpoint: true,
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct ReplicationResult {
78 pub ok: bool,
79 pub docs_read: u64,
80 pub docs_written: u64,
81 pub errors: Vec<String>,
82 pub last_seq: Seq,
83}
84
85#[derive(Debug, Clone)]
87pub enum ReplicationEvent {
88 Change { docs_read: u64 },
89 Paused,
90 Active,
91 Complete(ReplicationResult),
92 Error(String),
93}
94
95pub async fn replicate(
105 source: &dyn Adapter,
106 target: &dyn Adapter,
107 opts: ReplicationOptions,
108) -> Result<ReplicationResult> {
109 let source_info = source.info().await?;
110 let target_info = target.info().await?;
111
112 let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
113
114 let since = if let Some(ref override_since) = opts.since {
116 override_since.clone()
117 } else if opts.checkpoint {
118 checkpointer.read_checkpoint(source, target).await?
119 } else {
120 Seq::default()
121 };
122
123 let filter_doc_ids = match &opts.filter {
125 Some(ReplicationFilter::DocIds(ids)) => Some(ids.clone()),
126 _ => None,
127 };
128
129 let mut total_docs_read = 0u64;
130 let mut total_docs_written = 0u64;
131 let mut errors = Vec::new();
132 let mut current_seq = since;
133
134 loop {
135 let changes = source
137 .changes(ChangesOptions {
138 since: current_seq.clone(),
139 limit: Some(opts.batch_size),
140 include_docs: false,
141 doc_ids: filter_doc_ids.clone(),
142 ..Default::default()
143 })
144 .await?;
145
146 if changes.results.is_empty() {
147 break; }
149
150 let batch_last_seq = changes.last_seq;
151
152 let filtered_changes: Vec<&ChangeEvent> = match &opts.filter {
154 Some(ReplicationFilter::Custom(predicate)) => {
155 changes.results.iter().filter(|c| predicate(c)).collect()
156 }
157 _ => changes.results.iter().collect(),
158 };
159
160 total_docs_read += filtered_changes.len() as u64;
161
162 if filtered_changes.is_empty() {
163 current_seq = batch_last_seq;
164 if (changes.results.len() as u64) < opts.batch_size {
165 break;
166 }
167 continue;
168 }
169
170 let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
172 for change in &filtered_changes {
173 let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
174 rev_map.insert(change.id.clone(), revs);
175 }
176
177 let diff = target.revs_diff(rev_map).await?;
178
179 if diff.results.is_empty() {
180 current_seq = batch_last_seq;
182 if (changes.results.len() as u64) < opts.batch_size {
183 break;
184 }
185 continue;
186 }
187
188 let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
190 for (doc_id, diff_result) in &diff.results {
191 for missing_rev in &diff_result.missing {
192 bulk_get_items.push(BulkGetItem {
193 id: doc_id.clone(),
194 rev: Some(missing_rev.clone()),
195 });
196 }
197 }
198
199 let bulk_get_response = source.bulk_get(bulk_get_items).await?;
200
201 let mut docs_to_write: Vec<Document> = Vec::new();
203 for result in &bulk_get_response.results {
204 for doc in &result.docs {
205 if let Some(ref json) = doc.ok {
206 match Document::from_json(json.clone()) {
207 Ok(document) => docs_to_write.push(document),
208 Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
209 }
210 }
211 }
212 }
213
214 if let Some(ReplicationFilter::Selector(ref selector)) = opts.filter {
216 docs_to_write.retain(|doc| rouchdb_query::matches_selector(&doc.data, selector));
217 }
218
219 if !docs_to_write.is_empty() {
220 let write_count = docs_to_write.len() as u64;
221 let write_results = target
222 .bulk_docs(docs_to_write, BulkDocsOptions::replication())
223 .await?;
224
225 for wr in &write_results {
226 if !wr.ok {
227 errors.push(format!(
228 "write error for {}: {}",
229 wr.id,
230 wr.reason.as_deref().unwrap_or("unknown")
231 ));
232 }
233 }
234
235 total_docs_written += write_count;
236 }
237
238 current_seq = batch_last_seq;
240 if opts.checkpoint {
241 let _ = checkpointer
242 .write_checkpoint(source, target, current_seq.clone())
243 .await;
244 }
245
246 if (changes.results.len() as u64) < opts.batch_size {
248 break;
249 }
250 }
251
252 Ok(ReplicationResult {
253 ok: errors.is_empty(),
254 docs_read: total_docs_read,
255 docs_written: total_docs_written,
256 errors,
257 last_seq: current_seq,
258 })
259}
260
261pub async fn replicate_with_events(
266 source: &dyn Adapter,
267 target: &dyn Adapter,
268 opts: ReplicationOptions,
269 events_tx: mpsc::Sender<ReplicationEvent>,
270) -> Result<ReplicationResult> {
271 let source_info = source.info().await?;
272 let target_info = target.info().await?;
273
274 let checkpointer = Checkpointer::new(&source_info.db_name, &target_info.db_name);
275
276 let since = if let Some(ref override_since) = opts.since {
277 override_since.clone()
278 } else if opts.checkpoint {
279 checkpointer.read_checkpoint(source, target).await?
280 } else {
281 Seq::default()
282 };
283
284 let filter_doc_ids = match &opts.filter {
285 Some(ReplicationFilter::DocIds(ids)) => Some(ids.clone()),
286 _ => None,
287 };
288
289 let mut total_docs_read = 0u64;
290 let mut total_docs_written = 0u64;
291 let mut errors = Vec::new();
292 let mut current_seq = since;
293
294 let _ = events_tx.send(ReplicationEvent::Active).await;
295
296 loop {
297 let changes = source
298 .changes(ChangesOptions {
299 since: current_seq.clone(),
300 limit: Some(opts.batch_size),
301 include_docs: false,
302 doc_ids: filter_doc_ids.clone(),
303 ..Default::default()
304 })
305 .await?;
306
307 if changes.results.is_empty() {
308 break;
309 }
310
311 let batch_last_seq = changes.last_seq;
312
313 let filtered_changes: Vec<&ChangeEvent> = match &opts.filter {
314 Some(ReplicationFilter::Custom(predicate)) => {
315 changes.results.iter().filter(|c| predicate(c)).collect()
316 }
317 _ => changes.results.iter().collect(),
318 };
319
320 total_docs_read += filtered_changes.len() as u64;
321
322 if filtered_changes.is_empty() {
323 current_seq = batch_last_seq;
324 if (changes.results.len() as u64) < opts.batch_size {
325 break;
326 }
327 continue;
328 }
329
330 let mut rev_map: HashMap<String, Vec<String>> = HashMap::new();
331 for change in &filtered_changes {
332 let revs: Vec<String> = change.changes.iter().map(|c| c.rev.clone()).collect();
333 rev_map.insert(change.id.clone(), revs);
334 }
335
336 let diff = target.revs_diff(rev_map).await?;
337
338 if diff.results.is_empty() {
339 current_seq = batch_last_seq;
340 if (changes.results.len() as u64) < opts.batch_size {
341 break;
342 }
343 continue;
344 }
345
346 let mut bulk_get_items: Vec<BulkGetItem> = Vec::new();
347 for (doc_id, diff_result) in &diff.results {
348 for missing_rev in &diff_result.missing {
349 bulk_get_items.push(BulkGetItem {
350 id: doc_id.clone(),
351 rev: Some(missing_rev.clone()),
352 });
353 }
354 }
355
356 let bulk_get_response = source.bulk_get(bulk_get_items).await?;
357
358 let mut docs_to_write: Vec<Document> = Vec::new();
359 for result in &bulk_get_response.results {
360 for doc in &result.docs {
361 if let Some(ref json) = doc.ok {
362 match Document::from_json(json.clone()) {
363 Ok(document) => docs_to_write.push(document),
364 Err(e) => errors.push(format!("parse error for {}: {}", result.id, e)),
365 }
366 }
367 }
368 }
369
370 if let Some(ReplicationFilter::Selector(ref selector)) = opts.filter {
371 docs_to_write.retain(|doc| rouchdb_query::matches_selector(&doc.data, selector));
372 }
373
374 if !docs_to_write.is_empty() {
375 let write_count = docs_to_write.len() as u64;
376 let write_results = target
377 .bulk_docs(docs_to_write, BulkDocsOptions::replication())
378 .await?;
379
380 for wr in &write_results {
381 if !wr.ok {
382 errors.push(format!(
383 "write error for {}: {}",
384 wr.id,
385 wr.reason.as_deref().unwrap_or("unknown")
386 ));
387 }
388 }
389
390 total_docs_written += write_count;
391 }
392
393 let _ = events_tx
395 .send(ReplicationEvent::Change {
396 docs_read: total_docs_read,
397 })
398 .await;
399
400 current_seq = batch_last_seq;
401 if opts.checkpoint {
402 let _ = checkpointer
403 .write_checkpoint(source, target, current_seq.clone())
404 .await;
405 }
406
407 if (changes.results.len() as u64) < opts.batch_size {
408 break;
409 }
410 }
411
412 let result = ReplicationResult {
413 ok: errors.is_empty(),
414 docs_read: total_docs_read,
415 docs_written: total_docs_written,
416 errors,
417 last_seq: current_seq,
418 };
419
420 let _ = events_tx
421 .send(ReplicationEvent::Complete(result.clone()))
422 .await;
423
424 Ok(result)
425}
426
427pub fn replicate_live(
435 source: Arc<dyn Adapter>,
436 target: Arc<dyn Adapter>,
437 opts: ReplicationOptions,
438) -> (mpsc::Receiver<ReplicationEvent>, ReplicationHandle) {
439 let (tx, rx) = mpsc::channel(64);
440 let poll_interval = opts.poll_interval;
441 let retry = opts.retry;
442 let back_off = opts.back_off_function;
443
444 let cancel = CancellationToken::new();
445 let cancel_clone = cancel.clone();
446
447 tokio::spawn(async move {
448 let mut attempt: u32 = 0;
449
450 loop {
451 let one_shot_opts = ReplicationOptions {
454 batch_size: opts.batch_size,
455 batches_limit: opts.batches_limit,
456 filter: opts.filter.clone(),
457 live: false,
458 retry: false,
459 poll_interval,
460 back_off_function: None,
461 since: None,
462 checkpoint: opts.checkpoint,
463 };
464
465 let result =
466 replicate_with_events(source.as_ref(), target.as_ref(), one_shot_opts, tx.clone())
467 .await;
468
469 match result {
470 Ok(r) => {
471 attempt = 0; if r.docs_read == 0 {
473 let _ = tx.send(ReplicationEvent::Paused).await;
475 }
476 }
477 Err(e) => {
478 let _ = tx.send(ReplicationEvent::Error(e.to_string())).await;
479 if retry {
480 attempt += 1;
481 let delay = if let Some(ref f) = back_off {
482 f(attempt)
483 } else {
484 let secs = (1u64 << attempt.min(6)).min(60);
486 Duration::from_secs(secs)
487 };
488 tokio::select! {
489 _ = tokio::time::sleep(delay) => continue,
490 _ = cancel_clone.cancelled() => break,
491 }
492 } else {
493 break;
494 }
495 }
496 }
497
498 tokio::select! {
500 _ = tokio::time::sleep(poll_interval) => {},
501 _ = cancel_clone.cancelled() => break,
502 }
503 }
504 });
505
506 (rx, ReplicationHandle { cancel })
507}
508
509pub struct ReplicationHandle {
511 cancel: CancellationToken,
512}
513
514impl ReplicationHandle {
515 pub fn cancel(&self) {
517 self.cancel.cancel();
518 }
519}
520
521impl Drop for ReplicationHandle {
522 fn drop(&mut self) {
523 self.cancel.cancel();
524 }
525}
526
527#[cfg(test)]
532mod tests {
533 use super::*;
534 use rouchdb_adapter_memory::MemoryAdapter;
535
536 async fn put_doc(adapter: &dyn Adapter, id: &str, data: serde_json::Value) {
537 let doc = Document {
538 id: id.into(),
539 rev: None,
540 deleted: false,
541 data,
542 attachments: HashMap::new(),
543 };
544 adapter
545 .bulk_docs(vec![doc], BulkDocsOptions::new())
546 .await
547 .unwrap();
548 }
549
550 #[tokio::test]
551 async fn replicate_empty_databases() {
552 let source = MemoryAdapter::new("source");
553 let target = MemoryAdapter::new("target");
554
555 let result = replicate(&source, &target, ReplicationOptions::default())
556 .await
557 .unwrap();
558
559 assert!(result.ok);
560 assert_eq!(result.docs_read, 0);
561 assert_eq!(result.docs_written, 0);
562 }
563
564 #[tokio::test]
565 async fn replicate_source_to_target() {
566 let source = MemoryAdapter::new("source");
567 let target = MemoryAdapter::new("target");
568
569 put_doc(&source, "doc1", serde_json::json!({"name": "Alice"})).await;
570 put_doc(&source, "doc2", serde_json::json!({"name": "Bob"})).await;
571 put_doc(&source, "doc3", serde_json::json!({"name": "Charlie"})).await;
572
573 let result = replicate(&source, &target, ReplicationOptions::default())
574 .await
575 .unwrap();
576
577 assert!(result.ok);
578 assert_eq!(result.docs_read, 3);
579 assert_eq!(result.docs_written, 3);
580
581 let target_info = target.info().await.unwrap();
583 assert_eq!(target_info.doc_count, 3);
584
585 let doc = target.get("doc1", GetOptions::default()).await.unwrap();
586 assert_eq!(doc.data["name"], "Alice");
587 }
588
589 #[tokio::test]
590 async fn replicate_incremental() {
591 let source = MemoryAdapter::new("source");
592 let target = MemoryAdapter::new("target");
593
594 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
596 let r1 = replicate(&source, &target, ReplicationOptions::default())
597 .await
598 .unwrap();
599 assert_eq!(r1.docs_written, 1);
600
601 put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
603 put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
604
605 let r2 = replicate(&source, &target, ReplicationOptions::default())
607 .await
608 .unwrap();
609 assert_eq!(r2.docs_read, 2);
610 assert_eq!(r2.docs_written, 2);
611
612 let target_info = target.info().await.unwrap();
613 assert_eq!(target_info.doc_count, 3);
614 }
615
616 #[tokio::test]
617 async fn replicate_already_synced() {
618 let source = MemoryAdapter::new("source");
619 let target = MemoryAdapter::new("target");
620
621 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
622
623 replicate(&source, &target, ReplicationOptions::default())
625 .await
626 .unwrap();
627
628 let result = replicate(&source, &target, ReplicationOptions::default())
630 .await
631 .unwrap();
632
633 assert!(result.ok);
634 assert_eq!(result.docs_written, 0);
635 }
636
637 #[tokio::test]
638 async fn replicate_batched() {
639 let source = MemoryAdapter::new("source");
640 let target = MemoryAdapter::new("target");
641
642 for i in 0..15 {
644 put_doc(
645 &source,
646 &format!("doc{:03}", i),
647 serde_json::json!({"i": i}),
648 )
649 .await;
650 }
651
652 let result = replicate(
653 &source,
654 &target,
655 ReplicationOptions {
656 batch_size: 5,
657 ..Default::default()
658 },
659 )
660 .await
661 .unwrap();
662
663 assert!(result.ok);
664 assert_eq!(result.docs_written, 15);
665
666 let target_info = target.info().await.unwrap();
667 assert_eq!(target_info.doc_count, 15);
668 }
669
670 #[tokio::test]
671 async fn replicate_with_deletes() {
672 let source = MemoryAdapter::new("source");
673 let target = MemoryAdapter::new("target");
674
675 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
677 replicate(&source, &target, ReplicationOptions::default())
678 .await
679 .unwrap();
680
681 let doc = source.get("doc1", GetOptions::default()).await.unwrap();
683 let del = Document {
684 id: "doc1".into(),
685 rev: doc.rev,
686 deleted: true,
687 data: serde_json::json!({}),
688 attachments: HashMap::new(),
689 };
690 source
691 .bulk_docs(vec![del], BulkDocsOptions::new())
692 .await
693 .unwrap();
694
695 let result = replicate(&source, &target, ReplicationOptions::default())
697 .await
698 .unwrap();
699 assert!(result.ok);
700
701 let target_info = target.info().await.unwrap();
703 assert_eq!(target_info.doc_count, 0);
704 }
705
706 #[tokio::test]
711 async fn replicate_filtered_by_doc_ids() {
712 let source = MemoryAdapter::new("source");
713 let target = MemoryAdapter::new("target");
714
715 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
716 put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
717 put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
718 put_doc(&source, "doc4", serde_json::json!({"v": 4})).await;
719 put_doc(&source, "doc5", serde_json::json!({"v": 5})).await;
720
721 let result = replicate(
722 &source,
723 &target,
724 ReplicationOptions {
725 filter: Some(ReplicationFilter::DocIds(vec![
726 "doc2".into(),
727 "doc4".into(),
728 ])),
729 ..Default::default()
730 },
731 )
732 .await
733 .unwrap();
734
735 assert!(result.ok);
736 assert_eq!(result.docs_written, 2);
737
738 let target_info = target.info().await.unwrap();
739 assert_eq!(target_info.doc_count, 2);
740
741 target.get("doc2", GetOptions::default()).await.unwrap();
742 target.get("doc4", GetOptions::default()).await.unwrap();
743
744 assert!(target.get("doc1", GetOptions::default()).await.is_err());
746 assert!(target.get("doc3", GetOptions::default()).await.is_err());
747 assert!(target.get("doc5", GetOptions::default()).await.is_err());
748 }
749
750 #[tokio::test]
751 async fn replicate_filtered_by_selector() {
752 let source = MemoryAdapter::new("source");
753 let target = MemoryAdapter::new("target");
754
755 put_doc(
756 &source,
757 "inv1",
758 serde_json::json!({"type": "invoice", "amount": 100}),
759 )
760 .await;
761 put_doc(
762 &source,
763 "inv2",
764 serde_json::json!({"type": "invoice", "amount": 200}),
765 )
766 .await;
767 put_doc(
768 &source,
769 "user1",
770 serde_json::json!({"type": "user", "name": "Alice"}),
771 )
772 .await;
773 put_doc(
774 &source,
775 "user2",
776 serde_json::json!({"type": "user", "name": "Bob"}),
777 )
778 .await;
779
780 let result = replicate(
781 &source,
782 &target,
783 ReplicationOptions {
784 filter: Some(ReplicationFilter::Selector(
785 serde_json::json!({"type": "invoice"}),
786 )),
787 ..Default::default()
788 },
789 )
790 .await
791 .unwrap();
792
793 assert!(result.ok);
794 assert_eq!(result.docs_written, 2);
795
796 let target_info = target.info().await.unwrap();
797 assert_eq!(target_info.doc_count, 2);
798
799 let doc = target.get("inv1", GetOptions::default()).await.unwrap();
800 assert_eq!(doc.data["amount"], 100);
801
802 assert!(target.get("user1", GetOptions::default()).await.is_err());
803 }
804
805 #[tokio::test]
806 async fn replicate_filtered_by_custom_closure() {
807 let source = MemoryAdapter::new("source");
808 let target = MemoryAdapter::new("target");
809
810 put_doc(&source, "public:doc1", serde_json::json!({"v": 1})).await;
811 put_doc(&source, "public:doc2", serde_json::json!({"v": 2})).await;
812 put_doc(&source, "private:doc3", serde_json::json!({"v": 3})).await;
813 put_doc(&source, "private:doc4", serde_json::json!({"v": 4})).await;
814
815 let result = replicate(
816 &source,
817 &target,
818 ReplicationOptions {
819 filter: Some(ReplicationFilter::Custom(Arc::new(|change| {
820 change.id.starts_with("public:")
821 }))),
822 ..Default::default()
823 },
824 )
825 .await
826 .unwrap();
827
828 assert!(result.ok);
829 assert_eq!(result.docs_written, 2);
830
831 let target_info = target.info().await.unwrap();
832 assert_eq!(target_info.doc_count, 2);
833
834 target
835 .get("public:doc1", GetOptions::default())
836 .await
837 .unwrap();
838 assert!(
839 target
840 .get("private:doc3", GetOptions::default())
841 .await
842 .is_err()
843 );
844 }
845
846 #[tokio::test]
847 async fn replicate_filtered_incremental() {
848 let source = MemoryAdapter::new("source");
849 let target = MemoryAdapter::new("target");
850
851 put_doc(&source, "doc1", serde_json::json!({"type": "a"})).await;
853 put_doc(&source, "doc2", serde_json::json!({"type": "b"})).await;
854
855 let r1 = replicate(
856 &source,
857 &target,
858 ReplicationOptions {
859 filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
860 ..Default::default()
861 },
862 )
863 .await
864 .unwrap();
865 assert_eq!(r1.docs_written, 1);
866
867 put_doc(&source, "doc3", serde_json::json!({"type": "a"})).await;
869 put_doc(&source, "doc4", serde_json::json!({"type": "b"})).await;
870
871 let r2 = replicate(
873 &source,
874 &target,
875 ReplicationOptions {
876 filter: Some(ReplicationFilter::DocIds(vec![
877 "doc1".into(),
878 "doc3".into(),
879 ])),
880 ..Default::default()
881 },
882 )
883 .await
884 .unwrap();
885
886 assert_eq!(r2.docs_written, 1);
888
889 let target_info = target.info().await.unwrap();
890 assert_eq!(target_info.doc_count, 2); }
892
893 #[tokio::test]
894 async fn replicate_filtered_with_deletes() {
895 let source = MemoryAdapter::new("source");
896 let target = MemoryAdapter::new("target");
897
898 put_doc(&source, "doc1", serde_json::json!({"type": "keep"})).await;
899 put_doc(&source, "doc2", serde_json::json!({"type": "skip"})).await;
900
901 replicate(
903 &source,
904 &target,
905 ReplicationOptions {
906 filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
907 ..Default::default()
908 },
909 )
910 .await
911 .unwrap();
912
913 let doc = source.get("doc1", GetOptions::default()).await.unwrap();
915 let del = Document {
916 id: "doc1".into(),
917 rev: doc.rev,
918 deleted: true,
919 data: serde_json::json!({}),
920 attachments: HashMap::new(),
921 };
922 source
923 .bulk_docs(vec![del], BulkDocsOptions::new())
924 .await
925 .unwrap();
926
927 let result = replicate(
929 &source,
930 &target,
931 ReplicationOptions {
932 filter: Some(ReplicationFilter::DocIds(vec!["doc1".into()])),
933 ..Default::default()
934 },
935 )
936 .await
937 .unwrap();
938
939 assert!(result.ok);
940 let target_info = target.info().await.unwrap();
941 assert_eq!(target_info.doc_count, 0);
942 }
943
944 #[tokio::test]
945 async fn replicate_no_filter_unchanged() {
946 let source = MemoryAdapter::new("source");
947 let target = MemoryAdapter::new("target");
948
949 put_doc(&source, "doc1", serde_json::json!({"v": 1})).await;
950 put_doc(&source, "doc2", serde_json::json!({"v": 2})).await;
951 put_doc(&source, "doc3", serde_json::json!({"v": 3})).await;
952
953 let result = replicate(
955 &source,
956 &target,
957 ReplicationOptions {
958 filter: None,
959 ..Default::default()
960 },
961 )
962 .await
963 .unwrap();
964
965 assert!(result.ok);
966 assert_eq!(result.docs_read, 3);
967 assert_eq!(result.docs_written, 3);
968
969 let target_info = target.info().await.unwrap();
970 assert_eq!(target_info.doc_count, 3);
971 }
972}