Skip to main content

rouchdb_adapter_memory/
lib.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14    build_path_from_revs, collect_leaves, find_rev_ancestry, rev_exists, NodeOpts, RevPath,
15    RevStatus, RevTree,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20// ---------------------------------------------------------------------------
21// Internal storage types
22// ---------------------------------------------------------------------------
23
24#[derive(Debug, Clone)]
25struct StoredDoc {
26    rev_tree: RevTree,
27    /// Map from "pos-hash" to the document data at that revision.
28    rev_data: HashMap<String, serde_json::Value>,
29    /// Map from "pos-hash" to the deleted flag at that revision.
30    rev_deleted: HashMap<String, bool>,
31    /// Current sequence number for this document.
32    seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37    name: String,
38    /// Documents keyed by ID.
39    docs: HashMap<String, StoredDoc>,
40    /// Sequence counter (monotonically increasing).
41    update_seq: u64,
42    /// Changes log: seq -> (doc_id, was_deleted).
43    changes: BTreeMap<u64, (String, bool)>,
44    /// Local (non-replicated) documents.
45    local_docs: HashMap<String, serde_json::Value>,
46    /// Attachment data keyed by digest.
47    attachments: HashMap<String, Vec<u8>>,
48}
49
50/// In-memory adapter for RouchDB. All data is held in RAM.
51#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53    inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57    pub fn new(name: &str) -> Self {
58        Self {
59            inner: Arc::new(RwLock::new(Inner {
60                name: name.to_string(),
61                docs: HashMap::new(),
62                update_seq: 0,
63                changes: BTreeMap::new(),
64                local_docs: HashMap::new(),
65                attachments: HashMap::new(),
66            })),
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Helper functions
73// ---------------------------------------------------------------------------
74
75/// Generate a revision hash from the document content.
76fn generate_rev_hash(doc_data: &serde_json::Value, deleted: bool, prev_rev: Option<&str>) -> String {
77    let mut hasher = Md5::new();
78    // Include the previous revision in the hash for determinism
79    if let Some(prev) = prev_rev {
80        hasher.update(prev.as_bytes());
81    }
82    hasher.update(if deleted { b"1" } else { b"0" });
83    let serialized = serde_json::to_string(doc_data).unwrap_or_default();
84    hasher.update(serialized.as_bytes());
85    format!("{:x}", hasher.finalize())
86}
87
88fn rev_string(pos: u64, hash: &str) -> String {
89    format!("{}-{}", pos, hash)
90}
91
92fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
93    let (pos_str, hash) = rev_str
94        .split_once('-')
95        .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
96    let pos: u64 = pos_str
97        .parse()
98        .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
99    Ok((pos, hash.to_string()))
100}
101
102fn compute_attachment_digest(data: &[u8]) -> String {
103    let mut hasher = Md5::new();
104    hasher.update(data);
105    let hash = hasher.finalize();
106    use base64::Engine;
107    let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
108    format!("md5-{}", b64)
109}
110
111// ---------------------------------------------------------------------------
112// Adapter implementation
113// ---------------------------------------------------------------------------
114
115#[async_trait]
116impl Adapter for MemoryAdapter {
117    async fn info(&self) -> Result<DbInfo> {
118        let inner = self.inner.read().await;
119        let doc_count = inner
120            .docs
121            .values()
122            .filter(|d| {
123                // Count only non-deleted documents
124                !is_deleted(&d.rev_tree)
125            })
126            .count() as u64;
127
128        Ok(DbInfo {
129            db_name: inner.name.clone(),
130            doc_count,
131            update_seq: Seq::Num(inner.update_seq),
132        })
133    }
134
135    async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
136        let inner = self.inner.read().await;
137        let stored = inner
138            .docs
139            .get(id)
140            .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
141
142        // Determine which revision to return
143        let target_rev = if let Some(ref rev_str) = opts.rev {
144            rev_str.clone()
145        } else {
146            // Use the winning revision
147            let winner = winning_rev(&stored.rev_tree)
148                .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
149            winner.to_string()
150        };
151
152        // Get the data for this revision
153        let data = stored
154            .rev_data
155            .get(&target_rev)
156            .cloned()
157            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
158
159        let deleted = stored.rev_deleted.get(&target_rev).copied().unwrap_or(false);
160
161        // If the winning rev is deleted and no specific rev was requested, it's "not found"
162        if deleted && opts.rev.is_none() {
163            return Err(RouchError::NotFound(id.to_string()));
164        }
165
166        let (pos, hash) = parse_rev(&target_rev)?;
167        let rev = Revision::new(pos, hash);
168
169        let mut doc = Document {
170            id: id.to_string(),
171            rev: Some(rev),
172            deleted,
173            data,
174            attachments: HashMap::new(),
175        };
176
177        // Add conflicts if requested
178        if opts.conflicts {
179            let conflicts = collect_conflicts(&stored.rev_tree);
180            if !conflicts.is_empty() {
181                let conflict_list: Vec<serde_json::Value> = conflicts
182                    .iter()
183                    .map(|c| serde_json::Value::String(c.to_string()))
184                    .collect();
185                if let serde_json::Value::Object(ref mut map) = doc.data {
186                    map.insert(
187                        "_conflicts".to_string(),
188                        serde_json::Value::Array(conflict_list),
189                    );
190                }
191            }
192        }
193
194        Ok(doc)
195    }
196
197    async fn bulk_docs(
198        &self,
199        docs: Vec<Document>,
200        opts: BulkDocsOptions,
201    ) -> Result<Vec<DocResult>> {
202        let mut inner = self.inner.write().await;
203        let mut results = Vec::with_capacity(docs.len());
204
205        for doc in docs {
206            let result = if opts.new_edits {
207                process_doc_new_edits(&mut inner, doc)
208            } else {
209                process_doc_replication(&mut inner, doc)
210            };
211            results.push(result);
212        }
213
214        Ok(results)
215    }
216
217    async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
218        let inner = self.inner.read().await;
219
220        // Collect all doc IDs sorted
221        let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
222        doc_ids.sort();
223
224        if opts.descending {
225            doc_ids.reverse();
226        }
227
228        // If specific keys are requested, use those instead
229        let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
230            keys.clone()
231        } else if let Some(ref key) = opts.key {
232            vec![key.clone()]
233        } else {
234            doc_ids.iter().map(|k| (*k).clone()).collect()
235        };
236
237        let mut rows = Vec::new();
238
239        for key in &target_keys {
240            // Apply key range filters if no specific keys were given
241            if opts.keys.is_none() && opts.key.is_none() {
242                if let Some(ref start) = opts.start_key {
243                    if (!opts.descending && key.as_str() < start.as_str())
244                        || (opts.descending && key.as_str() > start.as_str())
245                    {
246                        continue;
247                    }
248                }
249                if let Some(ref end) = opts.end_key {
250                    if opts.inclusive_end {
251                        if (!opts.descending && key.as_str() > end.as_str())
252                            || (opts.descending && key.as_str() < end.as_str())
253                        {
254                            continue;
255                        }
256                    } else if (!opts.descending && key.as_str() >= end.as_str())
257                        || (opts.descending && key.as_str() <= end.as_str())
258                    {
259                        continue;
260                    }
261                }
262            }
263
264            if let Some(stored) = inner.docs.get(key.as_str()) {
265                let winner = match winning_rev(&stored.rev_tree) {
266                    Some(w) => w,
267                    None => continue,
268                };
269                let deleted = is_deleted(&stored.rev_tree);
270
271                // Skip deleted docs unless specific keys were requested
272                if deleted && opts.keys.is_none() {
273                    continue;
274                }
275
276                let doc_json = if opts.include_docs && !deleted {
277                    let rev_str = winner.to_string();
278                    stored.rev_data.get(&rev_str).map(|data| {
279                        let mut obj = match data {
280                            serde_json::Value::Object(m) => m.clone(),
281                            _ => serde_json::Map::new(),
282                        };
283                        obj.insert(
284                            "_id".into(),
285                            serde_json::Value::String(key.clone()),
286                        );
287                        obj.insert(
288                            "_rev".into(),
289                            serde_json::Value::String(rev_str),
290                        );
291                        serde_json::Value::Object(obj)
292                    })
293                } else {
294                    None
295                };
296
297                rows.push(AllDocsRow {
298                    id: key.clone(),
299                    key: key.clone(),
300                    value: AllDocsRowValue {
301                        rev: winner.to_string(),
302                        deleted: if deleted { Some(true) } else { None },
303                    },
304                    doc: doc_json,
305                });
306            } else if opts.keys.is_some() {
307                // For specific key lookups, include missing keys as errors
308                // (CouchDB returns {"key":"x","error":"not_found"})
309                // We skip these for now — they don't fit our row struct cleanly
310            }
311        }
312
313        // Apply skip and limit
314        let total_rows = rows.len() as u64;
315        let skip = opts.skip as usize;
316        if skip > 0 {
317            rows = rows.into_iter().skip(skip).collect();
318        }
319        if let Some(limit) = opts.limit {
320            rows.truncate(limit as usize);
321        }
322
323        Ok(AllDocsResponse {
324            total_rows,
325            offset: opts.skip,
326            rows,
327        })
328    }
329
330    async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
331        let inner = self.inner.read().await;
332
333        let mut results = Vec::new();
334
335        // Iterate changes after `since`
336        let range = (opts.since.as_num() + 1)..;
337        let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
338            Box::new(
339                inner
340                    .changes
341                    .range(range)
342                    .collect::<Vec<_>>()
343                    .into_iter()
344                    .rev(),
345            )
346        } else {
347            Box::new(inner.changes.range(range))
348        };
349
350        for (seq, (doc_id, deleted)) in iter {
351            // Filter by doc_ids if specified
352            if let Some(ref doc_ids) = opts.doc_ids {
353                if !doc_ids.contains(doc_id) {
354                    continue;
355                }
356            }
357
358            let stored = inner.docs.get(doc_id);
359            let rev_str = stored
360                .and_then(|s| winning_rev(&s.rev_tree))
361                .map(|r| r.to_string())
362                .unwrap_or_default();
363
364            let doc = if opts.include_docs {
365                stored.and_then(|s| {
366                    s.rev_data.get(&rev_str).map(|data| {
367                        let mut obj = match data {
368                            serde_json::Value::Object(m) => m.clone(),
369                            _ => serde_json::Map::new(),
370                        };
371                        obj.insert(
372                            "_id".into(),
373                            serde_json::Value::String(doc_id.clone()),
374                        );
375                        obj.insert(
376                            "_rev".into(),
377                            serde_json::Value::String(rev_str.clone()),
378                        );
379                        if *deleted {
380                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
381                        }
382                        serde_json::Value::Object(obj)
383                    })
384                })
385            } else {
386                None
387            };
388
389            results.push(ChangeEvent {
390                seq: Seq::Num(*seq),
391                id: doc_id.clone(),
392                changes: vec![ChangeRev {
393                    rev: rev_str,
394                }],
395                deleted: *deleted,
396                doc,
397            });
398
399            if let Some(limit) = opts.limit {
400                if results.len() >= limit as usize {
401                    break;
402                }
403            }
404        }
405
406        let last_seq = results.last().map(|r| r.seq.clone()).unwrap_or(opts.since.clone());
407
408        Ok(ChangesResponse {
409            results,
410            last_seq,
411        })
412    }
413
414    async fn revs_diff(
415        &self,
416        revs: HashMap<String, Vec<String>>,
417    ) -> Result<RevsDiffResponse> {
418        let inner = self.inner.read().await;
419        let mut results = HashMap::new();
420
421        for (doc_id, rev_list) in revs {
422            let mut missing = Vec::new();
423            let mut possible_ancestors = Vec::new();
424
425            let stored = inner.docs.get(&doc_id);
426
427            for rev_str in &rev_list {
428                let (pos, hash) = parse_rev(rev_str)?;
429
430                let exists = stored
431                    .map(|s| rev_exists(&s.rev_tree, pos, &hash))
432                    .unwrap_or(false);
433
434                if !exists {
435                    missing.push(rev_str.clone());
436
437                    // Find possible ancestors (existing revisions with lower pos)
438                    if let Some(stored) = stored {
439                        let leaves = collect_leaves(&stored.rev_tree);
440                        for leaf in &leaves {
441                            if leaf.pos < pos {
442                                possible_ancestors.push(leaf.rev_string());
443                            }
444                        }
445                    }
446                }
447            }
448
449            if !missing.is_empty() {
450                results.insert(
451                    doc_id,
452                    RevsDiffResult {
453                        missing,
454                        possible_ancestors,
455                    },
456                );
457            }
458        }
459
460        Ok(RevsDiffResponse { results })
461    }
462
463    async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
464        let inner = self.inner.read().await;
465        let mut results = Vec::new();
466
467        for item in docs {
468            let mut bulk_docs = Vec::new();
469
470            match inner.docs.get(&item.id) {
471                Some(stored) => {
472                    let rev_str = if let Some(ref rev) = item.rev {
473                        rev.clone()
474                    } else {
475                        match winning_rev(&stored.rev_tree) {
476                            Some(w) => w.to_string(),
477                            None => {
478                                bulk_docs.push(BulkGetDoc {
479                                    ok: None,
480                                    error: Some(BulkGetError {
481                                        id: item.id.clone(),
482                                        rev: item.rev.unwrap_or_default(),
483                                        error: "not_found".into(),
484                                        reason: "missing".into(),
485                                    }),
486                                });
487                                results.push(BulkGetResult {
488                                    id: item.id,
489                                    docs: bulk_docs,
490                                });
491                                continue;
492                            }
493                        }
494                    };
495
496                    if let Some(data) = stored.rev_data.get(&rev_str) {
497                        let deleted = stored
498                            .rev_deleted
499                            .get(&rev_str)
500                            .copied()
501                            .unwrap_or(false);
502                        let mut obj = match data {
503                            serde_json::Value::Object(m) => m.clone(),
504                            _ => serde_json::Map::new(),
505                        };
506                        obj.insert(
507                            "_id".into(),
508                            serde_json::Value::String(item.id.clone()),
509                        );
510                        obj.insert(
511                            "_rev".into(),
512                            serde_json::Value::String(rev_str.clone()),
513                        );
514                        if deleted {
515                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
516                        }
517
518                        // Include _revisions for replication
519                        if let Ok((pos, ref hash)) = parse_rev(&rev_str) {
520                            if let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash) {
521                                obj.insert(
522                                    "_revisions".into(),
523                                    serde_json::json!({
524                                        "start": pos,
525                                        "ids": ancestry
526                                    }),
527                                );
528                            }
529                        }
530
531                        bulk_docs.push(BulkGetDoc {
532                            ok: Some(serde_json::Value::Object(obj)),
533                            error: None,
534                        });
535                    } else {
536                        bulk_docs.push(BulkGetDoc {
537                            ok: None,
538                            error: Some(BulkGetError {
539                                id: item.id.clone(),
540                                rev: rev_str,
541                                error: "not_found".into(),
542                                reason: "missing".into(),
543                            }),
544                        });
545                    }
546                }
547                None => {
548                    bulk_docs.push(BulkGetDoc {
549                        ok: None,
550                        error: Some(BulkGetError {
551                            id: item.id.clone(),
552                            rev: item.rev.unwrap_or_default(),
553                            error: "not_found".into(),
554                            reason: "missing".into(),
555                        }),
556                    });
557                }
558            }
559
560            results.push(BulkGetResult {
561                id: item.id,
562                docs: bulk_docs,
563            });
564        }
565
566        Ok(BulkGetResponse { results })
567    }
568
569    async fn put_attachment(
570        &self,
571        doc_id: &str,
572        att_id: &str,
573        rev: &str,
574        data: Vec<u8>,
575        content_type: &str,
576    ) -> Result<DocResult> {
577        let digest = compute_attachment_digest(&data);
578        let length = data.len() as u64;
579
580        let mut inner = self.inner.write().await;
581
582        // Store the attachment data
583        inner.attachments.insert(digest.clone(), data);
584
585        // Get or create the document
586        let stored = inner
587            .docs
588            .get(doc_id)
589            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
590
591        // Verify the rev matches
592        let winner = winning_rev(&stored.rev_tree)
593            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
594        if winner.to_string() != rev {
595            return Err(RouchError::Conflict);
596        }
597
598        // Get current doc data and add attachment
599        let doc_data = stored
600            .rev_data
601            .get(rev)
602            .cloned()
603            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
604
605        // Build updated document with attachment metadata
606        let att_meta = AttachmentMeta {
607            content_type: content_type.to_string(),
608            digest: digest.clone(),
609            length,
610            stub: true,
611            data: None,
612        };
613
614        let doc = Document {
615            id: doc_id.to_string(),
616            rev: Some(winner.clone()),
617            deleted: false,
618            data: doc_data.clone(),
619            attachments: {
620                let mut atts = HashMap::new();
621                atts.insert(att_id.to_string(), att_meta);
622                atts
623            },
624        };
625
626        // Process as a normal edit
627        let result = process_doc_new_edits(&mut inner, doc);
628        Ok(result)
629    }
630
631    async fn get_attachment(
632        &self,
633        doc_id: &str,
634        att_id: &str,
635        opts: GetAttachmentOptions,
636    ) -> Result<Vec<u8>> {
637        let inner = self.inner.read().await;
638
639        let stored = inner
640            .docs
641            .get(doc_id)
642            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
643
644        let rev_str = if let Some(ref rev) = opts.rev {
645            rev.clone()
646        } else {
647            winning_rev(&stored.rev_tree)
648                .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
649                .to_string()
650        };
651
652        // Look for attachment metadata in the doc data
653        // For now, look up by digest in our attachment store
654        // We'd need to track which attachments belong to which doc/rev
655        // For simplicity, search through our attachment map
656        let _data = stored.rev_data.get(&rev_str);
657
658        // TODO: proper attachment tracking per revision
659        Err(RouchError::NotFound(format!(
660            "attachment {}/{}",
661            doc_id, att_id
662        )))
663    }
664
665    async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
666        let inner = self.inner.read().await;
667        inner
668            .local_docs
669            .get(id)
670            .cloned()
671            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
672    }
673
674    async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
675        let mut inner = self.inner.write().await;
676        inner.local_docs.insert(id.to_string(), doc);
677        Ok(())
678    }
679
680    async fn remove_local(&self, id: &str) -> Result<()> {
681        let mut inner = self.inner.write().await;
682        inner
683            .local_docs
684            .remove(id)
685            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
686        Ok(())
687    }
688
689    async fn compact(&self) -> Result<()> {
690        let mut inner = self.inner.write().await;
691
692        for stored in inner.docs.values_mut() {
693            let leaves = collect_leaves(&stored.rev_tree);
694            let leaf_revs: std::collections::HashSet<String> =
695                leaves.iter().map(|l| l.rev_string()).collect();
696
697            // Remove data for non-leaf revisions
698            stored.rev_data.retain(|k, _| leaf_revs.contains(k));
699            stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
700        }
701
702        Ok(())
703    }
704
705    async fn destroy(&self) -> Result<()> {
706        let mut inner = self.inner.write().await;
707        inner.docs.clear();
708        inner.changes.clear();
709        inner.local_docs.clear();
710        inner.attachments.clear();
711        inner.update_seq = 0;
712        Ok(())
713    }
714}
715
716// ---------------------------------------------------------------------------
717// Document processing (new_edits = true)
718// ---------------------------------------------------------------------------
719
720fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
721    let doc_id = if doc.id.is_empty() {
722        Uuid::new_v4().to_string()
723    } else {
724        doc.id.clone()
725    };
726
727    let existing = inner.docs.get(&doc_id);
728
729    // Check for conflicts: if the doc has a _rev, it must match the winning rev
730    if let Some(stored) = existing {
731        let winner = winning_rev(&stored.rev_tree);
732
733        match (&doc.rev, &winner) {
734            (Some(provided_rev), Some(current_winner)) => {
735                if provided_rev.to_string() != current_winner.to_string() {
736                    return DocResult {
737                        ok: false,
738                        id: doc_id,
739                        rev: None,
740                        error: Some("conflict".into()),
741                        reason: Some("Document update conflict".into()),
742                    };
743                }
744            }
745            (None, Some(_)) => {
746                // Trying to create a doc that already exists (and isn't deleted)
747                if !is_deleted(&stored.rev_tree) {
748                    return DocResult {
749                        ok: false,
750                        id: doc_id,
751                        rev: None,
752                        error: Some("conflict".into()),
753                        reason: Some("Document update conflict".into()),
754                    };
755                }
756                // If winner is deleted, allow creating a new doc at the same ID
757            }
758            _ => {}
759        }
760    } else if doc.rev.is_some() {
761        // Updating a doc that doesn't exist
762        return DocResult {
763            ok: false,
764            id: doc_id,
765            rev: None,
766            error: Some("not_found".into()),
767            reason: Some("missing".into()),
768        };
769    }
770
771    // Generate new revision
772    let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
773    let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
774    let new_hash = generate_rev_hash(
775        &doc.data,
776        doc.deleted,
777        prev_rev_str.as_deref(),
778    );
779    let new_rev_str = rev_string(new_pos, &new_hash);
780
781    // Build the revision path for merging
782    let mut rev_hashes = vec![new_hash.clone()];
783    if let Some(ref prev) = doc.rev {
784        rev_hashes.push(prev.hash.clone());
785    }
786
787    let new_path = build_path_from_revs(
788        new_pos,
789        &rev_hashes,
790        NodeOpts {
791            deleted: doc.deleted,
792        },
793        RevStatus::Available,
794    );
795
796    // Merge into existing tree or create new one
797    let existing_tree = existing
798        .map(|s| s.rev_tree.clone())
799        .unwrap_or_default();
800
801    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
802
803    // Update sequence
804    inner.update_seq += 1;
805    let seq = inner.update_seq;
806
807    // Remove old change entry for this doc (each doc has only one entry in changes)
808    if let Some(existing) = inner.docs.get(&doc_id) {
809        inner.changes.remove(&existing.seq);
810    }
811
812    // Store or update the document
813    let stored = inner.docs.entry(doc_id.clone()).or_insert_with(|| StoredDoc {
814        rev_tree: Vec::new(),
815        rev_data: HashMap::new(),
816        rev_deleted: HashMap::new(),
817        seq: 0,
818    });
819
820    stored.rev_tree = merged_tree;
821    stored.rev_data.insert(new_rev_str.clone(), doc.data);
822    stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
823    stored.seq = seq;
824
825    // Record in changes
826    inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
827
828    DocResult {
829        ok: true,
830        id: doc_id,
831        rev: Some(new_rev_str),
832        error: None,
833        reason: None,
834    }
835}
836
837// ---------------------------------------------------------------------------
838// Document processing (new_edits = false, replication mode)
839// ---------------------------------------------------------------------------
840
841fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
842    let doc_id = doc.id.clone();
843    let rev = match &doc.rev {
844        Some(r) => r.clone(),
845        None => {
846            return DocResult {
847                ok: false,
848                id: doc_id,
849                rev: None,
850                error: Some("bad_request".into()),
851                reason: Some("missing _rev".into()),
852            };
853        }
854    };
855
856    let rev_str = rev.to_string();
857
858    // Build the revision path — use _revisions ancestry if available
859    let new_path = if let Some(revisions) = doc.data.get("_revisions") {
860        let start = revisions["start"].as_u64().unwrap_or(rev.pos);
861        let ids: Vec<String> = revisions["ids"]
862            .as_array()
863            .map(|arr| {
864                arr.iter()
865                    .filter_map(|v| v.as_str().map(String::from))
866                    .collect()
867            })
868            .unwrap_or_else(|| vec![rev.hash.clone()]);
869
870        build_path_from_revs(
871            start,
872            &ids,
873            NodeOpts {
874                deleted: doc.deleted,
875            },
876            RevStatus::Available,
877        )
878    } else {
879        // Fallback: single-node path (no ancestry available)
880        RevPath {
881            pos: rev.pos,
882            tree: rouchdb_core::rev_tree::RevNode {
883                hash: rev.hash.clone(),
884                status: RevStatus::Available,
885                opts: NodeOpts {
886                    deleted: doc.deleted,
887                },
888                children: vec![],
889            },
890        }
891    };
892
893    // Strip _revisions from data before storing
894    if let serde_json::Value::Object(ref mut map) = doc.data {
895        map.remove("_revisions");
896    }
897
898    // Merge into existing tree
899    let existing_tree = inner
900        .docs
901        .get(&doc_id)
902        .map(|s| s.rev_tree.clone())
903        .unwrap_or_default();
904
905    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
906
907    // Update sequence
908    inner.update_seq += 1;
909    let seq = inner.update_seq;
910
911    // Remove old change entry
912    if let Some(existing) = inner.docs.get(&doc_id) {
913        inner.changes.remove(&existing.seq);
914    }
915
916    let is_doc_deleted = is_deleted(&merged_tree);
917
918    let stored = inner.docs.entry(doc_id.clone()).or_insert_with(|| StoredDoc {
919        rev_tree: Vec::new(),
920        rev_data: HashMap::new(),
921        rev_deleted: HashMap::new(),
922        seq: 0,
923    });
924
925    stored.rev_tree = merged_tree;
926    stored.rev_data.insert(rev_str.clone(), doc.data);
927    stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
928    stored.seq = seq;
929
930    inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
931
932    DocResult {
933        ok: true,
934        id: doc_id,
935        rev: Some(rev_str),
936        error: None,
937        reason: None,
938    }
939}
940
941// ---------------------------------------------------------------------------
942// Tests
943// ---------------------------------------------------------------------------
944
945#[cfg(test)]
946mod tests {
947    use super::*;
948    use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
949
950    async fn new_db() -> MemoryAdapter {
951        MemoryAdapter::new("test")
952    }
953
954    #[tokio::test]
955    async fn info_empty_db() {
956        let db = new_db().await;
957        let info = db.info().await.unwrap();
958        assert_eq!(info.db_name, "test");
959        assert_eq!(info.doc_count, 0);
960        assert_eq!(info.update_seq, Seq::Num(0));
961    }
962
963    #[tokio::test]
964    async fn put_and_get_document() {
965        let db = new_db().await;
966
967        let doc = Document {
968            id: "doc1".into(),
969            rev: None,
970            deleted: false,
971            data: serde_json::json!({"name": "Alice"}),
972            attachments: HashMap::new(),
973        };
974
975        let results = db
976            .bulk_docs(vec![doc], BulkDocsOptions::new())
977            .await
978            .unwrap();
979        assert!(results[0].ok);
980        assert_eq!(results[0].id, "doc1");
981        assert!(results[0].rev.is_some());
982
983        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
984        assert_eq!(fetched.id, "doc1");
985        assert_eq!(fetched.data["name"], "Alice");
986        assert!(fetched.rev.is_some());
987    }
988
989    #[tokio::test]
990    async fn update_document() {
991        let db = new_db().await;
992
993        // Create
994        let doc = Document {
995            id: "doc1".into(),
996            rev: None,
997            deleted: false,
998            data: serde_json::json!({"name": "Alice"}),
999            attachments: HashMap::new(),
1000        };
1001        let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1002        let rev1 = results[0].rev.clone().unwrap();
1003
1004        // Update
1005        let rev_parsed: Revision = rev1.parse().unwrap();
1006        let doc2 = Document {
1007            id: "doc1".into(),
1008            rev: Some(rev_parsed),
1009            deleted: false,
1010            data: serde_json::json!({"name": "Bob"}),
1011            attachments: HashMap::new(),
1012        };
1013        let results = db.bulk_docs(vec![doc2], BulkDocsOptions::new()).await.unwrap();
1014        assert!(results[0].ok);
1015
1016        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1017        assert_eq!(fetched.data["name"], "Bob");
1018    }
1019
1020    #[tokio::test]
1021    async fn conflict_on_wrong_rev() {
1022        let db = new_db().await;
1023
1024        let doc = Document {
1025            id: "doc1".into(),
1026            rev: None,
1027            deleted: false,
1028            data: serde_json::json!({"v": 1}),
1029            attachments: HashMap::new(),
1030        };
1031        db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1032
1033        // Try updating with wrong rev
1034        let doc2 = Document {
1035            id: "doc1".into(),
1036            rev: Some(Revision::new(1, "wronghash".into())),
1037            deleted: false,
1038            data: serde_json::json!({"v": 2}),
1039            attachments: HashMap::new(),
1040        };
1041        let results = db.bulk_docs(vec![doc2], BulkDocsOptions::new()).await.unwrap();
1042        assert!(!results[0].ok);
1043        assert_eq!(results[0].error.as_deref(), Some("conflict"));
1044    }
1045
1046    #[tokio::test]
1047    async fn delete_document() {
1048        let db = new_db().await;
1049
1050        let doc = Document {
1051            id: "doc1".into(),
1052            rev: None,
1053            deleted: false,
1054            data: serde_json::json!({"name": "Alice"}),
1055            attachments: HashMap::new(),
1056        };
1057        let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1058        let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1059
1060        // Delete
1061        let del = Document {
1062            id: "doc1".into(),
1063            rev: Some(rev1),
1064            deleted: true,
1065            data: serde_json::json!({}),
1066            attachments: HashMap::new(),
1067        };
1068        let results = db.bulk_docs(vec![del], BulkDocsOptions::new()).await.unwrap();
1069        assert!(results[0].ok);
1070
1071        // Get should fail
1072        let err = db.get("doc1", GetOptions::default()).await;
1073        assert!(err.is_err());
1074
1075        // Info should show 0 docs
1076        let info = db.info().await.unwrap();
1077        assert_eq!(info.doc_count, 0);
1078    }
1079
1080    #[tokio::test]
1081    async fn all_docs() {
1082        let db = new_db().await;
1083
1084        for name in ["charlie", "alice", "bob"] {
1085            let doc = Document {
1086                id: name.into(),
1087                rev: None,
1088                deleted: false,
1089                data: serde_json::json!({"name": name}),
1090                attachments: HashMap::new(),
1091            };
1092            db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1093        }
1094
1095        let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1096        assert_eq!(result.total_rows, 3);
1097        // Should be sorted alphabetically
1098        assert_eq!(result.rows[0].id, "alice");
1099        assert_eq!(result.rows[1].id, "bob");
1100        assert_eq!(result.rows[2].id, "charlie");
1101    }
1102
1103    #[tokio::test]
1104    async fn all_docs_with_include_docs() {
1105        let db = new_db().await;
1106
1107        let doc = Document {
1108            id: "doc1".into(),
1109            rev: None,
1110            deleted: false,
1111            data: serde_json::json!({"name": "Alice"}),
1112            attachments: HashMap::new(),
1113        };
1114        db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1115
1116        let mut opts = AllDocsOptions::new();
1117        opts.include_docs = true;
1118        let result = db.all_docs(opts).await.unwrap();
1119        assert!(result.rows[0].doc.is_some());
1120        let doc = result.rows[0].doc.as_ref().unwrap();
1121        assert_eq!(doc["name"], "Alice");
1122        assert_eq!(doc["_id"], "doc1");
1123    }
1124
1125    #[tokio::test]
1126    async fn changes_feed() {
1127        let db = new_db().await;
1128
1129        for i in 0..3 {
1130            let doc = Document {
1131                id: format!("doc{}", i),
1132                rev: None,
1133                deleted: false,
1134                data: serde_json::json!({"i": i}),
1135                attachments: HashMap::new(),
1136            };
1137            db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1138        }
1139
1140        let changes = db
1141            .changes(ChangesOptions::default())
1142            .await
1143            .unwrap();
1144        assert_eq!(changes.results.len(), 3);
1145        assert_eq!(changes.last_seq, Seq::Num(3));
1146
1147        // Changes since seq 2
1148        let changes = db
1149            .changes(ChangesOptions {
1150                since: Seq::Num(2),
1151                ..Default::default()
1152            })
1153            .await
1154            .unwrap();
1155        assert_eq!(changes.results.len(), 1);
1156        assert_eq!(changes.results[0].id, "doc2");
1157    }
1158
1159    #[tokio::test]
1160    async fn revs_diff() {
1161        let db = new_db().await;
1162
1163        let doc = Document {
1164            id: "doc1".into(),
1165            rev: None,
1166            deleted: false,
1167            data: serde_json::json!({"v": 1}),
1168            attachments: HashMap::new(),
1169        };
1170        let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1171        let existing_rev = results[0].rev.clone().unwrap();
1172
1173        let mut revs = HashMap::new();
1174        revs.insert(
1175            "doc1".into(),
1176            vec![existing_rev.clone(), "2-doesnotexist".into()],
1177        );
1178        revs.insert("doc2".into(), vec!["1-abc".into()]);
1179
1180        let diff = db.revs_diff(revs).await.unwrap();
1181
1182        // doc1: existing_rev should not be missing, 2-doesnotexist should be
1183        let doc1_diff = diff.results.get("doc1").unwrap();
1184        assert!(!doc1_diff.missing.contains(&existing_rev));
1185        assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1186
1187        // doc2: completely missing
1188        let doc2_diff = diff.results.get("doc2").unwrap();
1189        assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1190    }
1191
1192    #[tokio::test]
1193    async fn local_docs() {
1194        let db = new_db().await;
1195
1196        let doc = serde_json::json!({"checkpoint": 42});
1197        db.put_local("repl-123", doc.clone()).await.unwrap();
1198
1199        let fetched = db.get_local("repl-123").await.unwrap();
1200        assert_eq!(fetched["checkpoint"], 42);
1201
1202        db.remove_local("repl-123").await.unwrap();
1203        assert!(db.get_local("repl-123").await.is_err());
1204    }
1205
1206    #[tokio::test]
1207    async fn replication_mode_bulk_docs() {
1208        let db = new_db().await;
1209
1210        // Insert with explicit revision (replication mode)
1211        let doc = Document {
1212            id: "doc1".into(),
1213            rev: Some(Revision::new(1, "abc123".into())),
1214            deleted: false,
1215            data: serde_json::json!({"name": "replicated"}),
1216            attachments: HashMap::new(),
1217        };
1218
1219        let results = db
1220            .bulk_docs(vec![doc], BulkDocsOptions::replication())
1221            .await
1222            .unwrap();
1223        assert!(results[0].ok);
1224
1225        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1226        assert_eq!(fetched.data["name"], "replicated");
1227        assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1228    }
1229
1230    #[tokio::test]
1231    async fn auto_generate_id() {
1232        let db = new_db().await;
1233
1234        let doc = Document {
1235            id: String::new(),
1236            rev: None,
1237            deleted: false,
1238            data: serde_json::json!({"name": "no-id"}),
1239            attachments: HashMap::new(),
1240        };
1241
1242        let results = db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1243        assert!(results[0].ok);
1244        assert!(!results[0].id.is_empty());
1245    }
1246
1247    #[tokio::test]
1248    async fn destroy_clears_everything() {
1249        let db = new_db().await;
1250
1251        let doc = Document {
1252            id: "doc1".into(),
1253            rev: None,
1254            deleted: false,
1255            data: serde_json::json!({}),
1256            attachments: HashMap::new(),
1257        };
1258        db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1259        db.put_local("x", serde_json::json!({})).await.unwrap();
1260
1261        db.destroy().await.unwrap();
1262
1263        let info = db.info().await.unwrap();
1264        assert_eq!(info.doc_count, 0);
1265        assert_eq!(info.update_seq, Seq::Num(0));
1266    }
1267
1268    #[tokio::test]
1269    async fn bulk_get_documents() {
1270        let db = new_db().await;
1271
1272        let doc = Document {
1273            id: "doc1".into(),
1274            rev: None,
1275            deleted: false,
1276            data: serde_json::json!({"name": "test"}),
1277            attachments: HashMap::new(),
1278        };
1279        db.bulk_docs(vec![doc], BulkDocsOptions::new()).await.unwrap();
1280
1281        let result = db
1282            .bulk_get(vec![
1283                BulkGetItem { id: "doc1".into(), rev: None },
1284                BulkGetItem { id: "missing".into(), rev: None },
1285            ])
1286            .await
1287            .unwrap();
1288
1289        assert_eq!(result.results.len(), 2);
1290        assert!(result.results[0].docs[0].ok.is_some());
1291        assert!(result.results[1].docs[0].error.is_some());
1292    }
1293}