Skip to main content

obj/
query.rs

1//! `Query<T>` — the M8 query builder.
2//!
3//! `Query` is a thin builder over the M6 / M7 `Collection` API. It
4//! borrows a `&Db` for the duration of the build phase; `.fetch()`
5//! and `.count()` consume / inspect the builder and open a fresh
6//! `read_transaction` for the actual scan.
7//!
8//! ## Sources
9//!
10//! - [`Source::Full`] — full collection scan via
11//!   [`crate::Collection::all`]. Order is by primary `Id`.
12//! - [`Source::IndexRange`] — index-range scan via
13//!   [`crate::Collection::index_range`]. Order is by encoded index
14//!   key bytes.
15//!
16//! No cost-based planner: the caller picks the source. The M8 plan
17//! explicitly defers planning to a future milestone.
18//!
19//! ## Filters
20//!
21//! Filters are arbitrary `Fn(&T) -> bool + 'static` closures. They
22//! are evaluated on the decoded document, so they pay the per-doc
23//! decode cost; an `.index_range(...)` source keeps the work bounded
24//! by walking only the index slice.
25//!
26//! Multiple `.filter(...)` calls AND together — every predicate must
27//! return `true` for a doc to be emitted. The closures are stored as
28//! `Box<dyn Fn(&T) -> bool + 'static>`; `'static` lets the closure
29//! outlive the temporary builder.
30//!
31//! ## Power-of-ten posture
32//!
33//! - **Rule 2.** The fetch loop is bounded by the source iterator's
34//!   length. The source iterators (`Collection::all`,
35//!   `Collection::index_range`) are themselves bounded by the
36//!   `MAX_RANGE_NODES` budget (M4 #28). The `.limit(N)` cap further
37//!   shortens the loop when the caller wants a top-N view.
38//! - **Rule 4.** Each function in this module stays under the 60-line
39//!   ceiling; the dispatch in `fetch_inner` factors per-source helpers.
40//! - **Rule 7.** Every `Result` and `Option` is handled. No `unwrap` /
41//!   `expect` in the production path.
42//! - **Rule 9.** Filters use `Box<dyn Fn(&T) -> bool>` — type-erased
43//!   so the public builder can compose without leaking generic
44//!   parameters into every call site. This is the one `dyn` in the
45//!   query layer; the source iteration itself is monomorphic.
46
47use std::ops::{Bound, RangeBounds};
48
49use obj_core::codec::Dynamic;
50use obj_core::{Error, Result};
51
52use crate::Db;
53use crate::Document;
54
55/// Boxed filter predicate. `'static` so the closure can outlive the
56/// `Query` builder.
57type FilterFn<T> = Box<dyn Fn(&T) -> bool + 'static>;
58
59/// Boxed sort-key extractor. The closure may fail at encode time —
60/// e.g. a `sort_by(|t| Dynamic::String(...))` whose string carries
61/// an embedded NUL byte that the order-preserving encoder rejects.
62/// `sort_by_bytes` callers wrap an infallible byte-producing closure
63/// in `Ok(_)`; `sort_by` callers run their `Dynamic` output through
64/// `encode_field` here and propagate the failure rather than
65/// collapsing it to an empty key (Rule 7). See [`Query::sort_by`].
66type SortKeyFn<T> = Box<dyn Fn(&T) -> Result<Vec<u8>> + 'static>;
67
68/// Default cap on the in-memory sort buffer. The query layer reads
69/// at most this many surviving documents into RAM before sorting; a
70/// scan that produces more candidates surfaces
71/// [`Error::SortBufferExceeded`]. M8 #66.
72pub const MAX_SORT_BUFFER: usize = 100_000;
73
74/// Where a [`Query`] reads its candidate documents from.
75///
76/// Construct via the [`Query`] builder methods; the source is full-
77/// scan by default and switches to `IndexRange` on
78/// [`Query::index_range`].
79#[derive(Debug, Clone)]
80enum Source {
81    /// Full primary-tree scan via [`crate::Collection::all`].
82    Full,
83    /// Walk the named index's B-tree slice via
84    /// [`crate::Collection::index_range`]. Bounds are stored as
85    /// already-encoded byte ranges — the builder runs
86    /// `encode_field` at the boundary so the borrow-checker does not
87    /// need to track `Dynamic` across the read txn.
88    IndexRange {
89        /// Index name.
90        name: String,
91        /// Encoded lower bound.
92        start: Bound<Vec<u8>>,
93        /// Encoded upper bound.
94        end: Bound<Vec<u8>>,
95    },
96}
97
98/// The M8 query builder.
99///
100/// Obtain via [`Db::query`]. Compose with [`Query::filter`],
101/// [`Query::limit`], and [`Query::index_range`]; terminate with
102/// [`Query::fetch`].
103///
104/// `Query` borrows `&'db Db` so multiple builders can coexist; the
105/// borrow ends when `.fetch()` returns. The actual scan runs inside
106/// a fresh `read_transaction` opened by `.fetch()` — the builder
107/// itself holds no locks.
108pub struct Query<'db, T: Document> {
109    /// Borrowed `Db` so the builder is cheap to construct and to drop
110    /// without committing to the scan up-front.
111    db: &'db Db,
112    /// Where to draw candidate documents from.
113    source: Source,
114    /// User-supplied predicates. Applied in declaration order; every
115    /// predicate must return `true` for a doc to be emitted.
116    filters: Vec<FilterFn<T>>,
117    /// Optional caller-supplied cap on the result count.
118    limit: Option<usize>,
119    /// Optional sort-key extractor. When set, the fetch collects up
120    /// to `sort_buffer_limit` filtered candidates into RAM, sorts by
121    /// the extractor's byte output, then applies `limit`. Last-call-
122    /// wins if `sort_by` is invoked multiple times.
123    sort_key: Option<SortKeyFn<T>>,
124    /// Per-query override for the sort buffer ceiling. Defaults to
125    /// [`MAX_SORT_BUFFER`]. Only consulted when `sort_key` is set.
126    sort_buffer_limit: Option<usize>,
127}
128
129impl<'db, T: Document> Query<'db, T>
130where
131    T: Send + 'static,
132{
133    /// Construct a fresh full-scan query. Crate-internal — public
134    /// callers go through [`Db::query`].
135    pub(crate) fn new(db: &'db Db) -> Self {
136        Self {
137            db,
138            source: Source::Full,
139            filters: Vec::new(),
140            limit: None,
141            sort_key: None,
142            sort_buffer_limit: None,
143        }
144    }
145
146    /// Append a filter predicate. Filters compose with AND — every
147    /// predicate must return `true` for a doc to be emitted.
148    ///
149    /// `'static` is required so the closure can outlive the
150    /// temporary builder; capture by value if you need to borrow a
151    /// stack-local value.
152    #[must_use]
153    pub fn filter<F>(mut self, predicate: F) -> Self
154    where
155        F: Fn(&T) -> bool + 'static,
156    {
157        self.filters.push(Box::new(predicate));
158        self
159    }
160
161    /// Cap the result set at `n` documents. Order is the source
162    /// order (primary `Id` for full-scan; index-key bytes for an
163    /// index range).
164    #[must_use]
165    pub fn limit(mut self, n: usize) -> Self {
166        self.limit = Some(n);
167        self
168    }
169
170    /// Switch the query source from full-scan to the named index's
171    /// range. Bounds are [`Dynamic`] values; the builder encodes
172    /// them through `obj_core::index::encode_field` at call time so
173    /// the actual range arithmetic sees byte-ordered keys.
174    ///
175    /// Order is by the index key bytes, not by primary `Id`. The
176    /// scan is bounded to the slice of the index B-tree the range
177    /// covers — no full-collection walk.
178    ///
179    /// # Examples
180    ///
181    /// Range query on an indexed `u64` field:
182    ///
183    /// ```
184    /// # fn main() -> obj::Result<()> {
185    /// use obj::Db;
186    /// use obj_core::codec::Dynamic;
187    /// use serde::{Deserialize, Serialize};
188    ///
189    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
190    /// #[obj(collection = "orders_index_range_doc")]
191    /// struct Order {
192    ///     #[obj(index)]
193    ///     placed_at: u64,
194    /// }
195    ///
196    /// let dir = tempfile::tempdir()?;
197    /// let db = Db::open(dir.path().join("range.obj"))?;
198    /// for i in 0..100u64 {
199    ///     let _ = db.insert(Order { placed_at: i * 1_000 })?;
200    /// }
201    /// let last_week = Dynamic::U64(30_000);
202    /// let now = Dynamic::U64(60_000);
203    /// let recent: Vec<Order> = db
204    ///     .query::<Order>()
205    ///     .index_range("placed_at", last_week..now)?
206    ///     .fetch()?;
207    /// assert_eq!(recent.len(), 30);
208    /// # Ok(())
209    /// # }
210    /// ```
211    ///
212    /// # Errors
213    ///
214    /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
215    ///   carries an embedded NUL byte (the order-preserving encoder
216    ///   rejects those — see `obj_core::index::encode_field`).
217    pub fn index_range<R>(mut self, name: &str, range: R) -> Result<Self>
218    where
219        R: RangeBounds<Dynamic>,
220    {
221        let start = encode_bound(range.start_bound())?;
222        let end = encode_bound(range.end_bound())?;
223        self.source = Source::IndexRange {
224            name: name.to_owned(),
225            start,
226            end,
227        };
228        Ok(self)
229    }
230
231    /// Sort the result by `key`'s output in ascending key order.
232    ///
233    /// `key` returns an [`obj_core::codec::Dynamic`] for each
234    /// document; the builder runs each value through
235    /// [`obj_core::index::encode_field`] (M7 #55) so the comparator
236    /// is a byte comparison whose ordering matches the value's
237    /// natural `Ord`. This reuses the same order-preserving encoder
238    /// the index layer uses, so a `.sort_by(|o| o.placed_at.into())`
239    /// produces the same ordering an `index_range("placed_at", ...)`
240    /// scan would visit.
241    ///
242    /// Last-call-wins: a second `.sort_by` (or `.sort_by_bytes`)
243    /// overwrites the first; the two extractors share the same
244    /// internal slot because they are mutually exclusive.
245    ///
246    /// The sort runs BEFORE [`Query::limit`] truncation, so
247    /// `.sort_by(...).limit(N)` returns the N smallest by the
248    /// extractor's key — the "top-N sorted" workload
249    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
250    /// shows for the Orders example.
251    ///
252    /// # Errors at fetch time
253    ///
254    /// `sort_by` itself is infallible — it stores the closure. The
255    /// encoder runs during [`Query::fetch`]; a `Dynamic` whose
256    /// `encode_field` representation cannot be computed (e.g. a
257    /// `Dynamic::String` carrying an embedded `0x00` byte that the
258    /// order-preserving encoder rejects) surfaces as
259    /// [`Error::SortKeyEncode`] — power-of-ten Rule 7 (no silent
260    /// errors). Callers who want to bypass `encode_field` entirely
261    /// should use [`Query::sort_by_bytes`].
262    ///
263    /// # Sort-buffer bound
264    ///
265    /// The pre-sort buffer is capped at
266    /// [`Query::sort_buffer_limit`] (default
267    /// [`MAX_SORT_BUFFER`] = 100 000). A scan that produces more
268    /// candidates surfaces [`Error::SortBufferExceeded`]; the user
269    /// should narrow the source via `.filter` / `.index_range` /
270    /// `.limit`, or raise the cap with `.sort_buffer_limit(N)`.
271    #[must_use]
272    pub fn sort_by<F>(mut self, key: F) -> Self
273    where
274        F: Fn(&T) -> Dynamic + 'static,
275    {
276        // Encode each per-doc `Dynamic` through the M7 #55 order-
277        // preserving encoder; propagate the encoder's `Result` so
278        // fetch can surface it as `Error::SortKeyEncode`. Power-of-
279        // ten Rule 7: no silent errors.
280        let encoded: SortKeyFn<T> = Box::new(move |doc: &T| {
281            let dynamic = key(doc);
282            obj_core::index::encode_field(&dynamic)
283                .map(obj_core::index::EncodedIndexKey::into_bytes)
284                .map_err(|e| Error::SortKeyEncode {
285                    source: Box::new(e),
286                })
287        });
288        self.sort_key = Some(encoded);
289        self
290    }
291
292    /// Sort the result by `key`'s raw byte output in ascending order.
293    ///
294    /// Companion to [`Query::sort_by`] that lets callers supply the
295    /// already-encoded sort bytes. The byte-order = sort-order
296    /// invariant is the caller's responsibility: two documents whose
297    /// key bytes compare `Less` MUST also be in the desired sort
298    /// order, otherwise the produced ordering is unspecified.
299    ///
300    /// Bypassing [`obj_core::index::encode_field`] means
301    /// [`Error::SortKeyEncode`] cannot fire — `sort_by_bytes` is the
302    /// right shape for callers who already have an order-preserving
303    /// byte form (e.g. a precomputed `i64::to_be_bytes` of a signed
304    /// counter that they have already biased to the unsigned range).
305    ///
306    /// Last-call-wins: `sort_by_bytes` overwrites a prior
307    /// [`Query::sort_by`] (and vice versa).
308    ///
309    /// # Sort-buffer bound
310    ///
311    /// Same bound as [`Query::sort_by`] — see that method's docs.
312    #[must_use]
313    pub fn sort_by_bytes<F>(mut self, key: F) -> Self
314    where
315        F: Fn(&T) -> Vec<u8> + 'static,
316    {
317        // Caller owns the byte-order = sort-order contract; we wrap
318        // their infallible output in `Ok(_)` so fetch_sorted's shared
319        // extractor signature still applies.
320        let encoded: SortKeyFn<T> = Box::new(move |doc: &T| Ok(key(doc)));
321        self.sort_key = Some(encoded);
322        self
323    }
324
325    /// Override the per-query sort-buffer ceiling. Only consulted
326    /// when [`Query::sort_by`] is set. Defaults to
327    /// [`MAX_SORT_BUFFER`] (100 000).
328    ///
329    /// A scan that overshoots the cap surfaces
330    /// [`Error::SortBufferExceeded`]; narrow the candidate set
331    /// with [`Query::filter`] / [`Query::index_range`] /
332    /// [`Query::limit`], or raise the cap with this method.
333    ///
334    /// # Examples
335    ///
336    /// ```
337    /// # fn main() -> obj::Result<()> {
338    /// use obj::Db;
339    /// use obj_core::codec::Dynamic;
340    /// use serde::{Deserialize, Serialize};
341    ///
342    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
343    /// #[obj(collection = "ticks_sort_buffer_doc")]
344    /// struct Tick { value: u64 }
345    ///
346    /// let dir = tempfile::tempdir()?;
347    /// let db = Db::open(dir.path().join("sort.obj"))?;
348    /// for v in 0..10u64 {
349    ///     let _ = db.insert(Tick { value: v })?;
350    /// }
351    /// let top: Vec<Tick> = db
352    ///     .query::<Tick>()
353    ///     .sort_by(|t| Dynamic::U64(t.value))
354    ///     .sort_buffer_limit(1_000) // narrower than the default
355    ///     .limit(3)
356    ///     .fetch()?;
357    /// assert_eq!(top.len(), 3);
358    /// # Ok(())
359    /// # }
360    /// ```
361    #[must_use]
362    pub fn sort_buffer_limit(mut self, n: usize) -> Self {
363        self.sort_buffer_limit = Some(n);
364        self
365    }
366
367    /// Execute the query and materialise the matching documents.
368    ///
369    /// Opens a fresh `read_transaction`, drives the configured
370    /// source iterator, applies every filter in declaration order,
371    /// and truncates to `limit` (if set). Returns the documents in
372    /// source order.
373    ///
374    /// # Errors
375    ///
376    /// - As [`Db::read_transaction`].
377    /// - Any error from the underlying [`crate::Collection`] scan.
378    pub fn fetch(self) -> Result<Vec<T>> {
379        #[cfg(feature = "tracing")]
380        let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
381        #[cfg(feature = "tracing")]
382        let _guard = span.enter();
383        #[cfg(feature = "tracing")]
384        span.record("kind", query_kind(&self.source));
385        self.db.read_transaction(|tx| {
386            let coll = tx.collection::<T>()?;
387            if self.sort_key.is_some() {
388                fetch_sorted(&coll, &self)
389            } else {
390                fetch_unsorted(&coll, &self)
391            }
392        })
393    }
394
395    /// Count the documents this query would return, ignoring
396    /// [`Query::sort_by`] (sort does not change the count) but
397    /// honouring [`Query::limit`] (returns `min(total, limit)`).
398    ///
399    /// Takes `&self` rather than consuming the builder so callers
400    /// can chain a follow-up `.fetch()` on the same predicate set.
401    ///
402    /// # Fast path (no filter set)
403    ///
404    /// When no filter is set, `count` walks the source B-tree
405    /// without decoding any document. The exact shape of the walk
406    /// depends on the source's index kind so the answer matches
407    /// what `fetch` would return:
408    ///
409    /// - **Full scan** — walks the primary B-tree counting entries
410    ///   (`Collection::count_all`). One entry per doc; the count is
411    ///   exact.
412    /// - **`Standard` / `Unique` / `Composite` `index_range`** — walks
413    ///   the named index's B-tree counting entries
414    ///   (`Collection::count_index_range`). One entry per doc;
415    ///   the count is exact.
416    /// - **`Each` `index_range`** — walks the named index's B-tree
417    ///   tracking distinct trailing-`id` suffixes via
418    ///   `Collection::count_distinct_ids_in_range` (M8 follow-up
419    ///   #72). A single doc may emit multiple entries under
420    ///   different element keys; the entry count would overshoot.
421    ///   The distinct set is bounded by
422    ///   [`crate::MAX_DISTINCT_IDS`] (100 000); exceeding it
423    ///   surfaces [`obj_core::Error::DistinctCountExceeded`].
424    ///
425    /// # Slow path (filter set)
426    ///
427    /// When at least one filter is set, the predicate has to see a
428    /// decoded `T`, so the slow path pays the per-doc decode cost
429    /// (same as `fetch`). Sort, if set, is ignored — the total
430    /// count is the same.
431    ///
432    /// # Examples
433    ///
434    /// Count without materialising documents:
435    ///
436    /// ```
437    /// # fn main() -> obj::Result<()> {
438    /// use obj::Db;
439    /// use serde::{Deserialize, Serialize};
440    ///
441    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
442    /// #[obj(collection = "counts_doc")]
443    /// struct Order { customer_id: u64 }
444    ///
445    /// let dir = tempfile::tempdir()?;
446    /// let db = Db::open(dir.path().join("count.obj"))?;
447    /// for i in 0..30u64 {
448    ///     let _ = db.insert(Order { customer_id: i % 3 })?;
449    /// }
450    /// let n: u64 = db.query::<Order>()
451    ///     .filter(|o| o.customer_id == 1)
452    ///     .count()?;
453    /// assert_eq!(n, 10);
454    /// # Ok(())
455    /// # }
456    /// ```
457    ///
458    /// # Errors
459    ///
460    /// - As [`Db::read_transaction`].
461    /// - Any error from the underlying [`crate::Collection`] scan.
462    /// - [`obj_core::Error::DistinctCountExceeded`] on the `Each`
463    ///   fast path.
464    pub fn count(&self) -> Result<u64> {
465        #[cfg(feature = "tracing")]
466        let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
467        #[cfg(feature = "tracing")]
468        let _guard = span.enter();
469        #[cfg(feature = "tracing")]
470        span.record("kind", query_kind(&self.source));
471        self.db.read_transaction(|tx| {
472            let coll = tx.collection::<T>()?;
473            let total = if self.filters.is_empty() {
474                count_fast(&coll, &self.source)?
475            } else {
476                count_slow(&coll, self)?
477            };
478            Ok(apply_count_limit(total, self.limit))
479        })
480    }
481}
482
483/// Map the [`Source`] variant to the `kind` value recorded on the
484/// `query.execute` span (Phase 2B, issue #7).
485///
486/// `Source::Full` → `"filter"` (a full primary-tree walk plus
487/// in-memory predicates is the "filter scan" path); `Source::IndexRange`
488/// → `"index"` (the work is bounded by the named index's slice).
489#[cfg(feature = "tracing")]
490fn query_kind(source: &Source) -> &'static str {
491    match source {
492        Source::Full => "filter",
493        Source::IndexRange { .. } => "index",
494    }
495}
496
497/// Apply the optional `.limit(N)` cap to a raw count. `min(total, N)`
498/// when `limit` is set; the raw total otherwise.
499fn apply_count_limit(total: u64, limit: Option<usize>) -> u64 {
500    match limit {
501        // `usize` is at most 64 bits on every supported target so
502        // the cast is exact; `min` keeps overflow out of the
503        // arithmetic regardless.
504        Some(n) => total.min(u64::try_from(n).unwrap_or(u64::MAX)),
505        None => total,
506    }
507}
508
509/// Drain the configured source with no `sort_by` set.
510///
511/// Power-of-ten Rule 2: the loop is bounded by the source iterator's
512/// length (the primary B-tree's `MAX_RANGE_NODES` budget). The
513/// `.limit(N)` cap further shortens the loop.
514fn fetch_unsorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
515where
516    T: Document + Send + 'static,
517{
518    if q.limit == Some(0) {
519        return Ok(Vec::new());
520    }
521    let mut out: Vec<T> = Vec::new();
522    for_each_candidate(coll, q, |doc| {
523        if !q.filters.iter().all(|f| f(&doc)) {
524            return Ok(true);
525        }
526        out.push(doc);
527        if let Some(n) = q.limit {
528            if out.len() >= n {
529                return Ok(false);
530            }
531        }
532        Ok(true)
533    })?;
534    Ok(out)
535}
536
537/// Drain the configured source with `sort_by` set.
538///
539/// Collects up to `sort_buffer_limit` filtered candidates into RAM,
540/// sorts ascending by the extractor's byte output, then truncates
541/// to `limit`. Power-of-ten Rule 3: the buffer is bounded — exceeding
542/// it surfaces [`Error::SortBufferExceeded`] rather than chewing
543/// arbitrary memory.
544fn fetch_sorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
545where
546    T: Document + Send + 'static,
547{
548    let cap = q.sort_buffer_limit.unwrap_or(MAX_SORT_BUFFER);
549    let sort_key = q
550        .sort_key
551        .as_ref()
552        .ok_or(Error::InvalidArgument("fetch_sorted without sort_key"))?;
553    let mut buf: Vec<(Vec<u8>, T)> = Vec::new();
554    for_each_candidate(coll, q, |doc| {
555        if !q.filters.iter().all(|f| f(&doc)) {
556            return Ok(true);
557        }
558        if buf.len() >= cap {
559            return Err(Error::SortBufferExceeded { limit: cap });
560        }
561        // Power-of-ten Rule 7: propagate the extractor's `Result`
562        // rather than collapsing an encode failure into a tied key.
563        let key_bytes = sort_key(&doc)?;
564        buf.push((key_bytes, doc));
565        Ok(true)
566    })?;
567    buf.sort_by(|a, b| a.0.cmp(&b.0));
568    let truncated_len = match q.limit {
569        Some(n) => buf.len().min(n),
570        None => buf.len(),
571    };
572    let mut out: Vec<T> = Vec::with_capacity(truncated_len);
573    for (_k, d) in buf.into_iter().take(truncated_len) {
574        out.push(d);
575    }
576    Ok(out)
577}
578
579/// Walk the configured source and call `f(doc)` for each decoded
580/// document. `f` returns `Ok(true)` to continue, `Ok(false)` to stop
581/// early (e.g. the limit is reached), or `Err(_)` to abort the scan.
582///
583/// Power-of-ten Rule 4: factor the source-dispatch + iteration
584/// boilerplate so the sorted / unsorted entry points stay readable.
585fn for_each_candidate<T, F>(
586    coll: &crate::Collection<'_, T>,
587    q: &Query<'_, T>,
588    mut f: F,
589) -> Result<()>
590where
591    T: Document + Send + 'static,
592    F: FnMut(T) -> Result<bool>,
593{
594    match &q.source {
595        Source::Full => {
596            let docs = coll.all()?;
597            for (_id, doc) in docs {
598                if !f(doc)? {
599                    return Ok(());
600                }
601            }
602        }
603        Source::IndexRange { name, start, end } => {
604            let iter = coll.index_range_encoded(name, clone_bound(start), clone_bound(end))?;
605            for step in iter {
606                let (_key, doc) = step?;
607                if !f(doc)? {
608                    return Ok(());
609                }
610            }
611        }
612    }
613    Ok(())
614}
615
616/// No-filter fast path for [`Query::count`].
617///
618/// Walks the underlying B-tree (primary or index) without decoding
619/// any document. For an `Each`-kind `index_range` source, dispatches
620/// to [`crate::Collection::count_distinct_ids_in_range`] so the count
621/// matches what `fetch` would return (which de-duplicates per doc).
622/// Other kinds use the cheaper entry-count
623/// [`crate::Collection::count_index_range`] path. See the rustdoc on
624/// `Query::count` for the full per-kind table.
625fn count_fast<T>(coll: &crate::Collection<'_, T>, source: &Source) -> Result<u64>
626where
627    T: Document + Send + 'static,
628{
629    match source {
630        Source::Full => coll.count_all(),
631        Source::IndexRange { name, start, end } => {
632            // Peek at the descriptor to choose the per-kind path. The
633            // peek goes through the same `active_index` validation as
634            // the eventual count call (which re-resolves the index by
635            // name), so an unknown / dropped name produces the same
636            // `Error::IndexNotFound` exactly once.
637            let kind = coll.index_kind(name)?;
638            if kind == obj_core::IndexKind::Each {
639                coll.count_distinct_ids_in_range_encoded(name, clone_bound(start), clone_bound(end))
640            } else {
641                coll.count_index_range_encoded(name, clone_bound(start), clone_bound(end))
642            }
643        }
644    }
645}
646
647/// Filter-applied slow path for [`Query::count`].
648///
649/// Must decode each candidate to evaluate the predicate. The decode
650/// cost is unavoidable per the M8 design: there is no inverted-index
651/// data structure that would let us count filter matches without
652/// touching the document.
653fn count_slow<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<u64>
654where
655    T: Document + Send + 'static,
656{
657    let mut n: u64 = 0;
658    for_each_candidate(coll, q, |doc| {
659        if q.filters.iter().all(|f| f(&doc)) {
660            n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
661                reason: "slow-path count exceeds u64",
662            })?;
663        }
664        Ok(true)
665    })?;
666    Ok(n)
667}
668
669/// Encode a `Bound<Dynamic>` into a `Bound<Vec<u8>>` using the
670/// order-preserving field encoder. Used by [`Query::index_range`].
671fn encode_bound(b: Bound<&Dynamic>) -> Result<Bound<Vec<u8>>> {
672    match b {
673        Bound::Included(v) => Ok(Bound::Included(
674            obj_core::index::encode_field(v)?.into_bytes(),
675        )),
676        Bound::Excluded(v) => Ok(Bound::Excluded(
677            obj_core::index::encode_field(v)?.into_bytes(),
678        )),
679        Bound::Unbounded => Ok(Bound::Unbounded),
680    }
681}
682
683/// Clone a borrowed `Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`.
684/// Used by [`fetch_index_range`] to hand the bounds to the M7
685/// `Collection::index_range` API (which takes ownership).
686fn clone_bound(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
687    match b {
688        Bound::Included(v) => Bound::Included(v.clone()),
689        Bound::Excluded(v) => Bound::Excluded(v.clone()),
690        Bound::Unbounded => Bound::Unbounded,
691    }
692}