Skip to main content

rouchdb_adapter_memory/
lib.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14    NodeOpts, RevPath, RevStatus, RevTree, build_path_from_revs, collect_leaves, find_rev_ancestry,
15    rev_exists,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20// ---------------------------------------------------------------------------
21// Internal storage types
22// ---------------------------------------------------------------------------
23
24#[derive(Debug, Clone)]
25struct StoredDoc {
26    rev_tree: RevTree,
27    /// Map from "pos-hash" to the document data at that revision.
28    rev_data: HashMap<String, serde_json::Value>,
29    /// Map from "pos-hash" to the deleted flag at that revision.
30    rev_deleted: HashMap<String, bool>,
31    /// Current sequence number for this document.
32    seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37    name: String,
38    /// Documents keyed by ID.
39    docs: HashMap<String, StoredDoc>,
40    /// Sequence counter (monotonically increasing).
41    update_seq: u64,
42    /// Changes log: seq -> (doc_id, was_deleted).
43    changes: BTreeMap<u64, (String, bool)>,
44    /// Local (non-replicated) documents.
45    local_docs: HashMap<String, serde_json::Value>,
46    /// Attachment data keyed by digest.
47    attachments: HashMap<String, Vec<u8>>,
48}
49
50/// In-memory adapter for RouchDB. All data is held in RAM.
51#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53    inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57    pub fn new(name: &str) -> Self {
58        Self {
59            inner: Arc::new(RwLock::new(Inner {
60                name: name.to_string(),
61                docs: HashMap::new(),
62                update_seq: 0,
63                changes: BTreeMap::new(),
64                local_docs: HashMap::new(),
65                attachments: HashMap::new(),
66            })),
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Helper functions
73// ---------------------------------------------------------------------------
74
75/// Generate a revision hash from the document content.
76fn generate_rev_hash(
77    doc_data: &serde_json::Value,
78    deleted: bool,
79    prev_rev: Option<&str>,
80) -> String {
81    let mut hasher = Md5::new();
82    // Include the previous revision in the hash for determinism
83    if let Some(prev) = prev_rev {
84        hasher.update(prev.as_bytes());
85    }
86    hasher.update(if deleted { b"1" } else { b"0" });
87    let serialized = serde_json::to_string(doc_data).unwrap_or_default();
88    hasher.update(serialized.as_bytes());
89    format!("{:x}", hasher.finalize())
90}
91
92fn rev_string(pos: u64, hash: &str) -> String {
93    format!("{}-{}", pos, hash)
94}
95
96fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
97    let (pos_str, hash) = rev_str
98        .split_once('-')
99        .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
100    let pos: u64 = pos_str
101        .parse()
102        .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
103    Ok((pos, hash.to_string()))
104}
105
106fn compute_attachment_digest(data: &[u8]) -> String {
107    let mut hasher = Md5::new();
108    hasher.update(data);
109    let hash = hasher.finalize();
110    use base64::Engine;
111    let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
112    format!("md5-{}", b64)
113}
114
115// ---------------------------------------------------------------------------
116// Adapter implementation
117// ---------------------------------------------------------------------------
118
119#[async_trait]
120impl Adapter for MemoryAdapter {
121    async fn info(&self) -> Result<DbInfo> {
122        let inner = self.inner.read().await;
123        let doc_count = inner
124            .docs
125            .values()
126            .filter(|d| {
127                // Count only non-deleted documents
128                !is_deleted(&d.rev_tree)
129            })
130            .count() as u64;
131
132        Ok(DbInfo {
133            db_name: inner.name.clone(),
134            doc_count,
135            update_seq: Seq::Num(inner.update_seq),
136        })
137    }
138
139    async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
140        let inner = self.inner.read().await;
141        let stored = inner
142            .docs
143            .get(id)
144            .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
145
146        // Determine which revision to return
147        let target_rev = if let Some(ref rev_str) = opts.rev {
148            rev_str.clone()
149        } else {
150            // Use the winning revision
151            let winner = winning_rev(&stored.rev_tree)
152                .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
153            winner.to_string()
154        };
155
156        // Get the data for this revision
157        let data = stored
158            .rev_data
159            .get(&target_rev)
160            .cloned()
161            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
162
163        let deleted = stored
164            .rev_deleted
165            .get(&target_rev)
166            .copied()
167            .unwrap_or(false);
168
169        // If the winning rev is deleted and no specific rev was requested, it's "not found"
170        if deleted && opts.rev.is_none() {
171            return Err(RouchError::NotFound(id.to_string()));
172        }
173
174        let (pos, hash) = parse_rev(&target_rev)?;
175        let rev = Revision::new(pos, hash);
176
177        let mut doc = Document {
178            id: id.to_string(),
179            rev: Some(rev),
180            deleted,
181            data,
182            attachments: HashMap::new(),
183        };
184
185        // Add conflicts if requested
186        if opts.conflicts {
187            let conflicts = collect_conflicts(&stored.rev_tree);
188            if !conflicts.is_empty() {
189                let conflict_list: Vec<serde_json::Value> = conflicts
190                    .iter()
191                    .map(|c| serde_json::Value::String(c.to_string()))
192                    .collect();
193                if let serde_json::Value::Object(ref mut map) = doc.data {
194                    map.insert(
195                        "_conflicts".to_string(),
196                        serde_json::Value::Array(conflict_list),
197                    );
198                }
199            }
200        }
201
202        Ok(doc)
203    }
204
205    async fn bulk_docs(
206        &self,
207        docs: Vec<Document>,
208        opts: BulkDocsOptions,
209    ) -> Result<Vec<DocResult>> {
210        let mut inner = self.inner.write().await;
211        let mut results = Vec::with_capacity(docs.len());
212
213        for doc in docs {
214            let result = if opts.new_edits {
215                process_doc_new_edits(&mut inner, doc)
216            } else {
217                process_doc_replication(&mut inner, doc)
218            };
219            results.push(result);
220        }
221
222        Ok(results)
223    }
224
225    async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
226        let inner = self.inner.read().await;
227
228        // Collect all doc IDs sorted
229        let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
230        doc_ids.sort();
231
232        if opts.descending {
233            doc_ids.reverse();
234        }
235
236        // If specific keys are requested, use those instead
237        let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
238            keys.clone()
239        } else if let Some(ref key) = opts.key {
240            vec![key.clone()]
241        } else {
242            doc_ids.iter().map(|k| (*k).clone()).collect()
243        };
244
245        let mut rows = Vec::new();
246
247        for key in &target_keys {
248            // Apply key range filters if no specific keys were given
249            if opts.keys.is_none() && opts.key.is_none() {
250                if let Some(ref start) = opts.start_key
251                    && ((!opts.descending && key.as_str() < start.as_str())
252                        || (opts.descending && key.as_str() > start.as_str()))
253                {
254                    continue;
255                }
256                if let Some(ref end) = opts.end_key {
257                    if opts.inclusive_end {
258                        if (!opts.descending && key.as_str() > end.as_str())
259                            || (opts.descending && key.as_str() < end.as_str())
260                        {
261                            continue;
262                        }
263                    } else if (!opts.descending && key.as_str() >= end.as_str())
264                        || (opts.descending && key.as_str() <= end.as_str())
265                    {
266                        continue;
267                    }
268                }
269            }
270
271            if let Some(stored) = inner.docs.get(key.as_str()) {
272                let winner = match winning_rev(&stored.rev_tree) {
273                    Some(w) => w,
274                    None => continue,
275                };
276                let deleted = is_deleted(&stored.rev_tree);
277
278                // Skip deleted docs unless specific keys were requested
279                if deleted && opts.keys.is_none() {
280                    continue;
281                }
282
283                let doc_json = if opts.include_docs && !deleted {
284                    let rev_str = winner.to_string();
285                    stored.rev_data.get(&rev_str).map(|data| {
286                        let mut obj = match data {
287                            serde_json::Value::Object(m) => m.clone(),
288                            _ => serde_json::Map::new(),
289                        };
290                        obj.insert("_id".into(), serde_json::Value::String(key.clone()));
291                        obj.insert("_rev".into(), serde_json::Value::String(rev_str));
292                        serde_json::Value::Object(obj)
293                    })
294                } else {
295                    None
296                };
297
298                rows.push(AllDocsRow {
299                    id: key.clone(),
300                    key: key.clone(),
301                    value: AllDocsRowValue {
302                        rev: winner.to_string(),
303                        deleted: if deleted { Some(true) } else { None },
304                    },
305                    doc: doc_json,
306                });
307            } else if opts.keys.is_some() {
308                // For specific key lookups, include missing keys as errors
309                // (CouchDB returns {"key":"x","error":"not_found"})
310                // We skip these for now — they don't fit our row struct cleanly
311            }
312        }
313
314        // Apply skip and limit
315        let total_rows = rows.len() as u64;
316        let skip = opts.skip as usize;
317        if skip > 0 {
318            rows = rows.into_iter().skip(skip).collect();
319        }
320        if let Some(limit) = opts.limit {
321            rows.truncate(limit as usize);
322        }
323
324        Ok(AllDocsResponse {
325            total_rows,
326            offset: opts.skip,
327            rows,
328        })
329    }
330
331    async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
332        let inner = self.inner.read().await;
333
334        let mut results = Vec::new();
335
336        // Iterate changes after `since`
337        let range = (opts.since.as_num() + 1)..;
338        let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
339            Box::new(
340                inner
341                    .changes
342                    .range(range)
343                    .collect::<Vec<_>>()
344                    .into_iter()
345                    .rev(),
346            )
347        } else {
348            Box::new(inner.changes.range(range))
349        };
350
351        for (seq, (doc_id, deleted)) in iter {
352            // Filter by doc_ids if specified
353            if let Some(ref doc_ids) = opts.doc_ids
354                && !doc_ids.contains(doc_id)
355            {
356                continue;
357            }
358
359            let stored = inner.docs.get(doc_id);
360            let rev_str = stored
361                .and_then(|s| winning_rev(&s.rev_tree))
362                .map(|r| r.to_string())
363                .unwrap_or_default();
364
365            let doc = if opts.include_docs {
366                stored.and_then(|s| {
367                    s.rev_data.get(&rev_str).map(|data| {
368                        let mut obj = match data {
369                            serde_json::Value::Object(m) => m.clone(),
370                            _ => serde_json::Map::new(),
371                        };
372                        obj.insert("_id".into(), serde_json::Value::String(doc_id.clone()));
373                        obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
374                        if *deleted {
375                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
376                        }
377                        serde_json::Value::Object(obj)
378                    })
379                })
380            } else {
381                None
382            };
383
384            results.push(ChangeEvent {
385                seq: Seq::Num(*seq),
386                id: doc_id.clone(),
387                changes: vec![ChangeRev { rev: rev_str }],
388                deleted: *deleted,
389                doc,
390            });
391
392            if let Some(limit) = opts.limit
393                && results.len() >= limit as usize
394            {
395                break;
396            }
397        }
398
399        let last_seq = results
400            .last()
401            .map(|r| r.seq.clone())
402            .unwrap_or(opts.since.clone());
403
404        Ok(ChangesResponse { results, last_seq })
405    }
406
407    async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
408        let inner = self.inner.read().await;
409        let mut results = HashMap::new();
410
411        for (doc_id, rev_list) in revs {
412            let mut missing = Vec::new();
413            let mut possible_ancestors = Vec::new();
414
415            let stored = inner.docs.get(&doc_id);
416
417            for rev_str in &rev_list {
418                let (pos, hash) = parse_rev(rev_str)?;
419
420                let exists = stored
421                    .map(|s| rev_exists(&s.rev_tree, pos, &hash))
422                    .unwrap_or(false);
423
424                if !exists {
425                    missing.push(rev_str.clone());
426
427                    // Find possible ancestors (existing revisions with lower pos)
428                    if let Some(stored) = stored {
429                        let leaves = collect_leaves(&stored.rev_tree);
430                        for leaf in &leaves {
431                            if leaf.pos < pos {
432                                possible_ancestors.push(leaf.rev_string());
433                            }
434                        }
435                    }
436                }
437            }
438
439            if !missing.is_empty() {
440                results.insert(
441                    doc_id,
442                    RevsDiffResult {
443                        missing,
444                        possible_ancestors,
445                    },
446                );
447            }
448        }
449
450        Ok(RevsDiffResponse { results })
451    }
452
453    async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
454        let inner = self.inner.read().await;
455        let mut results = Vec::new();
456
457        for item in docs {
458            let mut bulk_docs = Vec::new();
459
460            match inner.docs.get(&item.id) {
461                Some(stored) => {
462                    let rev_str = if let Some(ref rev) = item.rev {
463                        rev.clone()
464                    } else {
465                        match winning_rev(&stored.rev_tree) {
466                            Some(w) => w.to_string(),
467                            None => {
468                                bulk_docs.push(BulkGetDoc {
469                                    ok: None,
470                                    error: Some(BulkGetError {
471                                        id: item.id.clone(),
472                                        rev: item.rev.unwrap_or_default(),
473                                        error: "not_found".into(),
474                                        reason: "missing".into(),
475                                    }),
476                                });
477                                results.push(BulkGetResult {
478                                    id: item.id,
479                                    docs: bulk_docs,
480                                });
481                                continue;
482                            }
483                        }
484                    };
485
486                    if let Some(data) = stored.rev_data.get(&rev_str) {
487                        let deleted = stored.rev_deleted.get(&rev_str).copied().unwrap_or(false);
488                        let mut obj = match data {
489                            serde_json::Value::Object(m) => m.clone(),
490                            _ => serde_json::Map::new(),
491                        };
492                        obj.insert("_id".into(), serde_json::Value::String(item.id.clone()));
493                        obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
494                        if deleted {
495                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
496                        }
497
498                        // Include _revisions for replication
499                        if let Ok((pos, ref hash)) = parse_rev(&rev_str)
500                            && let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash)
501                        {
502                            obj.insert(
503                                "_revisions".into(),
504                                serde_json::json!({
505                                    "start": pos,
506                                    "ids": ancestry
507                                }),
508                            );
509                        }
510
511                        bulk_docs.push(BulkGetDoc {
512                            ok: Some(serde_json::Value::Object(obj)),
513                            error: None,
514                        });
515                    } else {
516                        bulk_docs.push(BulkGetDoc {
517                            ok: None,
518                            error: Some(BulkGetError {
519                                id: item.id.clone(),
520                                rev: rev_str,
521                                error: "not_found".into(),
522                                reason: "missing".into(),
523                            }),
524                        });
525                    }
526                }
527                None => {
528                    bulk_docs.push(BulkGetDoc {
529                        ok: None,
530                        error: Some(BulkGetError {
531                            id: item.id.clone(),
532                            rev: item.rev.unwrap_or_default(),
533                            error: "not_found".into(),
534                            reason: "missing".into(),
535                        }),
536                    });
537                }
538            }
539
540            results.push(BulkGetResult {
541                id: item.id,
542                docs: bulk_docs,
543            });
544        }
545
546        Ok(BulkGetResponse { results })
547    }
548
549    async fn put_attachment(
550        &self,
551        doc_id: &str,
552        att_id: &str,
553        rev: &str,
554        data: Vec<u8>,
555        content_type: &str,
556    ) -> Result<DocResult> {
557        let digest = compute_attachment_digest(&data);
558        let length = data.len() as u64;
559
560        let mut inner = self.inner.write().await;
561
562        // Store the attachment data
563        inner.attachments.insert(digest.clone(), data);
564
565        // Get or create the document
566        let stored = inner
567            .docs
568            .get(doc_id)
569            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
570
571        // Verify the rev matches
572        let winner = winning_rev(&stored.rev_tree)
573            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
574        if winner.to_string() != rev {
575            return Err(RouchError::Conflict);
576        }
577
578        // Get current doc data and add attachment
579        let doc_data = stored
580            .rev_data
581            .get(rev)
582            .cloned()
583            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
584
585        // Build updated document with attachment metadata
586        let att_meta = AttachmentMeta {
587            content_type: content_type.to_string(),
588            digest: digest.clone(),
589            length,
590            stub: true,
591            data: None,
592        };
593
594        let doc = Document {
595            id: doc_id.to_string(),
596            rev: Some(winner.clone()),
597            deleted: false,
598            data: doc_data.clone(),
599            attachments: {
600                let mut atts = HashMap::new();
601                atts.insert(att_id.to_string(), att_meta);
602                atts
603            },
604        };
605
606        // Process as a normal edit
607        let result = process_doc_new_edits(&mut inner, doc);
608        Ok(result)
609    }
610
611    async fn get_attachment(
612        &self,
613        doc_id: &str,
614        att_id: &str,
615        opts: GetAttachmentOptions,
616    ) -> Result<Vec<u8>> {
617        let inner = self.inner.read().await;
618
619        let stored = inner
620            .docs
621            .get(doc_id)
622            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
623
624        let rev_str = if let Some(ref rev) = opts.rev {
625            rev.clone()
626        } else {
627            winning_rev(&stored.rev_tree)
628                .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
629                .to_string()
630        };
631
632        // Look for attachment metadata in the doc data
633        // For now, look up by digest in our attachment store
634        // We'd need to track which attachments belong to which doc/rev
635        // For simplicity, search through our attachment map
636        let _data = stored.rev_data.get(&rev_str);
637
638        // TODO: proper attachment tracking per revision
639        Err(RouchError::NotFound(format!(
640            "attachment {}/{}",
641            doc_id, att_id
642        )))
643    }
644
645    async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
646        let inner = self.inner.read().await;
647        inner
648            .local_docs
649            .get(id)
650            .cloned()
651            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
652    }
653
654    async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
655        let mut inner = self.inner.write().await;
656        inner.local_docs.insert(id.to_string(), doc);
657        Ok(())
658    }
659
660    async fn remove_local(&self, id: &str) -> Result<()> {
661        let mut inner = self.inner.write().await;
662        inner
663            .local_docs
664            .remove(id)
665            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
666        Ok(())
667    }
668
669    async fn compact(&self) -> Result<()> {
670        let mut inner = self.inner.write().await;
671
672        for stored in inner.docs.values_mut() {
673            let leaves = collect_leaves(&stored.rev_tree);
674            let leaf_revs: std::collections::HashSet<String> =
675                leaves.iter().map(|l| l.rev_string()).collect();
676
677            // Remove data for non-leaf revisions
678            stored.rev_data.retain(|k, _| leaf_revs.contains(k));
679            stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
680        }
681
682        Ok(())
683    }
684
685    async fn destroy(&self) -> Result<()> {
686        let mut inner = self.inner.write().await;
687        inner.docs.clear();
688        inner.changes.clear();
689        inner.local_docs.clear();
690        inner.attachments.clear();
691        inner.update_seq = 0;
692        Ok(())
693    }
694}
695
696// ---------------------------------------------------------------------------
697// Document processing (new_edits = true)
698// ---------------------------------------------------------------------------
699
700fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
701    let doc_id = if doc.id.is_empty() {
702        Uuid::new_v4().to_string()
703    } else {
704        doc.id.clone()
705    };
706
707    let existing = inner.docs.get(&doc_id);
708
709    // Check for conflicts: if the doc has a _rev, it must match the winning rev
710    if let Some(stored) = existing {
711        let winner = winning_rev(&stored.rev_tree);
712
713        match (&doc.rev, &winner) {
714            (Some(provided_rev), Some(current_winner)) => {
715                if provided_rev.to_string() != current_winner.to_string() {
716                    return DocResult {
717                        ok: false,
718                        id: doc_id,
719                        rev: None,
720                        error: Some("conflict".into()),
721                        reason: Some("Document update conflict".into()),
722                    };
723                }
724            }
725            (None, Some(_)) => {
726                // Trying to create a doc that already exists (and isn't deleted)
727                if !is_deleted(&stored.rev_tree) {
728                    return DocResult {
729                        ok: false,
730                        id: doc_id,
731                        rev: None,
732                        error: Some("conflict".into()),
733                        reason: Some("Document update conflict".into()),
734                    };
735                }
736                // If winner is deleted, allow creating a new doc at the same ID
737            }
738            _ => {}
739        }
740    } else if doc.rev.is_some() {
741        // Updating a doc that doesn't exist
742        return DocResult {
743            ok: false,
744            id: doc_id,
745            rev: None,
746            error: Some("not_found".into()),
747            reason: Some("missing".into()),
748        };
749    }
750
751    // Generate new revision
752    let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
753    let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
754    let new_hash = generate_rev_hash(&doc.data, doc.deleted, prev_rev_str.as_deref());
755    let new_rev_str = rev_string(new_pos, &new_hash);
756
757    // Build the revision path for merging
758    let mut rev_hashes = vec![new_hash.clone()];
759    if let Some(ref prev) = doc.rev {
760        rev_hashes.push(prev.hash.clone());
761    }
762
763    let new_path = build_path_from_revs(
764        new_pos,
765        &rev_hashes,
766        NodeOpts {
767            deleted: doc.deleted,
768        },
769        RevStatus::Available,
770    );
771
772    // Merge into existing tree or create new one
773    let existing_tree = existing.map(|s| s.rev_tree.clone()).unwrap_or_default();
774
775    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
776
777    // Update sequence
778    inner.update_seq += 1;
779    let seq = inner.update_seq;
780
781    // Remove old change entry for this doc (each doc has only one entry in changes)
782    if let Some(existing) = inner.docs.get(&doc_id) {
783        inner.changes.remove(&existing.seq);
784    }
785
786    // Store or update the document
787    let stored = inner
788        .docs
789        .entry(doc_id.clone())
790        .or_insert_with(|| StoredDoc {
791            rev_tree: Vec::new(),
792            rev_data: HashMap::new(),
793            rev_deleted: HashMap::new(),
794            seq: 0,
795        });
796
797    stored.rev_tree = merged_tree;
798    stored.rev_data.insert(new_rev_str.clone(), doc.data);
799    stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
800    stored.seq = seq;
801
802    // Record in changes
803    inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
804
805    DocResult {
806        ok: true,
807        id: doc_id,
808        rev: Some(new_rev_str),
809        error: None,
810        reason: None,
811    }
812}
813
814// ---------------------------------------------------------------------------
815// Document processing (new_edits = false, replication mode)
816// ---------------------------------------------------------------------------
817
818fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
819    let doc_id = doc.id.clone();
820    let rev = match &doc.rev {
821        Some(r) => r.clone(),
822        None => {
823            return DocResult {
824                ok: false,
825                id: doc_id,
826                rev: None,
827                error: Some("bad_request".into()),
828                reason: Some("missing _rev".into()),
829            };
830        }
831    };
832
833    let rev_str = rev.to_string();
834
835    // Build the revision path — use _revisions ancestry if available
836    let new_path = if let Some(revisions) = doc.data.get("_revisions") {
837        let start = revisions["start"].as_u64().unwrap_or(rev.pos);
838        let ids: Vec<String> = revisions["ids"]
839            .as_array()
840            .map(|arr| {
841                arr.iter()
842                    .filter_map(|v| v.as_str().map(String::from))
843                    .collect()
844            })
845            .unwrap_or_else(|| vec![rev.hash.clone()]);
846
847        build_path_from_revs(
848            start,
849            &ids,
850            NodeOpts {
851                deleted: doc.deleted,
852            },
853            RevStatus::Available,
854        )
855    } else {
856        // Fallback: single-node path (no ancestry available)
857        RevPath {
858            pos: rev.pos,
859            tree: rouchdb_core::rev_tree::RevNode {
860                hash: rev.hash.clone(),
861                status: RevStatus::Available,
862                opts: NodeOpts {
863                    deleted: doc.deleted,
864                },
865                children: vec![],
866            },
867        }
868    };
869
870    // Strip _revisions from data before storing
871    if let serde_json::Value::Object(ref mut map) = doc.data {
872        map.remove("_revisions");
873    }
874
875    // Merge into existing tree
876    let existing_tree = inner
877        .docs
878        .get(&doc_id)
879        .map(|s| s.rev_tree.clone())
880        .unwrap_or_default();
881
882    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
883
884    // Update sequence
885    inner.update_seq += 1;
886    let seq = inner.update_seq;
887
888    // Remove old change entry
889    if let Some(existing) = inner.docs.get(&doc_id) {
890        inner.changes.remove(&existing.seq);
891    }
892
893    let is_doc_deleted = is_deleted(&merged_tree);
894
895    let stored = inner
896        .docs
897        .entry(doc_id.clone())
898        .or_insert_with(|| StoredDoc {
899            rev_tree: Vec::new(),
900            rev_data: HashMap::new(),
901            rev_deleted: HashMap::new(),
902            seq: 0,
903        });
904
905    stored.rev_tree = merged_tree;
906    stored.rev_data.insert(rev_str.clone(), doc.data);
907    stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
908    stored.seq = seq;
909
910    inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
911
912    DocResult {
913        ok: true,
914        id: doc_id,
915        rev: Some(rev_str),
916        error: None,
917        reason: None,
918    }
919}
920
921// ---------------------------------------------------------------------------
922// Tests
923// ---------------------------------------------------------------------------
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928    use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
929
930    async fn new_db() -> MemoryAdapter {
931        MemoryAdapter::new("test")
932    }
933
934    #[tokio::test]
935    async fn info_empty_db() {
936        let db = new_db().await;
937        let info = db.info().await.unwrap();
938        assert_eq!(info.db_name, "test");
939        assert_eq!(info.doc_count, 0);
940        assert_eq!(info.update_seq, Seq::Num(0));
941    }
942
943    #[tokio::test]
944    async fn put_and_get_document() {
945        let db = new_db().await;
946
947        let doc = Document {
948            id: "doc1".into(),
949            rev: None,
950            deleted: false,
951            data: serde_json::json!({"name": "Alice"}),
952            attachments: HashMap::new(),
953        };
954
955        let results = db
956            .bulk_docs(vec![doc], BulkDocsOptions::new())
957            .await
958            .unwrap();
959        assert!(results[0].ok);
960        assert_eq!(results[0].id, "doc1");
961        assert!(results[0].rev.is_some());
962
963        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
964        assert_eq!(fetched.id, "doc1");
965        assert_eq!(fetched.data["name"], "Alice");
966        assert!(fetched.rev.is_some());
967    }
968
969    #[tokio::test]
970    async fn update_document() {
971        let db = new_db().await;
972
973        // Create
974        let doc = Document {
975            id: "doc1".into(),
976            rev: None,
977            deleted: false,
978            data: serde_json::json!({"name": "Alice"}),
979            attachments: HashMap::new(),
980        };
981        let results = db
982            .bulk_docs(vec![doc], BulkDocsOptions::new())
983            .await
984            .unwrap();
985        let rev1 = results[0].rev.clone().unwrap();
986
987        // Update
988        let rev_parsed: Revision = rev1.parse().unwrap();
989        let doc2 = Document {
990            id: "doc1".into(),
991            rev: Some(rev_parsed),
992            deleted: false,
993            data: serde_json::json!({"name": "Bob"}),
994            attachments: HashMap::new(),
995        };
996        let results = db
997            .bulk_docs(vec![doc2], BulkDocsOptions::new())
998            .await
999            .unwrap();
1000        assert!(results[0].ok);
1001
1002        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1003        assert_eq!(fetched.data["name"], "Bob");
1004    }
1005
1006    #[tokio::test]
1007    async fn conflict_on_wrong_rev() {
1008        let db = new_db().await;
1009
1010        let doc = Document {
1011            id: "doc1".into(),
1012            rev: None,
1013            deleted: false,
1014            data: serde_json::json!({"v": 1}),
1015            attachments: HashMap::new(),
1016        };
1017        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1018            .await
1019            .unwrap();
1020
1021        // Try updating with wrong rev
1022        let doc2 = Document {
1023            id: "doc1".into(),
1024            rev: Some(Revision::new(1, "wronghash".into())),
1025            deleted: false,
1026            data: serde_json::json!({"v": 2}),
1027            attachments: HashMap::new(),
1028        };
1029        let results = db
1030            .bulk_docs(vec![doc2], BulkDocsOptions::new())
1031            .await
1032            .unwrap();
1033        assert!(!results[0].ok);
1034        assert_eq!(results[0].error.as_deref(), Some("conflict"));
1035    }
1036
1037    #[tokio::test]
1038    async fn delete_document() {
1039        let db = new_db().await;
1040
1041        let doc = Document {
1042            id: "doc1".into(),
1043            rev: None,
1044            deleted: false,
1045            data: serde_json::json!({"name": "Alice"}),
1046            attachments: HashMap::new(),
1047        };
1048        let results = db
1049            .bulk_docs(vec![doc], BulkDocsOptions::new())
1050            .await
1051            .unwrap();
1052        let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1053
1054        // Delete
1055        let del = Document {
1056            id: "doc1".into(),
1057            rev: Some(rev1),
1058            deleted: true,
1059            data: serde_json::json!({}),
1060            attachments: HashMap::new(),
1061        };
1062        let results = db
1063            .bulk_docs(vec![del], BulkDocsOptions::new())
1064            .await
1065            .unwrap();
1066        assert!(results[0].ok);
1067
1068        // Get should fail
1069        let err = db.get("doc1", GetOptions::default()).await;
1070        assert!(err.is_err());
1071
1072        // Info should show 0 docs
1073        let info = db.info().await.unwrap();
1074        assert_eq!(info.doc_count, 0);
1075    }
1076
1077    #[tokio::test]
1078    async fn all_docs() {
1079        let db = new_db().await;
1080
1081        for name in ["charlie", "alice", "bob"] {
1082            let doc = Document {
1083                id: name.into(),
1084                rev: None,
1085                deleted: false,
1086                data: serde_json::json!({"name": name}),
1087                attachments: HashMap::new(),
1088            };
1089            db.bulk_docs(vec![doc], BulkDocsOptions::new())
1090                .await
1091                .unwrap();
1092        }
1093
1094        let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1095        assert_eq!(result.total_rows, 3);
1096        // Should be sorted alphabetically
1097        assert_eq!(result.rows[0].id, "alice");
1098        assert_eq!(result.rows[1].id, "bob");
1099        assert_eq!(result.rows[2].id, "charlie");
1100    }
1101
1102    #[tokio::test]
1103    async fn all_docs_with_include_docs() {
1104        let db = new_db().await;
1105
1106        let doc = Document {
1107            id: "doc1".into(),
1108            rev: None,
1109            deleted: false,
1110            data: serde_json::json!({"name": "Alice"}),
1111            attachments: HashMap::new(),
1112        };
1113        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1114            .await
1115            .unwrap();
1116
1117        let mut opts = AllDocsOptions::new();
1118        opts.include_docs = true;
1119        let result = db.all_docs(opts).await.unwrap();
1120        assert!(result.rows[0].doc.is_some());
1121        let doc = result.rows[0].doc.as_ref().unwrap();
1122        assert_eq!(doc["name"], "Alice");
1123        assert_eq!(doc["_id"], "doc1");
1124    }
1125
1126    #[tokio::test]
1127    async fn changes_feed() {
1128        let db = new_db().await;
1129
1130        for i in 0..3 {
1131            let doc = Document {
1132                id: format!("doc{}", i),
1133                rev: None,
1134                deleted: false,
1135                data: serde_json::json!({"i": i}),
1136                attachments: HashMap::new(),
1137            };
1138            db.bulk_docs(vec![doc], BulkDocsOptions::new())
1139                .await
1140                .unwrap();
1141        }
1142
1143        let changes = db.changes(ChangesOptions::default()).await.unwrap();
1144        assert_eq!(changes.results.len(), 3);
1145        assert_eq!(changes.last_seq, Seq::Num(3));
1146
1147        // Changes since seq 2
1148        let changes = db
1149            .changes(ChangesOptions {
1150                since: Seq::Num(2),
1151                ..Default::default()
1152            })
1153            .await
1154            .unwrap();
1155        assert_eq!(changes.results.len(), 1);
1156        assert_eq!(changes.results[0].id, "doc2");
1157    }
1158
1159    #[tokio::test]
1160    async fn revs_diff() {
1161        let db = new_db().await;
1162
1163        let doc = Document {
1164            id: "doc1".into(),
1165            rev: None,
1166            deleted: false,
1167            data: serde_json::json!({"v": 1}),
1168            attachments: HashMap::new(),
1169        };
1170        let results = db
1171            .bulk_docs(vec![doc], BulkDocsOptions::new())
1172            .await
1173            .unwrap();
1174        let existing_rev = results[0].rev.clone().unwrap();
1175
1176        let mut revs = HashMap::new();
1177        revs.insert(
1178            "doc1".into(),
1179            vec![existing_rev.clone(), "2-doesnotexist".into()],
1180        );
1181        revs.insert("doc2".into(), vec!["1-abc".into()]);
1182
1183        let diff = db.revs_diff(revs).await.unwrap();
1184
1185        // doc1: existing_rev should not be missing, 2-doesnotexist should be
1186        let doc1_diff = diff.results.get("doc1").unwrap();
1187        assert!(!doc1_diff.missing.contains(&existing_rev));
1188        assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1189
1190        // doc2: completely missing
1191        let doc2_diff = diff.results.get("doc2").unwrap();
1192        assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1193    }
1194
1195    #[tokio::test]
1196    async fn local_docs() {
1197        let db = new_db().await;
1198
1199        let doc = serde_json::json!({"checkpoint": 42});
1200        db.put_local("repl-123", doc.clone()).await.unwrap();
1201
1202        let fetched = db.get_local("repl-123").await.unwrap();
1203        assert_eq!(fetched["checkpoint"], 42);
1204
1205        db.remove_local("repl-123").await.unwrap();
1206        assert!(db.get_local("repl-123").await.is_err());
1207    }
1208
1209    #[tokio::test]
1210    async fn replication_mode_bulk_docs() {
1211        let db = new_db().await;
1212
1213        // Insert with explicit revision (replication mode)
1214        let doc = Document {
1215            id: "doc1".into(),
1216            rev: Some(Revision::new(1, "abc123".into())),
1217            deleted: false,
1218            data: serde_json::json!({"name": "replicated"}),
1219            attachments: HashMap::new(),
1220        };
1221
1222        let results = db
1223            .bulk_docs(vec![doc], BulkDocsOptions::replication())
1224            .await
1225            .unwrap();
1226        assert!(results[0].ok);
1227
1228        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1229        assert_eq!(fetched.data["name"], "replicated");
1230        assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1231    }
1232
1233    #[tokio::test]
1234    async fn auto_generate_id() {
1235        let db = new_db().await;
1236
1237        let doc = Document {
1238            id: String::new(),
1239            rev: None,
1240            deleted: false,
1241            data: serde_json::json!({"name": "no-id"}),
1242            attachments: HashMap::new(),
1243        };
1244
1245        let results = db
1246            .bulk_docs(vec![doc], BulkDocsOptions::new())
1247            .await
1248            .unwrap();
1249        assert!(results[0].ok);
1250        assert!(!results[0].id.is_empty());
1251    }
1252
1253    #[tokio::test]
1254    async fn destroy_clears_everything() {
1255        let db = new_db().await;
1256
1257        let doc = Document {
1258            id: "doc1".into(),
1259            rev: None,
1260            deleted: false,
1261            data: serde_json::json!({}),
1262            attachments: HashMap::new(),
1263        };
1264        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1265            .await
1266            .unwrap();
1267        db.put_local("x", serde_json::json!({})).await.unwrap();
1268
1269        db.destroy().await.unwrap();
1270
1271        let info = db.info().await.unwrap();
1272        assert_eq!(info.doc_count, 0);
1273        assert_eq!(info.update_seq, Seq::Num(0));
1274    }
1275
1276    #[tokio::test]
1277    async fn bulk_get_documents() {
1278        let db = new_db().await;
1279
1280        let doc = Document {
1281            id: "doc1".into(),
1282            rev: None,
1283            deleted: false,
1284            data: serde_json::json!({"name": "test"}),
1285            attachments: HashMap::new(),
1286        };
1287        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1288            .await
1289            .unwrap();
1290
1291        let result = db
1292            .bulk_get(vec![
1293                BulkGetItem {
1294                    id: "doc1".into(),
1295                    rev: None,
1296                },
1297                BulkGetItem {
1298                    id: "missing".into(),
1299                    rev: None,
1300                },
1301            ])
1302            .await
1303            .unwrap();
1304
1305        assert_eq!(result.results.len(), 2);
1306        assert!(result.results[0].docs[0].ok.is_some());
1307        assert!(result.results[1].docs[0].error.is_some());
1308    }
1309}