obj/query.rs
1//! `Query<T>` — the M8 query builder.
2//!
3//! `Query` is a thin builder over the M6 / M7 `Collection` API. It
4//! borrows a `&Db` for the duration of the build phase; `.fetch()`
5//! and `.count()` consume / inspect the builder and open a fresh
6//! `read_transaction` for the actual scan.
7//!
8//! ## Sources
9//!
10//! - [`Source::Full`] — full collection scan via
11//! [`crate::Collection::all`]. Order is by primary `Id`.
12//! - [`Source::IndexRange`] — index-range scan via
13//! [`crate::Collection::index_range`]. Order is by encoded index
14//! key bytes.
15//!
16//! No cost-based planner: the caller picks the source. The M8 plan
17//! explicitly defers planning to a future milestone.
18//!
19//! ## Filters
20//!
21//! Filters are arbitrary `Fn(&T) -> bool + 'static` closures. They
22//! are evaluated on the decoded document, so they pay the per-doc
23//! decode cost; an `.index_range(...)` source keeps the work bounded
24//! by walking only the index slice.
25//!
26//! Multiple `.filter(...)` calls AND together — every predicate must
27//! return `true` for a doc to be emitted. The closures are stored as
28//! `Box<dyn Fn(&T) -> bool + 'static>`; `'static` lets the closure
29//! outlive the temporary builder.
30//!
31//! ## Power-of-ten posture
32//!
33//! - **Rule 2.** The fetch loop is bounded by the source iterator's
34//! length. The source iterators (`Collection::all`,
35//! `Collection::index_range`) are themselves bounded by the
36//! `MAX_RANGE_NODES` budget (M4 #28). The `.limit(N)` cap further
37//! shortens the loop when the caller wants a top-N view.
38//! - **Rule 4.** Each function in this module stays under the 60-line
39//! ceiling; the dispatch in `fetch_inner` factors per-source helpers.
40//! - **Rule 7.** Every `Result` and `Option` is handled. No `unwrap` /
41//! `expect` in the production path.
42//! - **Rule 9.** Filters use `Box<dyn Fn(&T) -> bool>` — type-erased
43//! so the public builder can compose without leaking generic
44//! parameters into every call site. This is the one `dyn` in the
45//! query layer; the source iteration itself is monomorphic.
46
47use std::ops::{Bound, RangeBounds};
48
49use obj_core::codec::Dynamic;
50use obj_core::{Error, Result};
51
52use crate::Db;
53use crate::Document;
54
55/// Boxed filter predicate. `'static` so the closure can outlive the
56/// `Query` builder.
57type FilterFn<T> = Box<dyn Fn(&T) -> bool + 'static>;
58
59/// Boxed sort-key extractor. The closure may fail at encode time —
60/// e.g. a `sort_by(|t| Dynamic::String(...))` whose string carries
61/// an embedded NUL byte that the order-preserving encoder rejects.
62/// `sort_by_bytes` callers wrap an infallible byte-producing closure
63/// in `Ok(_)`; `sort_by` callers run their `Dynamic` output through
64/// `encode_field` here and propagate the failure rather than
65/// collapsing it to an empty key (Rule 7). See [`Query::sort_by`].
66type SortKeyFn<T> = Box<dyn Fn(&T) -> Result<Vec<u8>> + 'static>;
67
68/// Default cap on the in-memory sort buffer. The query layer reads
69/// at most this many surviving documents into RAM before sorting; a
70/// scan that produces more candidates surfaces
71/// [`Error::SortBufferExceeded`]. M8 #66.
72pub const MAX_SORT_BUFFER: usize = 100_000;
73
74/// Where a [`Query`] reads its candidate documents from.
75///
76/// Construct via the [`Query`] builder methods; the source is full-
77/// scan by default and switches to `IndexRange` on
78/// [`Query::index_range`].
79#[derive(Debug, Clone)]
80enum Source {
81 /// Full primary-tree scan via [`crate::Collection::all`].
82 Full,
83 /// Walk the named index's B-tree slice via
84 /// [`crate::Collection::index_range`]. Bounds are stored as
85 /// already-encoded byte ranges — the builder runs
86 /// `encode_field` at the boundary so the borrow-checker does not
87 /// need to track `Dynamic` across the read txn.
88 IndexRange {
89 /// Index name.
90 name: String,
91 /// Encoded lower bound.
92 start: Bound<Vec<u8>>,
93 /// Encoded upper bound.
94 end: Bound<Vec<u8>>,
95 },
96}
97
98/// The M8 query builder.
99///
100/// Obtain via [`Db::query`]. Compose with [`Query::filter`],
101/// [`Query::limit`], and [`Query::index_range`]; terminate with
102/// [`Query::fetch`].
103///
104/// `Query` borrows `&'db Db` so multiple builders can coexist; the
105/// borrow ends when `.fetch()` returns. The actual scan runs inside
106/// a fresh `read_transaction` opened by `.fetch()` — the builder
107/// itself holds no locks.
108pub struct Query<'db, T: Document> {
109 /// Borrowed `Db` so the builder is cheap to construct and to drop
110 /// without committing to the scan up-front.
111 db: &'db Db,
112 /// Where to draw candidate documents from.
113 source: Source,
114 /// User-supplied predicates. Applied in declaration order; every
115 /// predicate must return `true` for a doc to be emitted.
116 filters: Vec<FilterFn<T>>,
117 /// Optional caller-supplied cap on the result count.
118 limit: Option<usize>,
119 /// Optional sort-key extractor. When set, the fetch collects up
120 /// to `sort_buffer_limit` filtered candidates into RAM, sorts by
121 /// the extractor's byte output, then applies `limit`. Last-call-
122 /// wins if `sort_by` is invoked multiple times.
123 sort_key: Option<SortKeyFn<T>>,
124 /// Per-query override for the sort buffer ceiling. Defaults to
125 /// [`MAX_SORT_BUFFER`]. Only consulted when `sort_key` is set.
126 sort_buffer_limit: Option<usize>,
127}
128
129impl<'db, T: Document> Query<'db, T>
130where
131 T: Send + 'static,
132{
133 /// Construct a fresh full-scan query. Crate-internal — public
134 /// callers go through [`Db::query`].
135 pub(crate) fn new(db: &'db Db) -> Self {
136 Self {
137 db,
138 source: Source::Full,
139 filters: Vec::new(),
140 limit: None,
141 sort_key: None,
142 sort_buffer_limit: None,
143 }
144 }
145
146 /// Append a filter predicate. Filters compose with AND — every
147 /// predicate must return `true` for a doc to be emitted.
148 ///
149 /// `'static` is required so the closure can outlive the
150 /// temporary builder; capture by value if you need to borrow a
151 /// stack-local value.
152 #[must_use]
153 pub fn filter<F>(mut self, predicate: F) -> Self
154 where
155 F: Fn(&T) -> bool + 'static,
156 {
157 self.filters.push(Box::new(predicate));
158 self
159 }
160
161 /// Cap the result set at `n` documents. Order is the source
162 /// order (primary `Id` for full-scan; index-key bytes for an
163 /// index range).
164 #[must_use]
165 pub fn limit(mut self, n: usize) -> Self {
166 self.limit = Some(n);
167 self
168 }
169
170 /// Switch the query source from full-scan to the named index's
171 /// range. Bounds are [`Dynamic`] values; the builder encodes
172 /// them through `obj_core::index::encode_field` at call time so
173 /// the actual range arithmetic sees byte-ordered keys.
174 ///
175 /// Order is by the index key bytes, not by primary `Id`. The
176 /// scan is bounded to the slice of the index B-tree the range
177 /// covers — no full-collection walk.
178 ///
179 /// # Examples
180 ///
181 /// Range query on an indexed `u64` field:
182 ///
183 /// ```
184 /// # fn main() -> obj::Result<()> {
185 /// use obj::Db;
186 /// use obj_core::codec::Dynamic;
187 /// use serde::{Deserialize, Serialize};
188 ///
189 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
190 /// #[obj(collection = "orders_index_range_doc")]
191 /// struct Order {
192 /// #[obj(index)]
193 /// placed_at: u64,
194 /// }
195 ///
196 /// let dir = tempfile::tempdir()?;
197 /// let db = Db::open(dir.path().join("range.obj"))?;
198 /// for i in 0..100u64 {
199 /// let _ = db.insert(Order { placed_at: i * 1_000 })?;
200 /// }
201 /// let last_week = Dynamic::U64(30_000);
202 /// let now = Dynamic::U64(60_000);
203 /// let recent: Vec<Order> = db
204 /// .query::<Order>()
205 /// .index_range("placed_at", last_week..now)?
206 /// .fetch()?;
207 /// assert_eq!(recent.len(), 30);
208 /// # Ok(())
209 /// # }
210 /// ```
211 ///
212 /// # Errors
213 ///
214 /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
215 /// carries an embedded NUL byte (the order-preserving encoder
216 /// rejects those — see `obj_core::index::encode_field`).
217 pub fn index_range<R>(mut self, name: &str, range: R) -> Result<Self>
218 where
219 R: RangeBounds<Dynamic>,
220 {
221 let start = encode_bound(range.start_bound())?;
222 let end = encode_bound(range.end_bound())?;
223 self.source = Source::IndexRange {
224 name: name.to_owned(),
225 start,
226 end,
227 };
228 Ok(self)
229 }
230
231 /// Sort the result by `key`'s output in ascending key order.
232 ///
233 /// `key` returns an [`obj_core::codec::Dynamic`] for each
234 /// document; the builder runs each value through
235 /// [`obj_core::index::encode_field`] (M7 #55) so the comparator
236 /// is a byte comparison whose ordering matches the value's
237 /// natural `Ord`. This reuses the same order-preserving encoder
238 /// the index layer uses, so a `.sort_by(|o| o.placed_at.into())`
239 /// produces the same ordering an `index_range("placed_at", ...)`
240 /// scan would visit.
241 ///
242 /// Last-call-wins: a second `.sort_by` (or `.sort_by_bytes`)
243 /// overwrites the first; the two extractors share the same
244 /// internal slot because they are mutually exclusive.
245 ///
246 /// The sort runs BEFORE [`Query::limit`] truncation, so
247 /// `.sort_by(...).limit(N)` returns the N smallest by the
248 /// extractor's key — the "top-N sorted" workload `design.md`
249 /// shows for the Orders example.
250 ///
251 /// # Errors at fetch time
252 ///
253 /// `sort_by` itself is infallible — it stores the closure. The
254 /// encoder runs during [`Query::fetch`]; a `Dynamic` whose
255 /// `encode_field` representation cannot be computed (e.g. a
256 /// `Dynamic::String` carrying an embedded `0x00` byte that the
257 /// order-preserving encoder rejects) surfaces as
258 /// [`Error::SortKeyEncode`] — power-of-ten Rule 7 (no silent
259 /// errors). Callers who want to bypass `encode_field` entirely
260 /// should use [`Query::sort_by_bytes`].
261 ///
262 /// # Sort-buffer bound
263 ///
264 /// The pre-sort buffer is capped at
265 /// [`Query::sort_buffer_limit`] (default
266 /// [`MAX_SORT_BUFFER`] = 100 000). A scan that produces more
267 /// candidates surfaces [`Error::SortBufferExceeded`]; the user
268 /// should narrow the source via `.filter` / `.index_range` /
269 /// `.limit`, or raise the cap with `.sort_buffer_limit(N)`.
270 #[must_use]
271 pub fn sort_by<F>(mut self, key: F) -> Self
272 where
273 F: Fn(&T) -> Dynamic + 'static,
274 {
275 // Encode each per-doc `Dynamic` through the M7 #55 order-
276 // preserving encoder; propagate the encoder's `Result` so
277 // fetch can surface it as `Error::SortKeyEncode`. Power-of-
278 // ten Rule 7: no silent errors.
279 let encoded: SortKeyFn<T> = Box::new(move |doc: &T| {
280 let dynamic = key(doc);
281 obj_core::index::encode_field(&dynamic)
282 .map(obj_core::index::EncodedIndexKey::into_bytes)
283 .map_err(|e| Error::SortKeyEncode {
284 source: Box::new(e),
285 })
286 });
287 self.sort_key = Some(encoded);
288 self
289 }
290
291 /// Sort the result by `key`'s raw byte output in ascending order.
292 ///
293 /// Companion to [`Query::sort_by`] that lets callers supply the
294 /// already-encoded sort bytes. The byte-order = sort-order
295 /// invariant is the caller's responsibility: two documents whose
296 /// key bytes compare `Less` MUST also be in the desired sort
297 /// order, otherwise the produced ordering is unspecified.
298 ///
299 /// Bypassing [`obj_core::index::encode_field`] means
300 /// [`Error::SortKeyEncode`] cannot fire — `sort_by_bytes` is the
301 /// right shape for callers who already have an order-preserving
302 /// byte form (e.g. a precomputed `i64::to_be_bytes` of a signed
303 /// counter that they have already biased to the unsigned range).
304 ///
305 /// Last-call-wins: `sort_by_bytes` overwrites a prior
306 /// [`Query::sort_by`] (and vice versa).
307 ///
308 /// # Sort-buffer bound
309 ///
310 /// Same bound as [`Query::sort_by`] — see that method's docs.
311 #[must_use]
312 pub fn sort_by_bytes<F>(mut self, key: F) -> Self
313 where
314 F: Fn(&T) -> Vec<u8> + 'static,
315 {
316 // Caller owns the byte-order = sort-order contract; we wrap
317 // their infallible output in `Ok(_)` so fetch_sorted's shared
318 // extractor signature still applies.
319 let encoded: SortKeyFn<T> = Box::new(move |doc: &T| Ok(key(doc)));
320 self.sort_key = Some(encoded);
321 self
322 }
323
324 /// Override the per-query sort-buffer ceiling. Only consulted
325 /// when [`Query::sort_by`] is set. Defaults to
326 /// [`MAX_SORT_BUFFER`] (100 000).
327 ///
328 /// A scan that overshoots the cap surfaces
329 /// [`Error::SortBufferExceeded`]; narrow the candidate set
330 /// with [`Query::filter`] / [`Query::index_range`] /
331 /// [`Query::limit`], or raise the cap with this method.
332 ///
333 /// # Examples
334 ///
335 /// ```
336 /// # fn main() -> obj::Result<()> {
337 /// use obj::Db;
338 /// use obj_core::codec::Dynamic;
339 /// use serde::{Deserialize, Serialize};
340 ///
341 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
342 /// #[obj(collection = "ticks_sort_buffer_doc")]
343 /// struct Tick { value: u64 }
344 ///
345 /// let dir = tempfile::tempdir()?;
346 /// let db = Db::open(dir.path().join("sort.obj"))?;
347 /// for v in 0..10u64 {
348 /// let _ = db.insert(Tick { value: v })?;
349 /// }
350 /// let top: Vec<Tick> = db
351 /// .query::<Tick>()
352 /// .sort_by(|t| Dynamic::U64(t.value))
353 /// .sort_buffer_limit(1_000) // narrower than the default
354 /// .limit(3)
355 /// .fetch()?;
356 /// assert_eq!(top.len(), 3);
357 /// # Ok(())
358 /// # }
359 /// ```
360 #[must_use]
361 pub fn sort_buffer_limit(mut self, n: usize) -> Self {
362 self.sort_buffer_limit = Some(n);
363 self
364 }
365
366 /// Execute the query and materialise the matching documents.
367 ///
368 /// Opens a fresh `read_transaction`, drives the configured
369 /// source iterator, applies every filter in declaration order,
370 /// and truncates to `limit` (if set). Returns the documents in
371 /// source order.
372 ///
373 /// # Errors
374 ///
375 /// - As [`Db::read_transaction`].
376 /// - Any error from the underlying [`crate::Collection`] scan.
377 pub fn fetch(self) -> Result<Vec<T>> {
378 #[cfg(feature = "tracing")]
379 let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
380 #[cfg(feature = "tracing")]
381 let _guard = span.enter();
382 #[cfg(feature = "tracing")]
383 span.record("kind", query_kind(&self.source));
384 self.db.read_transaction(|tx| {
385 let coll = tx.collection::<T>()?;
386 if self.sort_key.is_some() {
387 fetch_sorted(&coll, &self)
388 } else {
389 fetch_unsorted(&coll, &self)
390 }
391 })
392 }
393
394 /// Count the documents this query would return, ignoring
395 /// [`Query::sort_by`] (sort does not change the count) but
396 /// honouring [`Query::limit`] (returns `min(total, limit)`).
397 ///
398 /// Takes `&self` rather than consuming the builder so callers
399 /// can chain a follow-up `.fetch()` on the same predicate set.
400 ///
401 /// # Fast path (no filter set)
402 ///
403 /// When no filter is set, `count` walks the source B-tree
404 /// without decoding any document. The exact shape of the walk
405 /// depends on the source's index kind so the answer matches
406 /// what `fetch` would return:
407 ///
408 /// - **Full scan** — walks the primary B-tree counting entries
409 /// (`Collection::count_all`). One entry per doc; the count is
410 /// exact.
411 /// - **`Standard` / `Unique` / `Composite` `index_range`** — walks
412 /// the named index's B-tree counting entries
413 /// (`Collection::count_index_range`). One entry per doc;
414 /// the count is exact.
415 /// - **`Each` `index_range`** — walks the named index's B-tree
416 /// tracking distinct trailing-`id` suffixes via
417 /// `Collection::count_distinct_ids_in_range` (M8 follow-up
418 /// #72). A single doc may emit multiple entries under
419 /// different element keys; the entry count would overshoot.
420 /// The distinct set is bounded by
421 /// [`crate::MAX_DISTINCT_IDS`] (100 000); exceeding it
422 /// surfaces [`obj_core::Error::DistinctCountExceeded`].
423 ///
424 /// # Slow path (filter set)
425 ///
426 /// When at least one filter is set, the predicate has to see a
427 /// decoded `T`, so the slow path pays the per-doc decode cost
428 /// (same as `fetch`). Sort, if set, is ignored — the total
429 /// count is the same.
430 ///
431 /// # Examples
432 ///
433 /// Count without materialising documents:
434 ///
435 /// ```
436 /// # fn main() -> obj::Result<()> {
437 /// use obj::Db;
438 /// use serde::{Deserialize, Serialize};
439 ///
440 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
441 /// #[obj(collection = "counts_doc")]
442 /// struct Order { customer_id: u64 }
443 ///
444 /// let dir = tempfile::tempdir()?;
445 /// let db = Db::open(dir.path().join("count.obj"))?;
446 /// for i in 0..30u64 {
447 /// let _ = db.insert(Order { customer_id: i % 3 })?;
448 /// }
449 /// let n: u64 = db.query::<Order>()
450 /// .filter(|o| o.customer_id == 1)
451 /// .count()?;
452 /// assert_eq!(n, 10);
453 /// # Ok(())
454 /// # }
455 /// ```
456 ///
457 /// # Errors
458 ///
459 /// - As [`Db::read_transaction`].
460 /// - Any error from the underlying [`crate::Collection`] scan.
461 /// - [`obj_core::Error::DistinctCountExceeded`] on the `Each`
462 /// fast path.
463 pub fn count(&self) -> Result<u64> {
464 #[cfg(feature = "tracing")]
465 let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
466 #[cfg(feature = "tracing")]
467 let _guard = span.enter();
468 #[cfg(feature = "tracing")]
469 span.record("kind", query_kind(&self.source));
470 self.db.read_transaction(|tx| {
471 let coll = tx.collection::<T>()?;
472 let total = if self.filters.is_empty() {
473 count_fast(&coll, &self.source)?
474 } else {
475 count_slow(&coll, self)?
476 };
477 Ok(apply_count_limit(total, self.limit))
478 })
479 }
480}
481
482/// Map the [`Source`] variant to the `kind` value recorded on the
483/// `query.execute` span (Phase 2B, issue #7).
484///
485/// `Source::Full` → `"filter"` (a full primary-tree walk plus
486/// in-memory predicates is the "filter scan" path); `Source::IndexRange`
487/// → `"index"` (the work is bounded by the named index's slice).
488#[cfg(feature = "tracing")]
489fn query_kind(source: &Source) -> &'static str {
490 match source {
491 Source::Full => "filter",
492 Source::IndexRange { .. } => "index",
493 }
494}
495
496/// Apply the optional `.limit(N)` cap to a raw count. `min(total, N)`
497/// when `limit` is set; the raw total otherwise.
498fn apply_count_limit(total: u64, limit: Option<usize>) -> u64 {
499 match limit {
500 // `usize` is at most 64 bits on every supported target so
501 // the cast is exact; `min` keeps overflow out of the
502 // arithmetic regardless.
503 Some(n) => total.min(u64::try_from(n).unwrap_or(u64::MAX)),
504 None => total,
505 }
506}
507
508/// Drain the configured source with no `sort_by` set.
509///
510/// Power-of-ten Rule 2: the loop is bounded by the source iterator's
511/// length (the primary B-tree's `MAX_RANGE_NODES` budget). The
512/// `.limit(N)` cap further shortens the loop.
513fn fetch_unsorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
514where
515 T: Document + Send + 'static,
516{
517 if q.limit == Some(0) {
518 return Ok(Vec::new());
519 }
520 let mut out: Vec<T> = Vec::new();
521 for_each_candidate(coll, q, |doc| {
522 if !q.filters.iter().all(|f| f(&doc)) {
523 return Ok(true);
524 }
525 out.push(doc);
526 if let Some(n) = q.limit {
527 if out.len() >= n {
528 return Ok(false);
529 }
530 }
531 Ok(true)
532 })?;
533 Ok(out)
534}
535
536/// Drain the configured source with `sort_by` set.
537///
538/// Collects up to `sort_buffer_limit` filtered candidates into RAM,
539/// sorts ascending by the extractor's byte output, then truncates
540/// to `limit`. Power-of-ten Rule 3: the buffer is bounded — exceeding
541/// it surfaces [`Error::SortBufferExceeded`] rather than chewing
542/// arbitrary memory.
543fn fetch_sorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
544where
545 T: Document + Send + 'static,
546{
547 let cap = q.sort_buffer_limit.unwrap_or(MAX_SORT_BUFFER);
548 let sort_key = q
549 .sort_key
550 .as_ref()
551 .ok_or(Error::InvalidArgument("fetch_sorted without sort_key"))?;
552 let mut buf: Vec<(Vec<u8>, T)> = Vec::new();
553 for_each_candidate(coll, q, |doc| {
554 if !q.filters.iter().all(|f| f(&doc)) {
555 return Ok(true);
556 }
557 if buf.len() >= cap {
558 return Err(Error::SortBufferExceeded { limit: cap });
559 }
560 // Power-of-ten Rule 7: propagate the extractor's `Result`
561 // rather than collapsing an encode failure into a tied key.
562 let key_bytes = sort_key(&doc)?;
563 buf.push((key_bytes, doc));
564 Ok(true)
565 })?;
566 buf.sort_by(|a, b| a.0.cmp(&b.0));
567 let truncated_len = match q.limit {
568 Some(n) => buf.len().min(n),
569 None => buf.len(),
570 };
571 let mut out: Vec<T> = Vec::with_capacity(truncated_len);
572 for (_k, d) in buf.into_iter().take(truncated_len) {
573 out.push(d);
574 }
575 Ok(out)
576}
577
578/// Walk the configured source and call `f(doc)` for each decoded
579/// document. `f` returns `Ok(true)` to continue, `Ok(false)` to stop
580/// early (e.g. the limit is reached), or `Err(_)` to abort the scan.
581///
582/// Power-of-ten Rule 4: factor the source-dispatch + iteration
583/// boilerplate so the sorted / unsorted entry points stay readable.
584fn for_each_candidate<T, F>(
585 coll: &crate::Collection<'_, T>,
586 q: &Query<'_, T>,
587 mut f: F,
588) -> Result<()>
589where
590 T: Document + Send + 'static,
591 F: FnMut(T) -> Result<bool>,
592{
593 match &q.source {
594 Source::Full => {
595 let docs = coll.all()?;
596 for (_id, doc) in docs {
597 if !f(doc)? {
598 return Ok(());
599 }
600 }
601 }
602 Source::IndexRange { name, start, end } => {
603 let iter = coll.index_range_encoded(name, clone_bound(start), clone_bound(end))?;
604 for step in iter {
605 let (_key, doc) = step?;
606 if !f(doc)? {
607 return Ok(());
608 }
609 }
610 }
611 }
612 Ok(())
613}
614
615/// No-filter fast path for [`Query::count`].
616///
617/// Walks the underlying B-tree (primary or index) without decoding
618/// any document. For an `Each`-kind `index_range` source, dispatches
619/// to [`crate::Collection::count_distinct_ids_in_range`] so the count
620/// matches what `fetch` would return (which de-duplicates per doc).
621/// Other kinds use the cheaper entry-count
622/// [`crate::Collection::count_index_range`] path. See the rustdoc on
623/// `Query::count` for the full per-kind table.
624fn count_fast<T>(coll: &crate::Collection<'_, T>, source: &Source) -> Result<u64>
625where
626 T: Document + Send + 'static,
627{
628 match source {
629 Source::Full => coll.count_all(),
630 Source::IndexRange { name, start, end } => {
631 // Peek at the descriptor to choose the per-kind path. The
632 // peek goes through the same `active_index` validation as
633 // the eventual count call (which re-resolves the index by
634 // name), so an unknown / dropped name produces the same
635 // `Error::IndexNotFound` exactly once.
636 let kind = coll.index_kind(name)?;
637 if kind == obj_core::IndexKind::Each {
638 coll.count_distinct_ids_in_range_encoded(name, clone_bound(start), clone_bound(end))
639 } else {
640 coll.count_index_range_encoded(name, clone_bound(start), clone_bound(end))
641 }
642 }
643 }
644}
645
646/// Filter-applied slow path for [`Query::count`].
647///
648/// Must decode each candidate to evaluate the predicate. The decode
649/// cost is unavoidable per the M8 design: there is no inverted-index
650/// data structure that would let us count filter matches without
651/// touching the document.
652fn count_slow<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<u64>
653where
654 T: Document + Send + 'static,
655{
656 let mut n: u64 = 0;
657 for_each_candidate(coll, q, |doc| {
658 if q.filters.iter().all(|f| f(&doc)) {
659 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
660 reason: "slow-path count exceeds u64",
661 })?;
662 }
663 Ok(true)
664 })?;
665 Ok(n)
666}
667
668/// Encode a `Bound<Dynamic>` into a `Bound<Vec<u8>>` using the
669/// order-preserving field encoder. Used by [`Query::index_range`].
670fn encode_bound(b: Bound<&Dynamic>) -> Result<Bound<Vec<u8>>> {
671 match b {
672 Bound::Included(v) => Ok(Bound::Included(
673 obj_core::index::encode_field(v)?.into_bytes(),
674 )),
675 Bound::Excluded(v) => Ok(Bound::Excluded(
676 obj_core::index::encode_field(v)?.into_bytes(),
677 )),
678 Bound::Unbounded => Ok(Bound::Unbounded),
679 }
680}
681
682/// Clone a borrowed `Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`.
683/// Used by [`fetch_index_range`] to hand the bounds to the M7
684/// `Collection::index_range` API (which takes ownership).
685fn clone_bound(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
686 match b {
687 Bound::Included(v) => Bound::Included(v.clone()),
688 Bound::Excluded(v) => Bound::Excluded(v.clone()),
689 Bound::Unbounded => Bound::Unbounded,
690 }
691}