Skip to main content

kora_doc/
engine.rs

1//! In-memory document engine providing collection CRUD, secondary indexes,
2//! and WHERE-clause query execution.
3//!
4//! [`DocEngine`] is the top-level entry point. It owns an [`IdRegistry`],
5//! a map of per-collection state (metadata, dictionary, packed documents,
6//! and indexes), and orchestrates the full document lifecycle:
7//!
8//! ## Write Path
9//!
10//! 1. The caller supplies a JSON `Value` to [`DocEngine::set`].
11//! 2. The engine resolves (or allocates) a `DocId` via the registry.
12//! 3. Unique-constraint indexes are checked **before** mutation.
13//! 4. If the document already exists, old index entries are removed.
14//! 5. The JSON is decomposed into a [`PackedDoc`] through the
15//!    [`Decomposer`](crate::decompose::Decomposer) pipeline.
16//! 6. New index entries are inserted for every configured field.
17//!
18//! ## Read Path
19//!
20//! - [`DocEngine::get`] retrieves and recomposes a single document, with
21//!   optional field-level projection.
22//! - [`DocEngine::find`] parses a WHERE expression via
23//!   [`parse_where`](crate::expr::parse_where), walks the AST to collect
24//!   candidate `DocId` sets (using indexes when available, falling back to a
25//!   full collection scan), applies pagination, and recomposes results.
26//!
27//! ## Index Maintenance
28//!
29//! [`DocEngine::create_index`] registers a secondary index and backfills
30//! every existing document. Four index types are supported: `Hash`, `Sorted`,
31//! `Array`, and `Unique`. Index entries are maintained automatically on
32//! `set`, `update`, and `del`.
33//!
34//! ## Mutation
35//!
36//! [`DocEngine::update`] applies a sequence of [`DocMutation`] operations
37//! (Set, Del, Incr, Push, Pull) to an existing document's JSON
38//! representation, then round-trips through `set` so indexes and packed
39//! storage stay consistent.
40
41use std::collections::HashMap;
42use std::time::{SystemTime, UNIX_EPOCH};
43
44use serde_json::Value;
45use thiserror::Error;
46
47use crate::collection::{Collection, CollectionConfig, CollectionError, CompressionProfile};
48use crate::decompose::{DecomposeError, Decomposer};
49use crate::dictionary::{ValueDictionary, ValueDictionaryConfig};
50use crate::expr::{parse_where, Expr, ExprValue};
51use crate::index::{
52    hash32, intersect_sorted, union_sorted, CollectionIndexes, IndexConfig, IndexError, IndexType,
53};
54use crate::packed::PackedDoc;
55use crate::recompose::{RecomposeError, Recomposer};
56use crate::registry::{CollectionId, DocId, FieldId, IdRegistry, RegistryError};
57
58/// Result of a successful `set` operation.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub struct SetResult {
61    /// Internal document ID assigned in the collection.
62    pub internal_id: DocId,
63    /// True if this write inserted a new document key.
64    pub created: bool,
65}
66
67/// Result of a [`DocEngine::insert`] call with an auto-generated ID.
68#[derive(Debug, Clone, PartialEq)]
69pub struct InsertResult {
70    /// The auto-generated external document ID.
71    pub id: String,
72    /// Internal document ID assigned in the collection.
73    pub internal_id: DocId,
74    /// True if this write inserted a new document key (always true for insert).
75    pub created: bool,
76}
77
78/// Mutation operation used by [`DocEngine::update`].
79#[derive(Debug, Clone, PartialEq)]
80pub enum DocMutation {
81    /// Set a field path to a JSON value, creating missing intermediate objects.
82    Set {
83        /// Dotted field path (for example `address.city`).
84        path: String,
85        /// New JSON value for the path.
86        value: Value,
87    },
88    /// Delete one field path when present.
89    Del {
90        /// Dotted field path (for example `tags`).
91        path: String,
92    },
93    /// Increment an existing numeric field by `delta`.
94    Incr {
95        /// Dotted field path to increment.
96        path: String,
97        /// Increment amount.
98        delta: f64,
99    },
100    /// Append one JSON value to an array field, creating the array when missing.
101    Push {
102        /// Dotted field path to an array.
103        path: String,
104        /// Value to append.
105        value: Value,
106    },
107    /// Remove all array items that exactly match the supplied value.
108    Pull {
109        /// Dotted field path to an array.
110        path: String,
111        /// Value to remove.
112        value: Value,
113    },
114}
115
116/// Snapshot of collection metadata and current storage counters.
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct CollectionInfo {
119    /// Collection ID.
120    pub id: CollectionId,
121    /// Collection name.
122    pub name: String,
123    /// Creation timestamp (seconds since UNIX epoch).
124    pub created_at: u64,
125    /// Compression profile.
126    pub compression: CompressionProfile,
127    /// Number of documents currently stored in this engine.
128    pub doc_count: u64,
129    /// Number of entries in the collection dictionary.
130    pub dictionary_entries: usize,
131}
132
133/// Cardinality details for one collection field.
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct DictionaryFieldInfo {
136    /// Field ID assigned by the registry.
137    pub field_id: u16,
138    /// Dotted field path.
139    pub path: String,
140    /// Estimated unique value count observed for the field.
141    pub cardinality_estimate: usize,
142}
143
144/// Snapshot of collection dictionary statistics.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct DictionaryInfo {
147    /// Collection ID.
148    pub collection_id: CollectionId,
149    /// Collection name.
150    pub collection_name: String,
151    /// Number of unique dictionary values.
152    pub dictionary_entries: usize,
153    /// Per-field cardinality estimates.
154    pub fields: Vec<DictionaryFieldInfo>,
155}
156
157/// Snapshot of collection storage footprint.
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct StorageInfo {
160    /// Collection ID.
161    pub collection_id: CollectionId,
162    /// Collection name.
163    pub collection_name: String,
164    /// Number of documents stored.
165    pub doc_count: usize,
166    /// Total packed bytes across all stored documents.
167    pub total_packed_bytes: usize,
168    /// Smallest packed document size in bytes.
169    pub min_doc_bytes: usize,
170    /// Largest packed document size in bytes.
171    pub max_doc_bytes: usize,
172    /// Average packed document size in bytes.
173    pub avg_doc_bytes: usize,
174}
175
176/// Errors returned by `DocEngine`.
177#[derive(Debug, Error)]
178pub enum DocError {
179    /// Collection management error.
180    #[error(transparent)]
181    Collection(#[from] CollectionError),
182    /// Registry operation failed.
183    #[error(transparent)]
184    Registry(#[from] RegistryError),
185    /// JSON decomposition failed.
186    #[error(transparent)]
187    Decompose(#[from] DecomposeError),
188    /// Packed document reconstruction failed.
189    #[error(transparent)]
190    Recompose(#[from] RecomposeError),
191    /// Referenced collection does not exist.
192    #[error("unknown collection '{0}'")]
193    UnknownCollection(String),
194    /// Mutation payload or target path is invalid.
195    #[error("invalid document mutation: {0}")]
196    InvalidMutation(String),
197    /// Index operation failed.
198    #[error(transparent)]
199    Index(#[from] IndexError),
200    /// WHERE expression parse error.
201    #[error("invalid WHERE expression: {0}")]
202    InvalidExpression(String),
203}
204
205#[derive(Debug)]
206struct CollectionState {
207    collection: Collection,
208    dictionary: ValueDictionary,
209    docs_by_internal_id: HashMap<DocId, PackedDoc>,
210    index_config: IndexConfig,
211    indexes: CollectionIndexes,
212}
213
214/// Document engine with collection-local dictionaries and packed docs.
215#[derive(Debug)]
216pub struct DocEngine {
217    registry: IdRegistry,
218    collections: HashMap<CollectionId, CollectionState>,
219    packed_version: u16,
220}
221
222impl DocEngine {
223    /// Create a document engine with packed format version `1`.
224    #[must_use]
225    pub fn new() -> Self {
226        Self {
227            registry: IdRegistry::new(),
228            collections: HashMap::new(),
229            packed_version: 1,
230        }
231    }
232
233    /// Create a collection.
234    pub fn create_collection(
235        &mut self,
236        name: &str,
237        config: CollectionConfig,
238    ) -> Result<CollectionId, DocError> {
239        if self.registry.collection_id(name).is_some() {
240            return Err(DocError::Collection(CollectionError::AlreadyExists(
241                name.to_string(),
242            )));
243        }
244
245        let collection_id = self.registry.get_or_create_collection_id(name)?;
246        let state = CollectionState {
247            collection: Collection::new(name.to_string(), collection_id, config),
248            dictionary: ValueDictionary::new(ValueDictionaryConfig::default()),
249            docs_by_internal_id: HashMap::new(),
250            index_config: IndexConfig::new(),
251            indexes: CollectionIndexes::new(),
252        };
253        self.collections.insert(collection_id, state);
254
255        Ok(collection_id)
256    }
257
258    /// Drop a collection and all its documents.
259    pub fn drop_collection(&mut self, name: &str) -> bool {
260        if let Some(collection_id) = self.registry.remove_collection(name) {
261            self.collections.remove(&collection_id);
262            return true;
263        }
264        false
265    }
266
267    /// Return collection info when present.
268    #[must_use]
269    pub fn collection_info(&self, name: &str) -> Option<CollectionInfo> {
270        let collection_id = self.registry.collection_id(name)?;
271        let state = self.collections.get(&collection_id)?;
272        Some(CollectionInfo {
273            id: state.collection.id(),
274            name: state.collection.name().to_string(),
275            created_at: state.collection.created_at(),
276            compression: state.collection.compression(),
277            doc_count: state.collection.doc_count(),
278            dictionary_entries: state.dictionary.len(),
279        })
280    }
281
282    /// Return dictionary statistics for one collection.
283    pub fn dictionary_info(&self, name: &str) -> Result<DictionaryInfo, DocError> {
284        let collection_id = self.collection_id(name)?;
285        let state = self
286            .collections
287            .get(&collection_id)
288            .ok_or_else(|| DocError::UnknownCollection(name.to_string()))?;
289        let segment = self
290            .registry
291            .segment(collection_id)
292            .ok_or_else(|| DocError::UnknownCollection(name.to_string()))?;
293
294        let fields = segment
295            .field_mappings()
296            .into_iter()
297            .map(|(field_id, path)| DictionaryFieldInfo {
298                field_id,
299                cardinality_estimate: state.dictionary.cardinality_estimate(field_id),
300                path,
301            })
302            .collect();
303
304        Ok(DictionaryInfo {
305            collection_id,
306            collection_name: state.collection.name().to_string(),
307            dictionary_entries: state.dictionary.len(),
308            fields,
309        })
310    }
311
312    /// Return packed storage statistics for one collection.
313    pub fn storage_info(&self, name: &str) -> Result<StorageInfo, DocError> {
314        let collection_id = self.collection_id(name)?;
315        let state = self
316            .collections
317            .get(&collection_id)
318            .ok_or_else(|| DocError::UnknownCollection(name.to_string()))?;
319
320        let mut total_packed_bytes = 0usize;
321        let mut min_doc_bytes = usize::MAX;
322        let mut max_doc_bytes = 0usize;
323
324        for packed in state.docs_by_internal_id.values() {
325            let bytes = packed.byte_size();
326            total_packed_bytes += bytes;
327            min_doc_bytes = min_doc_bytes.min(bytes);
328            max_doc_bytes = max_doc_bytes.max(bytes);
329        }
330
331        let doc_count = state.docs_by_internal_id.len();
332        if doc_count == 0 {
333            min_doc_bytes = 0;
334        }
335        let avg_doc_bytes = if doc_count == 0 {
336            0
337        } else {
338            total_packed_bytes / doc_count
339        };
340
341        Ok(StorageInfo {
342            collection_id,
343            collection_name: state.collection.name().to_string(),
344            doc_count,
345            total_packed_bytes,
346            min_doc_bytes,
347            max_doc_bytes,
348            avg_doc_bytes,
349        })
350    }
351
352    /// Create a secondary index on a collection field.
353    ///
354    /// Backfills all existing documents. For `Unique` indexes, if a duplicate
355    /// value is detected during backfill the index is rolled back and an error
356    /// is returned.
357    pub fn create_index(
358        &mut self,
359        collection: &str,
360        field_path: &str,
361        index_type: IndexType,
362    ) -> Result<(), DocError> {
363        let collection_id = self.collection_id(collection)?;
364        let field_id = self
365            .registry
366            .get_or_create_field_id(collection_id, field_path)?;
367
368        let state = self
369            .collections
370            .get_mut(&collection_id)
371            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
372
373        state.index_config.add(field_id, index_type)?;
374
375        if let Err(err) = Self::backfill_index(
376            &self.registry,
377            &state.dictionary,
378            &state.docs_by_internal_id,
379            &mut state.indexes,
380            collection_id,
381            field_id,
382            field_path,
383            index_type,
384        ) {
385            state.index_config.remove(field_id).ok();
386            state.indexes.remove_field(field_id);
387            return Err(err);
388        }
389
390        Ok(())
391    }
392
393    /// Remove a secondary index from a collection field.
394    pub fn drop_index(&mut self, collection: &str, field_path: &str) -> Result<(), DocError> {
395        let collection_id = self.collection_id(collection)?;
396        let state = self
397            .collections
398            .get_mut(&collection_id)
399            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
400
401        let segment = self
402            .registry
403            .segment(collection_id)
404            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
405
406        let field_id = segment.field_id(field_path).ok_or_else(|| {
407            DocError::InvalidMutation(format!("no index found for field '{field_path}'"))
408        })?;
409
410        state.index_config.remove(field_id)?;
411        state.indexes.remove_field(field_id);
412
413        Ok(())
414    }
415
416    /// Return all configured indexes for a collection.
417    pub fn indexes(&self, collection: &str) -> Result<Vec<(String, IndexType)>, DocError> {
418        let collection_id = self.collection_id(collection)?;
419        let state = self
420            .collections
421            .get(&collection_id)
422            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
423
424        let segment = self
425            .registry
426            .segment(collection_id)
427            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
428
429        let mut result = Vec::new();
430        for (&field_id, &idx_type) in state.index_config.entries() {
431            if let Some(path) = segment.field_path(field_id) {
432                result.push((path.to_string(), idx_type));
433            }
434        }
435        result.sort_by(|(a, _), (b, _)| a.cmp(b));
436        Ok(result)
437    }
438
439    /// Insert or replace one JSON document.
440    pub fn set(
441        &mut self,
442        collection: &str,
443        external_doc_id: &str,
444        json: &Value,
445    ) -> Result<SetResult, DocError> {
446        let collection_id = self.collection_id(collection)?;
447        let internal_id = self
448            .registry
449            .get_or_create_doc_internal_id(collection_id, external_doc_id)?;
450
451        let state = self
452            .collections
453            .get_mut(&collection_id)
454            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
455
456        let is_update = state.docs_by_internal_id.contains_key(&internal_id);
457
458        Self::check_unique_constraints(
459            &self.registry,
460            &state.index_config,
461            &state.indexes,
462            &state.dictionary,
463            &state.docs_by_internal_id,
464            collection_id,
465            internal_id,
466            json,
467        )?;
468
469        if is_update {
470            if let Some(old_packed) = state.docs_by_internal_id.get(&internal_id) {
471                if let Ok(old_json) = Recomposer::recompose(
472                    old_packed,
473                    &self.registry,
474                    &state.dictionary,
475                    collection_id,
476                ) {
477                    Self::remove_index_entries(
478                        &self.registry,
479                        &state.index_config,
480                        &mut state.indexes,
481                        collection_id,
482                        internal_id,
483                        &old_json,
484                    );
485                }
486            }
487        }
488
489        let (registry, collections) = (&mut self.registry, &mut self.collections);
490        let state = collections
491            .get_mut(&collection_id)
492            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
493
494        let mut decomposer = Decomposer::new(
495            collection_id,
496            registry,
497            &mut state.dictionary,
498            self.packed_version,
499        );
500        let packed = decomposer.decompose(json, current_unix_seconds_u32())?;
501
502        let created = state
503            .docs_by_internal_id
504            .insert(internal_id, packed)
505            .is_none();
506        if created {
507            state.collection.increment_doc_count();
508        }
509
510        Self::add_index_entries(
511            &self.registry,
512            &state.index_config,
513            &mut state.indexes,
514            collection_id,
515            internal_id,
516            json,
517        );
518
519        Ok(SetResult {
520            internal_id,
521            created,
522        })
523    }
524
525    /// Insert a document with an auto-generated ID.
526    ///
527    /// Returns the generated ID and whether the document was newly created.
528    pub fn insert(&mut self, collection: &str, json: &Value) -> Result<InsertResult, DocError> {
529        let collection_id = self.collection_id(collection)?;
530        let next_id = self
531            .registry
532            .segment(collection_id)
533            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?
534            .next_doc_id();
535        let generated_id = format!("{}", next_id);
536        let set_result = self.set(collection, &generated_id, json)?;
537        Ok(InsertResult {
538            id: generated_id,
539            internal_id: set_result.internal_id,
540            created: set_result.created,
541        })
542    }
543
544    /// Get a full document or a projected subset of fields.
545    pub fn get(
546        &self,
547        collection: &str,
548        external_doc_id: &str,
549        projection: Option<&[&str]>,
550    ) -> Result<Option<Value>, DocError> {
551        let collection_id = self.collection_id(collection)?;
552        let Some(internal_id) = self
553            .registry
554            .segment(collection_id)
555            .and_then(|segment| segment.doc_internal_id(external_doc_id))
556        else {
557            return Ok(None);
558        };
559
560        let Some(state) = self.collections.get(&collection_id) else {
561            return Ok(None);
562        };
563        let Some(packed) = state.docs_by_internal_id.get(&internal_id) else {
564            return Ok(None);
565        };
566
567        match projection {
568            Some(paths) => {
569                let field_ids = self.resolve_field_ids(collection_id, paths);
570                let value = Recomposer::project(
571                    packed,
572                    &field_ids,
573                    &self.registry,
574                    &state.dictionary,
575                    collection_id,
576                )?;
577                Ok(Some(value))
578            }
579            None => {
580                let value = Recomposer::recompose(
581                    packed,
582                    &self.registry,
583                    &state.dictionary,
584                    collection_id,
585                )?;
586                Ok(Some(value))
587            }
588        }
589    }
590
591    /// Apply field-level mutations to an existing document.
592    ///
593    /// Returns `Ok(true)` when the document existed and was rewritten, `Ok(false)` when the
594    /// target document does not exist.
595    pub fn update(
596        &mut self,
597        collection: &str,
598        external_doc_id: &str,
599        mutations: &[DocMutation],
600    ) -> Result<bool, DocError> {
601        if mutations.is_empty() {
602            return Err(DocError::InvalidMutation(
603                "update requires at least one mutation".to_string(),
604            ));
605        }
606
607        let Some(mut doc) = self.get(collection, external_doc_id, None)? else {
608            return Ok(false);
609        };
610
611        for mutation in mutations {
612            match mutation {
613                DocMutation::Set { path, value } => {
614                    set_path(&mut doc, path, value.clone())?;
615                }
616                DocMutation::Del { path } => {
617                    del_path(&mut doc, path)?;
618                }
619                DocMutation::Incr { path, delta } => {
620                    incr_path(&mut doc, path, *delta)?;
621                }
622                DocMutation::Push { path, value } => {
623                    push_path(&mut doc, path, value.clone())?;
624                }
625                DocMutation::Pull { path, value } => {
626                    pull_path(&mut doc, path, value)?;
627                }
628            }
629        }
630
631        self.set(collection, external_doc_id, &doc)?;
632        Ok(true)
633    }
634
635    /// Delete a document by external ID.
636    pub fn del(&mut self, collection: &str, external_doc_id: &str) -> Result<bool, DocError> {
637        let collection_id = self.collection_id(collection)?;
638        let Some(internal_id) = self
639            .registry
640            .segment(collection_id)
641            .and_then(|segment| segment.doc_internal_id(external_doc_id))
642        else {
643            return Ok(false);
644        };
645
646        let Some(state) = self.collections.get_mut(&collection_id) else {
647            return Ok(false);
648        };
649
650        if let Some(packed) = state.docs_by_internal_id.get(&internal_id) {
651            if let Ok(old_json) =
652                Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)
653            {
654                Self::remove_index_entries(
655                    &self.registry,
656                    &state.index_config,
657                    &mut state.indexes,
658                    collection_id,
659                    internal_id,
660                    &old_json,
661                );
662            }
663        }
664
665        let removed = state.docs_by_internal_id.remove(&internal_id).is_some();
666        if removed {
667            state.collection.decrement_doc_count();
668        }
669        Ok(removed)
670    }
671
672    /// Check whether a document exists.
673    pub fn exists(&self, collection: &str, external_doc_id: &str) -> Result<bool, DocError> {
674        let collection_id = self.collection_id(collection)?;
675        let Some(internal_id) = self
676            .registry
677            .segment(collection_id)
678            .and_then(|segment| segment.doc_internal_id(external_doc_id))
679        else {
680            return Ok(false);
681        };
682
683        Ok(self
684            .collections
685            .get(&collection_id)
686            .is_some_and(|state| state.docs_by_internal_id.contains_key(&internal_id)))
687    }
688
689    fn collection_id(&self, name: &str) -> Result<CollectionId, DocError> {
690        self.registry
691            .collection_id(name)
692            .ok_or_else(|| DocError::UnknownCollection(name.to_string()))
693    }
694
695    fn resolve_field_ids(&self, collection_id: CollectionId, paths: &[&str]) -> Vec<u16> {
696        let Some(segment) = self.registry.segment(collection_id) else {
697            return Vec::new();
698        };
699        paths
700            .iter()
701            .filter_map(|path| segment.field_id(path))
702            .collect()
703    }
704
705    #[allow(clippy::too_many_arguments)]
706    fn backfill_index(
707        registry: &IdRegistry,
708        dictionary: &ValueDictionary,
709        docs: &HashMap<DocId, PackedDoc>,
710        indexes: &mut CollectionIndexes,
711        collection_id: CollectionId,
712        field_id: FieldId,
713        field_path: &str,
714        index_type: IndexType,
715    ) -> Result<(), DocError> {
716        for (&doc_id, packed) in docs {
717            let json = Recomposer::recompose(packed, registry, dictionary, collection_id)?;
718            if let Some(field_value) = resolve_json_path(&json, field_path) {
719                if index_type == IndexType::Unique {
720                    let Some(hashed) = value_to_hash(field_value) else {
721                        continue;
722                    };
723                    if let Some(existing) = find_unique_conflict(
724                        registry,
725                        dictionary,
726                        docs,
727                        collection_id,
728                        field_path,
729                        field_value,
730                        doc_id,
731                        indexes
732                            .unique(field_id)
733                            .map(|unique_idx| unique_idx.lookup(hashed))
734                            .unwrap_or(&[]),
735                    )? {
736                        return Err(DocError::Index(IndexError::UniqueViolation {
737                            hash: hashed,
738                            existing_doc_id: existing,
739                        }));
740                    }
741                }
742                add_single_field_entry(indexes, field_id, index_type, doc_id, field_value)?;
743            }
744        }
745        Ok(())
746    }
747
748    #[allow(clippy::too_many_arguments)]
749    fn check_unique_constraints(
750        registry: &IdRegistry,
751        index_config: &IndexConfig,
752        indexes: &CollectionIndexes,
753        dictionary: &ValueDictionary,
754        docs: &HashMap<DocId, PackedDoc>,
755        collection_id: CollectionId,
756        doc_id: DocId,
757        json: &Value,
758    ) -> Result<(), DocError> {
759        let Some(segment) = registry.segment(collection_id) else {
760            return Ok(());
761        };
762
763        for (&field_id, &idx_type) in index_config.entries() {
764            if idx_type != IndexType::Unique {
765                continue;
766            }
767            let Some(path) = segment.field_path(field_id) else {
768                continue;
769            };
770            let Some(field_value) = resolve_json_path(json, path) else {
771                continue;
772            };
773            let hashed = value_to_hash(field_value);
774            let Some(hashed) = hashed else {
775                continue;
776            };
777            if let Some(unique_idx) = indexes.unique(field_id) {
778                if let Some(existing) = find_unique_conflict(
779                    registry,
780                    dictionary,
781                    docs,
782                    collection_id,
783                    path,
784                    field_value,
785                    doc_id,
786                    unique_idx.lookup(hashed),
787                )? {
788                    return Err(DocError::Index(IndexError::UniqueViolation {
789                        hash: hashed,
790                        existing_doc_id: existing,
791                    }));
792                }
793            }
794        }
795        Ok(())
796    }
797
798    fn add_index_entries(
799        registry: &IdRegistry,
800        index_config: &IndexConfig,
801        indexes: &mut CollectionIndexes,
802        collection_id: CollectionId,
803        doc_id: DocId,
804        json: &Value,
805    ) {
806        let Some(segment) = registry.segment(collection_id) else {
807            return;
808        };
809
810        for (&field_id, &idx_type) in index_config.entries() {
811            let Some(path) = segment.field_path(field_id) else {
812                continue;
813            };
814            let Some(field_value) = resolve_json_path(json, path) else {
815                continue;
816            };
817            let _ = add_single_field_entry(indexes, field_id, idx_type, doc_id, field_value);
818        }
819    }
820
821    fn remove_index_entries(
822        registry: &IdRegistry,
823        index_config: &IndexConfig,
824        indexes: &mut CollectionIndexes,
825        collection_id: CollectionId,
826        doc_id: DocId,
827        json: &Value,
828    ) {
829        let Some(segment) = registry.segment(collection_id) else {
830            return;
831        };
832
833        for (&field_id, &idx_type) in index_config.entries() {
834            let Some(path) = segment.field_path(field_id) else {
835                continue;
836            };
837            let Some(field_value) = resolve_json_path(json, path) else {
838                continue;
839            };
840            remove_single_field_entry(indexes, field_id, idx_type, doc_id, field_value);
841        }
842    }
843
844    /// Execute a WHERE query and return matching documents.
845    #[allow(clippy::too_many_arguments)]
846    pub fn find(
847        &self,
848        collection: &str,
849        where_clause: &str,
850        projection: Option<&[&str]>,
851        limit: Option<usize>,
852        offset: usize,
853        order_by: Option<&str>,
854        order_desc: bool,
855    ) -> Result<Vec<Value>, DocError> {
856        let collection_id = self.collection_id(collection)?;
857        let state = self
858            .collections
859            .get(&collection_id)
860            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
861
862        let expr = parse_where(where_clause)
863            .map_err(|err| DocError::InvalidExpression(err.to_string()))?;
864
865        let doc_ids = self.execute_expr(collection_id, state, &expr)?;
866
867        let doc_ids = if let Some(sort_field) = order_by {
868            self.sort_doc_ids(collection_id, state, doc_ids, sort_field, order_desc)?
869        } else {
870            doc_ids
871        };
872
873        let end = match limit {
874            Some(lim) => (offset.saturating_add(lim)).min(doc_ids.len()),
875            None => doc_ids.len(),
876        };
877        let start = offset.min(doc_ids.len());
878        let page = &doc_ids[start..end];
879
880        let mut results = Vec::with_capacity(page.len());
881        for &doc_id in page {
882            let Some(packed) = state.docs_by_internal_id.get(&doc_id) else {
883                continue;
884            };
885            let value = match projection {
886                Some(paths) => {
887                    let field_ids = self.resolve_field_ids(collection_id, paths);
888                    Recomposer::project(
889                        packed,
890                        &field_ids,
891                        &self.registry,
892                        &state.dictionary,
893                        collection_id,
894                    )?
895                }
896                None => {
897                    Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)?
898                }
899            };
900            results.push(value);
901        }
902
903        Ok(results)
904    }
905
906    /// Count documents matching a WHERE clause.
907    pub fn count(&self, collection: &str, where_clause: &str) -> Result<u64, DocError> {
908        let collection_id = self.collection_id(collection)?;
909        let state = self
910            .collections
911            .get(&collection_id)
912            .ok_or_else(|| DocError::UnknownCollection(collection.to_string()))?;
913
914        let expr = parse_where(where_clause)
915            .map_err(|err| DocError::InvalidExpression(err.to_string()))?;
916
917        let doc_ids = self.execute_expr(collection_id, state, &expr)?;
918        Ok(doc_ids.len() as u64)
919    }
920
921    fn execute_expr(
922        &self,
923        collection_id: CollectionId,
924        state: &CollectionState,
925        expr: &Expr,
926    ) -> Result<Vec<DocId>, DocError> {
927        match expr {
928            Expr::And(left, right) => {
929                let left_ids = self.execute_expr(collection_id, state, left)?;
930                let right_ids = self.execute_expr(collection_id, state, right)?;
931                Ok(intersect_sorted(&left_ids, &right_ids))
932            }
933            Expr::Or(left, right) => {
934                let left_ids = self.execute_expr(collection_id, state, left)?;
935                let right_ids = self.execute_expr(collection_id, state, right)?;
936                Ok(union_sorted(&left_ids, &right_ids))
937            }
938            Expr::Not(_) | Expr::Exists(_) => self.fallback_scan(collection_id, state, expr),
939            _ => self.execute_leaf(collection_id, state, expr),
940        }
941    }
942
943    fn execute_leaf(
944        &self,
945        collection_id: CollectionId,
946        state: &CollectionState,
947        expr: &Expr,
948    ) -> Result<Vec<DocId>, DocError> {
949        let field_path = expr_field(expr);
950        let segment = self.registry.segment(collection_id);
951        let field_id = segment.and_then(|seg| seg.field_id(field_path));
952        let index_type = field_id.and_then(|fid| state.index_config.lookup(fid));
953
954        match (expr, index_type, field_id) {
955            (Expr::Eq(_, value), Some(IndexType::Hash), Some(fid)) => {
956                let Some(hashed) = expr_value_to_hash(value) else {
957                    return self.fallback_scan(collection_id, state, expr);
958                };
959                let candidates = state
960                    .indexes
961                    .hash(fid)
962                    .map_or_else(Vec::new, |idx| idx.lookup(hashed).to_vec());
963                self.filter_candidates_by_expr(collection_id, state, expr, candidates)
964            }
965
966            (Expr::Eq(_, value), Some(IndexType::Unique), Some(fid)) => {
967                let Some(hashed) = expr_value_to_hash(value) else {
968                    return self.fallback_scan(collection_id, state, expr);
969                };
970                let candidates = state
971                    .indexes
972                    .unique(fid)
973                    .map_or_else(Vec::new, |idx| idx.lookup(hashed).to_vec());
974                self.filter_candidates_by_expr(collection_id, state, expr, candidates)
975            }
976
977            (Expr::Eq(_, ExprValue::Number(n)), Some(IndexType::Sorted), Some(fid)) => Ok(state
978                .indexes
979                .sorted(fid)
980                .map_or_else(Vec::new, |idx| idx.range_query(*n, *n))),
981
982            (Expr::Gte(_, n), Some(IndexType::Sorted), Some(fid)) => Ok(state
983                .indexes
984                .sorted(fid)
985                .map_or_else(Vec::new, |idx| idx.range_query(*n, f64::MAX))),
986
987            (Expr::Lte(_, n), Some(IndexType::Sorted), Some(fid)) => Ok(state
988                .indexes
989                .sorted(fid)
990                .map_or_else(Vec::new, |idx| idx.range_query(f64::MIN, *n))),
991
992            (Expr::Gt(_, n), Some(IndexType::Sorted), Some(fid)) => {
993                let candidates = state
994                    .indexes
995                    .sorted(fid)
996                    .map_or_else(Vec::new, |idx| idx.range_query(*n, f64::MAX));
997                self.filter_numeric_boundary(
998                    collection_id,
999                    state,
1000                    field_path,
1001                    candidates,
1002                    *n,
1003                    |v, boundary| v > boundary,
1004                )
1005            }
1006
1007            (Expr::Lt(_, n), Some(IndexType::Sorted), Some(fid)) => {
1008                let candidates = state
1009                    .indexes
1010                    .sorted(fid)
1011                    .map_or_else(Vec::new, |idx| idx.range_query(f64::MIN, *n));
1012                self.filter_numeric_boundary(
1013                    collection_id,
1014                    state,
1015                    field_path,
1016                    candidates,
1017                    *n,
1018                    |v, boundary| v < boundary,
1019                )
1020            }
1021
1022            (Expr::Contains(_, value), Some(IndexType::Array), Some(fid)) => {
1023                let Some(hashed) = expr_value_to_hash(value) else {
1024                    return self.fallback_scan(collection_id, state, expr);
1025                };
1026                let candidates = state
1027                    .indexes
1028                    .array(fid)
1029                    .map_or_else(Vec::new, |idx| idx.lookup(hashed).to_vec());
1030                self.filter_candidates_by_expr(collection_id, state, expr, candidates)
1031            }
1032
1033            (Expr::In(_, values), Some(IndexType::Hash), Some(fid)) => {
1034                let mut all_candidates = Vec::new();
1035                for value in values {
1036                    if let Some(hashed) = expr_value_to_hash(value) {
1037                        if let Some(idx) = state.indexes.hash(fid) {
1038                            all_candidates.extend_from_slice(idx.lookup(hashed));
1039                        }
1040                    }
1041                }
1042                all_candidates.sort_unstable();
1043                all_candidates.dedup();
1044                self.filter_candidates_by_expr(collection_id, state, expr, all_candidates)
1045            }
1046
1047            (Expr::In(_, values), Some(IndexType::Unique), Some(fid)) => {
1048                let mut all_candidates = Vec::new();
1049                for value in values {
1050                    if let Some(hashed) = expr_value_to_hash(value) {
1051                        if let Some(idx) = state.indexes.unique(fid) {
1052                            all_candidates.extend_from_slice(idx.lookup(hashed));
1053                        }
1054                    }
1055                }
1056                all_candidates.sort_unstable();
1057                all_candidates.dedup();
1058                self.filter_candidates_by_expr(collection_id, state, expr, all_candidates)
1059            }
1060
1061            _ => self.fallback_scan(collection_id, state, expr),
1062        }
1063    }
1064
1065    fn filter_numeric_boundary(
1066        &self,
1067        collection_id: CollectionId,
1068        state: &CollectionState,
1069        field_path: &str,
1070        candidates: Vec<DocId>,
1071        boundary: f64,
1072        cmp: fn(f64, f64) -> bool,
1073    ) -> Result<Vec<DocId>, DocError> {
1074        let mut result = Vec::with_capacity(candidates.len());
1075        for doc_id in candidates {
1076            let Some(packed) = state.docs_by_internal_id.get(&doc_id) else {
1077                continue;
1078            };
1079            let json =
1080                Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)?;
1081            if let Some(field_val) = resolve_json_path(&json, field_path) {
1082                if let Some(num) = field_val.as_f64() {
1083                    if cmp(num, boundary) {
1084                        result.push(doc_id);
1085                    }
1086                }
1087            }
1088        }
1089        Ok(result)
1090    }
1091
1092    fn filter_candidates_by_expr(
1093        &self,
1094        collection_id: CollectionId,
1095        state: &CollectionState,
1096        expr: &Expr,
1097        candidates: Vec<DocId>,
1098    ) -> Result<Vec<DocId>, DocError> {
1099        let mut filtered = Vec::with_capacity(candidates.len());
1100        for doc_id in candidates {
1101            let Some(packed) = state.docs_by_internal_id.get(&doc_id) else {
1102                continue;
1103            };
1104            let json =
1105                Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)?;
1106            if eval_expr_on_json(&json, expr) {
1107                filtered.push(doc_id);
1108            }
1109        }
1110        Ok(filtered)
1111    }
1112
1113    fn fallback_scan(
1114        &self,
1115        collection_id: CollectionId,
1116        state: &CollectionState,
1117        expr: &Expr,
1118    ) -> Result<Vec<DocId>, DocError> {
1119        let mut result = Vec::new();
1120        let mut doc_ids: Vec<DocId> = state.docs_by_internal_id.keys().copied().collect();
1121        doc_ids.sort_unstable();
1122
1123        for doc_id in doc_ids {
1124            let Some(packed) = state.docs_by_internal_id.get(&doc_id) else {
1125                continue;
1126            };
1127            let json =
1128                Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)?;
1129            if eval_expr_on_json(&json, expr) {
1130                result.push(doc_id);
1131            }
1132        }
1133
1134        Ok(result)
1135    }
1136
1137    fn sort_doc_ids(
1138        &self,
1139        collection_id: CollectionId,
1140        state: &CollectionState,
1141        doc_ids: Vec<DocId>,
1142        sort_field: &str,
1143        descending: bool,
1144    ) -> Result<Vec<DocId>, DocError> {
1145        let mut keyed: Vec<(DocId, Option<Value>)> = Vec::with_capacity(doc_ids.len());
1146        for &doc_id in &doc_ids {
1147            let sort_val = state
1148                .docs_by_internal_id
1149                .get(&doc_id)
1150                .and_then(|packed| {
1151                    Recomposer::recompose(packed, &self.registry, &state.dictionary, collection_id)
1152                        .ok()
1153                })
1154                .and_then(|json| resolve_json_path(&json, sort_field).cloned());
1155            keyed.push((doc_id, sort_val));
1156        }
1157
1158        keyed.sort_by(|a, b| {
1159            let ordering = cmp_json_values(&a.1, &b.1);
1160            if descending {
1161                ordering.reverse()
1162            } else {
1163                ordering
1164            }
1165        });
1166
1167        Ok(keyed.into_iter().map(|(id, _)| id).collect())
1168    }
1169}
1170
1171impl Default for DocEngine {
1172    fn default() -> Self {
1173        Self::new()
1174    }
1175}
1176
1177fn current_unix_seconds_u32() -> u32 {
1178    let seconds = SystemTime::now()
1179        .duration_since(UNIX_EPOCH)
1180        .map_or(0, |duration| duration.as_secs());
1181    u32::try_from(seconds).unwrap_or(u32::MAX)
1182}
1183
1184fn resolve_json_path<'a>(root: &'a Value, path: &str) -> Option<&'a Value> {
1185    let mut current = root;
1186    for part in path.split('.') {
1187        current = current.as_object()?.get(part)?;
1188    }
1189    Some(current)
1190}
1191
1192fn value_to_hash(value: &Value) -> Option<u32> {
1193    match value {
1194        Value::String(s) => Some(hash32(s.as_bytes())),
1195        Value::Bool(true) => Some(hash32(b"true")),
1196        Value::Bool(false) => Some(hash32(b"false")),
1197        Value::Number(n) => Some(hash32(n.to_string().as_bytes())),
1198        _ => None,
1199    }
1200}
1201
1202fn value_to_score(value: &Value) -> Option<f64> {
1203    value.as_f64()
1204}
1205
1206#[allow(clippy::too_many_arguments)]
1207fn find_unique_conflict(
1208    registry: &IdRegistry,
1209    dictionary: &ValueDictionary,
1210    docs: &HashMap<DocId, PackedDoc>,
1211    collection_id: CollectionId,
1212    field_path: &str,
1213    field_value: &Value,
1214    current_doc_id: DocId,
1215    candidates: &[DocId],
1216) -> Result<Option<DocId>, DocError> {
1217    for &candidate_id in candidates {
1218        if candidate_id == current_doc_id {
1219            continue;
1220        }
1221        let Some(candidate_packed) = docs.get(&candidate_id) else {
1222            continue;
1223        };
1224        let candidate_json =
1225            Recomposer::recompose(candidate_packed, registry, dictionary, collection_id)?;
1226        let Some(candidate_value) = resolve_json_path(&candidate_json, field_path) else {
1227            continue;
1228        };
1229        if candidate_value == field_value {
1230            return Ok(Some(candidate_id));
1231        }
1232    }
1233    Ok(None)
1234}
1235
1236fn add_single_field_entry(
1237    indexes: &mut CollectionIndexes,
1238    field_id: FieldId,
1239    index_type: IndexType,
1240    doc_id: DocId,
1241    value: &Value,
1242) -> Result<(), DocError> {
1243    if value.is_null() {
1244        return Ok(());
1245    }
1246
1247    match index_type {
1248        IndexType::Hash => {
1249            if let Some(hashed) = value_to_hash(value) {
1250                indexes.get_or_create_hash(field_id).add(hashed, doc_id);
1251            }
1252        }
1253        IndexType::Sorted => {
1254            if let Some(score) = value_to_score(value) {
1255                indexes.get_or_create_sorted(field_id).add(score, doc_id);
1256            }
1257        }
1258        IndexType::Array => {
1259            if let Value::Array(items) = value {
1260                let array_idx = indexes.get_or_create_array(field_id);
1261                for item in items {
1262                    if let Some(hashed) = value_to_hash(item) {
1263                        array_idx.add(hashed, doc_id);
1264                    }
1265                }
1266            }
1267        }
1268        IndexType::Unique => {
1269            if let Some(hashed) = value_to_hash(value) {
1270                indexes.get_or_create_unique(field_id).add(hashed, doc_id);
1271            }
1272        }
1273    }
1274    Ok(())
1275}
1276
1277fn remove_single_field_entry(
1278    indexes: &mut CollectionIndexes,
1279    field_id: FieldId,
1280    index_type: IndexType,
1281    doc_id: DocId,
1282    value: &Value,
1283) {
1284    if value.is_null() {
1285        return;
1286    }
1287
1288    match index_type {
1289        IndexType::Hash => {
1290            if let Some(hashed) = value_to_hash(value) {
1291                indexes.get_or_create_hash(field_id).remove(hashed, doc_id);
1292            }
1293        }
1294        IndexType::Sorted => {
1295            if let Some(score) = value_to_score(value) {
1296                indexes.get_or_create_sorted(field_id).remove(score, doc_id);
1297            }
1298        }
1299        IndexType::Array => {
1300            if let Value::Array(items) = value {
1301                let array_idx = indexes.get_or_create_array(field_id);
1302                for item in items {
1303                    if let Some(hashed) = value_to_hash(item) {
1304                        array_idx.remove(hashed, doc_id);
1305                    }
1306                }
1307            }
1308        }
1309        IndexType::Unique => {
1310            if let Some(hashed) = value_to_hash(value) {
1311                indexes
1312                    .get_or_create_unique(field_id)
1313                    .remove(hashed, doc_id);
1314            }
1315        }
1316    }
1317}
1318
1319fn expr_field(expr: &Expr) -> &str {
1320    match expr {
1321        Expr::Eq(f, _)
1322        | Expr::Neq(f, _)
1323        | Expr::Gt(f, _)
1324        | Expr::Gte(f, _)
1325        | Expr::Lt(f, _)
1326        | Expr::Lte(f, _)
1327        | Expr::Contains(f, _)
1328        | Expr::In(f, _)
1329        | Expr::Exists(f) => f.as_str(),
1330        Expr::Not(inner) => expr_field(inner),
1331        Expr::And(_, _) | Expr::Or(_, _) => "",
1332    }
1333}
1334
1335fn expr_value_to_hash(value: &ExprValue) -> Option<u32> {
1336    match value {
1337        ExprValue::String(s) => Some(hash32(s.as_bytes())),
1338        ExprValue::Bool(true) => Some(hash32(b"true")),
1339        ExprValue::Bool(false) => Some(hash32(b"false")),
1340        ExprValue::Number(n) => Some(hash32(n.to_string().as_bytes())),
1341        ExprValue::Null => None,
1342    }
1343}
1344
1345fn eval_expr_on_json(doc: &Value, expr: &Expr) -> bool {
1346    match expr {
1347        Expr::Eq(path, value) => {
1348            let Some(field_val) = resolve_json_path(doc, path) else {
1349                return false;
1350            };
1351            json_matches_expr_value(field_val, value)
1352        }
1353        Expr::Neq(path, value) => {
1354            let Some(field_val) = resolve_json_path(doc, path) else {
1355                return true;
1356            };
1357            !json_matches_expr_value(field_val, value)
1358        }
1359        Expr::Gt(path, n) => resolve_json_path(doc, path)
1360            .and_then(|v| v.as_f64())
1361            .is_some_and(|v| v > *n),
1362        Expr::Gte(path, n) => resolve_json_path(doc, path)
1363            .and_then(|v| v.as_f64())
1364            .is_some_and(|v| v >= *n),
1365        Expr::Lt(path, n) => resolve_json_path(doc, path)
1366            .and_then(|v| v.as_f64())
1367            .is_some_and(|v| v < *n),
1368        Expr::Lte(path, n) => resolve_json_path(doc, path)
1369            .and_then(|v| v.as_f64())
1370            .is_some_and(|v| v <= *n),
1371        Expr::Contains(path, value) => {
1372            let Some(Value::Array(items)) = resolve_json_path(doc, path) else {
1373                return false;
1374            };
1375            items
1376                .iter()
1377                .any(|item| json_matches_expr_value(item, value))
1378        }
1379        Expr::In(path, values) => {
1380            let Some(field_val) = resolve_json_path(doc, path) else {
1381                return false;
1382            };
1383            values.iter().any(|v| json_matches_expr_value(field_val, v))
1384        }
1385        Expr::Exists(path) => resolve_json_path(doc, path).is_some(),
1386        Expr::Not(inner) => !eval_expr_on_json(doc, inner),
1387        Expr::And(left, right) => eval_expr_on_json(doc, left) && eval_expr_on_json(doc, right),
1388        Expr::Or(left, right) => eval_expr_on_json(doc, left) || eval_expr_on_json(doc, right),
1389    }
1390}
1391
1392fn json_matches_expr_value(json_val: &Value, expr_val: &ExprValue) -> bool {
1393    match (json_val, expr_val) {
1394        (Value::String(a), ExprValue::String(b)) => a == b,
1395        (Value::Number(a), ExprValue::Number(b)) => a.as_f64().is_some_and(|v| v == *b),
1396        (Value::Bool(a), ExprValue::Bool(b)) => a == b,
1397        (Value::Null, ExprValue::Null) => true,
1398        _ => false,
1399    }
1400}
1401
1402fn cmp_json_values(a: &Option<Value>, b: &Option<Value>) -> std::cmp::Ordering {
1403    use std::cmp::Ordering;
1404    match (a, b) {
1405        (None, None) => Ordering::Equal,
1406        (None, Some(_)) => Ordering::Greater,
1407        (Some(_), None) => Ordering::Less,
1408        (Some(va), Some(vb)) => cmp_json_value_inner(va, vb),
1409    }
1410}
1411
1412fn cmp_json_value_inner(a: &Value, b: &Value) -> std::cmp::Ordering {
1413    use std::cmp::Ordering;
1414    match (a, b) {
1415        (Value::Number(a), Value::Number(b)) => {
1416            let fa = a.as_f64().unwrap_or(0.0);
1417            let fb = b.as_f64().unwrap_or(0.0);
1418            fa.partial_cmp(&fb).unwrap_or(Ordering::Equal)
1419        }
1420        (Value::String(a), Value::String(b)) => a.cmp(b),
1421        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
1422        (Value::Null, Value::Null) => Ordering::Equal,
1423        _ => Ordering::Equal,
1424    }
1425}
1426
1427fn set_path(root: &mut Value, path: &str, value: Value) -> Result<(), DocError> {
1428    let parts = parse_path(path)?;
1429    let leaf = parts[parts.len() - 1];
1430    let Some(parent) = resolve_parent_object_mut(root, &parts, path, true)? else {
1431        return Err(DocError::InvalidMutation(format!(
1432            "SET path '{path}' is invalid"
1433        )));
1434    };
1435    parent.insert(leaf.to_string(), value);
1436    Ok(())
1437}
1438
1439fn del_path(root: &mut Value, path: &str) -> Result<(), DocError> {
1440    let parts = parse_path(path)?;
1441    let leaf = parts[parts.len() - 1];
1442    let Some(parent) = resolve_parent_object_mut(root, &parts, path, false)? else {
1443        return Ok(());
1444    };
1445    parent.remove(leaf);
1446    Ok(())
1447}
1448
1449fn incr_path(root: &mut Value, path: &str, delta: f64) -> Result<(), DocError> {
1450    if !delta.is_finite() {
1451        return Err(DocError::InvalidMutation(format!(
1452            "INCR delta for path '{path}' must be finite"
1453        )));
1454    }
1455
1456    let parts = parse_path(path)?;
1457    let Some(target) = resolve_existing_path_mut(root, &parts, path)? else {
1458        return Err(DocError::InvalidMutation(format!(
1459            "INCR path '{path}' does not exist"
1460        )));
1461    };
1462
1463    let Value::Number(number) = target else {
1464        return Err(DocError::InvalidMutation(format!(
1465            "INCR path '{path}' targets a non-numeric value"
1466        )));
1467    };
1468
1469    let Some(base) = number.as_f64() else {
1470        return Err(DocError::InvalidMutation(format!(
1471            "INCR path '{path}' contains an unsupported number representation"
1472        )));
1473    };
1474    let updated = base + delta;
1475    if !updated.is_finite() {
1476        return Err(DocError::InvalidMutation(format!(
1477            "INCR path '{path}' overflowed to a non-finite value"
1478        )));
1479    }
1480
1481    *target = if updated.fract() == 0.0 && updated >= i64::MIN as f64 && updated <= i64::MAX as f64
1482    {
1483        Value::Number((updated as i64).into())
1484    } else {
1485        let Some(number) = serde_json::Number::from_f64(updated) else {
1486            return Err(DocError::InvalidMutation(format!(
1487                "INCR path '{path}' produced an invalid float value"
1488            )));
1489        };
1490        Value::Number(number)
1491    };
1492
1493    Ok(())
1494}
1495
1496fn push_path(root: &mut Value, path: &str, value: Value) -> Result<(), DocError> {
1497    let parts = parse_path(path)?;
1498    if let Some(target) = resolve_existing_path_mut(root, &parts, path)? {
1499        let Value::Array(items) = target else {
1500            return Err(DocError::InvalidMutation(format!(
1501                "PUSH path '{path}' targets a non-array value"
1502            )));
1503        };
1504        items.push(value);
1505        return Ok(());
1506    }
1507
1508    set_path(root, path, Value::Array(vec![value]))
1509}
1510
1511fn pull_path(root: &mut Value, path: &str, value: &Value) -> Result<(), DocError> {
1512    let parts = parse_path(path)?;
1513    let Some(target) = resolve_existing_path_mut(root, &parts, path)? else {
1514        return Ok(());
1515    };
1516    let Value::Array(items) = target else {
1517        return Err(DocError::InvalidMutation(format!(
1518            "PULL path '{path}' targets a non-array value"
1519        )));
1520    };
1521    items.retain(|candidate| candidate != value);
1522    Ok(())
1523}
1524
1525fn parse_path(path: &str) -> Result<Vec<&str>, DocError> {
1526    if path.is_empty() {
1527        return Err(DocError::InvalidMutation(
1528            "path cannot be empty".to_string(),
1529        ));
1530    }
1531    let parts: Vec<&str> = path.split('.').collect();
1532    if parts.iter().any(|part| part.is_empty()) {
1533        return Err(DocError::InvalidMutation(format!(
1534            "path '{path}' contains an empty segment"
1535        )));
1536    }
1537    Ok(parts)
1538}
1539
1540fn resolve_parent_object_mut<'a>(
1541    root: &'a mut Value,
1542    parts: &[&str],
1543    full_path: &str,
1544    create_missing: bool,
1545) -> Result<Option<&'a mut serde_json::Map<String, Value>>, DocError> {
1546    let mut current = root;
1547    if !current.is_object() {
1548        return Err(DocError::InvalidMutation(
1549            "document root must be a JSON object".to_string(),
1550        ));
1551    }
1552
1553    for part in &parts[..parts.len() - 1] {
1554        let map = current.as_object_mut().ok_or_else(|| {
1555            DocError::InvalidMutation(format!(
1556                "path '{full_path}' traverses through a non-object segment"
1557            ))
1558        })?;
1559
1560        if create_missing {
1561            current = map
1562                .entry((*part).to_string())
1563                .or_insert_with(|| Value::Object(serde_json::Map::new()));
1564            if !current.is_object() {
1565                return Err(DocError::InvalidMutation(format!(
1566                    "path '{full_path}' traverses through a non-object segment"
1567                )));
1568            }
1569            continue;
1570        }
1571
1572        let Some(next) = map.get_mut(*part) else {
1573            return Ok(None);
1574        };
1575        if !next.is_object() {
1576            return Err(DocError::InvalidMutation(format!(
1577                "path '{full_path}' traverses through a non-object segment"
1578            )));
1579        }
1580        current = next;
1581    }
1582
1583    let map = current.as_object_mut().ok_or_else(|| {
1584        DocError::InvalidMutation(format!(
1585            "path '{full_path}' traverses through a non-object segment"
1586        ))
1587    })?;
1588    Ok(Some(map))
1589}
1590
1591fn resolve_existing_path_mut<'a>(
1592    root: &'a mut Value,
1593    parts: &[&str],
1594    full_path: &str,
1595) -> Result<Option<&'a mut Value>, DocError> {
1596    let mut current = root;
1597    for part in parts {
1598        let map = current.as_object_mut().ok_or_else(|| {
1599            DocError::InvalidMutation(format!(
1600                "path '{full_path}' traverses through a non-object segment"
1601            ))
1602        })?;
1603        let Some(next) = map.get_mut(*part) else {
1604            return Ok(None);
1605        };
1606        current = next;
1607    }
1608    Ok(Some(current))
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613    use serde_json::json;
1614
1615    use super::*;
1616
1617    #[test]
1618    fn set_get_projection_delete_flow() {
1619        let mut engine = DocEngine::new();
1620        engine
1621            .create_collection("users", CollectionConfig::default())
1622            .expect("collection create should work");
1623
1624        let set = engine
1625            .set(
1626                "users",
1627                "doc:1",
1628                &json!({
1629                    "name": "Augustus",
1630                    "age": 30,
1631                    "active": true,
1632                    "address": {"city": "Accra", "zip": "00233"},
1633                    "tags": ["rust", "systems"]
1634                }),
1635            )
1636            .expect("set should work");
1637        assert!(set.created);
1638        assert!(engine.exists("users", "doc:1").expect("exists should work"));
1639
1640        let full = engine
1641            .get("users", "doc:1", None)
1642            .expect("get should work")
1643            .expect("doc should exist");
1644        assert_eq!(
1645            full,
1646            json!({
1647                "name": "Augustus",
1648                "age": 30,
1649                "active": true,
1650                "address": {"city": "Accra", "zip": "00233"},
1651                "tags": ["rust", "systems"]
1652            })
1653        );
1654
1655        let projected = engine
1656            .get("users", "doc:1", Some(&["name", "address.city"]))
1657            .expect("projection should work")
1658            .expect("doc should exist");
1659        assert_eq!(
1660            projected,
1661            json!({"name": "Augustus", "address": {"city": "Accra"}})
1662        );
1663
1664        assert!(engine.del("users", "doc:1").expect("delete should work"));
1665        assert!(!engine.exists("users", "doc:1").expect("exists should work"));
1666        assert_eq!(
1667            engine.get("users", "doc:1", None).expect("get should work"),
1668            None
1669        );
1670    }
1671
1672    #[test]
1673    fn duplicate_collection_name_is_rejected() {
1674        let mut engine = DocEngine::new();
1675        engine
1676            .create_collection("users", CollectionConfig::default())
1677            .expect("create should work");
1678        let err = engine
1679            .create_collection("users", CollectionConfig::default())
1680            .expect_err("duplicate should fail");
1681        assert!(matches!(
1682            err,
1683            DocError::Collection(CollectionError::AlreadyExists(_))
1684        ));
1685    }
1686
1687    #[test]
1688    fn get_missing_document_returns_none() {
1689        let mut engine = DocEngine::new();
1690        engine
1691            .create_collection("users", CollectionConfig::default())
1692            .expect("create should work");
1693        let doc = engine
1694            .get("users", "doc:missing", None)
1695            .expect("get should work");
1696        assert_eq!(doc, None);
1697    }
1698
1699    #[test]
1700    fn collection_info_reflects_state() {
1701        let mut engine = DocEngine::new();
1702        engine
1703            .create_collection(
1704                "users",
1705                CollectionConfig {
1706                    compression: CompressionProfile::Dictionary,
1707                },
1708            )
1709            .expect("create should work");
1710        engine
1711            .set("users", "doc:1", &json!({"city": "Accra"}))
1712            .expect("set should work");
1713        engine
1714            .set("users", "doc:2", &json!({"city": "Accra"}))
1715            .expect("set should work");
1716
1717        let info = engine
1718            .collection_info("users")
1719            .expect("collection should exist");
1720        assert_eq!(info.compression, CompressionProfile::Dictionary);
1721        assert_eq!(info.doc_count, 2);
1722        assert_eq!(info.dictionary_entries, 1);
1723    }
1724
1725    #[test]
1726    fn unknown_collection_returns_error() {
1727        let engine = DocEngine::new();
1728        let err = engine
1729            .exists("users", "doc:1")
1730            .expect_err("unknown collection should fail");
1731        assert!(matches!(err, DocError::UnknownCollection(name) if name == "users"));
1732    }
1733
1734    #[test]
1735    fn dictionary_info_reports_field_cardinality() {
1736        let mut engine = DocEngine::new();
1737        engine
1738            .create_collection("users", CollectionConfig::default())
1739            .expect("create should work");
1740        engine
1741            .set(
1742                "users",
1743                "doc:1",
1744                &json!({"city": "Accra", "status": "active"}),
1745            )
1746            .expect("set should work");
1747        engine
1748            .set(
1749                "users",
1750                "doc:2",
1751                &json!({"city": "Accra", "status": "inactive"}),
1752            )
1753            .expect("set should work");
1754
1755        let info = engine
1756            .dictionary_info("users")
1757            .expect("dictionary info should work");
1758        assert_eq!(info.collection_name, "users");
1759        assert!(info.dictionary_entries >= 2);
1760
1761        let city = info
1762            .fields
1763            .iter()
1764            .find(|field| field.path == "city")
1765            .expect("city field should be present");
1766        assert_eq!(city.cardinality_estimate, 1);
1767
1768        let status = info
1769            .fields
1770            .iter()
1771            .find(|field| field.path == "status")
1772            .expect("status field should be present");
1773        assert_eq!(status.cardinality_estimate, 2);
1774    }
1775
1776    #[test]
1777    fn storage_info_reports_packed_sizes() {
1778        let mut engine = DocEngine::new();
1779        engine
1780            .create_collection("users", CollectionConfig::default())
1781            .expect("create should work");
1782        engine
1783            .set("users", "doc:1", &json!({"name": "A"}))
1784            .expect("set should work");
1785        engine
1786            .set(
1787                "users",
1788                "doc:2",
1789                &json!({"name": "Augustus", "city": "Accra"}),
1790            )
1791            .expect("set should work");
1792
1793        let info = engine
1794            .storage_info("users")
1795            .expect("storage info should work");
1796        assert_eq!(info.collection_name, "users");
1797        assert_eq!(info.doc_count, 2);
1798        assert!(info.total_packed_bytes > 0);
1799        assert!(info.max_doc_bytes >= info.min_doc_bytes);
1800        assert!(info.avg_doc_bytes >= info.min_doc_bytes);
1801        assert!(info.avg_doc_bytes <= info.max_doc_bytes);
1802    }
1803
1804    #[test]
1805    fn update_applies_mutations() {
1806        let mut engine = DocEngine::new();
1807        engine
1808            .create_collection("users", CollectionConfig::default())
1809            .expect("create should work");
1810        engine
1811            .set(
1812                "users",
1813                "doc:1",
1814                &json!({
1815                    "name": "Augustus",
1816                    "score": 10,
1817                    "active": true,
1818                    "address": {"city": "Accra"},
1819                    "tags": ["rust", "systems", "rust"]
1820                }),
1821            )
1822            .expect("set should work");
1823
1824        let updated = engine
1825            .update(
1826                "users",
1827                "doc:1",
1828                &[
1829                    DocMutation::Set {
1830                        path: "address.city".to_string(),
1831                        value: json!("London"),
1832                    },
1833                    DocMutation::Incr {
1834                        path: "score".to_string(),
1835                        delta: 2.5,
1836                    },
1837                    DocMutation::Push {
1838                        path: "tags".to_string(),
1839                        value: json!("cache"),
1840                    },
1841                    DocMutation::Pull {
1842                        path: "tags".to_string(),
1843                        value: json!("rust"),
1844                    },
1845                    DocMutation::Del {
1846                        path: "active".to_string(),
1847                    },
1848                ],
1849            )
1850            .expect("update should work");
1851        assert!(updated);
1852
1853        let doc = engine
1854            .get("users", "doc:1", None)
1855            .expect("get should work")
1856            .expect("doc should exist");
1857        assert_eq!(
1858            doc,
1859            json!({
1860                "name": "Augustus",
1861                "score": 12.5,
1862                "address": {"city": "London"},
1863                "tags": ["systems", "cache"]
1864            })
1865        );
1866    }
1867
1868    #[test]
1869    fn update_missing_document_returns_false() {
1870        let mut engine = DocEngine::new();
1871        engine
1872            .create_collection("users", CollectionConfig::default())
1873            .expect("create should work");
1874        let updated = engine
1875            .update(
1876                "users",
1877                "doc:missing",
1878                &[DocMutation::Set {
1879                    path: "name".to_string(),
1880                    value: json!("A"),
1881                }],
1882            )
1883            .expect("update should not fail");
1884        assert!(!updated);
1885    }
1886
1887    #[test]
1888    fn update_rejects_non_numeric_incr_target() {
1889        let mut engine = DocEngine::new();
1890        engine
1891            .create_collection("users", CollectionConfig::default())
1892            .expect("create should work");
1893        engine
1894            .set("users", "doc:1", &json!({"score": "high"}))
1895            .expect("set should work");
1896
1897        let err = engine
1898            .update(
1899                "users",
1900                "doc:1",
1901                &[DocMutation::Incr {
1902                    path: "score".to_string(),
1903                    delta: 1.0,
1904                }],
1905            )
1906            .expect_err("non-numeric increment must fail");
1907        assert!(matches!(err, DocError::InvalidMutation(_)));
1908    }
1909
1910    #[test]
1911    fn create_index_backfills_existing_docs() {
1912        let mut engine = DocEngine::new();
1913        engine
1914            .create_collection("users", CollectionConfig::default())
1915            .expect("create should work");
1916
1917        engine
1918            .set("users", "doc:1", &json!({"city": "Accra"}))
1919            .expect("set should work");
1920        engine
1921            .set("users", "doc:2", &json!({"city": "London"}))
1922            .expect("set should work");
1923        engine
1924            .set("users", "doc:3", &json!({"city": "Accra"}))
1925            .expect("set should work");
1926
1927        engine
1928            .create_index("users", "city", IndexType::Hash)
1929            .expect("create_index should work");
1930
1931        let collection_id = engine.collection_id("users").unwrap();
1932        let state = engine.collections.get(&collection_id).unwrap();
1933        let field_id = engine
1934            .registry
1935            .segment(collection_id)
1936            .unwrap()
1937            .field_id("city")
1938            .unwrap();
1939
1940        let hash_idx = state
1941            .indexes
1942            .hash(field_id)
1943            .expect("hash index should exist");
1944        let accra_hash = hash32(b"Accra");
1945        let london_hash = hash32(b"London");
1946        let accra_docs = hash_idx.lookup(accra_hash);
1947        let london_docs = hash_idx.lookup(london_hash);
1948
1949        assert_eq!(accra_docs.len(), 2);
1950        assert_eq!(london_docs.len(), 1);
1951    }
1952
1953    #[test]
1954    fn index_maintained_on_set() {
1955        let mut engine = DocEngine::new();
1956        engine
1957            .create_collection("users", CollectionConfig::default())
1958            .expect("create should work");
1959
1960        engine
1961            .create_index("users", "city", IndexType::Hash)
1962            .expect("create_index should work");
1963
1964        engine
1965            .set("users", "doc:1", &json!({"city": "Accra"}))
1966            .expect("set should work");
1967        engine
1968            .set("users", "doc:2", &json!({"city": "London"}))
1969            .expect("set should work");
1970
1971        let collection_id = engine.collection_id("users").unwrap();
1972        let state = engine.collections.get(&collection_id).unwrap();
1973        let field_id = engine
1974            .registry
1975            .segment(collection_id)
1976            .unwrap()
1977            .field_id("city")
1978            .unwrap();
1979
1980        let hash_idx = state
1981            .indexes
1982            .hash(field_id)
1983            .expect("hash index should exist");
1984        assert_eq!(hash_idx.lookup(hash32(b"Accra")).len(), 1);
1985        assert_eq!(hash_idx.lookup(hash32(b"London")).len(), 1);
1986    }
1987
1988    #[test]
1989    fn index_maintained_on_update() {
1990        let mut engine = DocEngine::new();
1991        engine
1992            .create_collection("users", CollectionConfig::default())
1993            .expect("create should work");
1994
1995        engine
1996            .set("users", "doc:1", &json!({"city": "Accra"}))
1997            .expect("set should work");
1998
1999        engine
2000            .create_index("users", "city", IndexType::Hash)
2001            .expect("create_index should work");
2002
2003        engine
2004            .update(
2005                "users",
2006                "doc:1",
2007                &[DocMutation::Set {
2008                    path: "city".to_string(),
2009                    value: json!("London"),
2010                }],
2011            )
2012            .expect("update should work");
2013
2014        let collection_id = engine.collection_id("users").unwrap();
2015        let state = engine.collections.get(&collection_id).unwrap();
2016        let field_id = engine
2017            .registry
2018            .segment(collection_id)
2019            .unwrap()
2020            .field_id("city")
2021            .unwrap();
2022
2023        let hash_idx = state
2024            .indexes
2025            .hash(field_id)
2026            .expect("hash index should exist");
2027        assert!(hash_idx.lookup(hash32(b"Accra")).is_empty());
2028        assert_eq!(hash_idx.lookup(hash32(b"London")).len(), 1);
2029    }
2030
2031    #[test]
2032    fn index_maintained_on_delete() {
2033        let mut engine = DocEngine::new();
2034        engine
2035            .create_collection("users", CollectionConfig::default())
2036            .expect("create should work");
2037
2038        engine
2039            .set("users", "doc:1", &json!({"city": "Accra"}))
2040            .expect("set should work");
2041
2042        engine
2043            .create_index("users", "city", IndexType::Hash)
2044            .expect("create_index should work");
2045
2046        let collection_id = engine.collection_id("users").unwrap();
2047        let field_id = engine
2048            .registry
2049            .segment(collection_id)
2050            .unwrap()
2051            .field_id("city")
2052            .unwrap();
2053
2054        {
2055            let state = engine.collections.get(&collection_id).unwrap();
2056            let hash_idx = state
2057                .indexes
2058                .hash(field_id)
2059                .expect("hash index should exist");
2060            assert_eq!(hash_idx.lookup(hash32(b"Accra")).len(), 1);
2061        }
2062
2063        engine.del("users", "doc:1").expect("del should work");
2064
2065        let state = engine.collections.get(&collection_id).unwrap();
2066        let hash_idx = state
2067            .indexes
2068            .hash(field_id)
2069            .expect("hash index should exist");
2070        assert!(hash_idx.lookup(hash32(b"Accra")).is_empty());
2071    }
2072
2073    #[test]
2074    fn unique_constraint_violation_on_set() {
2075        let mut engine = DocEngine::new();
2076        engine
2077            .create_collection("users", CollectionConfig::default())
2078            .expect("create should work");
2079
2080        engine
2081            .create_index("users", "email", IndexType::Unique)
2082            .expect("create_index should work");
2083
2084        engine
2085            .set("users", "doc:1", &json!({"email": "alice@example.com"}))
2086            .expect("first set should work");
2087
2088        let err = engine
2089            .set("users", "doc:2", &json!({"email": "alice@example.com"}))
2090            .expect_err("duplicate unique value must fail");
2091
2092        assert!(matches!(
2093            err,
2094            DocError::Index(IndexError::UniqueViolation { .. })
2095        ));
2096    }
2097
2098    #[test]
2099    fn unique_constraint_allows_hash_collision_with_distinct_values() {
2100        let mut engine = DocEngine::new();
2101        engine
2102            .create_collection("users", CollectionConfig::default())
2103            .expect("create should work");
2104
2105        engine
2106            .create_index("users", "email", IndexType::Unique)
2107            .expect("create_index should work");
2108
2109        let first = "BpEAYkE2SftJ";
2110        let second = "xSDGJoKxB";
2111        assert_eq!(hash32(first.as_bytes()), hash32(second.as_bytes()));
2112
2113        engine
2114            .set("users", "doc:1", &json!({"email": first}))
2115            .expect("first set should work");
2116        engine
2117            .set("users", "doc:2", &json!({"email": second}))
2118            .expect("hash collision with different value should be allowed");
2119    }
2120
2121    #[test]
2122    fn drop_index_clears_data() {
2123        let mut engine = DocEngine::new();
2124        engine
2125            .create_collection("users", CollectionConfig::default())
2126            .expect("create should work");
2127
2128        engine
2129            .create_index("users", "city", IndexType::Hash)
2130            .expect("create_index should work");
2131
2132        engine
2133            .set("users", "doc:1", &json!({"city": "Accra"}))
2134            .expect("set should work");
2135        engine
2136            .set("users", "doc:2", &json!({"city": "London"}))
2137            .expect("set should work");
2138
2139        engine
2140            .drop_index("users", "city")
2141            .expect("drop_index should work");
2142
2143        let indexes = engine.indexes("users").expect("indexes should work");
2144        assert!(indexes.is_empty());
2145
2146        let collection_id = engine.collection_id("users").unwrap();
2147        let field_id = engine
2148            .registry
2149            .segment(collection_id)
2150            .unwrap()
2151            .field_id("city")
2152            .unwrap();
2153        let state = engine.collections.get(&collection_id).unwrap();
2154        assert!(state.indexes.hash(field_id).is_none());
2155    }
2156
2157    #[test]
2158    fn sorted_index_range_query_works() {
2159        let mut engine = DocEngine::new();
2160        engine
2161            .create_collection("products", CollectionConfig::default())
2162            .expect("create should work");
2163
2164        engine
2165            .create_index("products", "price", IndexType::Sorted)
2166            .expect("create_index should work");
2167
2168        engine
2169            .set("products", "p1", &json!({"price": 10.0}))
2170            .expect("set should work");
2171        engine
2172            .set("products", "p2", &json!({"price": 25.0}))
2173            .expect("set should work");
2174        engine
2175            .set("products", "p3", &json!({"price": 50.0}))
2176            .expect("set should work");
2177        engine
2178            .set("products", "p4", &json!({"price": 5.0}))
2179            .expect("set should work");
2180
2181        let collection_id = engine.collection_id("products").unwrap();
2182        let field_id = engine
2183            .registry
2184            .segment(collection_id)
2185            .unwrap()
2186            .field_id("price")
2187            .unwrap();
2188        let state = engine.collections.get(&collection_id).unwrap();
2189        let sorted_idx = state
2190            .indexes
2191            .sorted(field_id)
2192            .expect("sorted index should exist");
2193
2194        let range_10_30 = sorted_idx.range_query(10.0, 30.0);
2195        assert_eq!(range_10_30.len(), 2);
2196
2197        let range_all = sorted_idx.range_query(0.0, 100.0);
2198        assert_eq!(range_all.len(), 4);
2199
2200        let range_high = sorted_idx.range_query(40.0, 100.0);
2201        assert_eq!(range_high.len(), 1);
2202    }
2203
2204    #[test]
2205    fn find_by_hash_index() {
2206        let mut engine = DocEngine::new();
2207        engine
2208            .create_collection("users", CollectionConfig::default())
2209            .expect("create should work");
2210        engine
2211            .create_index("users", "city", IndexType::Hash)
2212            .expect("index should work");
2213
2214        engine
2215            .set("users", "d1", &json!({"name": "Kwame", "city": "Accra"}))
2216            .expect("set");
2217        engine
2218            .set("users", "d2", &json!({"name": "Ama", "city": "Kumasi"}))
2219            .expect("set");
2220        engine
2221            .set("users", "d3", &json!({"name": "Kofi", "city": "Accra"}))
2222            .expect("set");
2223
2224        let results = engine
2225            .find("users", r#"city = "Accra""#, None, None, 0, None, false)
2226            .expect("find should work");
2227        assert_eq!(results.len(), 2);
2228        for doc in &results {
2229            assert_eq!(doc["city"], "Accra");
2230        }
2231    }
2232
2233    #[test]
2234    fn find_by_hash_index_filters_hash_collisions() {
2235        let mut engine = DocEngine::new();
2236        engine
2237            .create_collection("users", CollectionConfig::default())
2238            .expect("create should work");
2239        engine
2240            .create_index("users", "city", IndexType::Hash)
2241            .expect("index should work");
2242
2243        let first = "BpEAYkE2SftJ";
2244        let second = "xSDGJoKxB";
2245        assert_eq!(hash32(first.as_bytes()), hash32(second.as_bytes()));
2246
2247        engine
2248            .set("users", "d1", &json!({"name": "First", "city": first}))
2249            .expect("set");
2250        engine
2251            .set("users", "d2", &json!({"name": "Second", "city": second}))
2252            .expect("set");
2253
2254        let results = engine
2255            .find(
2256                "users",
2257                &format!("city = \"{}\"", first),
2258                None,
2259                None,
2260                0,
2261                None,
2262                false,
2263            )
2264            .expect("find should work");
2265        assert_eq!(results.len(), 1);
2266        assert_eq!(results[0]["city"], first);
2267    }
2268
2269    #[test]
2270    fn find_by_sorted_index_range() {
2271        let mut engine = DocEngine::new();
2272        engine
2273            .create_collection("users", CollectionConfig::default())
2274            .expect("create");
2275        engine
2276            .create_index("users", "age", IndexType::Sorted)
2277            .expect("index");
2278
2279        engine
2280            .set("users", "d1", &json!({"name": "A", "age": 20}))
2281            .expect("set");
2282        engine
2283            .set("users", "d2", &json!({"name": "B", "age": 25}))
2284            .expect("set");
2285        engine
2286            .set("users", "d3", &json!({"name": "C", "age": 30}))
2287            .expect("set");
2288        engine
2289            .set("users", "d4", &json!({"name": "D", "age": 35}))
2290            .expect("set");
2291        engine
2292            .set("users", "d5", &json!({"name": "E", "age": 40}))
2293            .expect("set");
2294
2295        let results = engine
2296            .find(
2297                "users",
2298                "age >= 25 AND age <= 35",
2299                None,
2300                None,
2301                0,
2302                None,
2303                false,
2304            )
2305            .expect("find should work");
2306        assert_eq!(results.len(), 3);
2307        for doc in &results {
2308            let age = doc["age"].as_f64().unwrap();
2309            assert!((25.0..=35.0).contains(&age));
2310        }
2311    }
2312
2313    #[test]
2314    fn find_by_array_index() {
2315        let mut engine = DocEngine::new();
2316        engine
2317            .create_collection("posts", CollectionConfig::default())
2318            .expect("create");
2319        engine
2320            .create_index("posts", "tags", IndexType::Array)
2321            .expect("index");
2322
2323        engine
2324            .set(
2325                "posts",
2326                "p1",
2327                &json!({"title": "A", "tags": ["rust", "systems"]}),
2328            )
2329            .expect("set");
2330        engine
2331            .set("posts", "p2", &json!({"title": "B", "tags": ["go", "web"]}))
2332            .expect("set");
2333        engine
2334            .set(
2335                "posts",
2336                "p3",
2337                &json!({"title": "C", "tags": ["rust", "wasm"]}),
2338            )
2339            .expect("set");
2340
2341        let results = engine
2342            .find(
2343                "posts",
2344                r#"tags CONTAINS "rust""#,
2345                None,
2346                None,
2347                0,
2348                None,
2349                false,
2350            )
2351            .expect("find should work");
2352        assert_eq!(results.len(), 2);
2353        for doc in &results {
2354            let tags = doc["tags"].as_array().unwrap();
2355            assert!(tags.contains(&json!("rust")));
2356        }
2357    }
2358
2359    #[test]
2360    fn find_by_array_index_filters_hash_collisions() {
2361        let mut engine = DocEngine::new();
2362        engine
2363            .create_collection("posts", CollectionConfig::default())
2364            .expect("create");
2365        engine
2366            .create_index("posts", "tags", IndexType::Array)
2367            .expect("index");
2368
2369        let first = "BpEAYkE2SftJ";
2370        let second = "xSDGJoKxB";
2371        assert_eq!(hash32(first.as_bytes()), hash32(second.as_bytes()));
2372
2373        engine
2374            .set("posts", "p1", &json!({"title": "A", "tags": [first]}))
2375            .expect("set");
2376        engine
2377            .set("posts", "p2", &json!({"title": "B", "tags": [second]}))
2378            .expect("set");
2379
2380        let results = engine
2381            .find(
2382                "posts",
2383                &format!("tags CONTAINS \"{}\"", first),
2384                None,
2385                None,
2386                0,
2387                None,
2388                false,
2389            )
2390            .expect("find should work");
2391        assert_eq!(results.len(), 1);
2392        assert_eq!(results[0]["tags"], json!([first]));
2393    }
2394
2395    #[test]
2396    fn find_compound_and() {
2397        let mut engine = DocEngine::new();
2398        engine
2399            .create_collection("users", CollectionConfig::default())
2400            .expect("create");
2401        engine
2402            .create_index("users", "city", IndexType::Hash)
2403            .expect("index");
2404        engine
2405            .create_index("users", "age", IndexType::Sorted)
2406            .expect("index");
2407
2408        engine
2409            .set("users", "d1", &json!({"city": "Accra", "age": 20}))
2410            .expect("set");
2411        engine
2412            .set("users", "d2", &json!({"city": "Accra", "age": 30}))
2413            .expect("set");
2414        engine
2415            .set("users", "d3", &json!({"city": "Lagos", "age": 30}))
2416            .expect("set");
2417        engine
2418            .set("users", "d4", &json!({"city": "Accra", "age": 40}))
2419            .expect("set");
2420
2421        let results = engine
2422            .find(
2423                "users",
2424                r#"city = "Accra" AND age >= 25"#,
2425                None,
2426                None,
2427                0,
2428                None,
2429                false,
2430            )
2431            .expect("find should work");
2432        assert_eq!(results.len(), 2);
2433        for doc in &results {
2434            assert_eq!(doc["city"], "Accra");
2435            assert!(doc["age"].as_f64().unwrap() >= 25.0);
2436        }
2437    }
2438
2439    #[test]
2440    fn find_compound_or() {
2441        let mut engine = DocEngine::new();
2442        engine
2443            .create_collection("users", CollectionConfig::default())
2444            .expect("create");
2445        engine
2446            .create_index("users", "city", IndexType::Hash)
2447            .expect("index");
2448
2449        engine
2450            .set("users", "d1", &json!({"city": "Accra"}))
2451            .expect("set");
2452        engine
2453            .set("users", "d2", &json!({"city": "Lagos"}))
2454            .expect("set");
2455        engine
2456            .set("users", "d3", &json!({"city": "Kumasi"}))
2457            .expect("set");
2458        engine
2459            .set("users", "d4", &json!({"city": "Lagos"}))
2460            .expect("set");
2461
2462        let results = engine
2463            .find(
2464                "users",
2465                r#"city = "Accra" OR city = "Lagos""#,
2466                None,
2467                None,
2468                0,
2469                None,
2470                false,
2471            )
2472            .expect("find should work");
2473        assert_eq!(results.len(), 3);
2474        for doc in &results {
2475            let city = doc["city"].as_str().unwrap();
2476            assert!(city == "Accra" || city == "Lagos");
2477        }
2478    }
2479
2480    #[test]
2481    fn find_with_projection() {
2482        let mut engine = DocEngine::new();
2483        engine
2484            .create_collection("users", CollectionConfig::default())
2485            .expect("create");
2486        engine
2487            .create_index("users", "city", IndexType::Hash)
2488            .expect("index");
2489
2490        engine
2491            .set(
2492                "users",
2493                "d1",
2494                &json!({"name": "Kwame", "city": "Accra", "age": 30}),
2495            )
2496            .expect("set");
2497        engine
2498            .set(
2499                "users",
2500                "d2",
2501                &json!({"name": "Ama", "city": "Accra", "age": 25}),
2502            )
2503            .expect("set");
2504
2505        let results = engine
2506            .find(
2507                "users",
2508                r#"city = "Accra""#,
2509                Some(&["name"]),
2510                None,
2511                0,
2512                None,
2513                false,
2514            )
2515            .expect("find should work");
2516        assert_eq!(results.len(), 2);
2517        for doc in &results {
2518            assert!(doc.get("name").is_some());
2519            assert!(doc.get("city").is_none());
2520            assert!(doc.get("age").is_none());
2521        }
2522    }
2523
2524    #[test]
2525    fn find_with_limit_offset() {
2526        let mut engine = DocEngine::new();
2527        engine
2528            .create_collection("users", CollectionConfig::default())
2529            .expect("create");
2530        engine
2531            .create_index("users", "active", IndexType::Hash)
2532            .expect("index");
2533
2534        for idx in 0..5 {
2535            engine
2536                .set(
2537                    "users",
2538                    &format!("d{idx}"),
2539                    &json!({"n": idx, "active": true}),
2540                )
2541                .expect("set");
2542        }
2543
2544        let results = engine
2545            .find("users", "active = true", None, Some(2), 1, None, false)
2546            .expect("find should work");
2547        assert_eq!(results.len(), 2);
2548    }
2549
2550    #[test]
2551    fn count_query() {
2552        let mut engine = DocEngine::new();
2553        engine
2554            .create_collection("users", CollectionConfig::default())
2555            .expect("create");
2556        engine
2557            .create_index("users", "city", IndexType::Hash)
2558            .expect("index");
2559
2560        engine
2561            .set("users", "d1", &json!({"city": "Accra"}))
2562            .expect("set");
2563        engine
2564            .set("users", "d2", &json!({"city": "Accra"}))
2565            .expect("set");
2566        engine
2567            .set("users", "d3", &json!({"city": "Lagos"}))
2568            .expect("set");
2569
2570        let count = engine
2571            .count("users", r#"city = "Accra""#)
2572            .expect("count should work");
2573        assert_eq!(count, 2);
2574    }
2575
2576    #[test]
2577    fn find_unindexed_falls_back_to_scan() {
2578        let mut engine = DocEngine::new();
2579        engine
2580            .create_collection("users", CollectionConfig::default())
2581            .expect("create");
2582
2583        engine
2584            .set("users", "d1", &json!({"name": "Kwame", "city": "Accra"}))
2585            .expect("set");
2586        engine
2587            .set("users", "d2", &json!({"name": "Ama", "city": "Kumasi"}))
2588            .expect("set");
2589        engine
2590            .set("users", "d3", &json!({"name": "Kofi", "city": "Accra"}))
2591            .expect("set");
2592
2593        let results = engine
2594            .find("users", r#"city = "Accra""#, None, None, 0, None, false)
2595            .expect("find should work");
2596        assert_eq!(results.len(), 2);
2597        for doc in &results {
2598            assert_eq!(doc["city"], "Accra");
2599        }
2600    }
2601
2602    #[test]
2603    fn find_empty_result() {
2604        let mut engine = DocEngine::new();
2605        engine
2606            .create_collection("users", CollectionConfig::default())
2607            .expect("create");
2608        engine
2609            .create_index("users", "city", IndexType::Hash)
2610            .expect("index");
2611
2612        engine
2613            .set("users", "d1", &json!({"city": "Accra"}))
2614            .expect("set");
2615
2616        let results = engine
2617            .find(
2618                "users",
2619                r#"city = "NonExistent""#,
2620                None,
2621                None,
2622                0,
2623                None,
2624                false,
2625            )
2626            .expect("find should work");
2627        assert!(results.is_empty());
2628    }
2629
2630    #[test]
2631    fn find_with_in_operator() {
2632        let mut engine = DocEngine::new();
2633        engine
2634            .create_collection("users", CollectionConfig::default())
2635            .expect("create");
2636        engine
2637            .set("users", "u1", &json!({"name": "Alice", "status": "active"}))
2638            .expect("set");
2639        engine
2640            .set("users", "u2", &json!({"name": "Bob", "status": "pending"}))
2641            .expect("set");
2642        engine
2643            .set(
2644                "users",
2645                "u3",
2646                &json!({"name": "Charlie", "status": "deleted"}),
2647            )
2648            .expect("set");
2649
2650        let results = engine
2651            .find(
2652                "users",
2653                r#"status IN ("active", "pending")"#,
2654                None,
2655                None,
2656                0,
2657                None,
2658                false,
2659            )
2660            .expect("find");
2661        assert_eq!(results.len(), 2);
2662    }
2663
2664    #[test]
2665    fn find_in_with_hash_index() {
2666        let mut engine = DocEngine::new();
2667        engine
2668            .create_collection("users", CollectionConfig::default())
2669            .expect("create");
2670        engine
2671            .create_index("users", "status", IndexType::Hash)
2672            .expect("index");
2673        engine
2674            .set("users", "u1", &json!({"name": "Alice", "status": "active"}))
2675            .expect("set");
2676        engine
2677            .set("users", "u2", &json!({"name": "Bob", "status": "pending"}))
2678            .expect("set");
2679        engine
2680            .set(
2681                "users",
2682                "u3",
2683                &json!({"name": "Charlie", "status": "deleted"}),
2684            )
2685            .expect("set");
2686
2687        let results = engine
2688            .find(
2689                "users",
2690                r#"status IN ("active", "pending")"#,
2691                None,
2692                None,
2693                0,
2694                None,
2695                false,
2696            )
2697            .expect("find");
2698        assert_eq!(results.len(), 2);
2699    }
2700
2701    #[test]
2702    fn find_with_exists() {
2703        let mut engine = DocEngine::new();
2704        engine
2705            .create_collection("users", CollectionConfig::default())
2706            .expect("create");
2707        engine
2708            .set(
2709                "users",
2710                "u1",
2711                &json!({"name": "Alice", "email": "alice@test.com"}),
2712            )
2713            .expect("set");
2714        engine
2715            .set("users", "u2", &json!({"name": "Bob"}))
2716            .expect("set");
2717
2718        let results = engine
2719            .find("users", "email EXISTS", None, None, 0, None, false)
2720            .expect("find");
2721        assert_eq!(results.len(), 1);
2722        assert_eq!(results[0]["name"], "Alice");
2723    }
2724
2725    #[test]
2726    fn find_with_not() {
2727        let mut engine = DocEngine::new();
2728        engine
2729            .create_collection("users", CollectionConfig::default())
2730            .expect("create");
2731        engine
2732            .set("users", "u1", &json!({"name": "Alice", "status": "active"}))
2733            .expect("set");
2734        engine
2735            .set("users", "u2", &json!({"name": "Bob", "status": "deleted"}))
2736            .expect("set");
2737        engine
2738            .set(
2739                "users",
2740                "u3",
2741                &json!({"name": "Charlie", "status": "active"}),
2742            )
2743            .expect("set");
2744
2745        let results = engine
2746            .find(
2747                "users",
2748                r#"NOT status = "deleted""#,
2749                None,
2750                None,
2751                0,
2752                None,
2753                false,
2754            )
2755            .expect("find");
2756        assert_eq!(results.len(), 2);
2757    }
2758
2759    #[test]
2760    fn find_with_parenthesized_grouping() {
2761        let mut engine = DocEngine::new();
2762        engine
2763            .create_collection("users", CollectionConfig::default())
2764            .expect("create");
2765        engine
2766            .set("users", "u1", &json!({"city": "Accra", "age": 30}))
2767            .expect("set");
2768        engine
2769            .set("users", "u2", &json!({"city": "Lagos", "age": 20}))
2770            .expect("set");
2771        engine
2772            .set("users", "u3", &json!({"city": "Nairobi", "age": 35}))
2773            .expect("set");
2774
2775        let results = engine
2776            .find(
2777                "users",
2778                r#"(city = "Accra" OR city = "Lagos") AND age > 18"#,
2779                None,
2780                None,
2781                0,
2782                None,
2783                false,
2784            )
2785            .expect("find");
2786        assert_eq!(results.len(), 2);
2787    }
2788
2789    #[test]
2790    fn find_order_by_ascending() {
2791        let mut engine = DocEngine::new();
2792        engine
2793            .create_collection("users", CollectionConfig::default())
2794            .expect("create");
2795        engine
2796            .set("users", "alice", &json!({"name": "Alice", "age": 30}))
2797            .expect("set");
2798        engine
2799            .set("users", "bob", &json!({"name": "Bob", "age": 20}))
2800            .expect("set");
2801        engine
2802            .set("users", "charlie", &json!({"name": "Charlie", "age": 25}))
2803            .expect("set");
2804
2805        let results = engine
2806            .find("users", "age > 0", None, None, 0, Some("age"), false)
2807            .expect("find");
2808        let ages: Vec<i64> = results.iter().map(|v| v["age"].as_i64().unwrap()).collect();
2809        assert_eq!(ages, vec![20, 25, 30]);
2810    }
2811
2812    #[test]
2813    fn find_order_by_descending() {
2814        let mut engine = DocEngine::new();
2815        engine
2816            .create_collection("users", CollectionConfig::default())
2817            .expect("create");
2818        engine
2819            .set("users", "alice", &json!({"name": "Alice", "age": 30}))
2820            .expect("set");
2821        engine
2822            .set("users", "bob", &json!({"name": "Bob", "age": 20}))
2823            .expect("set");
2824        engine
2825            .set("users", "charlie", &json!({"name": "Charlie", "age": 25}))
2826            .expect("set");
2827
2828        let results = engine
2829            .find("users", "age > 0", None, None, 0, Some("age"), true)
2830            .expect("find");
2831        let ages: Vec<i64> = results.iter().map(|v| v["age"].as_i64().unwrap()).collect();
2832        assert_eq!(ages, vec![30, 25, 20]);
2833    }
2834
2835    #[test]
2836    fn find_order_by_string_field() {
2837        let mut engine = DocEngine::new();
2838        engine
2839            .create_collection("users", CollectionConfig::default())
2840            .expect("create");
2841        engine
2842            .set("users", "a", &json!({"name": "Charlie"}))
2843            .expect("set");
2844        engine
2845            .set("users", "b", &json!({"name": "Alice"}))
2846            .expect("set");
2847        engine
2848            .set("users", "c", &json!({"name": "Bob"}))
2849            .expect("set");
2850
2851        let results = engine
2852            .find("users", "name EXISTS", None, None, 0, Some("name"), false)
2853            .expect("find");
2854        let names: Vec<&str> = results
2855            .iter()
2856            .map(|v| v["name"].as_str().unwrap())
2857            .collect();
2858        assert_eq!(names, vec!["Alice", "Bob", "Charlie"]);
2859    }
2860
2861    #[test]
2862    fn find_order_by_missing_field_sorts_to_end() {
2863        let mut engine = DocEngine::new();
2864        engine
2865            .create_collection("users", CollectionConfig::default())
2866            .expect("create");
2867        engine
2868            .set("users", "a", &json!({"name": "Alice", "age": 30}))
2869            .expect("set");
2870        engine
2871            .set("users", "b", &json!({"name": "Bob"}))
2872            .expect("set");
2873        engine
2874            .set("users", "c", &json!({"name": "Charlie", "age": 20}))
2875            .expect("set");
2876
2877        let results = engine
2878            .find("users", "name EXISTS", None, None, 0, Some("age"), false)
2879            .expect("find");
2880        assert_eq!(results.len(), 3);
2881        assert_eq!(results[0]["age"], 20);
2882        assert_eq!(results[1]["age"], 30);
2883        assert_eq!(results[2]["name"], "Bob");
2884    }
2885}