1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14 build_path_from_revs, collect_leaves, find_rev_ancestry, rev_exists, NodeOpts, RevPath,
15 RevStatus, RevTree,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20#[derive(Debug, Clone)]
25struct StoredDoc {
26 rev_tree: RevTree,
27 rev_data: HashMap<String, serde_json::Value>,
29 rev_deleted: HashMap<String, bool>,
31 seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37 name: String,
38 docs: HashMap<String, StoredDoc>,
40 update_seq: u64,
42 changes: BTreeMap<u64, (String, bool)>,
44 local_docs: HashMap<String, serde_json::Value>,
46 attachments: HashMap<String, Vec<u8>>,
48}
49
50#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53 inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57 pub fn new(name: &str) -> Self {
58 Self {
59 inner: Arc::new(RwLock::new(Inner {
60 name: name.to_string(),
61 docs: HashMap::new(),
62 update_seq: 0,
63 changes: BTreeMap::new(),
64 local_docs: HashMap::new(),
65 attachments: HashMap::new(),
66 })),
67 }
68 }
69}
70
71fn generate_rev_hash(doc_data: &serde_json::Value, deleted: bool, prev_rev: Option<&str>) -> String {
77 let mut hasher = Md5::new();
78 if let Some(prev) = prev_rev {
80 hasher.update(prev.as_bytes());
81 }
82 hasher.update(if deleted { b"1" } else { b"0" });
83 let serialized = serde_json::to_string(doc_data).unwrap_or_default();
84 hasher.update(serialized.as_bytes());
85 format!("{:x}", hasher.finalize())
86}
87
88fn rev_string(pos: u64, hash: &str) -> String {
89 format!("{}-{}", pos, hash)
90}
91
92fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
93 let (pos_str, hash) = rev_str
94 .split_once('-')
95 .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
96 let pos: u64 = pos_str
97 .parse()
98 .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
99 Ok((pos, hash.to_string()))
100}
101
102fn compute_attachment_digest(data: &[u8]) -> String {
103 let mut hasher = Md5::new();
104 hasher.update(data);
105 let hash = hasher.finalize();
106 use base64::Engine;
107 let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
108 format!("md5-{}", b64)
109}
110
111#[async_trait]
116impl Adapter for MemoryAdapter {
117 async fn info(&self) -> Result<DbInfo> {
118 let inner = self.inner.read().await;
119 let doc_count = inner
120 .docs
121 .values()
122 .filter(|d| {
123 !is_deleted(&d.rev_tree)
125 })
126 .count() as u64;
127
128 Ok(DbInfo {
129 db_name: inner.name.clone(),
130 doc_count,
131 update_seq: Seq::Num(inner.update_seq),
132 })
133 }
134
135 async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
136 let inner = self.inner.read().await;
137 let stored = inner
138 .docs
139 .get(id)
140 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
141
142 let target_rev = if let Some(ref rev_str) = opts.rev {
144 rev_str.clone()
145 } else {
146 let winner = winning_rev(&stored.rev_tree)
148 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
149 winner.to_string()
150 };
151
152 let data = stored
154 .rev_data
155 .get(&target_rev)
156 .cloned()
157 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
158
159 let deleted = stored.rev_deleted.get(&target_rev).copied().unwrap_or(false);
160
161 if deleted && opts.rev.is_none() {
163 return Err(RouchError::NotFound(id.to_string()));
164 }
165
166 let (pos, hash) = parse_rev(&target_rev)?;
167 let rev = Revision::new(pos, hash);
168
169 let mut doc = Document {
170 id: id.to_string(),
171 rev: Some(rev),
172 deleted,
173 data,
174 attachments: HashMap::new(),
175 };
176
177 if opts.conflicts {
179 let conflicts = collect_conflicts(&stored.rev_tree);
180 if !conflicts.is_empty() {
181 let conflict_list: Vec<serde_json::Value> = conflicts
182 .iter()
183 .map(|c| serde_json::Value::String(c.to_string()))
184 .collect();
185 if let serde_json::Value::Object(ref mut map) = doc.data {
186 map.insert(
187 "_conflicts".to_string(),
188 serde_json::Value::Array(conflict_list),
189 );
190 }
191 }
192 }
193
194 Ok(doc)
195 }
196
197 async fn bulk_docs(
198 &self,
199 docs: Vec<Document>,
200 opts: BulkDocsOptions,
201 ) -> Result<Vec<DocResult>> {
202 let mut inner = self.inner.write().await;
203 let mut results = Vec::with_capacity(docs.len());
204
205 for doc in docs {
206 let result = if opts.new_edits {
207 process_doc_new_edits(&mut inner, doc)
208 } else {
209 process_doc_replication(&mut inner, doc)
210 };
211 results.push(result);
212 }
213
214 Ok(results)
215 }
216
217 async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
218 let inner = self.inner.read().await;
219
220 let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
222 doc_ids.sort();
223
224 if opts.descending {
225 doc_ids.reverse();
226 }
227
228 let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
230 keys.clone()
231 } else if let Some(ref key) = opts.key {
232 vec![key.clone()]
233 } else {
234 doc_ids.iter().map(|k| (*k).clone()).collect()
235 };
236
237 let mut rows = Vec::new();
238
239 for key in &target_keys {
240 if opts.keys.is_none() && opts.key.is_none() {
242 if let Some(ref start) = opts.start_key {
243 if (!opts.descending && key.as_str() < start.as_str())
244 || (opts.descending && key.as_str() > start.as_str())
245 {
246 continue;
247 }
248 }
249 if let Some(ref end) = opts.end_key {
250 if opts.inclusive_end {
251 if (!opts.descending && key.as_str() > end.as_str())
252 || (opts.descending && key.as_str() < end.as_str())
253 {
254 continue;
255 }
256 } else if (!opts.descending && key.as_str() >= end.as_str())
257 || (opts.descending && key.as_str() <= end.as_str())
258 {
259 continue;
260 }
261 }
262 }
263
264 if let Some(stored) = inner.docs.get(key.as_str()) {
265 let winner = match winning_rev(&stored.rev_tree) {
266 Some(w) => w,
267 None => continue,
268 };
269 let deleted = is_deleted(&stored.rev_tree);
270
271 if deleted && opts.keys.is_none() {
273 continue;
274 }
275
276 let doc_json = if opts.include_docs && !deleted {
277 let rev_str = winner.to_string();
278 stored.rev_data.get(&rev_str).map(|data| {
279 let mut obj = match data {
280 serde_json::Value::Object(m) => m.clone(),
281 _ => serde_json::Map::new(),
282 };
283 obj.insert(
284 "_id".into(),
285 serde_json::Value::String(key.clone()),
286 );
287 obj.insert(
288 "_rev".into(),
289 serde_json::Value::String(rev_str),
290 );
291 serde_json::Value::Object(obj)
292 })
293 } else {
294 None
295 };
296
297 rows.push(AllDocsRow {
298 id: key.clone(),
299 key: key.clone(),
300 value: AllDocsRowValue {
301 rev: winner.to_string(),
302 deleted: if deleted { Some(true) } else { None },
303 },
304 doc: doc_json,
305 });
306 } else if opts.keys.is_some() {
307 }
311 }
312
313 let total_rows = rows.len() as u64;
315 let skip = opts.skip as usize;
316 if skip > 0 {
317 rows = rows.into_iter().skip(skip).collect();
318 }
319 if let Some(limit) = opts.limit {
320 rows.truncate(limit as usize);
321 }
322
323 Ok(AllDocsResponse {
324 total_rows,
325 offset: opts.skip,
326 rows,
327 })
328 }
329
330 async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
331 let inner = self.inner.read().await;
332
333 let mut results = Vec::new();
334
335 let range = (opts.since.as_num() + 1)..;
337 let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
338 Box::new(
339 inner
340 .changes
341 .range(range)
342 .collect::<Vec<_>>()
343 .into_iter()
344 .rev(),
345 )
346 } else {
347 Box::new(inner.changes.range(range))
348 };
349
350 for (seq, (doc_id, deleted)) in iter {
351 if let Some(ref doc_ids) = opts.doc_ids {
353 if !doc_ids.contains(doc_id) {
354 continue;
355 }
356 }
357
358 let stored = inner.docs.get(doc_id);
359 let rev_str = stored
360 .and_then(|s| winning_rev(&s.rev_tree))
361 .map(|r| r.to_string())
362 .unwrap_or_default();
363
364 let doc = if opts.include_docs {
365 stored.and_then(|s| {
366 s.rev_data.get(&rev_str).map(|data| {
367 let mut obj = match data {
368 serde_json::Value::Object(m) => m.clone(),
369 _ => serde_json::Map::new(),
370 };
371 obj.insert(
372 "_id".into(),
373 serde_json::Value::String(doc_id.clone()),
374 );
375 obj.insert(
376 "_rev".into(),
377 serde_json::Value::String(rev_str.clone()),
378 );
379 if *deleted {
380 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
381 }
382 serde_json::Value::Object(obj)
383 })
384 })
385 } else {
386 None
387 };
388
389 results.push(ChangeEvent {
390 seq: Seq::Num(*seq),
391 id: doc_id.clone(),
392 changes: vec![ChangeRev {
393 rev: rev_str,
394 }],
395 deleted: *deleted,
396 doc,
397 });
398
399 if let Some(limit) = opts.limit {
400 if results.len() >= limit as usize {
401 break;
402 }
403 }
404 }
405
406 let last_seq = results.last().map(|r| r.seq.clone()).unwrap_or(opts.since.clone());
407
408 Ok(ChangesResponse {
409 results,
410 last_seq,
411 })
412 }
413
414 async fn revs_diff(
415 &self,
416 revs: HashMap<String, Vec<String>>,
417 ) -> Result<RevsDiffResponse> {
418 let inner = self.inner.read().await;
419 let mut results = HashMap::new();
420
421 for (doc_id, rev_list) in revs {
422 let mut missing = Vec::new();
423 let mut possible_ancestors = Vec::new();
424
425 let stored = inner.docs.get(&doc_id);
426
427 for rev_str in &rev_list {
428 let (pos, hash) = parse_rev(rev_str)?;
429
430 let exists = stored
431 .map(|s| rev_exists(&s.rev_tree, pos, &hash))
432 .unwrap_or(false);
433
434 if !exists {
435 missing.push(rev_str.clone());
436
437 if let Some(stored) = stored {
439 let leaves = collect_leaves(&stored.rev_tree);
440 for leaf in &leaves {
441 if leaf.pos < pos {
442 possible_ancestors.push(leaf.rev_string());
443 }
444 }
445 }
446 }
447 }
448
449 if !missing.is_empty() {
450 results.insert(
451 doc_id,
452 RevsDiffResult {
453 missing,
454 possible_ancestors,
455 },
456 );
457 }
458 }
459
460 Ok(RevsDiffResponse { results })
461 }
462
463 async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
464 let inner = self.inner.read().await;
465 let mut results = Vec::new();
466
467 for item in docs {
468 let mut bulk_docs = Vec::new();
469
470 match inner.docs.get(&item.id) {
471 Some(stored) => {
472 let rev_str = if let Some(ref rev) = item.rev {
473 rev.clone()
474 } else {
475 match winning_rev(&stored.rev_tree) {
476 Some(w) => w.to_string(),
477 None => {
478 bulk_docs.push(BulkGetDoc {
479 ok: None,
480 error: Some(BulkGetError {
481 id: item.id.clone(),
482 rev: item.rev.unwrap_or_default(),
483 error: "not_found".into(),
484 reason: "missing".into(),
485 }),
486 });
487 results.push(BulkGetResult {
488 id: item.id,
489 docs: bulk_docs,
490 });
491 continue;
492 }
493 }
494 };
495
496 if let Some(data) = stored.rev_data.get(&rev_str) {
497 let deleted = stored
498 .rev_deleted
499 .get(&rev_str)
500 .copied()
501 .unwrap_or(false);
502 let mut obj = match data {
503 serde_json::Value::Object(m) => m.clone(),
504 _ => serde_json::Map::new(),
505 };
506 obj.insert(
507 "_id".into(),
508 serde_json::Value::String(item.id.clone()),
509 );
510 obj.insert(
511 "_rev".into(),
512 serde_json::Value::String(rev_str.clone()),
513 );
514 if deleted {
515 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
516 }
517
518 if let Ok((pos, ref hash)) = parse_rev(&rev_str) {
520 if let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash) {
521 obj.insert(
522 "_revisions".into(),
523 serde_json::json!({
524 "start": pos,
525 "ids": ancestry
526 }),
527 );
528 }
529 }
530
531 bulk_docs.push(BulkGetDoc {
532 ok: Some(serde_json::Value::Object(obj)),
533 error: None,
534 });
535 } else {
536 bulk_docs.push(BulkGetDoc {
537 ok: None,
538 error: Some(BulkGetError {
539 id: item.id.clone(),
540 rev: rev_str,
541 error: "not_found".into(),
542 reason: "missing".into(),
543 }),
544 });
545 }
546 }
547 None => {
548 bulk_docs.push(BulkGetDoc {
549 ok: None,
550 error: Some(BulkGetError {
551 id: item.id.clone(),
552 rev: item.rev.unwrap_or_default(),
553 error: "not_found".into(),
554 reason: "missing".into(),
555 }),
556 });
557 }
558 }
559
560 results.push(BulkGetResult {
561 id: item.id,
562 docs: bulk_docs,
563 });
564 }
565
566 Ok(BulkGetResponse { results })
567 }
568
569 async fn put_attachment(
570 &self,
571 doc_id: &str,
572 att_id: &str,
573 rev: &str,
574 data: Vec<u8>,
575 content_type: &str,
576 ) -> Result<DocResult> {
577 let digest = compute_attachment_digest(&data);
578 let length = data.len() as u64;
579
580 let mut inner = self.inner.write().await;
581
582 inner.attachments.insert(digest.clone(), data);
584
585 let stored = inner
587 .docs
588 .get(doc_id)
589 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
590
591 let winner = winning_rev(&stored.rev_tree)
593 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
594 if winner.to_string() != rev {
595 return Err(RouchError::Conflict);
596 }
597
598 let doc_data = stored
600 .rev_data
601 .get(rev)
602 .cloned()
603 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
604
605 let att_meta = AttachmentMeta {
607 content_type: content_type.to_string(),
608 digest: digest.clone(),
609 length,
610 stub: true,
611 data: None,
612 };
613
614 let doc = Document {
615 id: doc_id.to_string(),
616 rev: Some(winner.clone()),
617 deleted: false,
618 data: doc_data.clone(),
619 attachments: {
620 let mut atts = HashMap::new();
621 atts.insert(att_id.to_string(), att_meta);
622 atts
623 },
624 };
625
626 let result = process_doc_new_edits(&mut inner, doc);
628 Ok(result)
629 }
630
631 async fn get_attachment(
632 &self,
633 doc_id: &str,
634 att_id: &str,
635 opts: GetAttachmentOptions,
636 ) -> Result<Vec<u8>> {
637 let inner = self.inner.read().await;
638
639 let stored = inner
640 .docs
641 .get(doc_id)
642 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
643
644 let rev_str = if let Some(ref rev) = opts.rev {
645 rev.clone()
646 } else {
647 winning_rev(&stored.rev_tree)
648 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
649 .to_string()
650 };
651
652 let _data = stored.rev_data.get(&rev_str);
657
658 Err(RouchError::NotFound(format!(
660 "attachment {}/{}",
661 doc_id, att_id
662 )))
663 }
664
665 async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
666 let inner = self.inner.read().await;
667 inner
668 .local_docs
669 .get(id)
670 .cloned()
671 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
672 }
673
674 async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
675 let mut inner = self.inner.write().await;
676 inner.local_docs.insert(id.to_string(), doc);
677 Ok(())
678 }
679
680 async fn remove_local(&self, id: &str) -> Result<()> {
681 let mut inner = self.inner.write().await;
682 inner
683 .local_docs
684 .remove(id)
685 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
686 Ok(())
687 }
688
689 async fn compact(&self) -> Result<()> {
690 let mut inner = self.inner.write().await;
691
692 for stored in inner.docs.values_mut() {
693 let leaves = collect_leaves(&stored.rev_tree);
694 let leaf_revs: std::collections::HashSet<String> =
695 leaves.iter().map(|l| l.rev_string()).collect();
696
697 stored.rev_data.retain(|k, _| leaf_revs.contains(k));
699 stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
700 }
701
702 Ok(())
703 }
704
705 async fn destroy(&self) -> Result<()> {
706 let mut inner = self.inner.write().await;
707 inner.docs.clear();
708 inner.changes.clear();
709 inner.local_docs.clear();
710 inner.attachments.clear();
711 inner.update_seq = 0;
712 Ok(())
713 }
714}
715
716fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
721 let doc_id = if doc.id.is_empty() {
722 Uuid::new_v4().to_string()
723 } else {
724 doc.id.clone()
725 };
726
727 let existing = inner.docs.get(&doc_id);
728
729 if let Some(stored) = existing {
731 let winner = winning_rev(&stored.rev_tree);
732
733 match (&doc.rev, &winner) {
734 (Some(provided_rev), Some(current_winner)) => {
735 if provided_rev.to_string() != current_winner.to_string() {
736 return DocResult {
737 ok: false,
738 id: doc_id,
739 rev: None,
740 error: Some("conflict".into()),
741 reason: Some("Document update conflict".into()),
742 };
743 }
744 }
745 (None, Some(_)) => {
746 if !is_deleted(&stored.rev_tree) {
748 return DocResult {
749 ok: false,
750 id: doc_id,
751 rev: None,
752 error: Some("conflict".into()),
753 reason: Some("Document update conflict".into()),
754 };
755 }
756 }
758 _ => {}
759 }
760 } else if doc.rev.is_some() {
761 return DocResult {
763 ok: false,
764 id: doc_id,
765 rev: None,
766 error: Some("not_found".into()),
767 reason: Some("missing".into()),
768 };
769 }
770
771 let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
773 let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
774 let new_hash = generate_rev_hash(
775 &doc.data,
776 doc.deleted,
777 prev_rev_str.as_deref(),
778 );
779 let new_rev_str = rev_string(new_pos, &new_hash);
780
781 let mut rev_hashes = vec![new_hash.clone()];
783 if let Some(ref prev) = doc.rev {
784 rev_hashes.push(prev.hash.clone());
785 }
786
787 let new_path = build_path_from_revs(
788 new_pos,
789 &rev_hashes,
790 NodeOpts {
791 deleted: doc.deleted,
792 },
793 RevStatus::Available,
794 );
795
796 let existing_tree = existing
798 .map(|s| s.rev_tree.clone())
799 .unwrap_or_default();
800
801 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
802
803 inner.update_seq += 1;
805 let seq = inner.update_seq;
806
807 if let Some(existing) = inner.docs.get(&doc_id) {
809 inner.changes.remove(&existing.seq);
810 }
811
812 let stored = inner.docs.entry(doc_id.clone()).or_insert_with(|| StoredDoc {
814 rev_tree: Vec::new(),
815 rev_data: HashMap::new(),
816 rev_deleted: HashMap::new(),
817 seq: 0,
818 });
819
820 stored.rev_tree = merged_tree;
821 stored.rev_data.insert(new_rev_str.clone(), doc.data);
822 stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
823 stored.seq = seq;
824
825 inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
827
828 DocResult {
829 ok: true,
830 id: doc_id,
831 rev: Some(new_rev_str),
832 error: None,
833 reason: None,
834 }
835}
836
837fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
842 let doc_id = doc.id.clone();
843 let rev = match &doc.rev {
844 Some(r) => r.clone(),
845 None => {
846 return DocResult {
847 ok: false,
848 id: doc_id,
849 rev: None,
850 error: Some("bad_request".into()),
851 reason: Some("missing _rev".into()),
852 };
853 }
854 };
855
856 let rev_str = rev.to_string();
857
858 let new_path = if let Some(revisions) = doc.data.get("_revisions") {
860 let start = revisions["start"].as_u64().unwrap_or(rev.pos);
861 let ids: Vec<String> = revisions["ids"]
862 .as_array()
863 .map(|arr| {
864 arr.iter()
865 .filter_map(|v| v.as_str().map(String::from))
866 .collect()
867 })
868 .unwrap_or_else(|| vec![rev.hash.clone()]);
869
870 build_path_from_revs(
871 start,
872 &ids,
873 NodeOpts {
874 deleted: doc.deleted,
875 },
876 RevStatus::Available,
877 )
878 } else {
879 RevPath {
881 pos: rev.pos,
882 tree: rouchdb_core::rev_tree::RevNode {
883 hash: rev.hash.clone(),
884 status: RevStatus::Available,
885 opts: NodeOpts {
886 deleted: doc.deleted,
887 },
888 children: vec![],
889 },
890 }
891 };
892
893 if let serde_json::Value::Object(ref mut map) = doc.data {
895 map.remove("_revisions");
896 }
897
898 let existing_tree = inner
900 .docs
901 .get(&doc_id)
902 .map(|s| s.rev_tree.clone())
903 .unwrap_or_default();
904
905 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
906
907 inner.update_seq += 1;
909 let seq = inner.update_seq;
910
911 if let Some(existing) = inner.docs.get(&doc_id) {
913 inner.changes.remove(&existing.seq);
914 }
915
916 let is_doc_deleted = is_deleted(&merged_tree);
917
918 let stored = inner.docs.entry(doc_id.clone()).or_insert_with(|| StoredDoc {
919 rev_tree: Vec::new(),
920 rev_data: HashMap::new(),
921 rev_deleted: HashMap::new(),
922 seq: 0,
923 });
924
925 stored.rev_tree = merged_tree;
926 stored.rev_data.insert(rev_str.clone(), doc.data);
927 stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
928 stored.seq = seq;
929
930 inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
931
932 DocResult {
933 ok: true,
934 id: doc_id,
935 rev: Some(rev_str),
936 error: None,
937 reason: None,
938 }
939}
940
941#[cfg(test)]
946mod tests {
947 use super::*;
948 use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
949
950 async fn new_db() -> MemoryAdapter {
951 MemoryAdapter::new("test")
952 }
953
954 #[tokio::test]
955 async fn info_empty_db() {
956 let db = new_db().await;
957 let info = db.info().await.unwrap();
958 assert_eq!(info.db_name, "test");
959 assert_eq!(info.doc_count, 0);
960 assert_eq!(info.update_seq, Seq::Num(0));
961 }
962
963 #[tokio::test]
964 async fn put_and_get_document() {
965 let db = new_db().await;
966
967 let doc = Document {
968 id: "doc1".into(),
969 rev: None,
970 deleted: false,
971 data: serde_json::json!({"name": "Alice"}),
972 attachments: HashMap::new(),
973 };
974
975 let results = db
976 .bulk_docs(vec![doc], BulkDocsOptions::new())
977 .await
978 .unwrap();
979 assert!(results[0].ok);
980 assert_eq!(results[0].id, "doc1");
981 assert!(results[0].rev.is_some());
982
983 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
984 assert_eq!(fetched.id, "doc1");
985 assert_eq!(fetched.data["name"], "Alice");
986 assert!(fetched.rev.is_some());
987 }
988
989 #[tokio::test]
990 async fn update_document() {
991 let db = new_db().await;
992
993 let doc = Document {
995 id: "doc1".into(),
996 rev: None,
997 deleted: false,
998 data: serde_json::json!({"name": "Alice"}),
999 attachments: HashMap::new(),
1000 };
1001 let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1002 let rev1 = results[0].rev.clone().unwrap();
1003
1004 let rev_parsed: Revision = rev1.parse().unwrap();
1006 let doc2 = Document {
1007 id: "doc1".into(),
1008 rev: Some(rev_parsed),
1009 deleted: false,
1010 data: serde_json::json!({"name": "Bob"}),
1011 attachments: HashMap::new(),
1012 };
1013 let results = db.bulk_docs(vec![doc2], BulkDocsOptions::new()).await.unwrap();
1014 assert!(results[0].ok);
1015
1016 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1017 assert_eq!(fetched.data["name"], "Bob");
1018 }
1019
1020 #[tokio::test]
1021 async fn conflict_on_wrong_rev() {
1022 let db = new_db().await;
1023
1024 let doc = Document {
1025 id: "doc1".into(),
1026 rev: None,
1027 deleted: false,
1028 data: serde_json::json!({"v": 1}),
1029 attachments: HashMap::new(),
1030 };
1031 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1032
1033 let doc2 = Document {
1035 id: "doc1".into(),
1036 rev: Some(Revision::new(1, "wronghash".into())),
1037 deleted: false,
1038 data: serde_json::json!({"v": 2}),
1039 attachments: HashMap::new(),
1040 };
1041 let results = db.bulk_docs(vec![doc2], BulkDocsOptions::new()).await.unwrap();
1042 assert!(!results[0].ok);
1043 assert_eq!(results[0].error.as_deref(), Some("conflict"));
1044 }
1045
1046 #[tokio::test]
1047 async fn delete_document() {
1048 let db = new_db().await;
1049
1050 let doc = Document {
1051 id: "doc1".into(),
1052 rev: None,
1053 deleted: false,
1054 data: serde_json::json!({"name": "Alice"}),
1055 attachments: HashMap::new(),
1056 };
1057 let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1058 let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1059
1060 let del = Document {
1062 id: "doc1".into(),
1063 rev: Some(rev1),
1064 deleted: true,
1065 data: serde_json::json!({}),
1066 attachments: HashMap::new(),
1067 };
1068 let results = db.bulk_docs(vec![del], BulkDocsOptions::new()).await.unwrap();
1069 assert!(results[0].ok);
1070
1071 let err = db.get("doc1", GetOptions::default()).await;
1073 assert!(err.is_err());
1074
1075 let info = db.info().await.unwrap();
1077 assert_eq!(info.doc_count, 0);
1078 }
1079
1080 #[tokio::test]
1081 async fn all_docs() {
1082 let db = new_db().await;
1083
1084 for name in ["charlie", "alice", "bob"] {
1085 let doc = Document {
1086 id: name.into(),
1087 rev: None,
1088 deleted: false,
1089 data: serde_json::json!({"name": name}),
1090 attachments: HashMap::new(),
1091 };
1092 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1093 }
1094
1095 let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1096 assert_eq!(result.total_rows, 3);
1097 assert_eq!(result.rows[0].id, "alice");
1099 assert_eq!(result.rows[1].id, "bob");
1100 assert_eq!(result.rows[2].id, "charlie");
1101 }
1102
1103 #[tokio::test]
1104 async fn all_docs_with_include_docs() {
1105 let db = new_db().await;
1106
1107 let doc = Document {
1108 id: "doc1".into(),
1109 rev: None,
1110 deleted: false,
1111 data: serde_json::json!({"name": "Alice"}),
1112 attachments: HashMap::new(),
1113 };
1114 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1115
1116 let mut opts = AllDocsOptions::new();
1117 opts.include_docs = true;
1118 let result = db.all_docs(opts).await.unwrap();
1119 assert!(result.rows[0].doc.is_some());
1120 let doc = result.rows[0].doc.as_ref().unwrap();
1121 assert_eq!(doc["name"], "Alice");
1122 assert_eq!(doc["_id"], "doc1");
1123 }
1124
1125 #[tokio::test]
1126 async fn changes_feed() {
1127 let db = new_db().await;
1128
1129 for i in 0..3 {
1130 let doc = Document {
1131 id: format!("doc{}", i),
1132 rev: None,
1133 deleted: false,
1134 data: serde_json::json!({"i": i}),
1135 attachments: HashMap::new(),
1136 };
1137 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1138 }
1139
1140 let changes = db
1141 .changes(ChangesOptions::default())
1142 .await
1143 .unwrap();
1144 assert_eq!(changes.results.len(), 3);
1145 assert_eq!(changes.last_seq, Seq::Num(3));
1146
1147 let changes = db
1149 .changes(ChangesOptions {
1150 since: Seq::Num(2),
1151 ..Default::default()
1152 })
1153 .await
1154 .unwrap();
1155 assert_eq!(changes.results.len(), 1);
1156 assert_eq!(changes.results[0].id, "doc2");
1157 }
1158
1159 #[tokio::test]
1160 async fn revs_diff() {
1161 let db = new_db().await;
1162
1163 let doc = Document {
1164 id: "doc1".into(),
1165 rev: None,
1166 deleted: false,
1167 data: serde_json::json!({"v": 1}),
1168 attachments: HashMap::new(),
1169 };
1170 let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1171 let existing_rev = results[0].rev.clone().unwrap();
1172
1173 let mut revs = HashMap::new();
1174 revs.insert(
1175 "doc1".into(),
1176 vec![existing_rev.clone(), "2-doesnotexist".into()],
1177 );
1178 revs.insert("doc2".into(), vec!["1-abc".into()]);
1179
1180 let diff = db.revs_diff(revs).await.unwrap();
1181
1182 let doc1_diff = diff.results.get("doc1").unwrap();
1184 assert!(!doc1_diff.missing.contains(&existing_rev));
1185 assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1186
1187 let doc2_diff = diff.results.get("doc2").unwrap();
1189 assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1190 }
1191
1192 #[tokio::test]
1193 async fn local_docs() {
1194 let db = new_db().await;
1195
1196 let doc = serde_json::json!({"checkpoint": 42});
1197 db.put_local("repl-123", doc.clone()).await.unwrap();
1198
1199 let fetched = db.get_local("repl-123").await.unwrap();
1200 assert_eq!(fetched["checkpoint"], 42);
1201
1202 db.remove_local("repl-123").await.unwrap();
1203 assert!(db.get_local("repl-123").await.is_err());
1204 }
1205
1206 #[tokio::test]
1207 async fn replication_mode_bulk_docs() {
1208 let db = new_db().await;
1209
1210 let doc = Document {
1212 id: "doc1".into(),
1213 rev: Some(Revision::new(1, "abc123".into())),
1214 deleted: false,
1215 data: serde_json::json!({"name": "replicated"}),
1216 attachments: HashMap::new(),
1217 };
1218
1219 let results = db
1220 .bulk_docs(vec![doc], BulkDocsOptions::replication())
1221 .await
1222 .unwrap();
1223 assert!(results[0].ok);
1224
1225 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1226 assert_eq!(fetched.data["name"], "replicated");
1227 assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1228 }
1229
1230 #[tokio::test]
1231 async fn auto_generate_id() {
1232 let db = new_db().await;
1233
1234 let doc = Document {
1235 id: String::new(),
1236 rev: None,
1237 deleted: false,
1238 data: serde_json::json!({"name": "no-id"}),
1239 attachments: HashMap::new(),
1240 };
1241
1242 let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1243 assert!(results[0].ok);
1244 assert!(!results[0].id.is_empty());
1245 }
1246
1247 #[tokio::test]
1248 async fn destroy_clears_everything() {
1249 let db = new_db().await;
1250
1251 let doc = Document {
1252 id: "doc1".into(),
1253 rev: None,
1254 deleted: false,
1255 data: serde_json::json!({}),
1256 attachments: HashMap::new(),
1257 };
1258 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1259 db.put_local("x", serde_json::json!({})).await.unwrap();
1260
1261 db.destroy().await.unwrap();
1262
1263 let info = db.info().await.unwrap();
1264 assert_eq!(info.doc_count, 0);
1265 assert_eq!(info.update_seq, Seq::Num(0));
1266 }
1267
1268 #[tokio::test]
1269 async fn bulk_get_documents() {
1270 let db = new_db().await;
1271
1272 let doc = Document {
1273 id: "doc1".into(),
1274 rev: None,
1275 deleted: false,
1276 data: serde_json::json!({"name": "test"}),
1277 attachments: HashMap::new(),
1278 };
1279 db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1280
1281 let result = db
1282 .bulk_get(vec![
1283 BulkGetItem { id: "doc1".into(), rev: None },
1284 BulkGetItem { id: "missing".into(), rev: None },
1285 ])
1286 .await
1287 .unwrap();
1288
1289 assert_eq!(result.results.len(), 2);
1290 assert!(result.results[0].docs[0].ok.is_some());
1291 assert!(result.results[1].docs[0].error.is_some());
1292 }
1293}