1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14 NodeOpts, RevPath, RevStatus, RevTree, build_path_from_revs, collect_leaves, find_rev_ancestry,
15 rev_exists,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20#[derive(Debug, Clone)]
25struct StoredDoc {
26 rev_tree: RevTree,
27 rev_data: HashMap<String, serde_json::Value>,
29 rev_deleted: HashMap<String, bool>,
31 seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37 name: String,
38 docs: HashMap<String, StoredDoc>,
40 update_seq: u64,
42 changes: BTreeMap<u64, (String, bool)>,
44 local_docs: HashMap<String, serde_json::Value>,
46 attachments: HashMap<String, Vec<u8>>,
48}
49
50#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53 inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57 pub fn new(name: &str) -> Self {
58 Self {
59 inner: Arc::new(RwLock::new(Inner {
60 name: name.to_string(),
61 docs: HashMap::new(),
62 update_seq: 0,
63 changes: BTreeMap::new(),
64 local_docs: HashMap::new(),
65 attachments: HashMap::new(),
66 })),
67 }
68 }
69}
70
71fn generate_rev_hash(
77 doc_data: &serde_json::Value,
78 deleted: bool,
79 prev_rev: Option<&str>,
80) -> String {
81 let mut hasher = Md5::new();
82 if let Some(prev) = prev_rev {
84 hasher.update(prev.as_bytes());
85 }
86 hasher.update(if deleted { b"1" } else { b"0" });
87 let serialized = serde_json::to_string(doc_data).unwrap_or_default();
88 hasher.update(serialized.as_bytes());
89 format!("{:x}", hasher.finalize())
90}
91
92fn rev_string(pos: u64, hash: &str) -> String {
93 format!("{}-{}", pos, hash)
94}
95
96fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
97 let (pos_str, hash) = rev_str
98 .split_once('-')
99 .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
100 let pos: u64 = pos_str
101 .parse()
102 .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
103 Ok((pos, hash.to_string()))
104}
105
106fn compute_attachment_digest(data: &[u8]) -> String {
107 let mut hasher = Md5::new();
108 hasher.update(data);
109 let hash = hasher.finalize();
110 use base64::Engine;
111 let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
112 format!("md5-{}", b64)
113}
114
115#[async_trait]
120impl Adapter for MemoryAdapter {
121 async fn info(&self) -> Result<DbInfo> {
122 let inner = self.inner.read().await;
123 let doc_count = inner
124 .docs
125 .values()
126 .filter(|d| {
127 !is_deleted(&d.rev_tree)
129 })
130 .count() as u64;
131
132 Ok(DbInfo {
133 db_name: inner.name.clone(),
134 doc_count,
135 update_seq: Seq::Num(inner.update_seq),
136 })
137 }
138
139 async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
140 let inner = self.inner.read().await;
141 let stored = inner
142 .docs
143 .get(id)
144 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
145
146 let mut target_rev = if let Some(ref rev_str) = opts.rev {
148 rev_str.clone()
149 } else {
150 let winner = winning_rev(&stored.rev_tree)
152 .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
153 winner.to_string()
154 };
155
156 if opts.latest && opts.rev.is_some() {
158 let leaves = collect_leaves(&stored.rev_tree);
159 let is_leaf = leaves.iter().any(|l| l.rev_string() == target_rev);
160 if !is_leaf && let Some(leaf) = leaves.first() {
161 target_rev = leaf.rev_string();
162 }
163 }
164
165 let data = stored
167 .rev_data
168 .get(&target_rev)
169 .cloned()
170 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
171
172 let deleted = stored
173 .rev_deleted
174 .get(&target_rev)
175 .copied()
176 .unwrap_or(false);
177
178 if deleted && opts.rev.is_none() {
180 return Err(RouchError::NotFound(id.to_string()));
181 }
182
183 let (pos, hash) = parse_rev(&target_rev)?;
184 let rev = Revision::new(pos, hash);
185
186 let mut doc = Document {
187 id: id.to_string(),
188 rev: Some(rev),
189 deleted,
190 data,
191 attachments: HashMap::new(),
192 };
193
194 if opts.conflicts {
196 let conflicts = collect_conflicts(&stored.rev_tree);
197 if !conflicts.is_empty() {
198 let conflict_list: Vec<serde_json::Value> = conflicts
199 .iter()
200 .map(|c| serde_json::Value::String(c.to_string()))
201 .collect();
202 if let serde_json::Value::Object(ref mut map) = doc.data {
203 map.insert(
204 "_conflicts".to_string(),
205 serde_json::Value::Array(conflict_list),
206 );
207 }
208 }
209 }
210
211 if opts.revs_info {
213 use rouchdb_core::rev_tree::traverse_rev_tree;
214 let mut revs_info = Vec::new();
215 traverse_rev_tree(&stored.rev_tree, |node_pos, node, _root_pos| {
216 let rev_str = format!("{}-{}", node_pos, node.hash);
217 let status = if node.opts.deleted {
218 "deleted"
219 } else {
220 match node.status {
221 rouchdb_core::rev_tree::RevStatus::Available => "available",
222 rouchdb_core::rev_tree::RevStatus::Missing => "missing",
223 }
224 };
225 revs_info.push(RevInfo {
226 rev: rev_str,
227 status: status.to_string(),
228 });
229 });
230 revs_info.sort_by(|a, b| {
232 let a_pos: u64 = a.rev.split('-').next().unwrap_or("0").parse().unwrap_or(0);
233 let b_pos: u64 = b.rev.split('-').next().unwrap_or("0").parse().unwrap_or(0);
234 b_pos.cmp(&a_pos)
235 });
236 if let serde_json::Value::Object(ref mut map) = doc.data {
237 map.insert(
238 "_revs_info".to_string(),
239 serde_json::to_value(&revs_info).unwrap(),
240 );
241 }
242 }
243
244 Ok(doc)
245 }
246
247 async fn bulk_docs(
248 &self,
249 docs: Vec<Document>,
250 opts: BulkDocsOptions,
251 ) -> Result<Vec<DocResult>> {
252 let mut inner = self.inner.write().await;
253 let mut results = Vec::with_capacity(docs.len());
254
255 for doc in docs {
256 let result = if opts.new_edits {
257 process_doc_new_edits(&mut inner, doc)
258 } else {
259 process_doc_replication(&mut inner, doc)
260 };
261 results.push(result);
262 }
263
264 Ok(results)
265 }
266
267 async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
268 let inner = self.inner.read().await;
269
270 let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
272 doc_ids.sort();
273
274 if opts.descending {
275 doc_ids.reverse();
276 }
277
278 let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
280 keys.clone()
281 } else if let Some(ref key) = opts.key {
282 vec![key.clone()]
283 } else {
284 doc_ids.iter().map(|k| (*k).clone()).collect()
285 };
286
287 let mut rows = Vec::new();
288
289 for key in &target_keys {
290 if opts.keys.is_none() && opts.key.is_none() {
292 if let Some(ref start) = opts.start_key
293 && ((!opts.descending && key.as_str() < start.as_str())
294 || (opts.descending && key.as_str() > start.as_str()))
295 {
296 continue;
297 }
298 if let Some(ref end) = opts.end_key {
299 if opts.inclusive_end {
300 if (!opts.descending && key.as_str() > end.as_str())
301 || (opts.descending && key.as_str() < end.as_str())
302 {
303 continue;
304 }
305 } else if (!opts.descending && key.as_str() >= end.as_str())
306 || (opts.descending && key.as_str() <= end.as_str())
307 {
308 continue;
309 }
310 }
311 }
312
313 if let Some(stored) = inner.docs.get(key.as_str()) {
314 let winner = match winning_rev(&stored.rev_tree) {
315 Some(w) => w,
316 None => continue,
317 };
318 let deleted = is_deleted(&stored.rev_tree);
319
320 if deleted && opts.keys.is_none() {
322 continue;
323 }
324
325 let doc_json = if opts.include_docs && !deleted {
326 let rev_str = winner.to_string();
327 stored.rev_data.get(&rev_str).map(|data| {
328 let mut obj = match data {
329 serde_json::Value::Object(m) => m.clone(),
330 _ => serde_json::Map::new(),
331 };
332 obj.insert("_id".into(), serde_json::Value::String(key.clone()));
333 obj.insert("_rev".into(), serde_json::Value::String(rev_str));
334 if opts.conflicts {
336 let conflicts = collect_conflicts(&stored.rev_tree);
337 if !conflicts.is_empty() {
338 let conflict_list: Vec<serde_json::Value> = conflicts
339 .iter()
340 .map(|c| serde_json::Value::String(c.to_string()))
341 .collect();
342 obj.insert(
343 "_conflicts".to_string(),
344 serde_json::Value::Array(conflict_list),
345 );
346 }
347 }
348 serde_json::Value::Object(obj)
349 })
350 } else {
351 None
352 };
353
354 rows.push(AllDocsRow {
355 id: key.clone(),
356 key: key.clone(),
357 value: AllDocsRowValue {
358 rev: winner.to_string(),
359 deleted: if deleted { Some(true) } else { None },
360 },
361 doc: doc_json,
362 });
363 } else if opts.keys.is_some() {
364 }
368 }
369
370 let total_rows = rows.len() as u64;
372 let skip = opts.skip as usize;
373 if skip > 0 {
374 rows = rows.into_iter().skip(skip).collect();
375 }
376 if let Some(limit) = opts.limit {
377 rows.truncate(limit as usize);
378 }
379
380 let update_seq = if opts.update_seq {
381 Some(Seq::Num(inner.update_seq))
382 } else {
383 None
384 };
385
386 Ok(AllDocsResponse {
387 total_rows,
388 offset: opts.skip,
389 rows,
390 update_seq,
391 })
392 }
393
394 async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
395 let inner = self.inner.read().await;
396
397 let mut results = Vec::new();
398
399 let range = (opts.since.as_num() + 1)..;
401 let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
402 Box::new(
403 inner
404 .changes
405 .range(range)
406 .collect::<Vec<_>>()
407 .into_iter()
408 .rev(),
409 )
410 } else {
411 Box::new(inner.changes.range(range))
412 };
413
414 for (seq, (doc_id, deleted)) in iter {
415 if let Some(ref doc_ids) = opts.doc_ids
417 && !doc_ids.contains(doc_id)
418 {
419 continue;
420 }
421
422 let stored = inner.docs.get(doc_id);
423 let rev_str = stored
424 .and_then(|s| winning_rev(&s.rev_tree))
425 .map(|r| r.to_string())
426 .unwrap_or_default();
427
428 let doc = if opts.include_docs {
429 stored.and_then(|s| {
430 s.rev_data.get(&rev_str).map(|data| {
431 let mut obj = match data {
432 serde_json::Value::Object(m) => m.clone(),
433 _ => serde_json::Map::new(),
434 };
435 obj.insert("_id".into(), serde_json::Value::String(doc_id.clone()));
436 obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
437 if *deleted {
438 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
439 }
440 serde_json::Value::Object(obj)
441 })
442 })
443 } else {
444 None
445 };
446
447 let changes_list = if opts.style == ChangesStyle::AllDocs {
449 if let Some(s) = stored {
450 collect_leaves(&s.rev_tree)
451 .iter()
452 .filter(|l| !s.rev_deleted.get(&l.rev_string()).copied().unwrap_or(false))
453 .map(|l| ChangeRev {
454 rev: l.rev_string(),
455 })
456 .collect()
457 } else {
458 vec![ChangeRev { rev: rev_str }]
459 }
460 } else {
461 vec![ChangeRev { rev: rev_str }]
462 };
463
464 let conflicts = if opts.conflicts {
466 stored
467 .map(|s| {
468 let c = collect_conflicts(&s.rev_tree);
469 if c.is_empty() {
470 None
471 } else {
472 Some(c.iter().map(|r| r.to_string()).collect())
473 }
474 })
475 .unwrap_or(None)
476 } else {
477 None
478 };
479
480 results.push(ChangeEvent {
481 seq: Seq::Num(*seq),
482 id: doc_id.clone(),
483 changes: changes_list,
484 deleted: *deleted,
485 doc,
486 conflicts,
487 });
488
489 if let Some(limit) = opts.limit
490 && results.len() >= limit as usize
491 {
492 break;
493 }
494 }
495
496 let last_seq = results
497 .last()
498 .map(|r| r.seq.clone())
499 .unwrap_or(opts.since.clone());
500
501 Ok(ChangesResponse { results, last_seq })
502 }
503
504 async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
505 let inner = self.inner.read().await;
506 let mut results = HashMap::new();
507
508 for (doc_id, rev_list) in revs {
509 let mut missing = Vec::new();
510 let mut possible_ancestors = Vec::new();
511
512 let stored = inner.docs.get(&doc_id);
513
514 for rev_str in &rev_list {
515 let (pos, hash) = parse_rev(rev_str)?;
516
517 let exists = stored
518 .map(|s| rev_exists(&s.rev_tree, pos, &hash))
519 .unwrap_or(false);
520
521 if !exists {
522 missing.push(rev_str.clone());
523
524 if let Some(stored) = stored {
526 let leaves = collect_leaves(&stored.rev_tree);
527 for leaf in &leaves {
528 if leaf.pos < pos {
529 possible_ancestors.push(leaf.rev_string());
530 }
531 }
532 }
533 }
534 }
535
536 if !missing.is_empty() {
537 results.insert(
538 doc_id,
539 RevsDiffResult {
540 missing,
541 possible_ancestors,
542 },
543 );
544 }
545 }
546
547 Ok(RevsDiffResponse { results })
548 }
549
550 async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
551 let inner = self.inner.read().await;
552 let mut results = Vec::new();
553
554 for item in docs {
555 let mut bulk_docs = Vec::new();
556
557 match inner.docs.get(&item.id) {
558 Some(stored) => {
559 let rev_str = if let Some(ref rev) = item.rev {
560 rev.clone()
561 } else {
562 match winning_rev(&stored.rev_tree) {
563 Some(w) => w.to_string(),
564 None => {
565 bulk_docs.push(BulkGetDoc {
566 ok: None,
567 error: Some(BulkGetError {
568 id: item.id.clone(),
569 rev: item.rev.unwrap_or_default(),
570 error: "not_found".into(),
571 reason: "missing".into(),
572 }),
573 });
574 results.push(BulkGetResult {
575 id: item.id,
576 docs: bulk_docs,
577 });
578 continue;
579 }
580 }
581 };
582
583 if let Some(data) = stored.rev_data.get(&rev_str) {
584 let deleted = stored.rev_deleted.get(&rev_str).copied().unwrap_or(false);
585 let mut obj = match data {
586 serde_json::Value::Object(m) => m.clone(),
587 _ => serde_json::Map::new(),
588 };
589 obj.insert("_id".into(), serde_json::Value::String(item.id.clone()));
590 obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
591 if deleted {
592 obj.insert("_deleted".into(), serde_json::Value::Bool(true));
593 }
594
595 if let Ok((pos, ref hash)) = parse_rev(&rev_str)
597 && let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash)
598 {
599 obj.insert(
600 "_revisions".into(),
601 serde_json::json!({
602 "start": pos,
603 "ids": ancestry
604 }),
605 );
606 }
607
608 bulk_docs.push(BulkGetDoc {
609 ok: Some(serde_json::Value::Object(obj)),
610 error: None,
611 });
612 } else {
613 bulk_docs.push(BulkGetDoc {
614 ok: None,
615 error: Some(BulkGetError {
616 id: item.id.clone(),
617 rev: rev_str,
618 error: "not_found".into(),
619 reason: "missing".into(),
620 }),
621 });
622 }
623 }
624 None => {
625 bulk_docs.push(BulkGetDoc {
626 ok: None,
627 error: Some(BulkGetError {
628 id: item.id.clone(),
629 rev: item.rev.unwrap_or_default(),
630 error: "not_found".into(),
631 reason: "missing".into(),
632 }),
633 });
634 }
635 }
636
637 results.push(BulkGetResult {
638 id: item.id,
639 docs: bulk_docs,
640 });
641 }
642
643 Ok(BulkGetResponse { results })
644 }
645
646 async fn put_attachment(
647 &self,
648 doc_id: &str,
649 att_id: &str,
650 rev: &str,
651 data: Vec<u8>,
652 content_type: &str,
653 ) -> Result<DocResult> {
654 let digest = compute_attachment_digest(&data);
655 let length = data.len() as u64;
656
657 let mut inner = self.inner.write().await;
658
659 inner.attachments.insert(digest.clone(), data);
661
662 let stored = inner
664 .docs
665 .get(doc_id)
666 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
667
668 let winner = winning_rev(&stored.rev_tree)
670 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
671 if winner.to_string() != rev {
672 return Err(RouchError::Conflict);
673 }
674
675 let doc_data = stored
677 .rev_data
678 .get(rev)
679 .cloned()
680 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
681
682 let att_meta = AttachmentMeta {
684 content_type: content_type.to_string(),
685 digest: digest.clone(),
686 length,
687 stub: true,
688 data: None,
689 };
690
691 let doc = Document {
692 id: doc_id.to_string(),
693 rev: Some(winner.clone()),
694 deleted: false,
695 data: doc_data.clone(),
696 attachments: {
697 let mut atts = HashMap::new();
698 atts.insert(att_id.to_string(), att_meta);
699 atts
700 },
701 };
702
703 let result = process_doc_new_edits(&mut inner, doc);
705 Ok(result)
706 }
707
708 async fn get_attachment(
709 &self,
710 doc_id: &str,
711 att_id: &str,
712 opts: GetAttachmentOptions,
713 ) -> Result<Vec<u8>> {
714 let inner = self.inner.read().await;
715
716 let stored = inner
717 .docs
718 .get(doc_id)
719 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
720
721 let rev_str = if let Some(ref rev) = opts.rev {
722 rev.clone()
723 } else {
724 winning_rev(&stored.rev_tree)
725 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
726 .to_string()
727 };
728
729 let _data = stored.rev_data.get(&rev_str);
734
735 Err(RouchError::NotFound(format!(
737 "attachment {}/{}",
738 doc_id, att_id
739 )))
740 }
741
742 async fn remove_attachment(&self, doc_id: &str, att_id: &str, rev: &str) -> Result<DocResult> {
743 let _ = att_id; let mut inner = self.inner.write().await;
745
746 let stored = inner
747 .docs
748 .get(doc_id)
749 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
750
751 let winner = winning_rev(&stored.rev_tree)
752 .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
753 if winner.to_string() != rev {
754 return Err(RouchError::Conflict);
755 }
756
757 let doc_data = stored
758 .rev_data
759 .get(rev)
760 .cloned()
761 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
762
763 let doc = Document {
765 id: doc_id.to_string(),
766 rev: Some(winner.clone()),
767 deleted: false,
768 data: doc_data,
769 attachments: HashMap::new(),
770 };
771
772 let result = process_doc_new_edits(&mut inner, doc);
773 Ok(result)
774 }
775
776 async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
777 let inner = self.inner.read().await;
778 inner
779 .local_docs
780 .get(id)
781 .cloned()
782 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
783 }
784
785 async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
786 let mut inner = self.inner.write().await;
787 inner.local_docs.insert(id.to_string(), doc);
788 Ok(())
789 }
790
791 async fn remove_local(&self, id: &str) -> Result<()> {
792 let mut inner = self.inner.write().await;
793 inner
794 .local_docs
795 .remove(id)
796 .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
797 Ok(())
798 }
799
800 async fn compact(&self) -> Result<()> {
801 let mut inner = self.inner.write().await;
802
803 for stored in inner.docs.values_mut() {
804 let leaves = collect_leaves(&stored.rev_tree);
805 let leaf_revs: std::collections::HashSet<String> =
806 leaves.iter().map(|l| l.rev_string()).collect();
807
808 stored.rev_data.retain(|k, _| leaf_revs.contains(k));
810 stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
811 }
812
813 Ok(())
814 }
815
816 async fn destroy(&self) -> Result<()> {
817 let mut inner = self.inner.write().await;
818 inner.docs.clear();
819 inner.changes.clear();
820 inner.local_docs.clear();
821 inner.attachments.clear();
822 inner.update_seq = 0;
823 Ok(())
824 }
825
826 async fn purge(&self, req: HashMap<String, Vec<String>>) -> Result<PurgeResponse> {
827 let mut inner = self.inner.write().await;
828 let mut purged = HashMap::new();
829 let mut docs_to_remove = Vec::new();
830
831 for (doc_id, revs) in req {
832 let mut purged_revs = Vec::new();
833 if let Some(stored) = inner.docs.get_mut(&doc_id) {
834 for rev_str in &revs {
835 if stored.rev_data.remove(rev_str).is_some() {
836 stored.rev_deleted.remove(rev_str);
837 purged_revs.push(rev_str.clone());
838
839 if let Some((pos, hash)) = rev_str.split_once('-')
843 && let Ok(pos) = pos.parse::<u64>()
844 {
845 prune_leaf_from_tree(&mut stored.rev_tree, pos, hash);
846 }
847 }
848 }
849 stored.rev_tree.retain(|p| !is_tree_empty(&p.tree));
851
852 if stored.rev_data.is_empty() {
853 docs_to_remove.push((doc_id.clone(), stored.seq));
854 }
855 }
856 if !purged_revs.is_empty() {
857 purged.insert(doc_id, purged_revs);
858 }
859 }
860
861 for (doc_id, seq) in docs_to_remove {
862 inner.changes.remove(&seq);
863 inner.docs.remove(&doc_id);
864 }
865
866 Ok(PurgeResponse {
867 purge_seq: Some(inner.update_seq),
868 purged,
869 })
870 }
871
872 async fn get_security(&self) -> Result<SecurityDocument> {
873 let inner = self.inner.read().await;
874 match inner.local_docs.get("_security") {
875 Some(val) => serde_json::from_value(val.clone())
876 .map_err(|e| RouchError::DatabaseError(e.to_string())),
877 None => Ok(SecurityDocument::default()),
878 }
879 }
880
881 async fn put_security(&self, doc: SecurityDocument) -> Result<()> {
882 let mut inner = self.inner.write().await;
883 let val = serde_json::to_value(&doc)?;
884 inner.local_docs.insert("_security".to_string(), val);
885 Ok(())
886 }
887}
888
889fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
894 let doc_id = if doc.id.is_empty() {
895 Uuid::new_v4().to_string()
896 } else {
897 doc.id.clone()
898 };
899
900 let existing = inner.docs.get(&doc_id);
901
902 if let Some(stored) = existing {
904 let winner = winning_rev(&stored.rev_tree);
905
906 match (&doc.rev, &winner) {
907 (Some(provided_rev), Some(current_winner)) => {
908 if provided_rev.to_string() != current_winner.to_string() {
909 return DocResult {
910 ok: false,
911 id: doc_id,
912 rev: None,
913 error: Some("conflict".into()),
914 reason: Some("Document update conflict".into()),
915 };
916 }
917 }
918 (None, Some(_)) => {
919 if !is_deleted(&stored.rev_tree) {
921 return DocResult {
922 ok: false,
923 id: doc_id,
924 rev: None,
925 error: Some("conflict".into()),
926 reason: Some("Document update conflict".into()),
927 };
928 }
929 }
931 _ => {}
932 }
933 } else if doc.rev.is_some() {
934 return DocResult {
936 ok: false,
937 id: doc_id,
938 rev: None,
939 error: Some("not_found".into()),
940 reason: Some("missing".into()),
941 };
942 }
943
944 let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
946 let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
947 let new_hash = generate_rev_hash(&doc.data, doc.deleted, prev_rev_str.as_deref());
948 let new_rev_str = rev_string(new_pos, &new_hash);
949
950 let mut rev_hashes = vec![new_hash.clone()];
952 if let Some(ref prev) = doc.rev {
953 rev_hashes.push(prev.hash.clone());
954 }
955
956 let new_path = build_path_from_revs(
957 new_pos,
958 &rev_hashes,
959 NodeOpts {
960 deleted: doc.deleted,
961 },
962 RevStatus::Available,
963 );
964
965 let existing_tree = existing.map(|s| s.rev_tree.clone()).unwrap_or_default();
967
968 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
969
970 inner.update_seq += 1;
972 let seq = inner.update_seq;
973
974 if let Some(existing) = inner.docs.get(&doc_id) {
976 inner.changes.remove(&existing.seq);
977 }
978
979 let stored = inner
981 .docs
982 .entry(doc_id.clone())
983 .or_insert_with(|| StoredDoc {
984 rev_tree: Vec::new(),
985 rev_data: HashMap::new(),
986 rev_deleted: HashMap::new(),
987 seq: 0,
988 });
989
990 stored.rev_tree = merged_tree;
991 stored.rev_data.insert(new_rev_str.clone(), doc.data);
992 stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
993 stored.seq = seq;
994
995 inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
997
998 DocResult {
999 ok: true,
1000 id: doc_id,
1001 rev: Some(new_rev_str),
1002 error: None,
1003 reason: None,
1004 }
1005}
1006
1007fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
1012 let doc_id = doc.id.clone();
1013 let rev = match &doc.rev {
1014 Some(r) => r.clone(),
1015 None => {
1016 return DocResult {
1017 ok: false,
1018 id: doc_id,
1019 rev: None,
1020 error: Some("bad_request".into()),
1021 reason: Some("missing _rev".into()),
1022 };
1023 }
1024 };
1025
1026 let rev_str = rev.to_string();
1027
1028 let new_path = if let Some(revisions) = doc.data.get("_revisions") {
1030 let start = revisions["start"].as_u64().unwrap_or(rev.pos);
1031 let ids: Vec<String> = revisions["ids"]
1032 .as_array()
1033 .map(|arr| {
1034 arr.iter()
1035 .filter_map(|v| v.as_str().map(String::from))
1036 .collect()
1037 })
1038 .unwrap_or_else(|| vec![rev.hash.clone()]);
1039
1040 build_path_from_revs(
1041 start,
1042 &ids,
1043 NodeOpts {
1044 deleted: doc.deleted,
1045 },
1046 RevStatus::Available,
1047 )
1048 } else {
1049 RevPath {
1051 pos: rev.pos,
1052 tree: rouchdb_core::rev_tree::RevNode {
1053 hash: rev.hash.clone(),
1054 status: RevStatus::Available,
1055 opts: NodeOpts {
1056 deleted: doc.deleted,
1057 },
1058 children: vec![],
1059 },
1060 }
1061 };
1062
1063 if let serde_json::Value::Object(ref mut map) = doc.data {
1065 map.remove("_revisions");
1066 }
1067
1068 let existing_tree = inner
1070 .docs
1071 .get(&doc_id)
1072 .map(|s| s.rev_tree.clone())
1073 .unwrap_or_default();
1074
1075 let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
1076
1077 inner.update_seq += 1;
1079 let seq = inner.update_seq;
1080
1081 if let Some(existing) = inner.docs.get(&doc_id) {
1083 inner.changes.remove(&existing.seq);
1084 }
1085
1086 let is_doc_deleted = is_deleted(&merged_tree);
1087
1088 let stored = inner
1089 .docs
1090 .entry(doc_id.clone())
1091 .or_insert_with(|| StoredDoc {
1092 rev_tree: Vec::new(),
1093 rev_data: HashMap::new(),
1094 rev_deleted: HashMap::new(),
1095 seq: 0,
1096 });
1097
1098 stored.rev_tree = merged_tree;
1099 stored.rev_data.insert(rev_str.clone(), doc.data);
1100 stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
1101 stored.seq = seq;
1102
1103 inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
1104
1105 DocResult {
1106 ok: true,
1107 id: doc_id,
1108 rev: Some(rev_str),
1109 error: None,
1110 reason: None,
1111 }
1112}
1113
1114use rouchdb_core::rev_tree::RevNode;
1119
1120fn prune_leaf_from_tree(tree: &mut RevTree, target_pos: u64, target_hash: &str) {
1123 for path in tree.iter_mut() {
1124 prune_leaf_from_node(&mut path.tree, path.pos, target_pos, target_hash);
1125 }
1126}
1127
1128fn prune_leaf_from_node(node: &mut RevNode, current_pos: u64, target_pos: u64, target_hash: &str) {
1131 node.children.retain(|child| {
1133 let child_pos = current_pos + 1;
1134 !(child_pos == target_pos && child.hash == target_hash && child.children.is_empty())
1135 });
1136
1137 for child in node.children.iter_mut() {
1139 prune_leaf_from_node(child, current_pos + 1, target_pos, target_hash);
1140 }
1141}
1142
1143fn is_tree_empty(node: &RevNode) -> bool {
1146 node.children.is_empty() && node.hash.is_empty()
1147}
1148
1149#[cfg(test)]
1154mod tests {
1155 use super::*;
1156 use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
1157
1158 async fn new_db() -> MemoryAdapter {
1159 MemoryAdapter::new("test")
1160 }
1161
1162 #[tokio::test]
1163 async fn info_empty_db() {
1164 let db = new_db().await;
1165 let info = db.info().await.unwrap();
1166 assert_eq!(info.db_name, "test");
1167 assert_eq!(info.doc_count, 0);
1168 assert_eq!(info.update_seq, Seq::Num(0));
1169 }
1170
1171 #[tokio::test]
1172 async fn put_and_get_document() {
1173 let db = new_db().await;
1174
1175 let doc = Document {
1176 id: "doc1".into(),
1177 rev: None,
1178 deleted: false,
1179 data: serde_json::json!({"name": "Alice"}),
1180 attachments: HashMap::new(),
1181 };
1182
1183 let results = db
1184 .bulk_docs(vec![doc], BulkDocsOptions::new())
1185 .await
1186 .unwrap();
1187 assert!(results[0].ok);
1188 assert_eq!(results[0].id, "doc1");
1189 assert!(results[0].rev.is_some());
1190
1191 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1192 assert_eq!(fetched.id, "doc1");
1193 assert_eq!(fetched.data["name"], "Alice");
1194 assert!(fetched.rev.is_some());
1195 }
1196
1197 #[tokio::test]
1198 async fn update_document() {
1199 let db = new_db().await;
1200
1201 let doc = Document {
1203 id: "doc1".into(),
1204 rev: None,
1205 deleted: false,
1206 data: serde_json::json!({"name": "Alice"}),
1207 attachments: HashMap::new(),
1208 };
1209 let results = db
1210 .bulk_docs(vec![doc], BulkDocsOptions::new())
1211 .await
1212 .unwrap();
1213 let rev1 = results[0].rev.clone().unwrap();
1214
1215 let rev_parsed: Revision = rev1.parse().unwrap();
1217 let doc2 = Document {
1218 id: "doc1".into(),
1219 rev: Some(rev_parsed),
1220 deleted: false,
1221 data: serde_json::json!({"name": "Bob"}),
1222 attachments: HashMap::new(),
1223 };
1224 let results = db
1225 .bulk_docs(vec![doc2], BulkDocsOptions::new())
1226 .await
1227 .unwrap();
1228 assert!(results[0].ok);
1229
1230 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1231 assert_eq!(fetched.data["name"], "Bob");
1232 }
1233
1234 #[tokio::test]
1235 async fn conflict_on_wrong_rev() {
1236 let db = new_db().await;
1237
1238 let doc = Document {
1239 id: "doc1".into(),
1240 rev: None,
1241 deleted: false,
1242 data: serde_json::json!({"v": 1}),
1243 attachments: HashMap::new(),
1244 };
1245 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1246 .await
1247 .unwrap();
1248
1249 let doc2 = Document {
1251 id: "doc1".into(),
1252 rev: Some(Revision::new(1, "wronghash".into())),
1253 deleted: false,
1254 data: serde_json::json!({"v": 2}),
1255 attachments: HashMap::new(),
1256 };
1257 let results = db
1258 .bulk_docs(vec![doc2], BulkDocsOptions::new())
1259 .await
1260 .unwrap();
1261 assert!(!results[0].ok);
1262 assert_eq!(results[0].error.as_deref(), Some("conflict"));
1263 }
1264
1265 #[tokio::test]
1266 async fn delete_document() {
1267 let db = new_db().await;
1268
1269 let doc = Document {
1270 id: "doc1".into(),
1271 rev: None,
1272 deleted: false,
1273 data: serde_json::json!({"name": "Alice"}),
1274 attachments: HashMap::new(),
1275 };
1276 let results = db
1277 .bulk_docs(vec![doc], BulkDocsOptions::new())
1278 .await
1279 .unwrap();
1280 let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1281
1282 let del = Document {
1284 id: "doc1".into(),
1285 rev: Some(rev1),
1286 deleted: true,
1287 data: serde_json::json!({}),
1288 attachments: HashMap::new(),
1289 };
1290 let results = db
1291 .bulk_docs(vec![del], BulkDocsOptions::new())
1292 .await
1293 .unwrap();
1294 assert!(results[0].ok);
1295
1296 let err = db.get("doc1", GetOptions::default()).await;
1298 assert!(err.is_err());
1299
1300 let info = db.info().await.unwrap();
1302 assert_eq!(info.doc_count, 0);
1303 }
1304
1305 #[tokio::test]
1306 async fn all_docs() {
1307 let db = new_db().await;
1308
1309 for name in ["charlie", "alice", "bob"] {
1310 let doc = Document {
1311 id: name.into(),
1312 rev: None,
1313 deleted: false,
1314 data: serde_json::json!({"name": name}),
1315 attachments: HashMap::new(),
1316 };
1317 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1318 .await
1319 .unwrap();
1320 }
1321
1322 let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1323 assert_eq!(result.total_rows, 3);
1324 assert_eq!(result.rows[0].id, "alice");
1326 assert_eq!(result.rows[1].id, "bob");
1327 assert_eq!(result.rows[2].id, "charlie");
1328 }
1329
1330 #[tokio::test]
1331 async fn all_docs_with_include_docs() {
1332 let db = new_db().await;
1333
1334 let doc = Document {
1335 id: "doc1".into(),
1336 rev: None,
1337 deleted: false,
1338 data: serde_json::json!({"name": "Alice"}),
1339 attachments: HashMap::new(),
1340 };
1341 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1342 .await
1343 .unwrap();
1344
1345 let mut opts = AllDocsOptions::new();
1346 opts.include_docs = true;
1347 let result = db.all_docs(opts).await.unwrap();
1348 assert!(result.rows[0].doc.is_some());
1349 let doc = result.rows[0].doc.as_ref().unwrap();
1350 assert_eq!(doc["name"], "Alice");
1351 assert_eq!(doc["_id"], "doc1");
1352 }
1353
1354 #[tokio::test]
1355 async fn changes_feed() {
1356 let db = new_db().await;
1357
1358 for i in 0..3 {
1359 let doc = Document {
1360 id: format!("doc{}", i),
1361 rev: None,
1362 deleted: false,
1363 data: serde_json::json!({"i": i}),
1364 attachments: HashMap::new(),
1365 };
1366 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1367 .await
1368 .unwrap();
1369 }
1370
1371 let changes = db.changes(ChangesOptions::default()).await.unwrap();
1372 assert_eq!(changes.results.len(), 3);
1373 assert_eq!(changes.last_seq, Seq::Num(3));
1374
1375 let changes = db
1377 .changes(ChangesOptions {
1378 since: Seq::Num(2),
1379 ..Default::default()
1380 })
1381 .await
1382 .unwrap();
1383 assert_eq!(changes.results.len(), 1);
1384 assert_eq!(changes.results[0].id, "doc2");
1385 }
1386
1387 #[tokio::test]
1388 async fn revs_diff() {
1389 let db = new_db().await;
1390
1391 let doc = Document {
1392 id: "doc1".into(),
1393 rev: None,
1394 deleted: false,
1395 data: serde_json::json!({"v": 1}),
1396 attachments: HashMap::new(),
1397 };
1398 let results = db
1399 .bulk_docs(vec![doc], BulkDocsOptions::new())
1400 .await
1401 .unwrap();
1402 let existing_rev = results[0].rev.clone().unwrap();
1403
1404 let mut revs = HashMap::new();
1405 revs.insert(
1406 "doc1".into(),
1407 vec![existing_rev.clone(), "2-doesnotexist".into()],
1408 );
1409 revs.insert("doc2".into(), vec!["1-abc".into()]);
1410
1411 let diff = db.revs_diff(revs).await.unwrap();
1412
1413 let doc1_diff = diff.results.get("doc1").unwrap();
1415 assert!(!doc1_diff.missing.contains(&existing_rev));
1416 assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1417
1418 let doc2_diff = diff.results.get("doc2").unwrap();
1420 assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1421 }
1422
1423 #[tokio::test]
1424 async fn local_docs() {
1425 let db = new_db().await;
1426
1427 let doc = serde_json::json!({"checkpoint": 42});
1428 db.put_local("repl-123", doc.clone()).await.unwrap();
1429
1430 let fetched = db.get_local("repl-123").await.unwrap();
1431 assert_eq!(fetched["checkpoint"], 42);
1432
1433 db.remove_local("repl-123").await.unwrap();
1434 assert!(db.get_local("repl-123").await.is_err());
1435 }
1436
1437 #[tokio::test]
1438 async fn replication_mode_bulk_docs() {
1439 let db = new_db().await;
1440
1441 let doc = Document {
1443 id: "doc1".into(),
1444 rev: Some(Revision::new(1, "abc123".into())),
1445 deleted: false,
1446 data: serde_json::json!({"name": "replicated"}),
1447 attachments: HashMap::new(),
1448 };
1449
1450 let results = db
1451 .bulk_docs(vec![doc], BulkDocsOptions::replication())
1452 .await
1453 .unwrap();
1454 assert!(results[0].ok);
1455
1456 let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1457 assert_eq!(fetched.data["name"], "replicated");
1458 assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1459 }
1460
1461 #[tokio::test]
1462 async fn auto_generate_id() {
1463 let db = new_db().await;
1464
1465 let doc = Document {
1466 id: String::new(),
1467 rev: None,
1468 deleted: false,
1469 data: serde_json::json!({"name": "no-id"}),
1470 attachments: HashMap::new(),
1471 };
1472
1473 let results = db
1474 .bulk_docs(vec![doc], BulkDocsOptions::new())
1475 .await
1476 .unwrap();
1477 assert!(results[0].ok);
1478 assert!(!results[0].id.is_empty());
1479 }
1480
1481 #[tokio::test]
1482 async fn destroy_clears_everything() {
1483 let db = new_db().await;
1484
1485 let doc = Document {
1486 id: "doc1".into(),
1487 rev: None,
1488 deleted: false,
1489 data: serde_json::json!({}),
1490 attachments: HashMap::new(),
1491 };
1492 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1493 .await
1494 .unwrap();
1495 db.put_local("x", serde_json::json!({})).await.unwrap();
1496
1497 db.destroy().await.unwrap();
1498
1499 let info = db.info().await.unwrap();
1500 assert_eq!(info.doc_count, 0);
1501 assert_eq!(info.update_seq, Seq::Num(0));
1502 }
1503
1504 #[tokio::test]
1505 async fn bulk_get_documents() {
1506 let db = new_db().await;
1507
1508 let doc = Document {
1509 id: "doc1".into(),
1510 rev: None,
1511 deleted: false,
1512 data: serde_json::json!({"name": "test"}),
1513 attachments: HashMap::new(),
1514 };
1515 db.bulk_docs(vec![doc], BulkDocsOptions::new())
1516 .await
1517 .unwrap();
1518
1519 let result = db
1520 .bulk_get(vec![
1521 BulkGetItem {
1522 id: "doc1".into(),
1523 rev: None,
1524 },
1525 BulkGetItem {
1526 id: "missing".into(),
1527 rev: None,
1528 },
1529 ])
1530 .await
1531 .unwrap();
1532
1533 assert_eq!(result.results.len(), 2);
1534 assert!(result.results[0].docs[0].ok.is_some());
1535 assert!(result.results[1].docs[0].error.is_some());
1536 }
1537}