1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14 NodeOpts, RevPath, RevStatus, RevTree, build_path_from_revs, collect_leaves, find_rev_ancestry,
15 rev_exists,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20#[derive(Debug, Clone)]
25struct StoredDoc {
26 rev_tree: RevTree,
27 rev_data: HashMap<String, serde_json::Value>,
29 rev_deleted: HashMap<String, bool>,
31 seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37 name: String,
38 docs: HashMap<String, StoredDoc>,
40 update_seq: u64,
42 changes: BTreeMap<u64, (String, bool)>,
44 local_docs: HashMap<String, serde_json::Value>,
46 attachments: HashMap<String, Vec<u8>>,
48}
49
50#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53 inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57 pub fn new(name: &str) -> Self {
58 Self {
59 inner: Arc::new(RwLock::new(Inner {
60 name: name.to_string(),
61 docs: HashMap::new(),
62 update_seq: 0,
63 changes: BTreeMap::new(),
64 local_docs: HashMap::new(),
65 attachments: HashMap::new(),
66 })),
67 }
68 }
69}
70
71fn generate_rev_hash(
77 doc_data: &serde_json::Value,
78 deleted: bool,
79 prev_rev: Option<&str>,
80) -> String {
81 let mut hasher = Md5::new();
82 if let Some(prev) = prev_rev {
84 hasher.update(prev.as_bytes());
85 }
86 hasher.update(if deleted { b"1" } else { b"0" });
87 let serialized = serde_json::to_string(doc_data).unwrap_or_default();
88 hasher.update(serialized.as_bytes());
89 format!("{:x}", hasher.finalize())
90}
91
92fn rev_string(pos: u64, hash: &str) -> String {
93 format!("{}-{}", pos, hash)
94}
95
96fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
97 let (pos_str, hash) = rev_str
98 .split_once('-')
99 .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
100 let pos: u64 = pos_str
101 .parse()
102 .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
103 Ok((pos, hash.to_string()))
104}
105
106fn compute_attachment_digest(data: &[u8]) -> String {
107 let mut hasher = Md5::new();
108 hasher.update(data);
109 let hash = hasher.finalize();
110 use base64::Engine;
111 let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
112 format!("md5-{}", b64)
113}
114
115#[async_trait]
120impl Adapter for MemoryAdapter {
121 async fn info(&self) -> Result<DbInfo> {
122 let inner = self.inner.read().await;
123 let doc_count = inner
124 .docs
125 .values()
126 .filter(|d| {
127 !is_deleted(&d.rev_tree)
129 })
130 .count() as u64;
131
132 Ok(DbInfo {
133 db_name: inner.name.clone(),
134 doc_count,
135 update_seq: Seq::Num(inner.update_seq),
136 })
137 }
138
139 async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
140 let inner = self.inner.read().await;
141 let stored = inner
142 .docs
143 .get(id)
144 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
145
146 let target_rev = if let Some(ref rev_str) = opts.rev {
148 rev_str.clone()
149 } else {
150 let winner = winning_rev(&stored.rev_tree)
152 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
153 winner.to_string()
154 };
155
156 let data = stored
158 .rev_data
159 .get(&target_rev)
160 .cloned()
161 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
162
163 let deleted = stored
164 .rev_deleted
165 .get(&target_rev)
166 .copied()
167 .unwrap_or(false);
168
169 if deleted && opts.rev.is_none() {
171 return Err(RouchError::NotFound(id.to_string()));
172 }
173
174 let (pos, hash) = parse_rev(&target_rev)?;
175 let rev = Revision::new(pos, hash);
176
177 let mut doc = Document {
178 id: id.to_string(),
179 rev: Some(rev),
180 deleted,
181 data,
182 attachments: HashMap::new(),
183 };
184
185 if opts.conflicts {
187 let conflicts = collect_conflicts(&stored.rev_tree);
188 if !conflicts.is_empty() {
189 let conflict_list: Vec<serde_json::Value> = conflicts
190 .iter()
191 .map(|c| serde_json::Value::String(c.to_string()))
192 .collect();
193 if let serde_json::Value::Object(ref mut map) = doc.data {
194 map.insert(
195 "_conflicts".to_string(),
196 serde_json::Value::Array(conflict_list),
197 );
198 }
199 }
200 }
201
202 Ok(doc)
203 }
204
205 async fn bulk_docs(
206 &self,
207 docs: Vec<Document>,
208 opts: BulkDocsOptions,
209 ) -> Result<Vec<DocResult>> {
210 let mut inner = self.inner.write().await;
211 let mut results = Vec::with_capacity(docs.len());
212
213 for doc in docs {
214 let result = if opts.new_edits {
215 process_doc_new_edits(&mut inner, doc)
216 } else {
217 process_doc_replication(&mut inner, doc)
218 };
219 results.push(result);
220 }
221
222 Ok(results)
223 }
224
225 async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
226 let inner = self.inner.read().await;
227
228 let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
230 doc_ids.sort();
231
232 if opts.descending {
233 doc_ids.reverse();
234 }
235
236 let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
238 keys.clone()
239 } else if let Some(ref key) = opts.key {
240 vec![key.clone()]
241 } else {
242 doc_ids.iter().map(|k| (*k).clone()).collect()
243 };
244
245 let mut rows = Vec::new();
246
247 for key in &target_keys {
248 if opts.keys.is_none() && opts.key.is_none() {
250 if let Some(ref start) = opts.start_key
251 && ((!opts.descending && key.as_str() < start.as_str())
252 || (opts.descending && key.as_str() > start.as_str()))
253 {
254 continue;
255 }
256 if let Some(ref end) = opts.end_key {
257 if opts.inclusive_end {
258 if (!opts.descending && key.as_str() > end.as_str())
259 || (opts.descending && key.as_str() < end.as_str())
260 {
261 continue;
262 }
263 } else if (!opts.descending && key.as_str() >= end.as_str())
264 || (opts.descending && key.as_str() <= end.as_str())
265 {
266 continue;
267 }
268 }
269 }
270
271 if let Some(stored) = inner.docs.get(key.as_str()) {
272 let winner = match winning_rev(&stored.rev_tree) {
273 Some(w) => w,
274 None => continue,
275 };
276 let deleted = is_deleted(&stored.rev_tree);
277
278 if deleted && opts.keys.is_none() {
280 continue;
281 }
282
283 let doc_json = if opts.include_docs && !deleted {
284 let rev_str = winner.to_string();
285 stored.rev_data.get(&rev_str).map(|data| {
286 let mut obj = match data {
287 serde_json::Value::Object(m) => m.clone(),
288 _ => serde_json::Map::new(),
289 };
290 obj.insert("_id".into(), serde_json::Value::String(key.clone()));
291 obj.insert("_rev".into(), serde_json::Value::String(rev_str));
292 serde_json::Value::Object(obj)
293 })
294 } else {
295 None
296 };
297
298 rows.push(AllDocsRow {
299 id: key.clone(),
300 key: key.clone(),
301 value: AllDocsRowValue {
302 rev: winner.to_string(),
303 deleted: if deleted { Some(true) } else { None },
304 },
305 doc: doc_json,
306 });
307 } else if opts.keys.is_some() {
308 }
312 }
313
314 let total_rows = rows.len() as u64;
316 let skip = opts.skip as usize;
317 if skip > 0 {
318 rows = rows.into_iter().skip(skip).collect();
319 }
320 if let Some(limit) = opts.limit {
321 rows.truncate(limit as usize);
322 }
323
324 Ok(AllDocsResponse {
325 total_rows,
326 offset: opts.skip,
327 rows,
328 })
329 }
330
331 async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
332 let inner = self.inner.read().await;
333
334 let mut results = Vec::new();
335
336 let range = (opts.since.as_num() + 1)..;
338 let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
339 Box::new(
340 inner
341 .changes
342 .range(range)
343 .collect::<Vec<_>>()
344 .into_iter()
345 .rev(),
346 )
347 } else {
348 Box::new(inner.changes.range(range))
349 };
350
351 for (seq, (doc_id, deleted)) in iter {
352 if let Some(ref doc_ids) = opts.doc_ids
354 && !doc_ids.contains(doc_id)
355 {
356 continue;
357 }
358
359 let stored = inner.docs.get(doc_id);
360 let rev_str = stored
361 .and_then(|s| winning_rev(&s.rev_tree))
362 .map(|r| r.to_string())
363 .unwrap_or_default();
364
365 let doc = if opts.include_docs {
366 stored.and_then(|s| {
367 s.rev_data.get(&rev_str).map(|data| {
368 let mut obj = match data {
369 serde_json::Value::Object(m) => m.clone(),
370 _ => serde_json::Map::new(),
371 };
372 obj.insert("_id".into(), serde_json::Value::String(doc_id.clone()));
373 obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
374 if *deleted {
375 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
376 }
377 serde_json::Value::Object(obj)
378 })
379 })
380 } else {
381 None
382 };
383
384 results.push(ChangeEvent {
385 seq: Seq::Num(*seq),
386 id: doc_id.clone(),
387 changes: vec![ChangeRev { rev: rev_str }],
388 deleted: *deleted,
389 doc,
390 });
391
392 if let Some(limit) = opts.limit
393 && results.len() >= limit as usize
394 {
395 break;
396 }
397 }
398
399 let last_seq = results
400 .last()
401 .map(|r| r.seq.clone())
402 .unwrap_or(opts.since.clone());
403
404 Ok(ChangesResponse { results, last_seq })
405 }
406
407 async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
408 let inner = self.inner.read().await;
409 let mut results = HashMap::new();
410
411 for (doc_id, rev_list) in revs {
412 let mut missing = Vec::new();
413 let mut possible_ancestors = Vec::new();
414
415 let stored = inner.docs.get(&doc_id);
416
417 for rev_str in &rev_list {
418 let (pos, hash) = parse_rev(rev_str)?;
419
420 let exists = stored
421 .map(|s| rev_exists(&s.rev_tree, pos, &hash))
422 .unwrap_or(false);
423
424 if !exists {
425 missing.push(rev_str.clone());
426
427 if let Some(stored) = stored {
429 let leaves = collect_leaves(&stored.rev_tree);
430 for leaf in &leaves {
431 if leaf.pos < pos {
432 possible_ancestors.push(leaf.rev_string());
433 }
434 }
435 }
436 }
437 }
438
439 if !missing.is_empty() {
440 results.insert(
441 doc_id,
442 RevsDiffResult {
443 missing,
444 possible_ancestors,
445 },
446 );
447 }
448 }
449
450 Ok(RevsDiffResponse { results })
451 }
452
453 async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
454 let inner = self.inner.read().await;
455 let mut results = Vec::new();
456
457 for item in docs {
458 let mut bulk_docs = Vec::new();
459
460 match inner.docs.get(&item.id) {
461 Some(stored) => {
462 let rev_str = if let Some(ref rev) = item.rev {
463 rev.clone()
464 } else {
465 match winning_rev(&stored.rev_tree) {
466 Some(w) => w.to_string(),
467 None => {
468 bulk_docs.push(BulkGetDoc {
469 ok: None,
470 error: Some(BulkGetError {
471 id: item.id.clone(),
472 rev: item.rev.unwrap_or_default(),
473 error: "not_found".into(),
474 reason: "missing".into(),
475 }),
476 });
477 results.push(BulkGetResult {
478 id: item.id,
479 docs: bulk_docs,
480 });
481 continue;
482 }
483 }
484 };
485
486 if let Some(data) = stored.rev_data.get(&rev_str) {
487 let deleted = stored.rev_deleted.get(&rev_str).copied().unwrap_or(false);
488 let mut obj = match data {
489 serde_json::Value::Object(m) => m.clone(),
490 _ => serde_json::Map::new(),
491 };
492 obj.insert("_id".into(), serde_json::Value::String(item.id.clone()));
493 obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
494 if deleted {
495 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
496 }
497
498 if let Ok((pos, ref hash)) = parse_rev(&rev_str)
500 && let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash)
501 {
502 obj.insert(
503 "_revisions".into(),
504 serde_json::json!({
505 "start": pos,
506 "ids": ancestry
507 }),
508 );
509 }
510
511 bulk_docs.push(BulkGetDoc {
512 ok: Some(serde_json::Value::Object(obj)),
513 error: None,
514 });
515 } else {
516 bulk_docs.push(BulkGetDoc {
517 ok: None,
518 error: Some(BulkGetError {
519 id: item.id.clone(),
520 rev: rev_str,
521 error: "not_found".into(),
522 reason: "missing".into(),
523 }),
524 });
525 }
526 }
527 None => {
528 bulk_docs.push(BulkGetDoc {
529 ok: None,
530 error: Some(BulkGetError {
531 id: item.id.clone(),
532 rev: item.rev.unwrap_or_default(),
533 error: "not_found".into(),
534 reason: "missing".into(),
535 }),
536 });
537 }
538 }
539
540 results.push(BulkGetResult {
541 id: item.id,
542 docs: bulk_docs,
543 });
544 }
545
546 Ok(BulkGetResponse { results })
547 }
548
549 async fn put_attachment(
550 &self,
551 doc_id: &str,
552 att_id: &str,
553 rev: &str,
554 data: Vec<u8>,
555 content_type: &str,
556 ) -> Result<DocResult> {
557 let digest = compute_attachment_digest(&data);
558 let length = data.len() as u64;
559
560 let mut inner = self.inner.write().await;
561
562 inner.attachments.insert(digest.clone(), data);
564
565 let stored = inner
567 .docs
568 .get(doc_id)
569 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
570
571 let winner = winning_rev(&stored.rev_tree)
573 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
574 if winner.to_string() != rev {
575 return Err(RouchError::Conflict);
576 }
577
578 let doc_data = stored
580 .rev_data
581 .get(rev)
582 .cloned()
583 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
584
585 let att_meta = AttachmentMeta {
587 content_type: content_type.to_string(),
588 digest: digest.clone(),
589 length,
590 stub: true,
591 data: None,
592 };
593
594 let doc = Document {
595 id: doc_id.to_string(),
596 rev: Some(winner.clone()),
597 deleted: false,
598 data: doc_data.clone(),
599 attachments: {
600 let mut atts = HashMap::new();
601 atts.insert(att_id.to_string(), att_meta);
602 atts
603 },
604 };
605
606 let result = process_doc_new_edits(&mut inner, doc);
608 Ok(result)
609 }
610
611 async fn get_attachment(
612 &self,
613 doc_id: &str,
614 att_id: &str,
615 opts: GetAttachmentOptions,
616 ) -> Result<Vec<u8>> {
617 let inner = self.inner.read().await;
618
619 let stored = inner
620 .docs
621 .get(doc_id)
622 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
623
624 let rev_str = if let Some(ref rev) = opts.rev {
625 rev.clone()
626 } else {
627 winning_rev(&stored.rev_tree)
628 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
629 .to_string()
630 };
631
632 let _data = stored.rev_data.get(&rev_str);
637
638 Err(RouchError::NotFound(format!(
640 "attachment {}/{}",
641 doc_id, att_id
642 )))
643 }
644
645 async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
646 let inner = self.inner.read().await;
647 inner
648 .local_docs
649 .get(id)
650 .cloned()
651 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
652 }
653
654 async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
655 let mut inner = self.inner.write().await;
656 inner.local_docs.insert(id.to_string(), doc);
657 Ok(())
658 }
659
660 async fn remove_local(&self, id: &str) -> Result<()> {
661 let mut inner = self.inner.write().await;
662 inner
663 .local_docs
664 .remove(id)
665 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
666 Ok(())
667 }
668
669 async fn compact(&self) -> Result<()> {
670 let mut inner = self.inner.write().await;
671
672 for stored in inner.docs.values_mut() {
673 let leaves = collect_leaves(&stored.rev_tree);
674 let leaf_revs: std::collections::HashSet<String> =
675 leaves.iter().map(|l| l.rev_string()).collect();
676
677 stored.rev_data.retain(|k, _| leaf_revs.contains(k));
679 stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
680 }
681
682 Ok(())
683 }
684
685 async fn destroy(&self) -> Result<()> {
686 let mut inner = self.inner.write().await;
687 inner.docs.clear();
688 inner.changes.clear();
689 inner.local_docs.clear();
690 inner.attachments.clear();
691 inner.update_seq = 0;
692 Ok(())
693 }
694}
695
696fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
701 let doc_id = if doc.id.is_empty() {
702 Uuid::new_v4().to_string()
703 } else {
704 doc.id.clone()
705 };
706
707 let existing = inner.docs.get(&doc_id);
708
709 if let Some(stored) = existing {
711 let winner = winning_rev(&stored.rev_tree);
712
713 match (&doc.rev, &winner) {
714 (Some(provided_rev), Some(current_winner)) => {
715 if provided_rev.to_string() != current_winner.to_string() {
716 return DocResult {
717 ok: false,
718 id: doc_id,
719 rev: None,
720 error: Some("conflict".into()),
721 reason: Some("Document update conflict".into()),
722 };
723 }
724 }
725 (None, Some(_)) => {
726 if !is_deleted(&stored.rev_tree) {
728 return DocResult {
729 ok: false,
730 id: doc_id,
731 rev: None,
732 error: Some("conflict".into()),
733 reason: Some("Document update conflict".into()),
734 };
735 }
736 }
738 _ => {}
739 }
740 } else if doc.rev.is_some() {
741 return DocResult {
743 ok: false,
744 id: doc_id,
745 rev: None,
746 error: Some("not_found".into()),
747 reason: Some("missing".into()),
748 };
749 }
750
751 let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
753 let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
754 let new_hash = generate_rev_hash(&doc.data, doc.deleted, prev_rev_str.as_deref());
755 let new_rev_str = rev_string(new_pos, &new_hash);
756
757 let mut rev_hashes = vec![new_hash.clone()];
759 if let Some(ref prev) = doc.rev {
760 rev_hashes.push(prev.hash.clone());
761 }
762
763 let new_path = build_path_from_revs(
764 new_pos,
765 &rev_hashes,
766 NodeOpts {
767 deleted: doc.deleted,
768 },
769 RevStatus::Available,
770 );
771
772 let existing_tree = existing.map(|s| s.rev_tree.clone()).unwrap_or_default();
774
775 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
776
777 inner.update_seq += 1;
779 let seq = inner.update_seq;
780
781 if let Some(existing) = inner.docs.get(&doc_id) {
783 inner.changes.remove(&existing.seq);
784 }
785
786 let stored = inner
788 .docs
789 .entry(doc_id.clone())
790 .or_insert_with(|| StoredDoc {
791 rev_tree: Vec::new(),
792 rev_data: HashMap::new(),
793 rev_deleted: HashMap::new(),
794 seq: 0,
795 });
796
797 stored.rev_tree = merged_tree;
798 stored.rev_data.insert(new_rev_str.clone(), doc.data);
799 stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
800 stored.seq = seq;
801
802 inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
804
805 DocResult {
806 ok: true,
807 id: doc_id,
808 rev: Some(new_rev_str),
809 error: None,
810 reason: None,
811 }
812}
813
814fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
819 let doc_id = doc.id.clone();
820 let rev = match &doc.rev {
821 Some(r) => r.clone(),
822 None => {
823 return DocResult {
824 ok: false,
825 id: doc_id,
826 rev: None,
827 error: Some("bad_request".into()),
828 reason: Some("missing _rev".into()),
829 };
830 }
831 };
832
833 let rev_str = rev.to_string();
834
835 let new_path = if let Some(revisions) = doc.data.get("_revisions") {
837 let start = revisions["start"].as_u64().unwrap_or(rev.pos);
838 let ids: Vec<String> = revisions["ids"]
839 .as_array()
840 .map(|arr| {
841 arr.iter()
842 .filter_map(|v| v.as_str().map(String::from))
843 .collect()
844 })
845 .unwrap_or_else(|| vec![rev.hash.clone()]);
846
847 build_path_from_revs(
848 start,
849 &ids,
850 NodeOpts {
851 deleted: doc.deleted,
852 },
853 RevStatus::Available,
854 )
855 } else {
856 RevPath {
858 pos: rev.pos,
859 tree: rouchdb_core::rev_tree::RevNode {
860 hash: rev.hash.clone(),
861 status: RevStatus::Available,
862 opts: NodeOpts {
863 deleted: doc.deleted,
864 },
865 children: vec![],
866 },
867 }
868 };
869
870 if let serde_json::Value::Object(ref mut map) = doc.data {
872 map.remove("_revisions");
873 }
874
875 let existing_tree = inner
877 .docs
878 .get(&doc_id)
879 .map(|s| s.rev_tree.clone())
880 .unwrap_or_default();
881
882 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
883
884 inner.update_seq += 1;
886 let seq = inner.update_seq;
887
888 if let Some(existing) = inner.docs.get(&doc_id) {
890 inner.changes.remove(&existing.seq);
891 }
892
893 let is_doc_deleted = is_deleted(&merged_tree);
894
895 let stored = inner
896 .docs
897 .entry(doc_id.clone())
898 .or_insert_with(|| StoredDoc {
899 rev_tree: Vec::new(),
900 rev_data: HashMap::new(),
901 rev_deleted: HashMap::new(),
902 seq: 0,
903 });
904
905 stored.rev_tree = merged_tree;
906 stored.rev_data.insert(rev_str.clone(), doc.data);
907 stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
908 stored.seq = seq;
909
910 inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
911
912 DocResult {
913 ok: true,
914 id: doc_id,
915 rev: Some(rev_str),
916 error: None,
917 reason: None,
918 }
919}
920
921#[cfg(test)]
926mod tests {
927 use super::*;
928 use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
929
930 async fn new_db() -> MemoryAdapter {
931 MemoryAdapter::new("test")
932 }
933
934 #[tokio::test]
935 async fn info_empty_db() {
936 let db = new_db().await;
937 let info = db.info().await.unwrap();
938 assert_eq!(info.db_name, "test");
939 assert_eq!(info.doc_count, 0);
940 assert_eq!(info.update_seq, Seq::Num(0));
941 }
942
943 #[tokio::test]
944 async fn put_and_get_document() {
945 let db = new_db().await;
946
947 let doc = Document {
948 id: "doc1".into(),
949 rev: None,
950 deleted: false,
951 data: serde_json::json!({"name": "Alice"}),
952 attachments: HashMap::new(),
953 };
954
955 let results = db
956 .bulk_docs(vec![doc], BulkDocsOptions::new())
957 .await
958 .unwrap();
959 assert!(results[0].ok);
960 assert_eq!(results[0].id, "doc1");
961 assert!(results[0].rev.is_some());
962
963 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
964 assert_eq!(fetched.id, "doc1");
965 assert_eq!(fetched.data["name"], "Alice");
966 assert!(fetched.rev.is_some());
967 }
968
969 #[tokio::test]
970 async fn update_document() {
971 let db = new_db().await;
972
973 let doc = Document {
975 id: "doc1".into(),
976 rev: None,
977 deleted: false,
978 data: serde_json::json!({"name": "Alice"}),
979 attachments: HashMap::new(),
980 };
981 let results = db
982 .bulk_docs(vec![doc], BulkDocsOptions::new())
983 .await
984 .unwrap();
985 let rev1 = results[0].rev.clone().unwrap();
986
987 let rev_parsed: Revision = rev1.parse().unwrap();
989 let doc2 = Document {
990 id: "doc1".into(),
991 rev: Some(rev_parsed),
992 deleted: false,
993 data: serde_json::json!({"name": "Bob"}),
994 attachments: HashMap::new(),
995 };
996 let results = db
997 .bulk_docs(vec![doc2], BulkDocsOptions::new())
998 .await
999 .unwrap();
1000 assert!(results[0].ok);
1001
1002 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1003 assert_eq!(fetched.data["name"], "Bob");
1004 }
1005
1006 #[tokio::test]
1007 async fn conflict_on_wrong_rev() {
1008 let db = new_db().await;
1009
1010 let doc = Document {
1011 id: "doc1".into(),
1012 rev: None,
1013 deleted: false,
1014 data: serde_json::json!({"v": 1}),
1015 attachments: HashMap::new(),
1016 };
1017 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1018 .await
1019 .unwrap();
1020
1021 let doc2 = Document {
1023 id: "doc1".into(),
1024 rev: Some(Revision::new(1, "wronghash".into())),
1025 deleted: false,
1026 data: serde_json::json!({"v": 2}),
1027 attachments: HashMap::new(),
1028 };
1029 let results = db
1030 .bulk_docs(vec![doc2], BulkDocsOptions::new())
1031 .await
1032 .unwrap();
1033 assert!(!results[0].ok);
1034 assert_eq!(results[0].error.as_deref(), Some("conflict"));
1035 }
1036
1037 #[tokio::test]
1038 async fn delete_document() {
1039 let db = new_db().await;
1040
1041 let doc = Document {
1042 id: "doc1".into(),
1043 rev: None,
1044 deleted: false,
1045 data: serde_json::json!({"name": "Alice"}),
1046 attachments: HashMap::new(),
1047 };
1048 let results = db
1049 .bulk_docs(vec![doc], BulkDocsOptions::new())
1050 .await
1051 .unwrap();
1052 let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1053
1054 let del = Document {
1056 id: "doc1".into(),
1057 rev: Some(rev1),
1058 deleted: true,
1059 data: serde_json::json!({}),
1060 attachments: HashMap::new(),
1061 };
1062 let results = db
1063 .bulk_docs(vec![del], BulkDocsOptions::new())
1064 .await
1065 .unwrap();
1066 assert!(results[0].ok);
1067
1068 let err = db.get("doc1", GetOptions::default()).await;
1070 assert!(err.is_err());
1071
1072 let info = db.info().await.unwrap();
1074 assert_eq!(info.doc_count, 0);
1075 }
1076
1077 #[tokio::test]
1078 async fn all_docs() {
1079 let db = new_db().await;
1080
1081 for name in ["charlie", "alice", "bob"] {
1082 let doc = Document {
1083 id: name.into(),
1084 rev: None,
1085 deleted: false,
1086 data: serde_json::json!({"name": name}),
1087 attachments: HashMap::new(),
1088 };
1089 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1090 .await
1091 .unwrap();
1092 }
1093
1094 let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1095 assert_eq!(result.total_rows, 3);
1096 assert_eq!(result.rows[0].id, "alice");
1098 assert_eq!(result.rows[1].id, "bob");
1099 assert_eq!(result.rows[2].id, "charlie");
1100 }
1101
1102 #[tokio::test]
1103 async fn all_docs_with_include_docs() {
1104 let db = new_db().await;
1105
1106 let doc = Document {
1107 id: "doc1".into(),
1108 rev: None,
1109 deleted: false,
1110 data: serde_json::json!({"name": "Alice"}),
1111 attachments: HashMap::new(),
1112 };
1113 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1114 .await
1115 .unwrap();
1116
1117 let mut opts = AllDocsOptions::new();
1118 opts.include_docs = true;
1119 let result = db.all_docs(opts).await.unwrap();
1120 assert!(result.rows[0].doc.is_some());
1121 let doc = result.rows[0].doc.as_ref().unwrap();
1122 assert_eq!(doc["name"], "Alice");
1123 assert_eq!(doc["_id"], "doc1");
1124 }
1125
1126 #[tokio::test]
1127 async fn changes_feed() {
1128 let db = new_db().await;
1129
1130 for i in 0..3 {
1131 let doc = Document {
1132 id: format!("doc{}", i),
1133 rev: None,
1134 deleted: false,
1135 data: serde_json::json!({"i": i}),
1136 attachments: HashMap::new(),
1137 };
1138 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1139 .await
1140 .unwrap();
1141 }
1142
1143 let changes = db.changes(ChangesOptions::default()).await.unwrap();
1144 assert_eq!(changes.results.len(), 3);
1145 assert_eq!(changes.last_seq, Seq::Num(3));
1146
1147 let changes = db
1149 .changes(ChangesOptions {
1150 since: Seq::Num(2),
1151 ..Default::default()
1152 })
1153 .await
1154 .unwrap();
1155 assert_eq!(changes.results.len(), 1);
1156 assert_eq!(changes.results[0].id, "doc2");
1157 }
1158
1159 #[tokio::test]
1160 async fn revs_diff() {
1161 let db = new_db().await;
1162
1163 let doc = Document {
1164 id: "doc1".into(),
1165 rev: None,
1166 deleted: false,
1167 data: serde_json::json!({"v": 1}),
1168 attachments: HashMap::new(),
1169 };
1170 let results = db
1171 .bulk_docs(vec![doc], BulkDocsOptions::new())
1172 .await
1173 .unwrap();
1174 let existing_rev = results[0].rev.clone().unwrap();
1175
1176 let mut revs = HashMap::new();
1177 revs.insert(
1178 "doc1".into(),
1179 vec![existing_rev.clone(), "2-doesnotexist".into()],
1180 );
1181 revs.insert("doc2".into(), vec!["1-abc".into()]);
1182
1183 let diff = db.revs_diff(revs).await.unwrap();
1184
1185 let doc1_diff = diff.results.get("doc1").unwrap();
1187 assert!(!doc1_diff.missing.contains(&existing_rev));
1188 assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1189
1190 let doc2_diff = diff.results.get("doc2").unwrap();
1192 assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1193 }
1194
1195 #[tokio::test]
1196 async fn local_docs() {
1197 let db = new_db().await;
1198
1199 let doc = serde_json::json!({"checkpoint": 42});
1200 db.put_local("repl-123", doc.clone()).await.unwrap();
1201
1202 let fetched = db.get_local("repl-123").await.unwrap();
1203 assert_eq!(fetched["checkpoint"], 42);
1204
1205 db.remove_local("repl-123").await.unwrap();
1206 assert!(db.get_local("repl-123").await.is_err());
1207 }
1208
1209 #[tokio::test]
1210 async fn replication_mode_bulk_docs() {
1211 let db = new_db().await;
1212
1213 let doc = Document {
1215 id: "doc1".into(),
1216 rev: Some(Revision::new(1, "abc123".into())),
1217 deleted: false,
1218 data: serde_json::json!({"name": "replicated"}),
1219 attachments: HashMap::new(),
1220 };
1221
1222 let results = db
1223 .bulk_docs(vec![doc], BulkDocsOptions::replication())
1224 .await
1225 .unwrap();
1226 assert!(results[0].ok);
1227
1228 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1229 assert_eq!(fetched.data["name"], "replicated");
1230 assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1231 }
1232
1233 #[tokio::test]
1234 async fn auto_generate_id() {
1235 let db = new_db().await;
1236
1237 let doc = Document {
1238 id: String::new(),
1239 rev: None,
1240 deleted: false,
1241 data: serde_json::json!({"name": "no-id"}),
1242 attachments: HashMap::new(),
1243 };
1244
1245 let results = db
1246 .bulk_docs(vec![doc], BulkDocsOptions::new())
1247 .await
1248 .unwrap();
1249 assert!(results[0].ok);
1250 assert!(!results[0].id.is_empty());
1251 }
1252
1253 #[tokio::test]
1254 async fn destroy_clears_everything() {
1255 let db = new_db().await;
1256
1257 let doc = Document {
1258 id: "doc1".into(),
1259 rev: None,
1260 deleted: false,
1261 data: serde_json::json!({}),
1262 attachments: HashMap::new(),
1263 };
1264 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1265 .await
1266 .unwrap();
1267 db.put_local("x", serde_json::json!({})).await.unwrap();
1268
1269 db.destroy().await.unwrap();
1270
1271 let info = db.info().await.unwrap();
1272 assert_eq!(info.doc_count, 0);
1273 assert_eq!(info.update_seq, Seq::Num(0));
1274 }
1275
1276 #[tokio::test]
1277 async fn bulk_get_documents() {
1278 let db = new_db().await;
1279
1280 let doc = Document {
1281 id: "doc1".into(),
1282 rev: None,
1283 deleted: false,
1284 data: serde_json::json!({"name": "test"}),
1285 attachments: HashMap::new(),
1286 };
1287 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1288 .await
1289 .unwrap();
1290
1291 let result = db
1292 .bulk_get(vec![
1293 BulkGetItem {
1294 id: "doc1".into(),
1295 rev: None,
1296 },
1297 BulkGetItem {
1298 id: "missing".into(),
1299 rev: None,
1300 },
1301 ])
1302 .await
1303 .unwrap();
1304
1305 assert_eq!(result.results.len(), 2);
1306 assert!(result.results[0].docs[0].ok.is_some());
1307 assert!(result.results[1].docs[0].error.is_some());
1308 }
1309}