Skip to main content

obj/
query.rs

1//! `Query<T>` — the M8 query builder.
2//!
3//! `Query` is a thin builder over the M6 / M7 `Collection` API. It
4//! borrows a `&Db` for the duration of the build phase; `.fetch()`
5//! and `.count()` consume / inspect the builder and open a fresh
6//! `read_transaction` for the actual scan.
7//!
8//! ## Sources
9//!
10//! - [`Source::Full`] — full collection scan via
11//!   [`crate::Collection::all`]. Order is by primary `Id`.
12//! - [`Source::IndexRange`] — index-range scan via
13//!   [`crate::Collection::index_range`]. Order is by encoded index
14//!   key bytes.
15//!
16//! No cost-based planner: the caller picks the source. The M8 plan
17//! explicitly defers planning to a future milestone.
18//!
19//! ## Filters
20//!
21//! Filters are arbitrary `Fn(&T) -> bool + 'static` closures. They
22//! are evaluated on the decoded document, so they pay the per-doc
23//! decode cost; an `.index_range(...)` source keeps the work bounded
24//! by walking only the index slice.
25//!
26//! Multiple `.filter(...)` calls AND together — every predicate must
27//! return `true` for a doc to be emitted. The closures are stored as
28//! `Box<dyn Fn(&T) -> bool + 'static>`; `'static` lets the closure
29//! outlive the temporary builder.
30//!
31//! ## Power-of-ten posture
32//!
33//! - **Rule 2.** The fetch loop is bounded by the source iterator's
34//!   length. The source iterators (`Collection::all`,
35//!   `Collection::index_range`) are themselves bounded by the
36//!   `MAX_RANGE_NODES` budget (M4 #28). The `.limit(N)` cap further
37//!   shortens the loop when the caller wants a top-N view.
38//! - **Rule 4.** Each function in this module stays under the 60-line
39//!   ceiling; the dispatch in `fetch_inner` factors per-source helpers.
40//! - **Rule 7.** Every `Result` and `Option` is handled. No `unwrap` /
41//!   `expect` in the production path.
42//! - **Rule 9.** Filters use `Box<dyn Fn(&T) -> bool>` — type-erased
43//!   so the public builder can compose without leaking generic
44//!   parameters into every call site. This is the one `dyn` in the
45//!   query layer; the source iteration itself is monomorphic.
46
47use std::ops::{Bound, RangeBounds};
48
49use obj_core::codec::Dynamic;
50use obj_core::{Error, Result};
51
52use crate::Db;
53use crate::Document;
54
55/// Boxed filter predicate. `'static` so the closure can outlive the
56/// `Query` builder.
57type FilterFn<T> = Box<dyn Fn(&T) -> bool + 'static>;
58
59/// Boxed sort-key extractor. The closure may fail at encode time —
60/// e.g. a `sort_by(|t| Dynamic::String(...))` whose string carries
61/// an embedded NUL byte that the order-preserving encoder rejects.
62/// `sort_by_bytes` callers wrap an infallible byte-producing closure
63/// in `Ok(_)`; `sort_by` callers run their `Dynamic` output through
64/// `encode_field` here and propagate the failure rather than
65/// collapsing it to an empty key (Rule 7). See [`Query::sort_by`].
66type SortKeyFn<T> = Box<dyn Fn(&T) -> Result<Vec<u8>> + 'static>;
67
68/// Default cap on the in-memory sort buffer. The query layer reads
69/// at most this many surviving documents into RAM before sorting; a
70/// scan that produces more candidates surfaces
71/// [`Error::SortBufferExceeded`]. M8 #66.
72pub const MAX_SORT_BUFFER: usize = 100_000;
73
74/// Where a [`Query`] reads its candidate documents from.
75///
76/// Construct via the [`Query`] builder methods; the source is full-
77/// scan by default and switches to `IndexRange` on
78/// [`Query::index_range`].
79#[derive(Debug, Clone)]
80enum Source {
81    /// Full primary-tree scan via [`crate::Collection::all`].
82    Full,
83    /// Walk the named index's B-tree slice via
84    /// [`crate::Collection::index_range`]. Bounds are stored as
85    /// already-encoded byte ranges — the builder runs
86    /// `encode_field` at the boundary so the borrow-checker does not
87    /// need to track `Dynamic` across the read txn.
88    IndexRange {
89        /// Index name.
90        name: String,
91        /// Encoded lower bound.
92        start: Bound<Vec<u8>>,
93        /// Encoded upper bound.
94        end: Bound<Vec<u8>>,
95    },
96}
97
98/// The M8 query builder.
99///
100/// Obtain via [`Db::query`]. Compose with [`Query::filter`],
101/// [`Query::limit`], and [`Query::index_range`]; terminate with
102/// [`Query::fetch`].
103///
104/// `Query` borrows `&'db Db` so multiple builders can coexist; the
105/// borrow ends when `.fetch()` returns. The actual scan runs inside
106/// a fresh `read_transaction` opened by `.fetch()` — the builder
107/// itself holds no locks.
108pub struct Query<'db, T: Document> {
109    /// Borrowed `Db` so the builder is cheap to construct and to drop
110    /// without committing to the scan up-front.
111    db: &'db Db,
112    /// Where to draw candidate documents from.
113    source: Source,
114    /// User-supplied predicates. Applied in declaration order; every
115    /// predicate must return `true` for a doc to be emitted.
116    filters: Vec<FilterFn<T>>,
117    /// Optional caller-supplied cap on the result count.
118    limit: Option<usize>,
119    /// Optional sort-key extractor. When set, the fetch collects up
120    /// to `sort_buffer_limit` filtered candidates into RAM, sorts by
121    /// the extractor's byte output, then applies `limit`. Last-call-
122    /// wins if `sort_by` is invoked multiple times.
123    sort_key: Option<SortKeyFn<T>>,
124    /// Per-query override for the sort buffer ceiling. Defaults to
125    /// [`MAX_SORT_BUFFER`]. Only consulted when `sort_key` is set.
126    sort_buffer_limit: Option<usize>,
127}
128
129impl<'db, T: Document> Query<'db, T>
130where
131    T: Send + 'static,
132{
133    /// Construct a fresh full-scan query. Crate-internal — public
134    /// callers go through [`Db::query`].
135    pub(crate) fn new(db: &'db Db) -> Self {
136        Self {
137            db,
138            source: Source::Full,
139            filters: Vec::new(),
140            limit: None,
141            sort_key: None,
142            sort_buffer_limit: None,
143        }
144    }
145
146    /// Append a filter predicate. Filters compose with AND — every
147    /// predicate must return `true` for a doc to be emitted.
148    ///
149    /// `'static` is required so the closure can outlive the
150    /// temporary builder; capture by value if you need to borrow a
151    /// stack-local value.
152    #[must_use]
153    pub fn filter<F>(mut self, predicate: F) -> Self
154    where
155        F: Fn(&T) -> bool + 'static,
156    {
157        self.filters.push(Box::new(predicate));
158        self
159    }
160
161    /// Cap the result set at `n` documents. Order is the source
162    /// order (primary `Id` for full-scan; index-key bytes for an
163    /// index range).
164    #[must_use]
165    pub fn limit(mut self, n: usize) -> Self {
166        self.limit = Some(n);
167        self
168    }
169
170    /// Switch the query source from full-scan to the named index's
171    /// range. Bounds are [`Dynamic`] values; the builder encodes
172    /// them through `obj_core::index::encode_field` at call time so
173    /// the actual range arithmetic sees byte-ordered keys.
174    ///
175    /// Order is by the index key bytes, not by primary `Id`. The
176    /// scan is bounded to the slice of the index B-tree the range
177    /// covers — no full-collection walk.
178    ///
179    /// # Examples
180    ///
181    /// Range query on an indexed `u64` field:
182    ///
183    /// ```
184    /// # fn main() -> obj::Result<()> {
185    /// use obj::Db;
186    /// use obj_core::codec::Dynamic;
187    /// use serde::{Deserialize, Serialize};
188    ///
189    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
190    /// #[obj(collection = "orders_index_range_doc")]
191    /// struct Order {
192    ///     #[obj(index)]
193    ///     placed_at: u64,
194    /// }
195    ///
196    /// let dir = tempfile::tempdir()?;
197    /// let db = Db::open(dir.path().join("range.obj"))?;
198    /// for i in 0..100u64 {
199    ///     let _ = db.insert(Order { placed_at: i * 1_000 })?;
200    /// }
201    /// let last_week = Dynamic::U64(30_000);
202    /// let now = Dynamic::U64(60_000);
203    /// let recent: Vec<Order> = db
204    ///     .query::<Order>()
205    ///     .index_range("placed_at", last_week..now)?
206    ///     .fetch()?;
207    /// assert_eq!(recent.len(), 30);
208    /// # Ok(())
209    /// # }
210    /// ```
211    ///
212    /// # Errors
213    ///
214    /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
215    ///   carries an embedded NUL byte (the order-preserving encoder
216    ///   rejects those — see `obj_core::index::encode_field`).
217    pub fn index_range<R>(mut self, name: &str, range: R) -> Result<Self>
218    where
219        R: RangeBounds<Dynamic>,
220    {
221        let start = encode_bound(range.start_bound())?;
222        let end = encode_bound(range.end_bound())?;
223        self.source = Source::IndexRange {
224            name: name.to_owned(),
225            start,
226            end,
227        };
228        Ok(self)
229    }
230
231    /// Sort the result by `key`'s output in ascending key order.
232    ///
233    /// `key` returns an [`obj_core::codec::Dynamic`] for each
234    /// document; the builder runs each value through
235    /// [`obj_core::index::encode_field`] (M7 #55) so the comparator
236    /// is a byte comparison whose ordering matches the value's
237    /// natural `Ord`. This reuses the same order-preserving encoder
238    /// the index layer uses, so a `.sort_by(|o| o.placed_at.into())`
239    /// produces the same ordering an `index_range("placed_at", ...)`
240    /// scan would visit.
241    ///
242    /// Last-call-wins: a second `.sort_by` (or `.sort_by_bytes`)
243    /// overwrites the first; the two extractors share the same
244    /// internal slot because they are mutually exclusive.
245    ///
246    /// The sort runs BEFORE [`Query::limit`] truncation, so
247    /// `.sort_by(...).limit(N)` returns the N smallest by the
248    /// extractor's key — the "top-N sorted" workload `design.md`
249    /// shows for the Orders example.
250    ///
251    /// # Errors at fetch time
252    ///
253    /// `sort_by` itself is infallible — it stores the closure. The
254    /// encoder runs during [`Query::fetch`]; a `Dynamic` whose
255    /// `encode_field` representation cannot be computed (e.g. a
256    /// `Dynamic::String` carrying an embedded `0x00` byte that the
257    /// order-preserving encoder rejects) surfaces as
258    /// [`Error::SortKeyEncode`] — power-of-ten Rule 7 (no silent
259    /// errors). Callers who want to bypass `encode_field` entirely
260    /// should use [`Query::sort_by_bytes`].
261    ///
262    /// # Sort-buffer bound
263    ///
264    /// The pre-sort buffer is capped at
265    /// [`Query::sort_buffer_limit`] (default
266    /// [`MAX_SORT_BUFFER`] = 100 000). A scan that produces more
267    /// candidates surfaces [`Error::SortBufferExceeded`]; the user
268    /// should narrow the source via `.filter` / `.index_range` /
269    /// `.limit`, or raise the cap with `.sort_buffer_limit(N)`.
270    #[must_use]
271    pub fn sort_by<F>(mut self, key: F) -> Self
272    where
273        F: Fn(&T) -> Dynamic + 'static,
274    {
275        // Encode each per-doc `Dynamic` through the M7 #55 order-
276        // preserving encoder; propagate the encoder's `Result` so
277        // fetch can surface it as `Error::SortKeyEncode`. Power-of-
278        // ten Rule 7: no silent errors.
279        let encoded: SortKeyFn<T> = Box::new(move |doc: &T| {
280            let dynamic = key(doc);
281            obj_core::index::encode_field(&dynamic)
282                .map(obj_core::index::EncodedIndexKey::into_bytes)
283                .map_err(|e| Error::SortKeyEncode {
284                    source: Box::new(e),
285                })
286        });
287        self.sort_key = Some(encoded);
288        self
289    }
290
291    /// Sort the result by `key`'s raw byte output in ascending order.
292    ///
293    /// Companion to [`Query::sort_by`] that lets callers supply the
294    /// already-encoded sort bytes. The byte-order = sort-order
295    /// invariant is the caller's responsibility: two documents whose
296    /// key bytes compare `Less` MUST also be in the desired sort
297    /// order, otherwise the produced ordering is unspecified.
298    ///
299    /// Bypassing [`obj_core::index::encode_field`] means
300    /// [`Error::SortKeyEncode`] cannot fire — `sort_by_bytes` is the
301    /// right shape for callers who already have an order-preserving
302    /// byte form (e.g. a precomputed `i64::to_be_bytes` of a signed
303    /// counter that they have already biased to the unsigned range).
304    ///
305    /// Last-call-wins: `sort_by_bytes` overwrites a prior
306    /// [`Query::sort_by`] (and vice versa).
307    ///
308    /// # Sort-buffer bound
309    ///
310    /// Same bound as [`Query::sort_by`] — see that method's docs.
311    #[must_use]
312    pub fn sort_by_bytes<F>(mut self, key: F) -> Self
313    where
314        F: Fn(&T) -> Vec<u8> + 'static,
315    {
316        // Caller owns the byte-order = sort-order contract; we wrap
317        // their infallible output in `Ok(_)` so fetch_sorted's shared
318        // extractor signature still applies.
319        let encoded: SortKeyFn<T> = Box::new(move |doc: &T| Ok(key(doc)));
320        self.sort_key = Some(encoded);
321        self
322    }
323
324    /// Override the per-query sort-buffer ceiling. Only consulted
325    /// when [`Query::sort_by`] is set. Defaults to
326    /// [`MAX_SORT_BUFFER`] (100 000).
327    ///
328    /// A scan that overshoots the cap surfaces
329    /// [`Error::SortBufferExceeded`]; narrow the candidate set
330    /// with [`Query::filter`] / [`Query::index_range`] /
331    /// [`Query::limit`], or raise the cap with this method.
332    ///
333    /// # Examples
334    ///
335    /// ```
336    /// # fn main() -> obj::Result<()> {
337    /// use obj::Db;
338    /// use obj_core::codec::Dynamic;
339    /// use serde::{Deserialize, Serialize};
340    ///
341    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
342    /// #[obj(collection = "ticks_sort_buffer_doc")]
343    /// struct Tick { value: u64 }
344    ///
345    /// let dir = tempfile::tempdir()?;
346    /// let db = Db::open(dir.path().join("sort.obj"))?;
347    /// for v in 0..10u64 {
348    ///     let _ = db.insert(Tick { value: v })?;
349    /// }
350    /// let top: Vec<Tick> = db
351    ///     .query::<Tick>()
352    ///     .sort_by(|t| Dynamic::U64(t.value))
353    ///     .sort_buffer_limit(1_000) // narrower than the default
354    ///     .limit(3)
355    ///     .fetch()?;
356    /// assert_eq!(top.len(), 3);
357    /// # Ok(())
358    /// # }
359    /// ```
360    #[must_use]
361    pub fn sort_buffer_limit(mut self, n: usize) -> Self {
362        self.sort_buffer_limit = Some(n);
363        self
364    }
365
366    /// Execute the query and materialise the matching documents.
367    ///
368    /// Opens a fresh `read_transaction`, drives the configured
369    /// source iterator, applies every filter in declaration order,
370    /// and truncates to `limit` (if set). Returns the documents in
371    /// source order.
372    ///
373    /// # Errors
374    ///
375    /// - As [`Db::read_transaction`].
376    /// - Any error from the underlying [`crate::Collection`] scan.
377    pub fn fetch(self) -> Result<Vec<T>> {
378        #[cfg(feature = "tracing")]
379        let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
380        #[cfg(feature = "tracing")]
381        let _guard = span.enter();
382        #[cfg(feature = "tracing")]
383        span.record("kind", query_kind(&self.source));
384        self.db.read_transaction(|tx| {
385            let coll = tx.collection::<T>()?;
386            if self.sort_key.is_some() {
387                fetch_sorted(&coll, &self)
388            } else {
389                fetch_unsorted(&coll, &self)
390            }
391        })
392    }
393
394    /// Count the documents this query would return, ignoring
395    /// [`Query::sort_by`] (sort does not change the count) but
396    /// honouring [`Query::limit`] (returns `min(total, limit)`).
397    ///
398    /// Takes `&self` rather than consuming the builder so callers
399    /// can chain a follow-up `.fetch()` on the same predicate set.
400    ///
401    /// # Fast path (no filter set)
402    ///
403    /// When no filter is set, `count` walks the source B-tree
404    /// without decoding any document. The exact shape of the walk
405    /// depends on the source's index kind so the answer matches
406    /// what `fetch` would return:
407    ///
408    /// - **Full scan** — walks the primary B-tree counting entries
409    ///   (`Collection::count_all`). One entry per doc; the count is
410    ///   exact.
411    /// - **`Standard` / `Unique` / `Composite` `index_range`** — walks
412    ///   the named index's B-tree counting entries
413    ///   (`Collection::count_index_range`). One entry per doc;
414    ///   the count is exact.
415    /// - **`Each` `index_range`** — walks the named index's B-tree
416    ///   tracking distinct trailing-`id` suffixes via
417    ///   `Collection::count_distinct_ids_in_range` (M8 follow-up
418    ///   #72). A single doc may emit multiple entries under
419    ///   different element keys; the entry count would overshoot.
420    ///   The distinct set is bounded by
421    ///   [`crate::MAX_DISTINCT_IDS`] (100 000); exceeding it
422    ///   surfaces [`obj_core::Error::DistinctCountExceeded`].
423    ///
424    /// # Slow path (filter set)
425    ///
426    /// When at least one filter is set, the predicate has to see a
427    /// decoded `T`, so the slow path pays the per-doc decode cost
428    /// (same as `fetch`). Sort, if set, is ignored — the total
429    /// count is the same.
430    ///
431    /// # Examples
432    ///
433    /// Count without materialising documents:
434    ///
435    /// ```
436    /// # fn main() -> obj::Result<()> {
437    /// use obj::Db;
438    /// use serde::{Deserialize, Serialize};
439    ///
440    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
441    /// #[obj(collection = "counts_doc")]
442    /// struct Order { customer_id: u64 }
443    ///
444    /// let dir = tempfile::tempdir()?;
445    /// let db = Db::open(dir.path().join("count.obj"))?;
446    /// for i in 0..30u64 {
447    ///     let _ = db.insert(Order { customer_id: i % 3 })?;
448    /// }
449    /// let n: u64 = db.query::<Order>()
450    ///     .filter(|o| o.customer_id == 1)
451    ///     .count()?;
452    /// assert_eq!(n, 10);
453    /// # Ok(())
454    /// # }
455    /// ```
456    ///
457    /// # Errors
458    ///
459    /// - As [`Db::read_transaction`].
460    /// - Any error from the underlying [`crate::Collection`] scan.
461    /// - [`obj_core::Error::DistinctCountExceeded`] on the `Each`
462    ///   fast path.
463    pub fn count(&self) -> Result<u64> {
464        #[cfg(feature = "tracing")]
465        let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
466        #[cfg(feature = "tracing")]
467        let _guard = span.enter();
468        #[cfg(feature = "tracing")]
469        span.record("kind", query_kind(&self.source));
470        self.db.read_transaction(|tx| {
471            let coll = tx.collection::<T>()?;
472            let total = if self.filters.is_empty() {
473                count_fast(&coll, &self.source)?
474            } else {
475                count_slow(&coll, self)?
476            };
477            Ok(apply_count_limit(total, self.limit))
478        })
479    }
480}
481
482/// Map the [`Source`] variant to the `kind` value recorded on the
483/// `query.execute` span (Phase 2B, issue #7).
484///
485/// `Source::Full` → `"filter"` (a full primary-tree walk plus
486/// in-memory predicates is the "filter scan" path); `Source::IndexRange`
487/// → `"index"` (the work is bounded by the named index's slice).
488#[cfg(feature = "tracing")]
489fn query_kind(source: &Source) -> &'static str {
490    match source {
491        Source::Full => "filter",
492        Source::IndexRange { .. } => "index",
493    }
494}
495
496/// Apply the optional `.limit(N)` cap to a raw count. `min(total, N)`
497/// when `limit` is set; the raw total otherwise.
498fn apply_count_limit(total: u64, limit: Option<usize>) -> u64 {
499    match limit {
500        // `usize` is at most 64 bits on every supported target so
501        // the cast is exact; `min` keeps overflow out of the
502        // arithmetic regardless.
503        Some(n) => total.min(u64::try_from(n).unwrap_or(u64::MAX)),
504        None => total,
505    }
506}
507
508/// Drain the configured source with no `sort_by` set.
509///
510/// Power-of-ten Rule 2: the loop is bounded by the source iterator's
511/// length (the primary B-tree's `MAX_RANGE_NODES` budget). The
512/// `.limit(N)` cap further shortens the loop.
513fn fetch_unsorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
514where
515    T: Document + Send + 'static,
516{
517    if q.limit == Some(0) {
518        return Ok(Vec::new());
519    }
520    let mut out: Vec<T> = Vec::new();
521    for_each_candidate(coll, q, |doc| {
522        if !q.filters.iter().all(|f| f(&doc)) {
523            return Ok(true);
524        }
525        out.push(doc);
526        if let Some(n) = q.limit {
527            if out.len() >= n {
528                return Ok(false);
529            }
530        }
531        Ok(true)
532    })?;
533    Ok(out)
534}
535
536/// Drain the configured source with `sort_by` set.
537///
538/// Collects up to `sort_buffer_limit` filtered candidates into RAM,
539/// sorts ascending by the extractor's byte output, then truncates
540/// to `limit`. Power-of-ten Rule 3: the buffer is bounded — exceeding
541/// it surfaces [`Error::SortBufferExceeded`] rather than chewing
542/// arbitrary memory.
543fn fetch_sorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
544where
545    T: Document + Send + 'static,
546{
547    let cap = q.sort_buffer_limit.unwrap_or(MAX_SORT_BUFFER);
548    let sort_key = q
549        .sort_key
550        .as_ref()
551        .ok_or(Error::InvalidArgument("fetch_sorted without sort_key"))?;
552    let mut buf: Vec<(Vec<u8>, T)> = Vec::new();
553    for_each_candidate(coll, q, |doc| {
554        if !q.filters.iter().all(|f| f(&doc)) {
555            return Ok(true);
556        }
557        if buf.len() >= cap {
558            return Err(Error::SortBufferExceeded { limit: cap });
559        }
560        // Power-of-ten Rule 7: propagate the extractor's `Result`
561        // rather than collapsing an encode failure into a tied key.
562        let key_bytes = sort_key(&doc)?;
563        buf.push((key_bytes, doc));
564        Ok(true)
565    })?;
566    buf.sort_by(|a, b| a.0.cmp(&b.0));
567    let truncated_len = match q.limit {
568        Some(n) => buf.len().min(n),
569        None => buf.len(),
570    };
571    let mut out: Vec<T> = Vec::with_capacity(truncated_len);
572    for (_k, d) in buf.into_iter().take(truncated_len) {
573        out.push(d);
574    }
575    Ok(out)
576}
577
578/// Walk the configured source and call `f(doc)` for each decoded
579/// document. `f` returns `Ok(true)` to continue, `Ok(false)` to stop
580/// early (e.g. the limit is reached), or `Err(_)` to abort the scan.
581///
582/// Power-of-ten Rule 4: factor the source-dispatch + iteration
583/// boilerplate so the sorted / unsorted entry points stay readable.
584fn for_each_candidate<T, F>(
585    coll: &crate::Collection<'_, T>,
586    q: &Query<'_, T>,
587    mut f: F,
588) -> Result<()>
589where
590    T: Document + Send + 'static,
591    F: FnMut(T) -> Result<bool>,
592{
593    match &q.source {
594        Source::Full => {
595            let docs = coll.all()?;
596            for (_id, doc) in docs {
597                if !f(doc)? {
598                    return Ok(());
599                }
600            }
601        }
602        Source::IndexRange { name, start, end } => {
603            let iter = coll.index_range_encoded(name, clone_bound(start), clone_bound(end))?;
604            for step in iter {
605                let (_key, doc) = step?;
606                if !f(doc)? {
607                    return Ok(());
608                }
609            }
610        }
611    }
612    Ok(())
613}
614
615/// No-filter fast path for [`Query::count`].
616///
617/// Walks the underlying B-tree (primary or index) without decoding
618/// any document. For an `Each`-kind `index_range` source, dispatches
619/// to [`crate::Collection::count_distinct_ids_in_range`] so the count
620/// matches what `fetch` would return (which de-duplicates per doc).
621/// Other kinds use the cheaper entry-count
622/// [`crate::Collection::count_index_range`] path. See the rustdoc on
623/// `Query::count` for the full per-kind table.
624fn count_fast<T>(coll: &crate::Collection<'_, T>, source: &Source) -> Result<u64>
625where
626    T: Document + Send + 'static,
627{
628    match source {
629        Source::Full => coll.count_all(),
630        Source::IndexRange { name, start, end } => {
631            // Peek at the descriptor to choose the per-kind path. The
632            // peek goes through the same `active_index` validation as
633            // the eventual count call (which re-resolves the index by
634            // name), so an unknown / dropped name produces the same
635            // `Error::IndexNotFound` exactly once.
636            let kind = coll.index_kind(name)?;
637            if kind == obj_core::IndexKind::Each {
638                coll.count_distinct_ids_in_range_encoded(name, clone_bound(start), clone_bound(end))
639            } else {
640                coll.count_index_range_encoded(name, clone_bound(start), clone_bound(end))
641            }
642        }
643    }
644}
645
646/// Filter-applied slow path for [`Query::count`].
647///
648/// Must decode each candidate to evaluate the predicate. The decode
649/// cost is unavoidable per the M8 design: there is no inverted-index
650/// data structure that would let us count filter matches without
651/// touching the document.
652fn count_slow<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<u64>
653where
654    T: Document + Send + 'static,
655{
656    let mut n: u64 = 0;
657    for_each_candidate(coll, q, |doc| {
658        if q.filters.iter().all(|f| f(&doc)) {
659            n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
660                reason: "slow-path count exceeds u64",
661            })?;
662        }
663        Ok(true)
664    })?;
665    Ok(n)
666}
667
668/// Encode a `Bound<Dynamic>` into a `Bound<Vec<u8>>` using the
669/// order-preserving field encoder. Used by [`Query::index_range`].
670fn encode_bound(b: Bound<&Dynamic>) -> Result<Bound<Vec<u8>>> {
671    match b {
672        Bound::Included(v) => Ok(Bound::Included(
673            obj_core::index::encode_field(v)?.into_bytes(),
674        )),
675        Bound::Excluded(v) => Ok(Bound::Excluded(
676            obj_core::index::encode_field(v)?.into_bytes(),
677        )),
678        Bound::Unbounded => Ok(Bound::Unbounded),
679    }
680}
681
682/// Clone a borrowed `Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`.
683/// Used by [`fetch_index_range`] to hand the bounds to the M7
684/// `Collection::index_range` API (which takes ownership).
685fn clone_bound(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
686    match b {
687        Bound::Included(v) => Bound::Included(v.clone()),
688        Bound::Excluded(v) => Bound::Excluded(v.clone()),
689        Bound::Unbounded => Bound::Unbounded,
690    }
691}