Skip to main content

rouchdb_adapter_memory/
lib.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use md5::{Digest, Md5};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::document::*;
11use rouchdb_core::error::{Result, RouchError};
12use rouchdb_core::merge::{collect_conflicts, is_deleted, merge_tree, winning_rev};
13use rouchdb_core::rev_tree::{
14    NodeOpts, RevPath, RevStatus, RevTree, build_path_from_revs, collect_leaves, find_rev_ancestry,
15    rev_exists,
16};
17
18const DEFAULT_REV_LIMIT: u64 = 1000;
19
20// ---------------------------------------------------------------------------
21// Internal storage types
22// ---------------------------------------------------------------------------
23
24#[derive(Debug, Clone)]
25struct StoredDoc {
26    rev_tree: RevTree,
27    /// Map from "pos-hash" to the document data at that revision.
28    rev_data: HashMap<String, serde_json::Value>,
29    /// Map from "pos-hash" to the deleted flag at that revision.
30    rev_deleted: HashMap<String, bool>,
31    /// Current sequence number for this document.
32    seq: u64,
33}
34
35#[derive(Debug)]
36struct Inner {
37    name: String,
38    /// Documents keyed by ID.
39    docs: HashMap<String, StoredDoc>,
40    /// Sequence counter (monotonically increasing).
41    update_seq: u64,
42    /// Changes log: seq -> (doc_id, was_deleted).
43    changes: BTreeMap<u64, (String, bool)>,
44    /// Local (non-replicated) documents.
45    local_docs: HashMap<String, serde_json::Value>,
46    /// Attachment data keyed by digest.
47    attachments: HashMap<String, Vec<u8>>,
48}
49
50/// In-memory adapter for RouchDB. All data is held in RAM.
51#[derive(Debug, Clone)]
52pub struct MemoryAdapter {
53    inner: Arc<RwLock<Inner>>,
54}
55
56impl MemoryAdapter {
57    pub fn new(name: &str) -> Self {
58        Self {
59            inner: Arc::new(RwLock::new(Inner {
60                name: name.to_string(),
61                docs: HashMap::new(),
62                update_seq: 0,
63                changes: BTreeMap::new(),
64                local_docs: HashMap::new(),
65                attachments: HashMap::new(),
66            })),
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Helper functions
73// ---------------------------------------------------------------------------
74
75/// Generate a revision hash from the document content.
76fn generate_rev_hash(
77    doc_data: &serde_json::Value,
78    deleted: bool,
79    prev_rev: Option<&str>,
80) -> String {
81    let mut hasher = Md5::new();
82    // Include the previous revision in the hash for determinism
83    if let Some(prev) = prev_rev {
84        hasher.update(prev.as_bytes());
85    }
86    hasher.update(if deleted { b"1" } else { b"0" });
87    let serialized = serde_json::to_string(doc_data).unwrap_or_default();
88    hasher.update(serialized.as_bytes());
89    format!("{:x}", hasher.finalize())
90}
91
92fn rev_string(pos: u64, hash: &str) -> String {
93    format!("{}-{}", pos, hash)
94}
95
96fn parse_rev(rev_str: &str) -> Result<(u64, String)> {
97    let (pos_str, hash) = rev_str
98        .split_once('-')
99        .ok_or_else(|| RouchError::InvalidRev(rev_str.to_string()))?;
100    let pos: u64 = pos_str
101        .parse()
102        .map_err(|_| RouchError::InvalidRev(rev_str.to_string()))?;
103    Ok((pos, hash.to_string()))
104}
105
106fn compute_attachment_digest(data: &[u8]) -> String {
107    let mut hasher = Md5::new();
108    hasher.update(data);
109    let hash = hasher.finalize();
110    use base64::Engine;
111    let b64 = base64::engine::general_purpose::STANDARD.encode(hash);
112    format!("md5-{}", b64)
113}
114
115// ---------------------------------------------------------------------------
116// Adapter implementation
117// ---------------------------------------------------------------------------
118
119#[async_trait]
120impl Adapter for MemoryAdapter {
121    async fn info(&self) -> Result<DbInfo> {
122        let inner = self.inner.read().await;
123        let doc_count = inner
124            .docs
125            .values()
126            .filter(|d| {
127                // Count only non-deleted documents
128                !is_deleted(&d.rev_tree)
129            })
130            .count() as u64;
131
132        Ok(DbInfo {
133            db_name: inner.name.clone(),
134            doc_count,
135            update_seq: Seq::Num(inner.update_seq),
136        })
137    }
138
139    async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
140        let inner = self.inner.read().await;
141        let stored = inner
142            .docs
143            .get(id)
144            .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
145
146        // Determine which revision to return
147        let mut target_rev = if let Some(ref rev_str) = opts.rev {
148            rev_str.clone()
149        } else {
150            // Use the winning revision
151            let winner = winning_rev(&stored.rev_tree)
152                .ok_or_else(|| RouchError::NotFound(id.to_string()))?;
153            winner.to_string()
154        };
155
156        // latest: if requested rev isn't a leaf, return the latest leaf instead
157        if opts.latest && opts.rev.is_some() {
158            let leaves = collect_leaves(&stored.rev_tree);
159            let is_leaf = leaves.iter().any(|l| l.rev_string() == target_rev);
160            if !is_leaf && let Some(leaf) = leaves.first() {
161                target_rev = leaf.rev_string();
162            }
163        }
164
165        // Get the data for this revision
166        let data = stored
167            .rev_data
168            .get(&target_rev)
169            .cloned()
170            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
171
172        let deleted = stored
173            .rev_deleted
174            .get(&target_rev)
175            .copied()
176            .unwrap_or(false);
177
178        // If the winning rev is deleted and no specific rev was requested, it's "not found"
179        if deleted && opts.rev.is_none() {
180            return Err(RouchError::NotFound(id.to_string()));
181        }
182
183        let (pos, hash) = parse_rev(&target_rev)?;
184        let rev = Revision::new(pos, hash);
185
186        let mut doc = Document {
187            id: id.to_string(),
188            rev: Some(rev),
189            deleted,
190            data,
191            attachments: HashMap::new(),
192        };
193
194        // Add conflicts if requested
195        if opts.conflicts {
196            let conflicts = collect_conflicts(&stored.rev_tree);
197            if !conflicts.is_empty() {
198                let conflict_list: Vec<serde_json::Value> = conflicts
199                    .iter()
200                    .map(|c| serde_json::Value::String(c.to_string()))
201                    .collect();
202                if let serde_json::Value::Object(ref mut map) = doc.data {
203                    map.insert(
204                        "_conflicts".to_string(),
205                        serde_json::Value::Array(conflict_list),
206                    );
207                }
208            }
209        }
210
211        // Add revs_info if requested
212        if opts.revs_info {
213            use rouchdb_core::rev_tree::traverse_rev_tree;
214            let mut revs_info = Vec::new();
215            traverse_rev_tree(&stored.rev_tree, |node_pos, node, _root_pos| {
216                let rev_str = format!("{}-{}", node_pos, node.hash);
217                let status = if node.opts.deleted {
218                    "deleted"
219                } else {
220                    match node.status {
221                        rouchdb_core::rev_tree::RevStatus::Available => "available",
222                        rouchdb_core::rev_tree::RevStatus::Missing => "missing",
223                    }
224                };
225                revs_info.push(RevInfo {
226                    rev: rev_str,
227                    status: status.to_string(),
228                });
229            });
230            // Sort by pos descending (newest first)
231            revs_info.sort_by(|a, b| {
232                let a_pos: u64 = a.rev.split('-').next().unwrap_or("0").parse().unwrap_or(0);
233                let b_pos: u64 = b.rev.split('-').next().unwrap_or("0").parse().unwrap_or(0);
234                b_pos.cmp(&a_pos)
235            });
236            if let serde_json::Value::Object(ref mut map) = doc.data {
237                map.insert(
238                    "_revs_info".to_string(),
239                    serde_json::to_value(&revs_info).unwrap(),
240                );
241            }
242        }
243
244        Ok(doc)
245    }
246
247    async fn bulk_docs(
248        &self,
249        docs: Vec<Document>,
250        opts: BulkDocsOptions,
251    ) -> Result<Vec<DocResult>> {
252        let mut inner = self.inner.write().await;
253        let mut results = Vec::with_capacity(docs.len());
254
255        for doc in docs {
256            let result = if opts.new_edits {
257                process_doc_new_edits(&mut inner, doc)
258            } else {
259                process_doc_replication(&mut inner, doc)
260            };
261            results.push(result);
262        }
263
264        Ok(results)
265    }
266
267    async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
268        let inner = self.inner.read().await;
269
270        // Collect all doc IDs sorted
271        let mut doc_ids: Vec<&String> = inner.docs.keys().collect();
272        doc_ids.sort();
273
274        if opts.descending {
275            doc_ids.reverse();
276        }
277
278        // If specific keys are requested, use those instead
279        let target_keys: Vec<String> = if let Some(ref keys) = opts.keys {
280            keys.clone()
281        } else if let Some(ref key) = opts.key {
282            vec![key.clone()]
283        } else {
284            doc_ids.iter().map(|k| (*k).clone()).collect()
285        };
286
287        let mut rows = Vec::new();
288
289        for key in &target_keys {
290            // Apply key range filters if no specific keys were given
291            if opts.keys.is_none() && opts.key.is_none() {
292                if let Some(ref start) = opts.start_key
293                    && ((!opts.descending && key.as_str() < start.as_str())
294                        || (opts.descending && key.as_str() > start.as_str()))
295                {
296                    continue;
297                }
298                if let Some(ref end) = opts.end_key {
299                    if opts.inclusive_end {
300                        if (!opts.descending && key.as_str() > end.as_str())
301                            || (opts.descending && key.as_str() < end.as_str())
302                        {
303                            continue;
304                        }
305                    } else if (!opts.descending && key.as_str() >= end.as_str())
306                        || (opts.descending && key.as_str() <= end.as_str())
307                    {
308                        continue;
309                    }
310                }
311            }
312
313            if let Some(stored) = inner.docs.get(key.as_str()) {
314                let winner = match winning_rev(&stored.rev_tree) {
315                    Some(w) => w,
316                    None => continue,
317                };
318                let deleted = is_deleted(&stored.rev_tree);
319
320                // Skip deleted docs unless specific keys were requested
321                if deleted && opts.keys.is_none() {
322                    continue;
323                }
324
325                let doc_json = if opts.include_docs && !deleted {
326                    let rev_str = winner.to_string();
327                    stored.rev_data.get(&rev_str).map(|data| {
328                        let mut obj = match data {
329                            serde_json::Value::Object(m) => m.clone(),
330                            _ => serde_json::Map::new(),
331                        };
332                        obj.insert("_id".into(), serde_json::Value::String(key.clone()));
333                        obj.insert("_rev".into(), serde_json::Value::String(rev_str));
334                        // Include conflicts if requested
335                        if opts.conflicts {
336                            let conflicts = collect_conflicts(&stored.rev_tree);
337                            if !conflicts.is_empty() {
338                                let conflict_list: Vec<serde_json::Value> = conflicts
339                                    .iter()
340                                    .map(|c| serde_json::Value::String(c.to_string()))
341                                    .collect();
342                                obj.insert(
343                                    "_conflicts".to_string(),
344                                    serde_json::Value::Array(conflict_list),
345                                );
346                            }
347                        }
348                        serde_json::Value::Object(obj)
349                    })
350                } else {
351                    None
352                };
353
354                rows.push(AllDocsRow {
355                    id: key.clone(),
356                    key: key.clone(),
357                    value: AllDocsRowValue {
358                        rev: winner.to_string(),
359                        deleted: if deleted { Some(true) } else { None },
360                    },
361                    doc: doc_json,
362                });
363            } else if opts.keys.is_some() {
364                // For specific key lookups, include missing keys as errors
365                // (CouchDB returns {"key":"x","error":"not_found"})
366                // We skip these for now — they don't fit our row struct cleanly
367            }
368        }
369
370        // Apply skip and limit
371        let total_rows = rows.len() as u64;
372        let skip = opts.skip as usize;
373        if skip > 0 {
374            rows = rows.into_iter().skip(skip).collect();
375        }
376        if let Some(limit) = opts.limit {
377            rows.truncate(limit as usize);
378        }
379
380        let update_seq = if opts.update_seq {
381            Some(Seq::Num(inner.update_seq))
382        } else {
383            None
384        };
385
386        Ok(AllDocsResponse {
387            total_rows,
388            offset: opts.skip,
389            rows,
390            update_seq,
391        })
392    }
393
394    async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
395        let inner = self.inner.read().await;
396
397        let mut results = Vec::new();
398
399        // Iterate changes after `since`
400        let range = (opts.since.as_num() + 1)..;
401        let iter: Box<dyn Iterator<Item = (&u64, &(String, bool))>> = if opts.descending {
402            Box::new(
403                inner
404                    .changes
405                    .range(range)
406                    .collect::<Vec<_>>()
407                    .into_iter()
408                    .rev(),
409            )
410        } else {
411            Box::new(inner.changes.range(range))
412        };
413
414        for (seq, (doc_id, deleted)) in iter {
415            // Filter by doc_ids if specified
416            if let Some(ref doc_ids) = opts.doc_ids
417                && !doc_ids.contains(doc_id)
418            {
419                continue;
420            }
421
422            let stored = inner.docs.get(doc_id);
423            let rev_str = stored
424                .and_then(|s| winning_rev(&s.rev_tree))
425                .map(|r| r.to_string())
426                .unwrap_or_default();
427
428            let doc = if opts.include_docs {
429                stored.and_then(|s| {
430                    s.rev_data.get(&rev_str).map(|data| {
431                        let mut obj = match data {
432                            serde_json::Value::Object(m) => m.clone(),
433                            _ => serde_json::Map::new(),
434                        };
435                        obj.insert("_id".into(), serde_json::Value::String(doc_id.clone()));
436                        obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
437                        if *deleted {
438                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
439                        }
440                        serde_json::Value::Object(obj)
441                    })
442                })
443            } else {
444                None
445            };
446
447            // Build changes list based on style
448            let changes_list = if opts.style == ChangesStyle::AllDocs {
449                if let Some(s) = stored {
450                    collect_leaves(&s.rev_tree)
451                        .iter()
452                        .filter(|l| !s.rev_deleted.get(&l.rev_string()).copied().unwrap_or(false))
453                        .map(|l| ChangeRev {
454                            rev: l.rev_string(),
455                        })
456                        .collect()
457                } else {
458                    vec![ChangeRev { rev: rev_str }]
459                }
460            } else {
461                vec![ChangeRev { rev: rev_str }]
462            };
463
464            // Collect conflicts if requested
465            let conflicts = if opts.conflicts {
466                stored
467                    .map(|s| {
468                        let c = collect_conflicts(&s.rev_tree);
469                        if c.is_empty() {
470                            None
471                        } else {
472                            Some(c.iter().map(|r| r.to_string()).collect())
473                        }
474                    })
475                    .unwrap_or(None)
476            } else {
477                None
478            };
479
480            results.push(ChangeEvent {
481                seq: Seq::Num(*seq),
482                id: doc_id.clone(),
483                changes: changes_list,
484                deleted: *deleted,
485                doc,
486                conflicts,
487            });
488
489            if let Some(limit) = opts.limit
490                && results.len() >= limit as usize
491            {
492                break;
493            }
494        }
495
496        let last_seq = results
497            .last()
498            .map(|r| r.seq.clone())
499            .unwrap_or(opts.since.clone());
500
501        Ok(ChangesResponse { results, last_seq })
502    }
503
504    async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
505        let inner = self.inner.read().await;
506        let mut results = HashMap::new();
507
508        for (doc_id, rev_list) in revs {
509            let mut missing = Vec::new();
510            let mut possible_ancestors = Vec::new();
511
512            let stored = inner.docs.get(&doc_id);
513
514            for rev_str in &rev_list {
515                let (pos, hash) = parse_rev(rev_str)?;
516
517                let exists = stored
518                    .map(|s| rev_exists(&s.rev_tree, pos, &hash))
519                    .unwrap_or(false);
520
521                if !exists {
522                    missing.push(rev_str.clone());
523
524                    // Find possible ancestors (existing revisions with lower pos)
525                    if let Some(stored) = stored {
526                        let leaves = collect_leaves(&stored.rev_tree);
527                        for leaf in &leaves {
528                            if leaf.pos < pos {
529                                possible_ancestors.push(leaf.rev_string());
530                            }
531                        }
532                    }
533                }
534            }
535
536            if !missing.is_empty() {
537                results.insert(
538                    doc_id,
539                    RevsDiffResult {
540                        missing,
541                        possible_ancestors,
542                    },
543                );
544            }
545        }
546
547        Ok(RevsDiffResponse { results })
548    }
549
550    async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
551        let inner = self.inner.read().await;
552        let mut results = Vec::new();
553
554        for item in docs {
555            let mut bulk_docs = Vec::new();
556
557            match inner.docs.get(&item.id) {
558                Some(stored) => {
559                    let rev_str = if let Some(ref rev) = item.rev {
560                        rev.clone()
561                    } else {
562                        match winning_rev(&stored.rev_tree) {
563                            Some(w) => w.to_string(),
564                            None => {
565                                bulk_docs.push(BulkGetDoc {
566                                    ok: None,
567                                    error: Some(BulkGetError {
568                                        id: item.id.clone(),
569                                        rev: item.rev.unwrap_or_default(),
570                                        error: "not_found".into(),
571                                        reason: "missing".into(),
572                                    }),
573                                });
574                                results.push(BulkGetResult {
575                                    id: item.id,
576                                    docs: bulk_docs,
577                                });
578                                continue;
579                            }
580                        }
581                    };
582
583                    if let Some(data) = stored.rev_data.get(&rev_str) {
584                        let deleted = stored.rev_deleted.get(&rev_str).copied().unwrap_or(false);
585                        let mut obj = match data {
586                            serde_json::Value::Object(m) => m.clone(),
587                            _ => serde_json::Map::new(),
588                        };
589                        obj.insert("_id".into(), serde_json::Value::String(item.id.clone()));
590                        obj.insert("_rev".into(), serde_json::Value::String(rev_str.clone()));
591                        if deleted {
592                            obj.insert("_deleted".into(), serde_json::Value::Bool(true));
593                        }
594
595                        // Include _revisions for replication
596                        if let Ok((pos, ref hash)) = parse_rev(&rev_str)
597                            && let Some(ancestry) = find_rev_ancestry(&stored.rev_tree, pos, hash)
598                        {
599                            obj.insert(
600                                "_revisions".into(),
601                                serde_json::json!({
602                                    "start": pos,
603                                    "ids": ancestry
604                                }),
605                            );
606                        }
607
608                        bulk_docs.push(BulkGetDoc {
609                            ok: Some(serde_json::Value::Object(obj)),
610                            error: None,
611                        });
612                    } else {
613                        bulk_docs.push(BulkGetDoc {
614                            ok: None,
615                            error: Some(BulkGetError {
616                                id: item.id.clone(),
617                                rev: rev_str,
618                                error: "not_found".into(),
619                                reason: "missing".into(),
620                            }),
621                        });
622                    }
623                }
624                None => {
625                    bulk_docs.push(BulkGetDoc {
626                        ok: None,
627                        error: Some(BulkGetError {
628                            id: item.id.clone(),
629                            rev: item.rev.unwrap_or_default(),
630                            error: "not_found".into(),
631                            reason: "missing".into(),
632                        }),
633                    });
634                }
635            }
636
637            results.push(BulkGetResult {
638                id: item.id,
639                docs: bulk_docs,
640            });
641        }
642
643        Ok(BulkGetResponse { results })
644    }
645
646    async fn put_attachment(
647        &self,
648        doc_id: &str,
649        att_id: &str,
650        rev: &str,
651        data: Vec<u8>,
652        content_type: &str,
653    ) -> Result<DocResult> {
654        let digest = compute_attachment_digest(&data);
655        let length = data.len() as u64;
656
657        let mut inner = self.inner.write().await;
658
659        // Store the attachment data
660        inner.attachments.insert(digest.clone(), data);
661
662        // Get or create the document
663        let stored = inner
664            .docs
665            .get(doc_id)
666            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
667
668        // Verify the rev matches
669        let winner = winning_rev(&stored.rev_tree)
670            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
671        if winner.to_string() != rev {
672            return Err(RouchError::Conflict);
673        }
674
675        // Get current doc data and add attachment
676        let doc_data = stored
677            .rev_data
678            .get(rev)
679            .cloned()
680            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
681
682        // Build updated document with attachment metadata
683        let att_meta = AttachmentMeta {
684            content_type: content_type.to_string(),
685            digest: digest.clone(),
686            length,
687            stub: true,
688            data: None,
689        };
690
691        let doc = Document {
692            id: doc_id.to_string(),
693            rev: Some(winner.clone()),
694            deleted: false,
695            data: doc_data.clone(),
696            attachments: {
697                let mut atts = HashMap::new();
698                atts.insert(att_id.to_string(), att_meta);
699                atts
700            },
701        };
702
703        // Process as a normal edit
704        let result = process_doc_new_edits(&mut inner, doc);
705        Ok(result)
706    }
707
708    async fn get_attachment(
709        &self,
710        doc_id: &str,
711        att_id: &str,
712        opts: GetAttachmentOptions,
713    ) -> Result<Vec<u8>> {
714        let inner = self.inner.read().await;
715
716        let stored = inner
717            .docs
718            .get(doc_id)
719            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
720
721        let rev_str = if let Some(ref rev) = opts.rev {
722            rev.clone()
723        } else {
724            winning_rev(&stored.rev_tree)
725                .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?
726                .to_string()
727        };
728
729        // Look for attachment metadata in the doc data
730        // For now, look up by digest in our attachment store
731        // We'd need to track which attachments belong to which doc/rev
732        // For simplicity, search through our attachment map
733        let _data = stored.rev_data.get(&rev_str);
734
735        // TODO: proper attachment tracking per revision
736        Err(RouchError::NotFound(format!(
737            "attachment {}/{}",
738            doc_id, att_id
739        )))
740    }
741
742    async fn remove_attachment(&self, doc_id: &str, att_id: &str, rev: &str) -> Result<DocResult> {
743        let _ = att_id; // attachment tracking is simplified in memory adapter
744        let mut inner = self.inner.write().await;
745
746        let stored = inner
747            .docs
748            .get(doc_id)
749            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
750
751        let winner = winning_rev(&stored.rev_tree)
752            .ok_or_else(|| RouchError::NotFound(doc_id.to_string()))?;
753        if winner.to_string() != rev {
754            return Err(RouchError::Conflict);
755        }
756
757        let doc_data = stored
758            .rev_data
759            .get(rev)
760            .cloned()
761            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
762
763        // Create a new revision (attachment removal is a document update)
764        let doc = Document {
765            id: doc_id.to_string(),
766            rev: Some(winner.clone()),
767            deleted: false,
768            data: doc_data,
769            attachments: HashMap::new(),
770        };
771
772        let result = process_doc_new_edits(&mut inner, doc);
773        Ok(result)
774    }
775
776    async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
777        let inner = self.inner.read().await;
778        inner
779            .local_docs
780            .get(id)
781            .cloned()
782            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))
783    }
784
785    async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
786        let mut inner = self.inner.write().await;
787        inner.local_docs.insert(id.to_string(), doc);
788        Ok(())
789    }
790
791    async fn remove_local(&self, id: &str) -> Result<()> {
792        let mut inner = self.inner.write().await;
793        inner
794            .local_docs
795            .remove(id)
796            .ok_or_else(|| RouchError::NotFound(format!("_local/{}", id)))?;
797        Ok(())
798    }
799
800    async fn compact(&self) -> Result<()> {
801        let mut inner = self.inner.write().await;
802
803        for stored in inner.docs.values_mut() {
804            let leaves = collect_leaves(&stored.rev_tree);
805            let leaf_revs: std::collections::HashSet<String> =
806                leaves.iter().map(|l| l.rev_string()).collect();
807
808            // Remove data for non-leaf revisions
809            stored.rev_data.retain(|k, _| leaf_revs.contains(k));
810            stored.rev_deleted.retain(|k, _| leaf_revs.contains(k));
811        }
812
813        Ok(())
814    }
815
816    async fn destroy(&self) -> Result<()> {
817        let mut inner = self.inner.write().await;
818        inner.docs.clear();
819        inner.changes.clear();
820        inner.local_docs.clear();
821        inner.attachments.clear();
822        inner.update_seq = 0;
823        Ok(())
824    }
825
826    async fn purge(&self, req: HashMap<String, Vec<String>>) -> Result<PurgeResponse> {
827        let mut inner = self.inner.write().await;
828        let mut purged = HashMap::new();
829        let mut docs_to_remove = Vec::new();
830
831        for (doc_id, revs) in req {
832            let mut purged_revs = Vec::new();
833            if let Some(stored) = inner.docs.get_mut(&doc_id) {
834                for rev_str in &revs {
835                    if stored.rev_data.remove(rev_str).is_some() {
836                        stored.rev_deleted.remove(rev_str);
837                        purged_revs.push(rev_str.clone());
838
839                        // Also prune the revision from the rev_tree so that
840                        // winning_rev(), collect_conflicts(), and replication
841                        // don't reference purged revisions.
842                        if let Some((pos, hash)) = rev_str.split_once('-')
843                            && let Ok(pos) = pos.parse::<u64>()
844                        {
845                            prune_leaf_from_tree(&mut stored.rev_tree, pos, hash);
846                        }
847                    }
848                }
849                // Remove empty rev_tree paths after pruning
850                stored.rev_tree.retain(|p| !is_tree_empty(&p.tree));
851
852                if stored.rev_data.is_empty() {
853                    docs_to_remove.push((doc_id.clone(), stored.seq));
854                }
855            }
856            if !purged_revs.is_empty() {
857                purged.insert(doc_id, purged_revs);
858            }
859        }
860
861        for (doc_id, seq) in docs_to_remove {
862            inner.changes.remove(&seq);
863            inner.docs.remove(&doc_id);
864        }
865
866        Ok(PurgeResponse {
867            purge_seq: Some(inner.update_seq),
868            purged,
869        })
870    }
871
872    async fn get_security(&self) -> Result<SecurityDocument> {
873        let inner = self.inner.read().await;
874        match inner.local_docs.get("_security") {
875            Some(val) => serde_json::from_value(val.clone())
876                .map_err(|e| RouchError::DatabaseError(e.to_string())),
877            None => Ok(SecurityDocument::default()),
878        }
879    }
880
881    async fn put_security(&self, doc: SecurityDocument) -> Result<()> {
882        let mut inner = self.inner.write().await;
883        let val = serde_json::to_value(&doc)?;
884        inner.local_docs.insert("_security".to_string(), val);
885        Ok(())
886    }
887}
888
889// ---------------------------------------------------------------------------
890// Document processing (new_edits = true)
891// ---------------------------------------------------------------------------
892
893fn process_doc_new_edits(inner: &mut Inner, doc: Document) -> DocResult {
894    let doc_id = if doc.id.is_empty() {
895        Uuid::new_v4().to_string()
896    } else {
897        doc.id.clone()
898    };
899
900    let existing = inner.docs.get(&doc_id);
901
902    // Check for conflicts: if the doc has a _rev, it must match the winning rev
903    if let Some(stored) = existing {
904        let winner = winning_rev(&stored.rev_tree);
905
906        match (&doc.rev, &winner) {
907            (Some(provided_rev), Some(current_winner)) => {
908                if provided_rev.to_string() != current_winner.to_string() {
909                    return DocResult {
910                        ok: false,
911                        id: doc_id,
912                        rev: None,
913                        error: Some("conflict".into()),
914                        reason: Some("Document update conflict".into()),
915                    };
916                }
917            }
918            (None, Some(_)) => {
919                // Trying to create a doc that already exists (and isn't deleted)
920                if !is_deleted(&stored.rev_tree) {
921                    return DocResult {
922                        ok: false,
923                        id: doc_id,
924                        rev: None,
925                        error: Some("conflict".into()),
926                        reason: Some("Document update conflict".into()),
927                    };
928                }
929                // If winner is deleted, allow creating a new doc at the same ID
930            }
931            _ => {}
932        }
933    } else if doc.rev.is_some() {
934        // Updating a doc that doesn't exist
935        return DocResult {
936            ok: false,
937            id: doc_id,
938            rev: None,
939            error: Some("not_found".into()),
940            reason: Some("missing".into()),
941        };
942    }
943
944    // Generate new revision
945    let new_pos = doc.rev.as_ref().map(|r| r.pos + 1).unwrap_or(1);
946    let prev_rev_str = doc.rev.as_ref().map(|r| r.to_string());
947    let new_hash = generate_rev_hash(&doc.data, doc.deleted, prev_rev_str.as_deref());
948    let new_rev_str = rev_string(new_pos, &new_hash);
949
950    // Build the revision path for merging
951    let mut rev_hashes = vec![new_hash.clone()];
952    if let Some(ref prev) = doc.rev {
953        rev_hashes.push(prev.hash.clone());
954    }
955
956    let new_path = build_path_from_revs(
957        new_pos,
958        &rev_hashes,
959        NodeOpts {
960            deleted: doc.deleted,
961        },
962        RevStatus::Available,
963    );
964
965    // Merge into existing tree or create new one
966    let existing_tree = existing.map(|s| s.rev_tree.clone()).unwrap_or_default();
967
968    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
969
970    // Update sequence
971    inner.update_seq += 1;
972    let seq = inner.update_seq;
973
974    // Remove old change entry for this doc (each doc has only one entry in changes)
975    if let Some(existing) = inner.docs.get(&doc_id) {
976        inner.changes.remove(&existing.seq);
977    }
978
979    // Store or update the document
980    let stored = inner
981        .docs
982        .entry(doc_id.clone())
983        .or_insert_with(|| StoredDoc {
984            rev_tree: Vec::new(),
985            rev_data: HashMap::new(),
986            rev_deleted: HashMap::new(),
987            seq: 0,
988        });
989
990    stored.rev_tree = merged_tree;
991    stored.rev_data.insert(new_rev_str.clone(), doc.data);
992    stored.rev_deleted.insert(new_rev_str.clone(), doc.deleted);
993    stored.seq = seq;
994
995    // Record in changes
996    inner.changes.insert(seq, (doc_id.clone(), doc.deleted));
997
998    DocResult {
999        ok: true,
1000        id: doc_id,
1001        rev: Some(new_rev_str),
1002        error: None,
1003        reason: None,
1004    }
1005}
1006
1007// ---------------------------------------------------------------------------
1008// Document processing (new_edits = false, replication mode)
1009// ---------------------------------------------------------------------------
1010
1011fn process_doc_replication(inner: &mut Inner, mut doc: Document) -> DocResult {
1012    let doc_id = doc.id.clone();
1013    let rev = match &doc.rev {
1014        Some(r) => r.clone(),
1015        None => {
1016            return DocResult {
1017                ok: false,
1018                id: doc_id,
1019                rev: None,
1020                error: Some("bad_request".into()),
1021                reason: Some("missing _rev".into()),
1022            };
1023        }
1024    };
1025
1026    let rev_str = rev.to_string();
1027
1028    // Build the revision path — use _revisions ancestry if available
1029    let new_path = if let Some(revisions) = doc.data.get("_revisions") {
1030        let start = revisions["start"].as_u64().unwrap_or(rev.pos);
1031        let ids: Vec<String> = revisions["ids"]
1032            .as_array()
1033            .map(|arr| {
1034                arr.iter()
1035                    .filter_map(|v| v.as_str().map(String::from))
1036                    .collect()
1037            })
1038            .unwrap_or_else(|| vec![rev.hash.clone()]);
1039
1040        build_path_from_revs(
1041            start,
1042            &ids,
1043            NodeOpts {
1044                deleted: doc.deleted,
1045            },
1046            RevStatus::Available,
1047        )
1048    } else {
1049        // Fallback: single-node path (no ancestry available)
1050        RevPath {
1051            pos: rev.pos,
1052            tree: rouchdb_core::rev_tree::RevNode {
1053                hash: rev.hash.clone(),
1054                status: RevStatus::Available,
1055                opts: NodeOpts {
1056                    deleted: doc.deleted,
1057                },
1058                children: vec![],
1059            },
1060        }
1061    };
1062
1063    // Strip _revisions from data before storing
1064    if let serde_json::Value::Object(ref mut map) = doc.data {
1065        map.remove("_revisions");
1066    }
1067
1068    // Merge into existing tree
1069    let existing_tree = inner
1070        .docs
1071        .get(&doc_id)
1072        .map(|s| s.rev_tree.clone())
1073        .unwrap_or_default();
1074
1075    let (merged_tree, _merge_result) = merge_tree(&existing_tree, &new_path, DEFAULT_REV_LIMIT);
1076
1077    // Update sequence
1078    inner.update_seq += 1;
1079    let seq = inner.update_seq;
1080
1081    // Remove old change entry
1082    if let Some(existing) = inner.docs.get(&doc_id) {
1083        inner.changes.remove(&existing.seq);
1084    }
1085
1086    let is_doc_deleted = is_deleted(&merged_tree);
1087
1088    let stored = inner
1089        .docs
1090        .entry(doc_id.clone())
1091        .or_insert_with(|| StoredDoc {
1092            rev_tree: Vec::new(),
1093            rev_data: HashMap::new(),
1094            rev_deleted: HashMap::new(),
1095            seq: 0,
1096        });
1097
1098    stored.rev_tree = merged_tree;
1099    stored.rev_data.insert(rev_str.clone(), doc.data);
1100    stored.rev_deleted.insert(rev_str.clone(), doc.deleted);
1101    stored.seq = seq;
1102
1103    inner.changes.insert(seq, (doc_id.clone(), is_doc_deleted));
1104
1105    DocResult {
1106        ok: true,
1107        id: doc_id,
1108        rev: Some(rev_str),
1109        error: None,
1110        reason: None,
1111    }
1112}
1113
1114// ---------------------------------------------------------------------------
1115// Rev-tree pruning helpers for purge
1116// ---------------------------------------------------------------------------
1117
1118use rouchdb_core::rev_tree::RevNode;
1119
1120/// Remove a specific leaf node from the rev tree. If the node at (pos, hash)
1121/// is a leaf (no children), it's removed from its parent's children list.
1122fn prune_leaf_from_tree(tree: &mut RevTree, target_pos: u64, target_hash: &str) {
1123    for path in tree.iter_mut() {
1124        prune_leaf_from_node(&mut path.tree, path.pos, target_pos, target_hash);
1125    }
1126}
1127
1128/// Recursively remove a matching leaf node from the subtree.
1129/// Returns true if the node at this level should be removed (it matched and was a leaf).
1130fn prune_leaf_from_node(node: &mut RevNode, current_pos: u64, target_pos: u64, target_hash: &str) {
1131    // Remove matching children that are leaves
1132    node.children.retain(|child| {
1133        let child_pos = current_pos + 1;
1134        !(child_pos == target_pos && child.hash == target_hash && child.children.is_empty())
1135    });
1136
1137    // Recurse into remaining children
1138    for child in node.children.iter_mut() {
1139        prune_leaf_from_node(child, current_pos + 1, target_pos, target_hash);
1140    }
1141}
1142
1143/// Check if a rev tree node is effectively empty (no children and no useful data).
1144/// Used to clean up orphaned root paths after leaf pruning.
1145fn is_tree_empty(node: &RevNode) -> bool {
1146    node.children.is_empty() && node.hash.is_empty()
1147}
1148
1149// ---------------------------------------------------------------------------
1150// Tests
1151// ---------------------------------------------------------------------------
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156    use rouchdb_core::document::{AllDocsOptions, BulkDocsOptions, ChangesOptions, GetOptions};
1157
1158    async fn new_db() -> MemoryAdapter {
1159        MemoryAdapter::new("test")
1160    }
1161
1162    #[tokio::test]
1163    async fn info_empty_db() {
1164        let db = new_db().await;
1165        let info = db.info().await.unwrap();
1166        assert_eq!(info.db_name, "test");
1167        assert_eq!(info.doc_count, 0);
1168        assert_eq!(info.update_seq, Seq::Num(0));
1169    }
1170
1171    #[tokio::test]
1172    async fn put_and_get_document() {
1173        let db = new_db().await;
1174
1175        let doc = Document {
1176            id: "doc1".into(),
1177            rev: None,
1178            deleted: false,
1179            data: serde_json::json!({"name": "Alice"}),
1180            attachments: HashMap::new(),
1181        };
1182
1183        let results = db
1184            .bulk_docs(vec![doc], BulkDocsOptions::new())
1185            .await
1186            .unwrap();
1187        assert!(results[0].ok);
1188        assert_eq!(results[0].id, "doc1");
1189        assert!(results[0].rev.is_some());
1190
1191        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1192        assert_eq!(fetched.id, "doc1");
1193        assert_eq!(fetched.data["name"], "Alice");
1194        assert!(fetched.rev.is_some());
1195    }
1196
1197    #[tokio::test]
1198    async fn update_document() {
1199        let db = new_db().await;
1200
1201        // Create
1202        let doc = Document {
1203            id: "doc1".into(),
1204            rev: None,
1205            deleted: false,
1206            data: serde_json::json!({"name": "Alice"}),
1207            attachments: HashMap::new(),
1208        };
1209        let results = db
1210            .bulk_docs(vec![doc], BulkDocsOptions::new())
1211            .await
1212            .unwrap();
1213        let rev1 = results[0].rev.clone().unwrap();
1214
1215        // Update
1216        let rev_parsed: Revision = rev1.parse().unwrap();
1217        let doc2 = Document {
1218            id: "doc1".into(),
1219            rev: Some(rev_parsed),
1220            deleted: false,
1221            data: serde_json::json!({"name": "Bob"}),
1222            attachments: HashMap::new(),
1223        };
1224        let results = db
1225            .bulk_docs(vec![doc2], BulkDocsOptions::new())
1226            .await
1227            .unwrap();
1228        assert!(results[0].ok);
1229
1230        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1231        assert_eq!(fetched.data["name"], "Bob");
1232    }
1233
1234    #[tokio::test]
1235    async fn conflict_on_wrong_rev() {
1236        let db = new_db().await;
1237
1238        let doc = Document {
1239            id: "doc1".into(),
1240            rev: None,
1241            deleted: false,
1242            data: serde_json::json!({"v": 1}),
1243            attachments: HashMap::new(),
1244        };
1245        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1246            .await
1247            .unwrap();
1248
1249        // Try updating with wrong rev
1250        let doc2 = Document {
1251            id: "doc1".into(),
1252            rev: Some(Revision::new(1, "wronghash".into())),
1253            deleted: false,
1254            data: serde_json::json!({"v": 2}),
1255            attachments: HashMap::new(),
1256        };
1257        let results = db
1258            .bulk_docs(vec![doc2], BulkDocsOptions::new())
1259            .await
1260            .unwrap();
1261        assert!(!results[0].ok);
1262        assert_eq!(results[0].error.as_deref(), Some("conflict"));
1263    }
1264
1265    #[tokio::test]
1266    async fn delete_document() {
1267        let db = new_db().await;
1268
1269        let doc = Document {
1270            id: "doc1".into(),
1271            rev: None,
1272            deleted: false,
1273            data: serde_json::json!({"name": "Alice"}),
1274            attachments: HashMap::new(),
1275        };
1276        let results = db
1277            .bulk_docs(vec![doc], BulkDocsOptions::new())
1278            .await
1279            .unwrap();
1280        let rev1: Revision = results[0].rev.clone().unwrap().parse().unwrap();
1281
1282        // Delete
1283        let del = Document {
1284            id: "doc1".into(),
1285            rev: Some(rev1),
1286            deleted: true,
1287            data: serde_json::json!({}),
1288            attachments: HashMap::new(),
1289        };
1290        let results = db
1291            .bulk_docs(vec![del], BulkDocsOptions::new())
1292            .await
1293            .unwrap();
1294        assert!(results[0].ok);
1295
1296        // Get should fail
1297        let err = db.get("doc1", GetOptions::default()).await;
1298        assert!(err.is_err());
1299
1300        // Info should show 0 docs
1301        let info = db.info().await.unwrap();
1302        assert_eq!(info.doc_count, 0);
1303    }
1304
1305    #[tokio::test]
1306    async fn all_docs() {
1307        let db = new_db().await;
1308
1309        for name in ["charlie", "alice", "bob"] {
1310            let doc = Document {
1311                id: name.into(),
1312                rev: None,
1313                deleted: false,
1314                data: serde_json::json!({"name": name}),
1315                attachments: HashMap::new(),
1316            };
1317            db.bulk_docs(vec![doc], BulkDocsOptions::new())
1318                .await
1319                .unwrap();
1320        }
1321
1322        let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
1323        assert_eq!(result.total_rows, 3);
1324        // Should be sorted alphabetically
1325        assert_eq!(result.rows[0].id, "alice");
1326        assert_eq!(result.rows[1].id, "bob");
1327        assert_eq!(result.rows[2].id, "charlie");
1328    }
1329
1330    #[tokio::test]
1331    async fn all_docs_with_include_docs() {
1332        let db = new_db().await;
1333
1334        let doc = Document {
1335            id: "doc1".into(),
1336            rev: None,
1337            deleted: false,
1338            data: serde_json::json!({"name": "Alice"}),
1339            attachments: HashMap::new(),
1340        };
1341        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1342            .await
1343            .unwrap();
1344
1345        let mut opts = AllDocsOptions::new();
1346        opts.include_docs = true;
1347        let result = db.all_docs(opts).await.unwrap();
1348        assert!(result.rows[0].doc.is_some());
1349        let doc = result.rows[0].doc.as_ref().unwrap();
1350        assert_eq!(doc["name"], "Alice");
1351        assert_eq!(doc["_id"], "doc1");
1352    }
1353
1354    #[tokio::test]
1355    async fn changes_feed() {
1356        let db = new_db().await;
1357
1358        for i in 0..3 {
1359            let doc = Document {
1360                id: format!("doc{}", i),
1361                rev: None,
1362                deleted: false,
1363                data: serde_json::json!({"i": i}),
1364                attachments: HashMap::new(),
1365            };
1366            db.bulk_docs(vec![doc], BulkDocsOptions::new())
1367                .await
1368                .unwrap();
1369        }
1370
1371        let changes = db.changes(ChangesOptions::default()).await.unwrap();
1372        assert_eq!(changes.results.len(), 3);
1373        assert_eq!(changes.last_seq, Seq::Num(3));
1374
1375        // Changes since seq 2
1376        let changes = db
1377            .changes(ChangesOptions {
1378                since: Seq::Num(2),
1379                ..Default::default()
1380            })
1381            .await
1382            .unwrap();
1383        assert_eq!(changes.results.len(), 1);
1384        assert_eq!(changes.results[0].id, "doc2");
1385    }
1386
1387    #[tokio::test]
1388    async fn revs_diff() {
1389        let db = new_db().await;
1390
1391        let doc = Document {
1392            id: "doc1".into(),
1393            rev: None,
1394            deleted: false,
1395            data: serde_json::json!({"v": 1}),
1396            attachments: HashMap::new(),
1397        };
1398        let results = db
1399            .bulk_docs(vec![doc], BulkDocsOptions::new())
1400            .await
1401            .unwrap();
1402        let existing_rev = results[0].rev.clone().unwrap();
1403
1404        let mut revs = HashMap::new();
1405        revs.insert(
1406            "doc1".into(),
1407            vec![existing_rev.clone(), "2-doesnotexist".into()],
1408        );
1409        revs.insert("doc2".into(), vec!["1-abc".into()]);
1410
1411        let diff = db.revs_diff(revs).await.unwrap();
1412
1413        // doc1: existing_rev should not be missing, 2-doesnotexist should be
1414        let doc1_diff = diff.results.get("doc1").unwrap();
1415        assert!(!doc1_diff.missing.contains(&existing_rev));
1416        assert!(doc1_diff.missing.contains(&"2-doesnotexist".to_string()));
1417
1418        // doc2: completely missing
1419        let doc2_diff = diff.results.get("doc2").unwrap();
1420        assert!(doc2_diff.missing.contains(&"1-abc".to_string()));
1421    }
1422
1423    #[tokio::test]
1424    async fn local_docs() {
1425        let db = new_db().await;
1426
1427        let doc = serde_json::json!({"checkpoint": 42});
1428        db.put_local("repl-123", doc.clone()).await.unwrap();
1429
1430        let fetched = db.get_local("repl-123").await.unwrap();
1431        assert_eq!(fetched["checkpoint"], 42);
1432
1433        db.remove_local("repl-123").await.unwrap();
1434        assert!(db.get_local("repl-123").await.is_err());
1435    }
1436
1437    #[tokio::test]
1438    async fn replication_mode_bulk_docs() {
1439        let db = new_db().await;
1440
1441        // Insert with explicit revision (replication mode)
1442        let doc = Document {
1443            id: "doc1".into(),
1444            rev: Some(Revision::new(1, "abc123".into())),
1445            deleted: false,
1446            data: serde_json::json!({"name": "replicated"}),
1447            attachments: HashMap::new(),
1448        };
1449
1450        let results = db
1451            .bulk_docs(vec![doc], BulkDocsOptions::replication())
1452            .await
1453            .unwrap();
1454        assert!(results[0].ok);
1455
1456        let fetched = db.get("doc1", GetOptions::default()).await.unwrap();
1457        assert_eq!(fetched.data["name"], "replicated");
1458        assert_eq!(fetched.rev.unwrap().to_string(), "1-abc123");
1459    }
1460
1461    #[tokio::test]
1462    async fn auto_generate_id() {
1463        let db = new_db().await;
1464
1465        let doc = Document {
1466            id: String::new(),
1467            rev: None,
1468            deleted: false,
1469            data: serde_json::json!({"name": "no-id"}),
1470            attachments: HashMap::new(),
1471        };
1472
1473        let results = db
1474            .bulk_docs(vec![doc], BulkDocsOptions::new())
1475            .await
1476            .unwrap();
1477        assert!(results[0].ok);
1478        assert!(!results[0].id.is_empty());
1479    }
1480
1481    #[tokio::test]
1482    async fn destroy_clears_everything() {
1483        let db = new_db().await;
1484
1485        let doc = Document {
1486            id: "doc1".into(),
1487            rev: None,
1488            deleted: false,
1489            data: serde_json::json!({}),
1490            attachments: HashMap::new(),
1491        };
1492        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1493            .await
1494            .unwrap();
1495        db.put_local("x", serde_json::json!({})).await.unwrap();
1496
1497        db.destroy().await.unwrap();
1498
1499        let info = db.info().await.unwrap();
1500        assert_eq!(info.doc_count, 0);
1501        assert_eq!(info.update_seq, Seq::Num(0));
1502    }
1503
1504    #[tokio::test]
1505    async fn bulk_get_documents() {
1506        let db = new_db().await;
1507
1508        let doc = Document {
1509            id: "doc1".into(),
1510            rev: None,
1511            deleted: false,
1512            data: serde_json::json!({"name": "test"}),
1513            attachments: HashMap::new(),
1514        };
1515        db.bulk_docs(vec![doc], BulkDocsOptions::new())
1516            .await
1517            .unwrap();
1518
1519        let result = db
1520            .bulk_get(vec![
1521                BulkGetItem {
1522                    id: "doc1".into(),
1523                    rev: None,
1524                },
1525                BulkGetItem {
1526                    id: "missing".into(),
1527                    rev: None,
1528                },
1529            ])
1530            .await
1531            .unwrap();
1532
1533        assert_eq!(result.results.len(), 2);
1534        assert!(result.results[0].docs[0].ok.is_some());
1535        assert!(result.results[1].docs[0].error.is_some());
1536    }
1537}