1use serde_json::Value;
3
4use crate::codec::doc::{decode_document, encode_document, IdentityCodec, ValueCodec};
5use crate::database::Database;
6use crate::error::NookError;
7use crate::index::engine::{
8 delete_index_entry, index_value_exists_writing, lookup_eq, put_index_entry,
9};
10use crate::schema::ir::{CollectionIr, SchemaIr};
11use crate::schema::validate::validate_document;
12use crate::storage::WriteTx;
13
14pub struct Collection<'a> {
22 db: &'a Database,
23 ir: &'a CollectionIr,
24 name: String,
25 codec: Box<dyn ValueCodec>,
29}
30
31impl<'a> Collection<'a> {
32 pub fn new(db: &'a Database, schema: &'a SchemaIr, name: &str) -> Result<Self, NookError> {
39 let ir = schema.collection(name).ok_or_else(|| NookError::Schema {
40 msg: format!("unknown collection {name:?}"),
41 })?;
42 Ok(Self {
43 db,
44 ir,
45 name: name.to_string(),
46 codec: Box::new(IdentityCodec),
47 })
48 }
49
50 fn doc_id<'v>(&self, doc: &'v Value) -> Result<&'v str, NookError> {
57 doc.get(&self.ir.id_field)
58 .and_then(Value::as_str)
59 .ok_or_else(|| NookError::Schema {
60 msg: format!("missing id field {:?}", self.ir.id_field),
61 })
62 }
63
64 fn matches(doc: &Value, filter: &Value) -> bool {
69 let Some(fobj) = filter.as_object() else {
70 return true;
71 };
72 fobj.iter().all(|(field, cond)| {
73 let actual = doc.get(field);
74 match cond {
75 Value::Object(ops) if ops.keys().any(|k| k.starts_with('$')) => {
76 ops.iter().all(|(op, want)| match op.as_str() {
77 "$ne" => actual != Some(want),
78 "$exists" => want.as_bool().unwrap_or(true) == actual.is_some(),
79 "$in" => want
80 .as_array()
81 .is_some_and(|a| actual.is_some_and(|v| a.contains(v))),
82 "$nin" => want
83 .as_array()
84 .is_some_and(|a| actual.map_or(true, |v| !a.contains(v))),
85 "$gt" | "$gte" | "$lt" | "$lte" => Self::cmp(actual, op, want),
86 _ => false,
87 })
88 }
89 _ => actual == Some(cond),
90 }
91 })
92 }
93
94 fn cmp(actual: Option<&Value>, op: &str, want: &Value) -> bool {
98 let (Some(a), Some(b)) = (actual.and_then(Value::as_f64), want.as_f64()) else {
99 return false;
100 };
101 match op {
102 "$gt" => a > b,
103 "$gte" => a >= b,
104 "$lt" => a < b,
105 _ => a <= b,
106 }
107 }
108
109 fn all_docs(&self) -> Result<Vec<Value>, NookError> {
111 self.db.read(|tx| {
112 let entries = tx.list_collection(&self.name)?;
113 entries
114 .iter()
115 .map(|(_, v)| decode_document(self.ir, v, self.codec.as_ref()))
116 .collect()
117 })
118 }
119
120 fn candidates(&self, filter: &Value) -> Result<Vec<Value>, NookError> {
126 let Some(obj) = filter.as_object() else {
127 return self.all_docs();
128 };
129 if let Some(Value::String(idv)) = obj.get(&self.ir.id_field) {
131 return self.db.read(|tx| {
132 Ok(tx
133 .get(&self.name, idv.as_bytes())?
134 .map(|b| decode_document(self.ir, &b, self.codec.as_ref()))
135 .transpose()?
136 .into_iter()
137 .collect())
138 });
139 }
140 for idx in &self.ir.indexes {
142 if let Some(v) = obj.get(&idx.field) {
143 if !v.is_object() {
144 let ids = self
145 .db
146 .read(|tx| lookup_eq(tx, &self.name, &idx.field, v))?;
147 return self.db.read(|tx| {
148 ids.iter()
149 .filter_map(|id| tx.get(&self.name, id).transpose())
150 .map(|r| {
151 r.and_then(|b| decode_document(self.ir, &b, self.codec.as_ref()))
152 })
153 .collect()
154 });
155 }
156 }
157 }
158 self.all_docs()
160 }
161
162 pub fn find(&self, filter: &Value) -> Result<Vec<Value>, NookError> {
174 self.find_with(filter, &crate::query::QueryOptions::default())
175 }
176
177 pub fn find_with(
188 &self,
189 filter: &Value,
190 opts: &crate::query::QueryOptions,
191 ) -> Result<Vec<Value>, NookError> {
192 let matched: Vec<Value> = self
193 .candidates(filter)?
194 .into_iter()
195 .filter(|d| Self::matches(d, filter))
196 .collect();
197 opts.apply(matched, &self.ir.id_field, |f| {
198 self.ir.field(f).map(|fi| &fi.ty)
199 })
200 }
201
202 pub fn find_one(&self, filter: &Value) -> Result<Option<Value>, NookError> {
208 self.find_one_with(filter, &crate::query::QueryOptions::default())
209 }
210
211 pub fn find_one_with(
219 &self,
220 filter: &Value,
221 opts: &crate::query::QueryOptions,
222 ) -> Result<Option<Value>, NookError> {
223 let mut one = opts.clone();
224 one.limit = Some(1);
225 Ok(self.find_with(filter, &one)?.into_iter().next())
226 }
227
228 pub fn count(&self, filter: &Value) -> Result<usize, NookError> {
234 self.count_with(filter, &crate::query::QueryOptions::default())
235 }
236
237 pub fn count_with(
247 &self,
248 filter: &Value,
249 opts: &crate::query::QueryOptions,
250 ) -> Result<usize, NookError> {
251 opts.validate_sort_fields(|f| self.ir.field(f).map(|fi| &fi.ty))?;
252 let total = self
253 .candidates(filter)?
254 .into_iter()
255 .filter(|d| Self::matches(d, filter))
256 .count();
257 let after_offset = total.saturating_sub(opts.offset);
258 Ok(opts.limit.map_or(after_offset, |l| after_offset.min(l)))
259 }
260
261 pub fn delete(&self, filter: &Value) -> Result<usize, NookError> {
268 let victims = self.find(filter)?;
274 if victims.is_empty() {
275 return Ok(0);
276 }
277 self.db
278 .write(|tx| self.delete_victims_in_tx(tx, &victims))?;
279 Ok(victims.len())
280 }
281
282 fn delete_victims_in_tx(
294 &self,
295 tx: &mut WriteTx<'_>,
296 victims: &[Value],
297 ) -> Result<(), NookError> {
298 for d in victims {
299 let id = self.doc_id(d)?.as_bytes().to_vec();
300 for idx in &self.ir.indexes {
301 let v = d.get(&idx.field).cloned().unwrap_or(Value::Null);
302 delete_index_entry(tx, &self.name, &idx.field, &v, &id)?;
303 }
304 tx.delete(&self.name, &id)?;
305 }
306 Ok(())
307 }
308
309 pub fn delete_in_tx(&self, tx: &mut WriteTx<'_>, filter: &Value) -> Result<usize, NookError> {
328 let victims = self.find(filter)?;
329 if victims.is_empty() {
330 return Ok(0);
331 }
332 self.delete_victims_in_tx(tx, &victims)?;
333 Ok(victims.len())
334 }
335
336 pub fn insert(&self, doc: &Value) -> Result<(), NookError> {
352 validate_document(self.ir, doc)?;
353 let id_str = self.doc_id(doc)?;
354 if id_str.contains('\0') {
361 return Err(NookError::InvalidArg {
362 msg: format!(
363 "id field {:?} must not contain a NUL byte",
364 self.ir.id_field
365 ),
366 });
367 }
368 let id = id_str.as_bytes().to_vec();
369 let bytes = encode_document(self.ir, doc, self.codec.as_ref())?;
370 self.db
371 .write(|tx| self.insert_validated_in_tx(tx, doc, &id, &bytes))
372 }
373
374 fn insert_validated_in_tx(
379 &self,
380 tx: &mut WriteTx<'_>,
381 doc: &Value,
382 id: &[u8],
383 bytes: &[u8],
384 ) -> Result<(), NookError> {
385 for idx in &self.ir.indexes {
388 if idx.unique {
389 let v = doc.get(&idx.field).cloned().unwrap_or(Value::Null);
390 if index_value_exists_writing(tx, &self.name, &idx.field, &v)? {
393 return Err(NookError::Conflict {
394 msg: format!("{}.{} duplicate", self.name, idx.field),
395 });
396 }
397 }
398 }
399 tx.put(&self.name, id, bytes)?;
400 for idx in &self.ir.indexes {
401 let v = doc.get(&idx.field).cloned().unwrap_or(Value::Null);
402 put_index_entry(tx, &self.name, &idx.field, &v, id)?;
403 }
404 Ok(())
405 }
406
407 pub fn insert_in_tx(&self, tx: &mut WriteTx<'_>, doc: &Value) -> Result<(), NookError> {
419 validate_document(self.ir, doc)?;
420 let id_str = self.doc_id(doc)?;
421 if id_str.contains('\0') {
422 return Err(NookError::InvalidArg {
423 msg: format!(
424 "id field {:?} must not contain a NUL byte",
425 self.ir.id_field
426 ),
427 });
428 }
429 let id = id_str.as_bytes().to_vec();
430 let bytes = encode_document(self.ir, doc, self.codec.as_ref())?;
431 self.insert_validated_in_tx(tx, doc, &id, &bytes)
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::database::Database;
439 use crate::schema::ir::SchemaIr;
440 use serde_json::json;
441
442 fn setup() -> (tempfile::TempDir, Database, SchemaIr) {
443 let d = tempfile::tempdir().unwrap();
444 let db = Database::open(d.path().join("t.db")).unwrap();
445 let ir = SchemaIr::compile(
446 r#"{"u":{"idField":"id","fields":[
447 {"name":"id","type":"id"},{"name":"email","type":"string"},
448 {"name":"role","type":"enum","variants":["admin","user"]}],
449 "indexes":[{"field":"email","unique":true},{"field":"role","unique":false}]}}"#,
450 )
451 .unwrap();
452 (d, db, ir)
453 }
454
455 #[test]
456 fn insert_validates_stores_and_indexes() {
457 let (_d, db, ir) = setup();
458 let c = Collection::new(&db, &ir, "u").unwrap();
459 c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
460 .unwrap();
461 let stored = db.read(|tx| tx.get("u", b"1")).unwrap().unwrap();
462 assert!(serde_json::from_slice::<serde_json::Value>(&stored).is_ok());
463 }
464
465 #[test]
466 fn insert_rejects_invalid_document_with_schema_error() {
467 let (_d, db, ir) = setup();
468 let c = Collection::new(&db, &ir, "u").unwrap();
469 let e = c
470 .insert(&json!({"id":"1","email":"a@b","role":"ghost"}))
471 .unwrap_err();
472 assert_eq!(e.kind(), crate::error::NookErrorKind::Schema);
473 }
474
475 #[test]
476 fn unique_index_violation_is_conflict_and_rolls_back() {
477 let (_d, db, ir) = setup();
478 let c = Collection::new(&db, &ir, "u").unwrap();
479 c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
480 .unwrap();
481 let e = c
482 .insert(&json!({"id":"2","email":"a@b","role":"user"}))
483 .unwrap_err();
484 assert_eq!(e.kind(), crate::error::NookErrorKind::Conflict);
485 assert!(
486 db.read(|tx| tx.get("u", b"2")).unwrap().is_none(),
487 "rolled back"
488 );
489 }
490
491 #[test]
492 fn insert_rejects_nul_in_id_with_invalid_arg_and_persists_nothing() {
493 let (_d, db, ir) = setup();
494 let c = Collection::new(&db, &ir, "u").unwrap();
495 let e = c
496 .insert(&json!({"id": "a\u{0}b", "email": "a@b", "role": "admin"}))
497 .unwrap_err();
498 assert_eq!(e.kind(), crate::error::NookErrorKind::InvalidArg);
499 assert!(
503 db.read(|tx| tx.get("u", b"a\x00b")).unwrap().is_none(),
504 "rejected NUL-id insert must not persist an entries row"
505 );
506 }
507
508 #[test]
509 fn find_by_id_index_and_scan_paths() {
510 let (_d, db, ir) = setup();
511 let c = Collection::new(&db, &ir, "u").unwrap();
512 c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
513 .unwrap();
514 c.insert(&json!({"id":"2","email":"c@d","role":"user"}))
515 .unwrap();
516 c.insert(&json!({"id":"3","email":"e@f","role":"admin"}))
517 .unwrap();
518
519 let one = c.find_one(&json!({"id":"2"})).unwrap().unwrap();
520 assert_eq!(one["email"], json!("c@d"));
521 let mut admins = c.find(&json!({"role":"admin"})).unwrap();
522 admins.sort_by_key(|d| d["id"].as_str().unwrap().to_string());
523 assert_eq!(admins.len(), 2);
524 let ne = c.find(&json!({"role":{"$ne":"admin"}})).unwrap();
525 assert_eq!(ne.len(), 1);
526 assert_eq!(c.count(&json!({"role":"admin"})).unwrap(), 2);
527 assert_eq!(c.count(&json!({})).unwrap(), 3);
528 }
529
530 proptest::proptest! {
531 #[test]
532 fn index_lookup_matches_full_scan(ids in proptest::collection::vec(0u32..50, 0..20)) {
533 let (_d, db, ir) = setup();
534 let c = Collection::new(&db, &ir, "u").unwrap();
535 for (i, n) in ids.iter().enumerate() {
536 let role = if n % 2 == 0 { "admin" } else { "user" };
537 let _ = c.insert(&json!({"id":format!("{i}"),
538 "email":format!("e{i}@x"),"role":role}));
539 }
540 let via_index = c.find(&json!({"role":"admin"})).unwrap().len();
541 let via_scan = c.find(&json!({})).unwrap().into_iter()
542 .filter(|d| d["role"] == json!("admin")).count();
543 proptest::prop_assert_eq!(via_index, via_scan);
544 }
545 }
546
547 #[test]
548 fn delete_removes_docs_and_their_index_entries() {
549 let (_d, db, ir) = setup();
550 let c = Collection::new(&db, &ir, "u").unwrap();
551 c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
552 .unwrap();
553 c.insert(&json!({"id":"2","email":"c@d","role":"admin"}))
554 .unwrap();
555 let n = c.delete(&json!({"id":"1"})).unwrap();
556 assert_eq!(n, 1);
557 assert!(c.find_one(&json!({"id":"1"})).unwrap().is_none());
558 let admins = c.find(&json!({"role":"admin"})).unwrap();
559 assert_eq!(admins.len(), 1);
560 assert_eq!(admins[0]["id"], json!("2"));
561 c.insert(&json!({"id":"9","email":"a@b","role":"user"}))
562 .unwrap();
563 }
564
565 #[test]
566 fn delete_by_filter_removes_all_matching_victims_atomically() {
567 let (_d, db, ir) = setup();
568 let c = Collection::new(&db, &ir, "u").unwrap();
569 c.insert(&json!({"id":"1","email":"a@b","role":"admin"}))
570 .unwrap();
571 c.insert(&json!({"id":"2","email":"c@d","role":"admin"}))
572 .unwrap();
573 c.insert(&json!({"id":"3","email":"e@f","role":"user"}))
574 .unwrap();
575 let n = c.delete(&json!({"role":"admin"})).unwrap();
576 assert_eq!(n, 2);
577 assert!(c.find_one(&json!({"id":"1"})).unwrap().is_none());
578 assert!(c.find_one(&json!({"id":"2"})).unwrap().is_none());
579 assert!(c.find_one(&json!({"id":"3"})).unwrap().is_some());
581 assert_eq!(c.count(&json!({})).unwrap(), 1);
582 c.insert(&json!({"id":"10","email":"a@b","role":"user"}))
585 .unwrap();
586 c.insert(&json!({"id":"11","email":"c@d","role":"user"}))
587 .unwrap();
588 }
589
590 proptest::proptest! {
591 #[test]
592 fn delete_keeps_index_consistent_with_live_docs(
593 roles in proptest::collection::vec(0u8..2, 1..12),
595 keep in proptest::collection::vec(proptest::bool::ANY, 1..12),
596 ) {
597 let (_d, db, ir) = setup();
598 let c = Collection::new(&db, &ir, "u").unwrap();
599 let n = roles.len().min(keep.len());
600 for (i, &role_idx) in roles.iter().enumerate().take(n) {
602 let role = if role_idx == 0 { "admin" } else { "user" };
603 c.insert(&json!({"id": format!("{i}"),
604 "email": format!("e{i}@x"), "role": role})).unwrap();
605 }
606 for (i, &kept) in keep.iter().enumerate().take(n) {
608 if !kept {
609 let removed = c.delete(&json!({"id": format!("{i}")})).unwrap();
610 proptest::prop_assert_eq!(removed, 1);
611 }
612 }
613 for role in ["admin", "user"] {
616 let via_index = c.find(&json!({"role": role})).unwrap().len();
617 let via_scan = c.find(&json!({})).unwrap().into_iter()
618 .filter(|d| d["role"] == json!(role)).count();
619 proptest::prop_assert_eq!(via_index, via_scan);
620 let live = roles.iter().zip(keep.iter()).take(n)
622 .filter(|(&r, &k)| k && (if r == 0 { "admin" } else { "user" }) == role)
623 .count();
624 proptest::prop_assert_eq!(via_index, live);
625 }
626 for (i, &kept) in keep.iter().enumerate().take(n) {
635 if !kept {
636 c.insert(&json!({"id": format!("r{i}"),
637 "email": format!("e{i}@x"), "role": "user"})).unwrap();
638 }
639 }
640 }
641 }
642
643 fn setup_numeric() -> (tempfile::TempDir, Database, SchemaIr) {
646 let d = tempfile::tempdir().unwrap();
647 let db = Database::open(d.path().join("t.db")).unwrap();
648 let ir = SchemaIr::compile(
649 r#"{"u":{"idField":"id","fields":[
650 {"name":"id","type":"id"},{"name":"n","type":"number"}]}}"#,
651 )
652 .unwrap();
653 (d, db, ir)
654 }
655
656 #[test]
657 fn find_with_sorts_limits_offsets() {
658 let (_d, db, ir) = setup_numeric();
659 let c = Collection::new(&db, &ir, "u").unwrap();
660 for (id, n) in [("a", 4), ("b", 1), ("c", 3), ("d", 2)] {
661 c.insert(&serde_json::json!({"id": id, "n": n})).unwrap();
662 }
663 let opts = crate::query::QueryOptions::parse(Some(
664 r#"{"sort":[["n","asc"]],"offset":1,"limit":2}"#,
665 ))
666 .unwrap();
667 let got = c.find_with(&serde_json::json!({}), &opts).unwrap();
668 let ns: Vec<_> = got.iter().map(|d| d["n"].as_i64().unwrap()).collect();
669 assert_eq!(ns, vec![2, 3]);
670 }
671
672 #[test]
673 fn count_with_applies_offset_and_limit_cap() {
674 let (_d, db, ir) = setup_numeric();
675 let c = Collection::new(&db, &ir, "u").unwrap();
676 for (id, n) in [("a", 1), ("b", 2), ("c", 3), ("d", 4)] {
677 c.insert(&serde_json::json!({"id": id, "n": n})).unwrap();
678 }
679 let parse = |s: &str| crate::query::QueryOptions::parse(Some(s)).unwrap();
680 let f = serde_json::json!({});
681 assert_eq!(
682 c.count_with(&f, &crate::query::QueryOptions::default())
683 .unwrap(),
684 4
685 );
686 assert_eq!(c.count_with(&f, &parse(r#"{"limit":2}"#)).unwrap(), 2);
687 assert_eq!(c.count_with(&f, &parse(r#"{"offset":9}"#)).unwrap(), 0);
688 assert_eq!(
689 c.count_with(&f, &parse(r#"{"offset":1,"limit":2}"#))
690 .unwrap(),
691 2
692 );
693 assert_eq!(c.count_with(&f, &parse(r#"{"limit":0}"#)).unwrap(), 0);
694 }
695
696 #[test]
697 fn count_with_rejects_invalid_sort_field() {
698 let (_d, db, ir) = setup_numeric();
701 let c = Collection::new(&db, &ir, "u").unwrap();
702 c.insert(&serde_json::json!({"id": "a", "n": 1})).unwrap();
703 let opts =
704 crate::query::QueryOptions::parse(Some(r#"{"sort":[["bogus","asc"]]}"#)).unwrap();
705 let err = c.count_with(&serde_json::json!({}), &opts).unwrap_err();
706 assert!(matches!(err, NookError::Schema { .. }));
707 }
708}