Skip to main content

nookdb_core/
collection.rs

1//! Typed CRUD/query composing validate + doc codec + index + storage.
2use serde_json::Value;
3
4use crate::codec::doc::{decode_document, encode_document, IdentityCodec, ValueCodec};
5use crate::database::Database;
6use crate::error::NookError;
7use crate::index::engine::{
8    delete_index_entry, index_value_exists_writing, lookup_eq, put_index_entry,
9};
10use crate::schema::ir::{CollectionIr, SchemaIr};
11use crate::schema::validate::validate_document;
12use crate::storage::WriteTx;
13
14/// A typed handle to one collection.
15///
16/// Validates, stores, and maintains secondary indexes for documents,
17/// composing the schema validator, the document codec seam, and the
18/// index engine over a single [`Database`]. The Rust core is the sole
19/// validation authority (PRD §3); the TS surface applies id/defaults
20/// *before* a document reaches `insert`.
21pub struct Collection<'a> {
22    db: &'a Database,
23    ir: &'a CollectionIr,
24    name: String,
25    /// Identity in M2; the storage value-codec seam point (§6a). Boxed
26    /// so an external crate can later inject an alternate codec without
27    /// changing this type's shape.
28    codec: Box<dyn ValueCodec>,
29}
30
31impl<'a> Collection<'a> {
32    /// Binds a collection name to its compiled schema IR.
33    ///
34    /// # Errors
35    ///
36    /// Returns `NookError::Schema` if `name` is not a collection in
37    /// `schema`.
38    pub fn new(db: &'a Database, schema: &'a SchemaIr, name: &str) -> Result<Self, NookError> {
39        let ir = schema.collection(name).ok_or_else(|| NookError::Schema {
40            msg: format!("unknown collection {name:?}"),
41        })?;
42        Ok(Self {
43            db,
44            ir,
45            name: name.to_string(),
46            codec: Box::new(IdentityCodec),
47        })
48    }
49
50    /// Extracts the id-field value as a string slice.
51    ///
52    /// # Errors
53    ///
54    /// Returns `NookError::Schema` if the id field is absent or not a
55    /// string.
56    fn doc_id<'v>(&self, doc: &'v Value) -> Result<&'v str, NookError> {
57        doc.get(&self.ir.id_field)
58            .and_then(Value::as_str)
59            .ok_or_else(|| NookError::Schema {
60                msg: format!("missing id field {:?}", self.ir.id_field),
61            })
62    }
63
64    /// Returns `true` when `doc` satisfies every field constraint in `filter`.
65    ///
66    /// Supported operators (M2): bare equality, `$ne`, `$in`, `$nin`,
67    /// `$gt`, `$gte`, `$lt`, `$lte`, `$exists`. An empty filter matches all docs.
68    fn matches(doc: &Value, filter: &Value) -> bool {
69        let Some(fobj) = filter.as_object() else {
70            return true;
71        };
72        fobj.iter().all(|(field, cond)| {
73            let actual = doc.get(field);
74            match cond {
75                Value::Object(ops) if ops.keys().any(|k| k.starts_with('$')) => {
76                    ops.iter().all(|(op, want)| match op.as_str() {
77                        "$ne" => actual != Some(want),
78                        "$exists" => want.as_bool().unwrap_or(true) == actual.is_some(),
79                        "$in" => want
80                            .as_array()
81                            .is_some_and(|a| actual.is_some_and(|v| a.contains(v))),
82                        "$nin" => want
83                            .as_array()
84                            .is_some_and(|a| actual.map_or(true, |v| !a.contains(v))),
85                        "$gt" | "$gte" | "$lt" | "$lte" => Self::cmp(actual, op, want),
86                        _ => false,
87                    })
88                }
89                _ => actual == Some(cond),
90            }
91        })
92    }
93
94    /// Numeric comparison for `$gt`, `$gte`, `$lt`, `$lte`.
95    ///
96    /// Returns `false` if either side is not an `f64`-representable number.
97    fn cmp(actual: Option<&Value>, op: &str, want: &Value) -> bool {
98        let (Some(a), Some(b)) = (actual.and_then(Value::as_f64), want.as_f64()) else {
99            return false;
100        };
101        match op {
102            "$gt" => a > b,
103            "$gte" => a >= b,
104            "$lt" => a < b,
105            _ => a <= b,
106        }
107    }
108
109    /// Fetches and decodes every document in this collection (full scan).
110    fn all_docs(&self) -> Result<Vec<Value>, NookError> {
111        self.db.read(|tx| {
112            let entries = tx.list_collection(&self.name)?;
113            entries
114                .iter()
115                .map(|(_, v)| decode_document(self.ir, v, self.codec.as_ref()))
116                .collect()
117        })
118    }
119
120    /// Returns a candidate set for `filter`, using the id-field fast-path or
121    /// an indexed-field equality fast-path when available, otherwise a full scan.
122    ///
123    /// `find` ALWAYS re-filters the candidate set through `matches`, so fast
124    /// paths can only narrow (never produce wrong answers).
125    fn candidates(&self, filter: &Value) -> Result<Vec<Value>, NookError> {
126        let Some(obj) = filter.as_object() else {
127            return self.all_docs();
128        };
129        // Primary-id equality → single `tx.get`.
130        if let Some(Value::String(idv)) = obj.get(&self.ir.id_field) {
131            return self.db.read(|tx| {
132                Ok(tx
133                    .get(&self.name, idv.as_bytes())?
134                    .map(|b| decode_document(self.ir, &b, self.codec.as_ref()))
135                    .transpose()?
136                    .into_iter()
137                    .collect())
138            });
139        }
140        // Indexed-field equality (non-operator value) → index lookup then fetch.
141        for idx in &self.ir.indexes {
142            if let Some(v) = obj.get(&idx.field) {
143                if !v.is_object() {
144                    let ids = self
145                        .db
146                        .read(|tx| lookup_eq(tx, &self.name, &idx.field, v))?;
147                    return self.db.read(|tx| {
148                        ids.iter()
149                            .filter_map(|id| tx.get(&self.name, id).transpose())
150                            .map(|r| {
151                                r.and_then(|b| decode_document(self.ir, &b, self.codec.as_ref()))
152                            })
153                            .collect()
154                    });
155                }
156            }
157        }
158        // No fast path available — full scan.
159        self.all_docs()
160    }
161
162    /// Returns every document in the collection that matches `filter`,
163    /// in storage order. Equivalent to `find_with(filter,
164    /// &QueryOptions::default())`.
165    ///
166    /// Operator support (M2): bare equality, `$ne`, `$in`, `$nin`,
167    /// `$gt`, `$gte`, `$lt`, `$lte`, `$exists`. An empty filter (`{}`)
168    /// returns all documents.
169    ///
170    /// # Errors
171    ///
172    /// Storage/corruption errors.
173    pub fn find(&self, filter: &Value) -> Result<Vec<Value>, NookError> {
174        self.find_with(filter, &crate::query::QueryOptions::default())
175    }
176
177    /// Returns documents matching `filter`, then applies `opts`
178    /// (schema-typed sort → offset → limit). null/missing sort last;
179    /// ties break by id.
180    ///
181    /// Operator support for `filter` matches [`find`](Self::find).
182    ///
183    /// # Errors
184    ///
185    /// Storage/corruption errors; `NookError::Schema` if a sort field is
186    /// unknown or non-orderable.
187    pub fn find_with(
188        &self,
189        filter: &Value,
190        opts: &crate::query::QueryOptions,
191    ) -> Result<Vec<Value>, NookError> {
192        let matched: Vec<Value> = self
193            .candidates(filter)?
194            .into_iter()
195            .filter(|d| Self::matches(d, filter))
196            .collect();
197        opts.apply(matched, &self.ir.id_field, |f| {
198            self.ir.field(f).map(|fi| &fi.ty)
199        })
200    }
201
202    /// Returns the first document matching `filter`, or `None`.
203    ///
204    /// # Errors
205    ///
206    /// Storage/corruption errors.
207    pub fn find_one(&self, filter: &Value) -> Result<Option<Value>, NookError> {
208        self.find_one_with(filter, &crate::query::QueryOptions::default())
209    }
210
211    /// `find_one` honoring `opts`: sorts, then returns the first row.
212    /// `limit` is forced to 1 internally.
213    ///
214    /// # Errors
215    ///
216    /// Storage/corruption errors; sort-field errors as
217    /// [`find_with`](Self::find_with).
218    pub fn find_one_with(
219        &self,
220        filter: &Value,
221        opts: &crate::query::QueryOptions,
222    ) -> Result<Option<Value>, NookError> {
223        let mut one = opts.clone();
224        one.limit = Some(1);
225        Ok(self.find_with(filter, &one)?.into_iter().next())
226    }
227
228    /// Returns the number of documents matching `filter`.
229    ///
230    /// # Errors
231    ///
232    /// Storage/corruption errors.
233    pub fn count(&self, filter: &Value) -> Result<usize, NookError> {
234        self.count_with(filter, &crate::query::QueryOptions::default())
235    }
236
237    /// `count` honoring `opts`: `sort` does not change a count so it is not
238    /// applied, but an INVALID `sort` field is still rejected so `count` and
239    /// `find` accept/reject identical options identically; `offset`/`limit`
240    /// cap the returned count ("are there at most N?").
241    ///
242    /// # Errors
243    ///
244    /// Storage/corruption errors; `NookError::Schema` if a sort field is
245    /// unknown or non-orderable.
246    pub fn count_with(
247        &self,
248        filter: &Value,
249        opts: &crate::query::QueryOptions,
250    ) -> Result<usize, NookError> {
251        opts.validate_sort_fields(|f| self.ir.field(f).map(|fi| &fi.ty))?;
252        let total = self
253            .candidates(filter)?
254            .into_iter()
255            .filter(|d| Self::matches(d, filter))
256            .count();
257        let after_offset = total.saturating_sub(opts.offset);
258        Ok(opts.limit.map_or(after_offset, |l| after_offset.min(l)))
259    }
260
261    /// Deletes every document matching `filter`, removing each document and
262    /// all of its index entries atomically.
263    ///
264    /// # Errors
265    ///
266    /// Storage/corruption errors.
267    pub fn delete(&self, filter: &Value) -> Result<usize, NookError> {
268        // NOTE(M4): victims are resolved in a READ txn, then removed in a
269        // SEPARATE write txn using the find-snapshot's values. Safe under M2's
270        // single-process / single-threaded model; a concurrent writer (M4
271        // multi-process) could change a row between find and write, mis-deleting
272        // an index key. Owner: M4 (or a future single-txn find+delete).
273        let victims = self.find(filter)?;
274        if victims.is_empty() {
275            return Ok(0);
276        }
277        self.db
278            .write(|tx| self.delete_victims_in_tx(tx, &victims))?;
279        Ok(victims.len())
280    }
281
282    /// Removes a pre-resolved set of `victims` (documents) and their index
283    /// entries from the in-flight write transaction `tx`. Intended to be
284    /// reused by both the public `delete` (which resolves victims via a
285    /// separate read snapshot) and the buffered transactional path in the
286    /// NAPI binding (which resolves victims via the latest committed
287    /// snapshot at buffer time, see M5c §3.1).
288    ///
289    /// # Errors
290    ///
291    /// Storage/corruption errors, or `NookError::Schema` if any victim
292    /// document lacks the id field.
293    fn delete_victims_in_tx(
294        &self,
295        tx: &mut WriteTx<'_>,
296        victims: &[Value],
297    ) -> Result<(), NookError> {
298        for d in victims {
299            let id = self.doc_id(d)?.as_bytes().to_vec();
300            for idx in &self.ir.indexes {
301                let v = d.get(&idx.field).cloned().unwrap_or(Value::Null);
302                delete_index_entry(tx, &self.name, &idx.field, &v, &id)?;
303            }
304            tx.delete(&self.name, &id)?;
305        }
306        Ok(())
307    }
308
309    /// Variant of [`Self::delete`] that runs the write phase inside the
310    /// caller-supplied [`WriteTx`], so multiple delete ops can share one
311    /// transaction (see M5c `db.transaction(cb)` and the NAPI
312    /// `tx_delete_many` primitive).
313    ///
314    /// Victims are still resolved against the latest committed snapshot
315    /// via [`Self::find`] BEFORE entering the caller's write txn, then
316    /// removed atomically. This mirrors the M2 split-txn shape: read +
317    /// write are separate observations even when delete is composed with
318    /// other ops in one outer txn (read-after-write inside the same
319    /// transaction is M6 retrofit work).
320    ///
321    /// Returns the number of documents removed.
322    ///
323    /// # Errors
324    ///
325    /// Storage/corruption errors, or `NookError::Schema` if any victim
326    /// document lacks the id field.
327    pub fn delete_in_tx(&self, tx: &mut WriteTx<'_>, filter: &Value) -> Result<usize, NookError> {
328        let victims = self.find(filter)?;
329        if victims.is_empty() {
330            return Ok(0);
331        }
332        self.delete_victims_in_tx(tx, &victims)?;
333        Ok(victims.len())
334    }
335
336    /// Validates `doc`, stores it, and maintains every secondary index
337    /// atomically. Unique indexes are pre-checked against the in-flight
338    /// write transaction so two colliding inserts (even within one txn)
339    /// conflict and roll back.
340    ///
341    /// # Errors
342    ///
343    /// - `NookError::Schema` — `doc` fails schema validation, or the id
344    ///   field is missing/not a string.
345    /// - `NookError::InvalidArg` — the id contains a `\0` byte (the
346    ///   `\0`-delimited index key requires a NUL-free `doc_id`; see
347    ///   [`crate::index::engine`]).
348    /// - `NookError::Conflict` — a unique-index value already exists;
349    ///   the whole transaction is rolled back.
350    /// - storage errors propagated from the write transaction.
351    pub fn insert(&self, doc: &Value) -> Result<(), NookError> {
352        validate_document(self.ir, doc)?;
353        let id_str = self.doc_id(doc)?;
354        // Uphold the index engine's NUL-free `doc_id` invariant at the
355        // boundary. The `\0`-delimited, non-length-prefixed index key
356        // (and `lookup_eq`'s range-exactness) depend on it; in release
357        // builds the engine's `debug_assert!` is absent, so a `\0` id
358        // would otherwise silently collide. Mirrors the existing
359        // collection-name NUL rejection in `crate::codec::encode_key`.
360        if id_str.contains('\0') {
361            return Err(NookError::InvalidArg {
362                msg: format!(
363                    "id field {:?} must not contain a NUL byte",
364                    self.ir.id_field
365                ),
366            });
367        }
368        let id = id_str.as_bytes().to_vec();
369        let bytes = encode_document(self.ir, doc, self.codec.as_ref())?;
370        self.db
371            .write(|tx| self.insert_validated_in_tx(tx, doc, &id, &bytes))
372    }
373
374    /// Performs the in-txn write portion of [`Self::insert`] against the
375    /// caller-supplied [`WriteTx`]. Validation, id extraction, and
376    /// document encoding have already run; this method only does the
377    /// unique pre-check + `tx.put` + index maintenance against `tx`.
378    fn insert_validated_in_tx(
379        &self,
380        tx: &mut WriteTx<'_>,
381        doc: &Value,
382        id: &[u8],
383        bytes: &[u8],
384    ) -> Result<(), NookError> {
385        // NOTE(M3+): reading the in-flight write txn here also guarantees
386        // that two unique-colliding inserts within ONE `db.write` conflict.
387        for idx in &self.ir.indexes {
388            if idx.unique {
389                let v = doc.get(&idx.field).cloned().unwrap_or(Value::Null);
390                // Observe THIS write txn's view (not a read
391                // snapshot) so two inserts in one txn still conflict.
392                if index_value_exists_writing(tx, &self.name, &idx.field, &v)? {
393                    return Err(NookError::Conflict {
394                        msg: format!("{}.{} duplicate", self.name, idx.field),
395                    });
396                }
397            }
398        }
399        tx.put(&self.name, id, bytes)?;
400        for idx in &self.ir.indexes {
401            let v = doc.get(&idx.field).cloned().unwrap_or(Value::Null);
402            put_index_entry(tx, &self.name, &idx.field, &v, id)?;
403        }
404        Ok(())
405    }
406
407    /// Variant of [`Self::insert`] that runs the validation + index
408    /// maintenance + storage write inside the caller-supplied
409    /// [`WriteTx`], so multiple insert ops can share one transaction
410    /// (see M5c `db.transaction(cb)` and the NAPI `tx_insert`
411    /// primitive). Same semantics as `insert`, including the
412    /// in-transaction unique pre-check that catches two colliding
413    /// inserts buffered into one outer txn.
414    ///
415    /// # Errors
416    ///
417    /// Same as [`Self::insert`].
418    pub fn insert_in_tx(&self, tx: &mut WriteTx<'_>, doc: &Value) -> Result<(), NookError> {
419        validate_document(self.ir, doc)?;
420        let id_str = self.doc_id(doc)?;
421        if id_str.contains('\0') {
422            return Err(NookError::InvalidArg {
423                msg: format!(
424                    "id field {:?} must not contain a NUL byte",
425                    self.ir.id_field
426                ),
427            });
428        }
429        let id = id_str.as_bytes().to_vec();
430        let bytes = encode_document(self.ir, doc, self.codec.as_ref())?;
431        self.insert_validated_in_tx(tx, doc, &id, &bytes)
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::database::Database;
439    use crate::schema::ir::SchemaIr;
440    use serde_json::json;
441
442    fn setup() -> (tempfile::TempDir, Database, SchemaIr) {
443        let d = tempfile::tempdir().unwrap();
444        let db = Database::open(d.path().join("t.db")).unwrap();
445        let ir = SchemaIr::compile(
446            r#"{"u":{"idField":"id","fields":[
447          {"name":"id","type":"id"},{"name":"email","type":"string"},
448          {"name":"role","type":"enum","variants":["admin","user"]}],
449          "indexes":[{"field":"email","unique":true},{"field":"role","unique":false}]}}"#,
450        )
451        .unwrap();
452        (d, db, ir)
453    }
454
455    #[test]
456    fn insert_validates_stores_and_indexes() {
457        let (_d, db, ir) = setup();
458        let c = Collection::new(&db, &ir, "u").unwrap();
459        c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
460            .unwrap();
461        let stored = db.read(|tx| tx.get("u", b"1")).unwrap().unwrap();
462        assert!(serde_json::from_slice::<serde_json::Value>(&stored).is_ok());
463    }
464
465    #[test]
466    fn insert_rejects_invalid_document_with_schema_error() {
467        let (_d, db, ir) = setup();
468        let c = Collection::new(&db, &ir, "u").unwrap();
469        let e = c
470            .insert(&json!({"id":"1","email":"a@b","role":"ghost"}))
471            .unwrap_err();
472        assert_eq!(e.kind(), crate::error::NookErrorKind::Schema);
473    }
474
475    #[test]
476    fn unique_index_violation_is_conflict_and_rolls_back() {
477        let (_d, db, ir) = setup();
478        let c = Collection::new(&db, &ir, "u").unwrap();
479        c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
480            .unwrap();
481        let e = c
482            .insert(&json!({"id":"2","email":"a@b","role":"user"}))
483            .unwrap_err();
484        assert_eq!(e.kind(), crate::error::NookErrorKind::Conflict);
485        assert!(
486            db.read(|tx| tx.get("u", b"2")).unwrap().is_none(),
487            "rolled back"
488        );
489    }
490
491    #[test]
492    fn insert_rejects_nul_in_id_with_invalid_arg_and_persists_nothing() {
493        let (_d, db, ir) = setup();
494        let c = Collection::new(&db, &ir, "u").unwrap();
495        let e = c
496            .insert(&json!({"id": "a\u{0}b", "email": "a@b", "role": "admin"}))
497            .unwrap_err();
498        assert_eq!(e.kind(), crate::error::NookErrorKind::InvalidArg);
499        // The NUL-free doc_id invariant (engine::key debug_assert is compiled
500        // out in release) is enforced ONLY by this insert-boundary guard, so
501        // a rejected insert must persist nothing.
502        assert!(
503            db.read(|tx| tx.get("u", b"a\x00b")).unwrap().is_none(),
504            "rejected NUL-id insert must not persist an entries row"
505        );
506    }
507
508    #[test]
509    fn find_by_id_index_and_scan_paths() {
510        let (_d, db, ir) = setup();
511        let c = Collection::new(&db, &ir, "u").unwrap();
512        c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
513            .unwrap();
514        c.insert(&json!({"id":"2","email":"c@d","role":"user"}))
515            .unwrap();
516        c.insert(&json!({"id":"3","email":"e@f","role":"admin"}))
517            .unwrap();
518
519        let one = c.find_one(&json!({"id":"2"})).unwrap().unwrap();
520        assert_eq!(one["email"], json!("c@d"));
521        let mut admins = c.find(&json!({"role":"admin"})).unwrap();
522        admins.sort_by_key(|d| d["id"].as_str().unwrap().to_string());
523        assert_eq!(admins.len(), 2);
524        let ne = c.find(&json!({"role":{"$ne":"admin"}})).unwrap();
525        assert_eq!(ne.len(), 1);
526        assert_eq!(c.count(&json!({"role":"admin"})).unwrap(), 2);
527        assert_eq!(c.count(&json!({})).unwrap(), 3);
528    }
529
530    proptest::proptest! {
531        #[test]
532        fn index_lookup_matches_full_scan(ids in proptest::collection::vec(0u32..50, 0..20)) {
533            let (_d, db, ir) = setup();
534            let c = Collection::new(&db, &ir, "u").unwrap();
535            for (i, n) in ids.iter().enumerate() {
536                let role = if n % 2 == 0 { "admin" } else { "user" };
537                let _ = c.insert(&json!({"id":format!("{i}"),
538                    "email":format!("e{i}@x"),"role":role}));
539            }
540            let via_index = c.find(&json!({"role":"admin"})).unwrap().len();
541            let via_scan = c.find(&json!({})).unwrap().into_iter()
542                .filter(|d| d["role"] == json!("admin")).count();
543            proptest::prop_assert_eq!(via_index, via_scan);
544        }
545    }
546
547    #[test]
548    fn delete_removes_docs_and_their_index_entries() {
549        let (_d, db, ir) = setup();
550        let c = Collection::new(&db, &ir, "u").unwrap();
551        c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
552            .unwrap();
553        c.insert(&json!({"id":"2","email":"c@d","role":"admin"}))
554            .unwrap();
555        let n = c.delete(&json!({"id":"1"})).unwrap();
556        assert_eq!(n, 1);
557        assert!(c.find_one(&json!({"id":"1"})).unwrap().is_none());
558        let admins = c.find(&json!({"role":"admin"})).unwrap();
559        assert_eq!(admins.len(), 1);
560        assert_eq!(admins[0]["id"], json!("2"));
561        c.insert(&json!({"id":"9","email":"a@b","role":"user"}))
562            .unwrap();
563    }
564
565    #[test]
566    fn delete_by_filter_removes_all_matching_victims_atomically() {
567        let (_d, db, ir) = setup();
568        let c = Collection::new(&db, &ir, "u").unwrap();
569        c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
570            .unwrap();
571        c.insert(&json!({"id":"2","email":"c@d","role":"admin"}))
572            .unwrap();
573        c.insert(&json!({"id":"3","email":"e@f","role":"user"}))
574            .unwrap();
575        let n = c.delete(&json!({"role":"admin"})).unwrap();
576        assert_eq!(n, 2);
577        assert!(c.find_one(&json!({"id":"1"})).unwrap().is_none());
578        assert!(c.find_one(&json!({"id":"2"})).unwrap().is_none());
579        // non-matching doc untouched
580        assert!(c.find_one(&json!({"id":"3"})).unwrap().is_some());
581        assert_eq!(c.count(&json!({})).unwrap(), 1);
582        // both deleted docs' unique emails are freed (index entries removed
583        // for every victim, not just the first)
584        c.insert(&json!({"id":"10","email":"a@b","role":"user"}))
585            .unwrap();
586        c.insert(&json!({"id":"11","email":"c@d","role":"user"}))
587            .unwrap();
588    }
589
590    proptest::proptest! {
591        #[test]
592        fn delete_keeps_index_consistent_with_live_docs(
593            // up to 12 docs; `keep[i]` decides whether doc i survives the delete pass
594            roles in proptest::collection::vec(0u8..2, 1..12),
595            keep  in proptest::collection::vec(proptest::bool::ANY, 1..12),
596        ) {
597            let (_d, db, ir) = setup();
598            let c = Collection::new(&db, &ir, "u").unwrap();
599            let n = roles.len().min(keep.len());
600            // insert n docs with distinct ids + distinct unique emails
601            for (i, &role_idx) in roles.iter().enumerate().take(n) {
602                let role = if role_idx == 0 { "admin" } else { "user" };
603                c.insert(&json!({"id": format!("{i}"),
604                    "email": format!("e{i}@x"), "role": role})).unwrap();
605            }
606            // delete the docs where !keep[i]
607            for (i, &kept) in keep.iter().enumerate().take(n) {
608                if !kept {
609                    let removed = c.delete(&json!({"id": format!("{i}")})).unwrap();
610                    proptest::prop_assert_eq!(removed, 1);
611                }
612            }
613            // invariant 1: index-path find == scan-path filter for each role (no stale
614            // index entry surfaces a deleted doc; no live doc is missed)
615            for role in ["admin", "user"] {
616                let via_index = c.find(&json!({"role": role})).unwrap().len();
617                let via_scan = c.find(&json!({})).unwrap().into_iter()
618                    .filter(|d| d["role"] == json!(role)).count();
619                proptest::prop_assert_eq!(via_index, via_scan);
620                // and equals the actual live count
621                let live = roles.iter().zip(keep.iter()).take(n)
622                    .filter(|(&r, &k)| k && (if r == 0 { "admin" } else { "user" }) == role)
623                    .count();
624                proptest::prop_assert_eq!(via_index, live);
625            }
626            // Invariant 2 is the GENUINE lock on unique-index cleanup (the Task-8
627            // carry-forward this proptest exists to enforce): re-inserting a deleted
628            // doc's unique email succeeds ONLY if delete physically removed the
629            // `index_entries` key (insert's unique pre-check reads it independently
630            // of the doc row). Invariant 1 above is a find/scan-equivalence + live-
631            // count check and is VACUOUS for detecting a stale entry in isolation
632            // (find re-fetches by id; a deleted doc's gone row hides a stale
633            // non-unique index entry). Do NOT weaken/remove invariant 2.
634            for (i, &kept) in keep.iter().enumerate().take(n) {
635                if !kept {
636                    c.insert(&json!({"id": format!("r{i}"),
637                        "email": format!("e{i}@x"), "role": "user"})).unwrap();
638                }
639            }
640        }
641    }
642
643    /// Opens a temp DB with a collection `"u"` that has a numeric field `n`,
644    /// for exercising sort/limit/offset query options.
645    fn setup_numeric() -> (tempfile::TempDir, Database, SchemaIr) {
646        let d = tempfile::tempdir().unwrap();
647        let db = Database::open(d.path().join("t.db")).unwrap();
648        let ir = SchemaIr::compile(
649            r#"{"u":{"idField":"id","fields":[
650          {"name":"id","type":"id"},{"name":"n","type":"number"}]}}"#,
651        )
652        .unwrap();
653        (d, db, ir)
654    }
655
656    #[test]
657    fn find_with_sorts_limits_offsets() {
658        let (_d, db, ir) = setup_numeric();
659        let c = Collection::new(&db, &ir, "u").unwrap();
660        for (id, n) in [("a", 4), ("b", 1), ("c", 3), ("d", 2)] {
661            c.insert(&serde_json::json!({"id": id, "n": n})).unwrap();
662        }
663        let opts = crate::query::QueryOptions::parse(Some(
664            r#"{"sort":[["n","asc"]],"offset":1,"limit":2}"#,
665        ))
666        .unwrap();
667        let got = c.find_with(&serde_json::json!({}), &opts).unwrap();
668        let ns: Vec<_> = got.iter().map(|d| d["n"].as_i64().unwrap()).collect();
669        assert_eq!(ns, vec![2, 3]);
670    }
671
672    #[test]
673    fn count_with_applies_offset_and_limit_cap() {
674        let (_d, db, ir) = setup_numeric();
675        let c = Collection::new(&db, &ir, "u").unwrap();
676        for (id, n) in [("a", 1), ("b", 2), ("c", 3), ("d", 4)] {
677            c.insert(&serde_json::json!({"id": id, "n": n})).unwrap();
678        }
679        let parse = |s: &str| crate::query::QueryOptions::parse(Some(s)).unwrap();
680        let f = serde_json::json!({});
681        assert_eq!(
682            c.count_with(&f, &crate::query::QueryOptions::default())
683                .unwrap(),
684            4
685        );
686        assert_eq!(c.count_with(&f, &parse(r#"{"limit":2}"#)).unwrap(), 2);
687        assert_eq!(c.count_with(&f, &parse(r#"{"offset":9}"#)).unwrap(), 0);
688        assert_eq!(
689            c.count_with(&f, &parse(r#"{"offset":1,"limit":2}"#))
690                .unwrap(),
691            2
692        );
693        assert_eq!(c.count_with(&f, &parse(r#"{"limit":0}"#)).unwrap(), 0);
694    }
695
696    #[test]
697    fn count_with_rejects_invalid_sort_field() {
698        // count ignores sort for the tally, but must still reject an invalid
699        // sort spec so count and find accept/reject identical options alike.
700        let (_d, db, ir) = setup_numeric();
701        let c = Collection::new(&db, &ir, "u").unwrap();
702        c.insert(&serde_json::json!({"id": "a", "n": 1})).unwrap();
703        let opts =
704            crate::query::QueryOptions::parse(Some(r#"{"sort":[["bogus","asc"]]}"#)).unwrap();
705        let err = c.count_with(&serde_json::json!({}), &opts).unwrap_err();
706        assert!(matches!(err, NookError::Schema { .. }));
707    }
708}