obj/query.rs
1//! `Query<T>` — the M8 query builder.
2//!
3//! `Query` is a thin builder over the M6 / M7 `Collection` API. It
4//! borrows a `&Db` for the duration of the build phase; `.fetch()`
5//! and `.count()` consume / inspect the builder and open a fresh
6//! `read_transaction` for the actual scan.
7//!
8//! ## Sources
9//!
10//! - [`Source::Full`] — full collection scan via
11//! [`crate::Collection::all`]. Order is by primary `Id`.
12//! - [`Source::IndexRange`] — index-range scan via
13//! [`crate::Collection::index_range`]. Order is by encoded index
14//! key bytes.
15//!
16//! No cost-based planner: the caller picks the source. The M8 plan
17//! explicitly defers planning to a future milestone.
18//!
19//! ## Filters
20//!
21//! Filters are arbitrary `Fn(&T) -> bool + 'static` closures. They
22//! are evaluated on the decoded document, so they pay the per-doc
23//! decode cost; an `.index_range(...)` source keeps the work bounded
24//! by walking only the index slice.
25//!
26//! Multiple `.filter(...)` calls AND together — every predicate must
27//! return `true` for a doc to be emitted. The closures are stored as
28//! `Box<dyn Fn(&T) -> bool + 'static>`; `'static` lets the closure
29//! outlive the temporary builder.
30//!
31//! ## Power-of-ten posture
32//!
33//! - **Rule 2.** The fetch loop is bounded by the source iterator's
34//! length. The source iterators (`Collection::all`,
35//! `Collection::index_range`) are themselves bounded by the
36//! `MAX_RANGE_NODES` budget (M4 #28). The `.limit(N)` cap further
37//! shortens the loop when the caller wants a top-N view.
38//! - **Rule 4.** Each function in this module stays under the 60-line
39//! ceiling; the dispatch in `fetch_inner` factors per-source helpers.
40//! - **Rule 7.** Every `Result` and `Option` is handled. No `unwrap` /
41//! `expect` in the production path.
42//! - **Rule 9.** Filters use `Box<dyn Fn(&T) -> bool>` — type-erased
43//! so the public builder can compose without leaking generic
44//! parameters into every call site. This is the one `dyn` in the
45//! query layer; the source iteration itself is monomorphic.
46
47use std::ops::{Bound, RangeBounds};
48
49use obj_core::codec::Dynamic;
50use obj_core::{Error, Result};
51
52use crate::Db;
53use crate::Document;
54
55/// Boxed filter predicate. `'static` so the closure can outlive the
56/// `Query` builder.
57type FilterFn<T> = Box<dyn Fn(&T) -> bool + 'static>;
58
59/// Boxed sort-key extractor. The closure may fail at encode time —
60/// e.g. a `sort_by(|t| Dynamic::String(...))` whose string carries
61/// an embedded NUL byte that the order-preserving encoder rejects.
62/// `sort_by_bytes` callers wrap an infallible byte-producing closure
63/// in `Ok(_)`; `sort_by` callers run their `Dynamic` output through
64/// `encode_field` here and propagate the failure rather than
65/// collapsing it to an empty key (Rule 7). See [`Query::sort_by`].
66type SortKeyFn<T> = Box<dyn Fn(&T) -> Result<Vec<u8>> + 'static>;
67
68/// Default cap on the in-memory sort buffer. The query layer reads
69/// at most this many surviving documents into RAM before sorting; a
70/// scan that produces more candidates surfaces
71/// [`Error::SortBufferExceeded`]. M8 #66.
72pub const MAX_SORT_BUFFER: usize = 100_000;
73
74/// Where a [`Query`] reads its candidate documents from.
75///
76/// Construct via the [`Query`] builder methods; the source is full-
77/// scan by default and switches to `IndexRange` on
78/// [`Query::index_range`].
79#[derive(Debug, Clone)]
80enum Source {
81 /// Full primary-tree scan via [`crate::Collection::all`].
82 Full,
83 /// Walk the named index's B-tree slice via
84 /// [`crate::Collection::index_range`]. Bounds are stored as
85 /// already-encoded byte ranges — the builder runs
86 /// `encode_field` at the boundary so the borrow-checker does not
87 /// need to track `Dynamic` across the read txn.
88 IndexRange {
89 /// Index name.
90 name: String,
91 /// Encoded lower bound.
92 start: Bound<Vec<u8>>,
93 /// Encoded upper bound.
94 end: Bound<Vec<u8>>,
95 },
96}
97
98/// The M8 query builder.
99///
100/// Obtain via [`Db::query`]. Compose with [`Query::filter`],
101/// [`Query::limit`], and [`Query::index_range`]; terminate with
102/// [`Query::fetch`].
103///
104/// `Query` borrows `&'db Db` so multiple builders can coexist; the
105/// borrow ends when `.fetch()` returns. The actual scan runs inside
106/// a fresh `read_transaction` opened by `.fetch()` — the builder
107/// itself holds no locks.
108pub struct Query<'db, T: Document> {
109 /// Borrowed `Db` so the builder is cheap to construct and to drop
110 /// without committing to the scan up-front.
111 db: &'db Db,
112 /// Where to draw candidate documents from.
113 source: Source,
114 /// User-supplied predicates. Applied in declaration order; every
115 /// predicate must return `true` for a doc to be emitted.
116 filters: Vec<FilterFn<T>>,
117 /// Optional caller-supplied cap on the result count.
118 limit: Option<usize>,
119 /// Optional sort-key extractor. When set, the fetch collects up
120 /// to `sort_buffer_limit` filtered candidates into RAM, sorts by
121 /// the extractor's byte output, then applies `limit`. Last-call-
122 /// wins if `sort_by` is invoked multiple times.
123 sort_key: Option<SortKeyFn<T>>,
124 /// Per-query override for the sort buffer ceiling. Defaults to
125 /// [`MAX_SORT_BUFFER`]. Only consulted when `sort_key` is set.
126 sort_buffer_limit: Option<usize>,
127}
128
129impl<'db, T: Document> Query<'db, T>
130where
131 T: Send + 'static,
132{
133 /// Construct a fresh full-scan query. Crate-internal — public
134 /// callers go through [`Db::query`].
135 pub(crate) fn new(db: &'db Db) -> Self {
136 Self {
137 db,
138 source: Source::Full,
139 filters: Vec::new(),
140 limit: None,
141 sort_key: None,
142 sort_buffer_limit: None,
143 }
144 }
145
146 /// Append a filter predicate. Filters compose with AND — every
147 /// predicate must return `true` for a doc to be emitted.
148 ///
149 /// `'static` is required so the closure can outlive the
150 /// temporary builder; capture by value if you need to borrow a
151 /// stack-local value.
152 #[must_use]
153 pub fn filter<F>(mut self, predicate: F) -> Self
154 where
155 F: Fn(&T) -> bool + 'static,
156 {
157 self.filters.push(Box::new(predicate));
158 self
159 }
160
161 /// Cap the result set at `n` documents. Order is the source
162 /// order (primary `Id` for full-scan; index-key bytes for an
163 /// index range).
164 #[must_use]
165 pub fn limit(mut self, n: usize) -> Self {
166 self.limit = Some(n);
167 self
168 }
169
170 /// Switch the query source from full-scan to the named index's
171 /// range. Bounds are [`Dynamic`] values; the builder encodes
172 /// them through `obj_core::index::encode_field` at call time so
173 /// the actual range arithmetic sees byte-ordered keys.
174 ///
175 /// Order is by the index key bytes, not by primary `Id`. The
176 /// scan is bounded to the slice of the index B-tree the range
177 /// covers — no full-collection walk.
178 ///
179 /// # Examples
180 ///
181 /// Range query on an indexed `u64` field:
182 ///
183 /// ```
184 /// # fn main() -> obj::Result<()> {
185 /// use obj::Db;
186 /// use obj_core::codec::Dynamic;
187 /// use serde::{Deserialize, Serialize};
188 ///
189 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
190 /// #[obj(collection = "orders_index_range_doc")]
191 /// struct Order {
192 /// #[obj(index)]
193 /// placed_at: u64,
194 /// }
195 ///
196 /// let dir = tempfile::tempdir()?;
197 /// let db = Db::open(dir.path().join("range.obj"))?;
198 /// for i in 0..100u64 {
199 /// let _ = db.insert(Order { placed_at: i * 1_000 })?;
200 /// }
201 /// let last_week = Dynamic::U64(30_000);
202 /// let now = Dynamic::U64(60_000);
203 /// let recent: Vec<Order> = db
204 /// .query::<Order>()
205 /// .index_range("placed_at", last_week..now)?
206 /// .fetch()?;
207 /// assert_eq!(recent.len(), 30);
208 /// # Ok(())
209 /// # }
210 /// ```
211 ///
212 /// # Errors
213 ///
214 /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
215 /// carries an embedded NUL byte (the order-preserving encoder
216 /// rejects those — see `obj_core::index::encode_field`).
217 pub fn index_range<R>(mut self, name: &str, range: R) -> Result<Self>
218 where
219 R: RangeBounds<Dynamic>,
220 {
221 let start = encode_bound(range.start_bound())?;
222 let end = encode_bound(range.end_bound())?;
223 self.source = Source::IndexRange {
224 name: name.to_owned(),
225 start,
226 end,
227 };
228 Ok(self)
229 }
230
231 /// Sort the result by `key`'s output in ascending key order.
232 ///
233 /// `key` returns an [`obj_core::codec::Dynamic`] for each
234 /// document; the builder runs each value through
235 /// [`obj_core::index::encode_field`] (M7 #55) so the comparator
236 /// is a byte comparison whose ordering matches the value's
237 /// natural `Ord`. This reuses the same order-preserving encoder
238 /// the index layer uses, so a `.sort_by(|o| o.placed_at.into())`
239 /// produces the same ordering an `index_range("placed_at", ...)`
240 /// scan would visit.
241 ///
242 /// Last-call-wins: a second `.sort_by` (or `.sort_by_bytes`)
243 /// overwrites the first; the two extractors share the same
244 /// internal slot because they are mutually exclusive.
245 ///
246 /// The sort runs BEFORE [`Query::limit`] truncation, so
247 /// `.sort_by(...).limit(N)` returns the N smallest by the
248 /// extractor's key — the "top-N sorted" workload
249 /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
250 /// shows for the Orders example.
251 ///
252 /// # Errors at fetch time
253 ///
254 /// `sort_by` itself is infallible — it stores the closure. The
255 /// encoder runs during [`Query::fetch`]; a `Dynamic` whose
256 /// `encode_field` representation cannot be computed (e.g. a
257 /// `Dynamic::String` carrying an embedded `0x00` byte that the
258 /// order-preserving encoder rejects) surfaces as
259 /// [`Error::SortKeyEncode`] — power-of-ten Rule 7 (no silent
260 /// errors). Callers who want to bypass `encode_field` entirely
261 /// should use [`Query::sort_by_bytes`].
262 ///
263 /// # Sort-buffer bound
264 ///
265 /// The pre-sort buffer is capped at
266 /// [`Query::sort_buffer_limit`] (default
267 /// [`MAX_SORT_BUFFER`] = 100 000). A scan that produces more
268 /// candidates surfaces [`Error::SortBufferExceeded`]; the user
269 /// should narrow the source via `.filter` / `.index_range` /
270 /// `.limit`, or raise the cap with `.sort_buffer_limit(N)`.
271 #[must_use]
272 pub fn sort_by<F>(mut self, key: F) -> Self
273 where
274 F: Fn(&T) -> Dynamic + 'static,
275 {
276 // Encode each per-doc `Dynamic` through the M7 #55 order-
277 // preserving encoder; propagate the encoder's `Result` so
278 // fetch can surface it as `Error::SortKeyEncode`. Power-of-
279 // ten Rule 7: no silent errors.
280 let encoded: SortKeyFn<T> = Box::new(move |doc: &T| {
281 let dynamic = key(doc);
282 obj_core::index::encode_field(&dynamic)
283 .map(obj_core::index::EncodedIndexKey::into_bytes)
284 .map_err(|e| Error::SortKeyEncode {
285 source: Box::new(e),
286 })
287 });
288 self.sort_key = Some(encoded);
289 self
290 }
291
292 /// Sort the result by `key`'s raw byte output in ascending order.
293 ///
294 /// Companion to [`Query::sort_by`] that lets callers supply the
295 /// already-encoded sort bytes. The byte-order = sort-order
296 /// invariant is the caller's responsibility: two documents whose
297 /// key bytes compare `Less` MUST also be in the desired sort
298 /// order, otherwise the produced ordering is unspecified.
299 ///
300 /// Bypassing [`obj_core::index::encode_field`] means
301 /// [`Error::SortKeyEncode`] cannot fire — `sort_by_bytes` is the
302 /// right shape for callers who already have an order-preserving
303 /// byte form (e.g. a precomputed `i64::to_be_bytes` of a signed
304 /// counter that they have already biased to the unsigned range).
305 ///
306 /// Last-call-wins: `sort_by_bytes` overwrites a prior
307 /// [`Query::sort_by`] (and vice versa).
308 ///
309 /// # Sort-buffer bound
310 ///
311 /// Same bound as [`Query::sort_by`] — see that method's docs.
312 #[must_use]
313 pub fn sort_by_bytes<F>(mut self, key: F) -> Self
314 where
315 F: Fn(&T) -> Vec<u8> + 'static,
316 {
317 // Caller owns the byte-order = sort-order contract; we wrap
318 // their infallible output in `Ok(_)` so fetch_sorted's shared
319 // extractor signature still applies.
320 let encoded: SortKeyFn<T> = Box::new(move |doc: &T| Ok(key(doc)));
321 self.sort_key = Some(encoded);
322 self
323 }
324
325 /// Override the per-query sort-buffer ceiling. Only consulted
326 /// when [`Query::sort_by`] is set. Defaults to
327 /// [`MAX_SORT_BUFFER`] (100 000).
328 ///
329 /// A scan that overshoots the cap surfaces
330 /// [`Error::SortBufferExceeded`]; narrow the candidate set
331 /// with [`Query::filter`] / [`Query::index_range`] /
332 /// [`Query::limit`], or raise the cap with this method.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// # fn main() -> obj::Result<()> {
338 /// use obj::Db;
339 /// use obj_core::codec::Dynamic;
340 /// use serde::{Deserialize, Serialize};
341 ///
342 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
343 /// #[obj(collection = "ticks_sort_buffer_doc")]
344 /// struct Tick { value: u64 }
345 ///
346 /// let dir = tempfile::tempdir()?;
347 /// let db = Db::open(dir.path().join("sort.obj"))?;
348 /// for v in 0..10u64 {
349 /// let _ = db.insert(Tick { value: v })?;
350 /// }
351 /// let top: Vec<Tick> = db
352 /// .query::<Tick>()
353 /// .sort_by(|t| Dynamic::U64(t.value))
354 /// .sort_buffer_limit(1_000) // narrower than the default
355 /// .limit(3)
356 /// .fetch()?;
357 /// assert_eq!(top.len(), 3);
358 /// # Ok(())
359 /// # }
360 /// ```
361 #[must_use]
362 pub fn sort_buffer_limit(mut self, n: usize) -> Self {
363 self.sort_buffer_limit = Some(n);
364 self
365 }
366
367 /// Execute the query and materialise the matching documents.
368 ///
369 /// Opens a fresh `read_transaction`, drives the configured
370 /// source iterator, applies every filter in declaration order,
371 /// and truncates to `limit` (if set). Returns the documents in
372 /// source order.
373 ///
374 /// # Errors
375 ///
376 /// - As [`Db::read_transaction`].
377 /// - Any error from the underlying [`crate::Collection`] scan.
378 pub fn fetch(self) -> Result<Vec<T>> {
379 #[cfg(feature = "tracing")]
380 let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
381 #[cfg(feature = "tracing")]
382 let _guard = span.enter();
383 #[cfg(feature = "tracing")]
384 span.record("kind", query_kind(&self.source));
385 self.db.read_transaction(|tx| {
386 let coll = tx.collection::<T>()?;
387 if self.sort_key.is_some() {
388 fetch_sorted(&coll, &self)
389 } else {
390 fetch_unsorted(&coll, &self)
391 }
392 })
393 }
394
395 /// Count the documents this query would return, ignoring
396 /// [`Query::sort_by`] (sort does not change the count) but
397 /// honouring [`Query::limit`] (returns `min(total, limit)`).
398 ///
399 /// Takes `&self` rather than consuming the builder so callers
400 /// can chain a follow-up `.fetch()` on the same predicate set.
401 ///
402 /// # Fast path (no filter set)
403 ///
404 /// When no filter is set, `count` walks the source B-tree
405 /// without decoding any document. The exact shape of the walk
406 /// depends on the source's index kind so the answer matches
407 /// what `fetch` would return:
408 ///
409 /// - **Full scan** — walks the primary B-tree counting entries
410 /// (`Collection::count_all`). One entry per doc; the count is
411 /// exact.
412 /// - **`Standard` / `Unique` / `Composite` `index_range`** — walks
413 /// the named index's B-tree counting entries
414 /// (`Collection::count_index_range`). One entry per doc;
415 /// the count is exact.
416 /// - **`Each` `index_range`** — walks the named index's B-tree
417 /// tracking distinct trailing-`id` suffixes via
418 /// `Collection::count_distinct_ids_in_range` (M8 follow-up
419 /// #72). A single doc may emit multiple entries under
420 /// different element keys; the entry count would overshoot.
421 /// The distinct set is bounded by
422 /// [`crate::MAX_DISTINCT_IDS`] (100 000); exceeding it
423 /// surfaces [`obj_core::Error::DistinctCountExceeded`].
424 ///
425 /// # Slow path (filter set)
426 ///
427 /// When at least one filter is set, the predicate has to see a
428 /// decoded `T`, so the slow path pays the per-doc decode cost
429 /// (same as `fetch`). Sort, if set, is ignored — the total
430 /// count is the same.
431 ///
432 /// # Examples
433 ///
434 /// Count without materialising documents:
435 ///
436 /// ```
437 /// # fn main() -> obj::Result<()> {
438 /// use obj::Db;
439 /// use serde::{Deserialize, Serialize};
440 ///
441 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
442 /// #[obj(collection = "counts_doc")]
443 /// struct Order { customer_id: u64 }
444 ///
445 /// let dir = tempfile::tempdir()?;
446 /// let db = Db::open(dir.path().join("count.obj"))?;
447 /// for i in 0..30u64 {
448 /// let _ = db.insert(Order { customer_id: i % 3 })?;
449 /// }
450 /// let n: u64 = db.query::<Order>()
451 /// .filter(|o| o.customer_id == 1)
452 /// .count()?;
453 /// assert_eq!(n, 10);
454 /// # Ok(())
455 /// # }
456 /// ```
457 ///
458 /// # Errors
459 ///
460 /// - As [`Db::read_transaction`].
461 /// - Any error from the underlying [`crate::Collection`] scan.
462 /// - [`obj_core::Error::DistinctCountExceeded`] on the `Each`
463 /// fast path.
464 pub fn count(&self) -> Result<u64> {
465 #[cfg(feature = "tracing")]
466 let span = tracing::debug_span!("query.execute", kind = tracing::field::Empty);
467 #[cfg(feature = "tracing")]
468 let _guard = span.enter();
469 #[cfg(feature = "tracing")]
470 span.record("kind", query_kind(&self.source));
471 self.db.read_transaction(|tx| {
472 let coll = tx.collection::<T>()?;
473 let total = if self.filters.is_empty() {
474 count_fast(&coll, &self.source)?
475 } else {
476 count_slow(&coll, self)?
477 };
478 Ok(apply_count_limit(total, self.limit))
479 })
480 }
481}
482
483/// Map the [`Source`] variant to the `kind` value recorded on the
484/// `query.execute` span (Phase 2B, issue #7).
485///
486/// `Source::Full` → `"filter"` (a full primary-tree walk plus
487/// in-memory predicates is the "filter scan" path); `Source::IndexRange`
488/// → `"index"` (the work is bounded by the named index's slice).
489#[cfg(feature = "tracing")]
490fn query_kind(source: &Source) -> &'static str {
491 match source {
492 Source::Full => "filter",
493 Source::IndexRange { .. } => "index",
494 }
495}
496
497/// Apply the optional `.limit(N)` cap to a raw count. `min(total, N)`
498/// when `limit` is set; the raw total otherwise.
499fn apply_count_limit(total: u64, limit: Option<usize>) -> u64 {
500 match limit {
501 // `usize` is at most 64 bits on every supported target so
502 // the cast is exact; `min` keeps overflow out of the
503 // arithmetic regardless.
504 Some(n) => total.min(u64::try_from(n).unwrap_or(u64::MAX)),
505 None => total,
506 }
507}
508
509/// Drain the configured source with no `sort_by` set.
510///
511/// Power-of-ten Rule 2: the loop is bounded by the source iterator's
512/// length (the primary B-tree's `MAX_RANGE_NODES` budget). The
513/// `.limit(N)` cap further shortens the loop.
514fn fetch_unsorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
515where
516 T: Document + Send + 'static,
517{
518 if q.limit == Some(0) {
519 return Ok(Vec::new());
520 }
521 let mut out: Vec<T> = Vec::new();
522 for_each_candidate(coll, q, |doc| {
523 if !q.filters.iter().all(|f| f(&doc)) {
524 return Ok(true);
525 }
526 out.push(doc);
527 if let Some(n) = q.limit {
528 if out.len() >= n {
529 return Ok(false);
530 }
531 }
532 Ok(true)
533 })?;
534 Ok(out)
535}
536
537/// Drain the configured source with `sort_by` set.
538///
539/// Collects up to `sort_buffer_limit` filtered candidates into RAM,
540/// sorts ascending by the extractor's byte output, then truncates
541/// to `limit`. Power-of-ten Rule 3: the buffer is bounded — exceeding
542/// it surfaces [`Error::SortBufferExceeded`] rather than chewing
543/// arbitrary memory.
544fn fetch_sorted<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<Vec<T>>
545where
546 T: Document + Send + 'static,
547{
548 let cap = q.sort_buffer_limit.unwrap_or(MAX_SORT_BUFFER);
549 let sort_key = q
550 .sort_key
551 .as_ref()
552 .ok_or(Error::InvalidArgument("fetch_sorted without sort_key"))?;
553 let mut buf: Vec<(Vec<u8>, T)> = Vec::new();
554 for_each_candidate(coll, q, |doc| {
555 if !q.filters.iter().all(|f| f(&doc)) {
556 return Ok(true);
557 }
558 if buf.len() >= cap {
559 return Err(Error::SortBufferExceeded { limit: cap });
560 }
561 // Power-of-ten Rule 7: propagate the extractor's `Result`
562 // rather than collapsing an encode failure into a tied key.
563 let key_bytes = sort_key(&doc)?;
564 buf.push((key_bytes, doc));
565 Ok(true)
566 })?;
567 buf.sort_by(|a, b| a.0.cmp(&b.0));
568 let truncated_len = match q.limit {
569 Some(n) => buf.len().min(n),
570 None => buf.len(),
571 };
572 let mut out: Vec<T> = Vec::with_capacity(truncated_len);
573 for (_k, d) in buf.into_iter().take(truncated_len) {
574 out.push(d);
575 }
576 Ok(out)
577}
578
579/// Walk the configured source and call `f(doc)` for each decoded
580/// document. `f` returns `Ok(true)` to continue, `Ok(false)` to stop
581/// early (e.g. the limit is reached), or `Err(_)` to abort the scan.
582///
583/// Power-of-ten Rule 4: factor the source-dispatch + iteration
584/// boilerplate so the sorted / unsorted entry points stay readable.
585fn for_each_candidate<T, F>(
586 coll: &crate::Collection<'_, T>,
587 q: &Query<'_, T>,
588 mut f: F,
589) -> Result<()>
590where
591 T: Document + Send + 'static,
592 F: FnMut(T) -> Result<bool>,
593{
594 match &q.source {
595 Source::Full => {
596 let docs = coll.all()?;
597 for (_id, doc) in docs {
598 if !f(doc)? {
599 return Ok(());
600 }
601 }
602 }
603 Source::IndexRange { name, start, end } => {
604 let iter = coll.index_range_encoded(name, clone_bound(start), clone_bound(end))?;
605 for step in iter {
606 let (_key, doc) = step?;
607 if !f(doc)? {
608 return Ok(());
609 }
610 }
611 }
612 }
613 Ok(())
614}
615
616/// No-filter fast path for [`Query::count`].
617///
618/// Walks the underlying B-tree (primary or index) without decoding
619/// any document. For an `Each`-kind `index_range` source, dispatches
620/// to [`crate::Collection::count_distinct_ids_in_range`] so the count
621/// matches what `fetch` would return (which de-duplicates per doc).
622/// Other kinds use the cheaper entry-count
623/// [`crate::Collection::count_index_range`] path. See the rustdoc on
624/// `Query::count` for the full per-kind table.
625fn count_fast<T>(coll: &crate::Collection<'_, T>, source: &Source) -> Result<u64>
626where
627 T: Document + Send + 'static,
628{
629 match source {
630 Source::Full => coll.count_all(),
631 Source::IndexRange { name, start, end } => {
632 // Peek at the descriptor to choose the per-kind path. The
633 // peek goes through the same `active_index` validation as
634 // the eventual count call (which re-resolves the index by
635 // name), so an unknown / dropped name produces the same
636 // `Error::IndexNotFound` exactly once.
637 let kind = coll.index_kind(name)?;
638 if kind == obj_core::IndexKind::Each {
639 coll.count_distinct_ids_in_range_encoded(name, clone_bound(start), clone_bound(end))
640 } else {
641 coll.count_index_range_encoded(name, clone_bound(start), clone_bound(end))
642 }
643 }
644 }
645}
646
647/// Filter-applied slow path for [`Query::count`].
648///
649/// Must decode each candidate to evaluate the predicate. The decode
650/// cost is unavoidable per the M8 design: there is no inverted-index
651/// data structure that would let us count filter matches without
652/// touching the document.
653fn count_slow<T>(coll: &crate::Collection<'_, T>, q: &Query<'_, T>) -> Result<u64>
654where
655 T: Document + Send + 'static,
656{
657 let mut n: u64 = 0;
658 for_each_candidate(coll, q, |doc| {
659 if q.filters.iter().all(|f| f(&doc)) {
660 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
661 reason: "slow-path count exceeds u64",
662 })?;
663 }
664 Ok(true)
665 })?;
666 Ok(n)
667}
668
669/// Encode a `Bound<Dynamic>` into a `Bound<Vec<u8>>` using the
670/// order-preserving field encoder. Used by [`Query::index_range`].
671fn encode_bound(b: Bound<&Dynamic>) -> Result<Bound<Vec<u8>>> {
672 match b {
673 Bound::Included(v) => Ok(Bound::Included(
674 obj_core::index::encode_field(v)?.into_bytes(),
675 )),
676 Bound::Excluded(v) => Ok(Bound::Excluded(
677 obj_core::index::encode_field(v)?.into_bytes(),
678 )),
679 Bound::Unbounded => Ok(Bound::Unbounded),
680 }
681}
682
683/// Clone a borrowed `Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`.
684/// Used by [`fetch_index_range`] to hand the bounds to the M7
685/// `Collection::index_range` API (which takes ownership).
686fn clone_bound(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
687 match b {
688 Bound::Included(v) => Bound::Included(v.clone()),
689 Bound::Excluded(v) => Bound::Excluded(v.clone()),
690 Bound::Unbounded => Bound::Unbounded,
691 }
692}