Skip to main content

obj/asynchronous/
query.rs

1//! `AsyncQuery` — async-facing wrapper over the M8 [`crate::Query`]
2//! builder.
3//!
4//! The blocking [`crate::Query`] borrows `&'db Db`; the async builder
5//! cannot keep that borrow alive across an `.await`. Instead,
6//! `AsyncQuery` stores the *configuration* (source, filters, sort key,
7//! limit) and re-materialises a `Query<'db, T>` on the inner `Db`
8//! inside the [`blocking`] task at terminal-method time.
9//!
10//! Builder setters (`filter`, `sort_by`, `sort_by_bytes`, `limit`,
11//! `index_range`, `sort_buffer_limit`) are synchronous because they
12//! manipulate in-memory state only. Terminal methods (`fetch`,
13//! `count`) are `async fn` — they hand the blocking scan off to the
14//! `blocking` pool.
15
16use std::marker::PhantomData;
17use std::ops::{Bound, RangeBounds};
18use std::sync::Arc;
19
20use obj_core::codec::Dynamic;
21use obj_core::{Document, Result};
22
23use crate::asynchronous::db::unblock;
24use crate::Db;
25
26/// Boxed filter predicate — same shape as the blocking
27/// [`crate::Query`]'s internal `FilterFn`, with the extra `Send`
28/// bound that the blocking-task hop requires.
29type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + 'static>;
30
31/// Boxed sort-key extractor producing the structured `Dynamic` shape.
32/// Forwarded onto [`crate::Query::sort_by`] in the blocking task so
33/// the existing `Error::SortKeyEncode` propagation lights up at fetch
34/// time — no error swallowing.
35type SortDynamicFn<T> = Box<dyn Fn(&T) -> Dynamic + Send + 'static>;
36
37/// Boxed sort-key extractor producing the raw byte shape. Forwarded
38/// onto [`crate::Query::sort_by_bytes`] in the blocking task; the
39/// caller's contract is infallible.
40type SortBytesFn<T> = Box<dyn Fn(&T) -> Vec<u8> + Send + 'static>;
41
42/// Mutually-exclusive sort-key state. `sort_by` and `sort_by_bytes`
43/// overwrite each other on the blocking [`crate::Query`] surface;
44/// mirroring that here keeps the build step a one-for-one mapping.
45enum SortKey<T> {
46    Dynamic(SortDynamicFn<T>),
47    Bytes(SortBytesFn<T>),
48}
49
50#[derive(Debug, Clone)]
51enum AsyncSource {
52    Full,
53    IndexRange {
54        name: String,
55        start: Bound<Dynamic>,
56        end: Bound<Dynamic>,
57    },
58}
59
60/// Async-facing query builder. See [`crate::Query`] for the surface
61/// semantics; this wrapper only changes the terminal methods to
62/// `async fn` and adds `Send` to every stored closure.
63pub struct AsyncQuery<T> {
64    db: Arc<Db>,
65    source: AsyncSource,
66    filters: Vec<FilterFn<T>>,
67    limit: Option<usize>,
68    sort_key: Option<SortKey<T>>,
69    sort_buffer_limit: Option<usize>,
70    _phantom: PhantomData<fn() -> T>,
71}
72
73impl<T> std::fmt::Debug for AsyncQuery<T> {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("AsyncQuery")
76            .field("source", &self.source)
77            .field("filters", &self.filters.len())
78            .field("limit", &self.limit)
79            .field("sort_key", &self.sort_key.is_some())
80            .field("sort_buffer_limit", &self.sort_buffer_limit)
81            .finish_non_exhaustive()
82    }
83}
84
85impl<T> AsyncQuery<T>
86where
87    T: Document + Send + 'static,
88{
89    pub(crate) fn new(db: Arc<Db>) -> Self {
90        Self {
91            db,
92            source: AsyncSource::Full,
93            filters: Vec::new(),
94            limit: None,
95            sort_key: None,
96            sort_buffer_limit: None,
97            _phantom: PhantomData,
98        }
99    }
100
101    /// Append a filter predicate. Async sibling of
102    /// [`crate::Query::filter`] — adds a `Send` bound so the closure
103    /// can ride the blocking-task hop.
104    #[must_use]
105    pub fn filter<F>(mut self, predicate: F) -> Self
106    where
107        F: Fn(&T) -> bool + Send + 'static,
108    {
109        self.filters.push(Box::new(predicate));
110        self
111    }
112
113    /// Cap the result set at `n`. See [`crate::Query::limit`].
114    #[must_use]
115    pub fn limit(mut self, n: usize) -> Self {
116        self.limit = Some(n);
117        self
118    }
119
120    /// Switch the source to a named index's range. See
121    /// [`crate::Query::index_range`].
122    ///
123    /// # Errors
124    ///
125    /// As [`crate::Query::index_range`] — note that the underlying
126    /// encoder fires at `fetch` / `count` time rather than here,
127    /// because the async builder stores the structured `Dynamic`
128    /// bounds and forwards them onto the blocking `Query` inside the
129    /// blocking task.
130    #[must_use]
131    pub fn index_range<R>(mut self, name: &str, range: R) -> Self
132    where
133        R: RangeBounds<Dynamic>,
134    {
135        self.source = AsyncSource::IndexRange {
136            name: name.to_owned(),
137            start: clone_dynamic_bound(range.start_bound()),
138            end: clone_dynamic_bound(range.end_bound()),
139        };
140        self
141    }
142
143    /// Sort the result by `key`'s output. See [`crate::Query::sort_by`].
144    /// Adds a `Send` bound to the closure so it can ride the
145    /// blocking-task hop.
146    #[must_use]
147    pub fn sort_by<F>(mut self, key: F) -> Self
148    where
149        F: Fn(&T) -> Dynamic + Send + 'static,
150    {
151        self.sort_key = Some(SortKey::Dynamic(Box::new(key)));
152        self
153    }
154
155    /// Sort by raw bytes. See [`crate::Query::sort_by_bytes`].
156    #[must_use]
157    pub fn sort_by_bytes<F>(mut self, key: F) -> Self
158    where
159        F: Fn(&T) -> Vec<u8> + Send + 'static,
160    {
161        self.sort_key = Some(SortKey::Bytes(Box::new(key)));
162        self
163    }
164
165    /// Override the per-query sort-buffer ceiling. See
166    /// [`crate::Query::sort_buffer_limit`].
167    #[must_use]
168    pub fn sort_buffer_limit(mut self, n: usize) -> Self {
169        self.sort_buffer_limit = Some(n);
170        self
171    }
172
173    /// Materialise the matching documents. See [`crate::Query::fetch`].
174    ///
175    /// # Errors
176    ///
177    /// As [`crate::Query::fetch`].
178    pub async fn fetch(self) -> Result<Vec<T>> {
179        let AsyncQuery {
180            db,
181            source,
182            filters,
183            limit,
184            sort_key,
185            sort_buffer_limit,
186            _phantom,
187        } = self;
188        unblock(move || {
189            let q = build_blocking_query::<T>(
190                &db,
191                source,
192                filters,
193                limit,
194                sort_key,
195                sort_buffer_limit,
196            )?;
197            q.fetch()
198        })
199        .await
200    }
201
202    /// Count matching documents. See [`crate::Query::count`].
203    ///
204    /// Takes `self` by value rather than by reference because the
205    /// async-builder's stored closures are `!Sync` in the general
206    /// case; consuming the builder avoids cloning the closures and
207    /// keeps the surface honest about ownership.
208    ///
209    /// # Errors
210    ///
211    /// As [`crate::Query::count`].
212    pub async fn count(self) -> Result<u64> {
213        let AsyncQuery {
214            db,
215            source,
216            filters,
217            limit,
218            sort_key,
219            sort_buffer_limit,
220            _phantom,
221        } = self;
222        unblock(move || {
223            let q = build_blocking_query::<T>(
224                &db,
225                source,
226                filters,
227                limit,
228                sort_key,
229                sort_buffer_limit,
230            )?;
231            q.count()
232        })
233        .await
234    }
235}
236
237/// Construct a blocking [`crate::Query<'db, T>`] from the async
238/// builder's stored configuration. The lifetime `'db` is whatever the
239/// caller's blocking task sees — borrowing `&'db Db` for the duration
240/// of `.fetch()` / `.count()` is the standard blocking-query contract.
241fn build_blocking_query<T>(
242    db: &Db,
243    source: AsyncSource,
244    filters: Vec<FilterFn<T>>,
245    limit: Option<usize>,
246    sort_key: Option<SortKey<T>>,
247    sort_buffer_limit: Option<usize>,
248) -> Result<crate::Query<'_, T>>
249where
250    T: Document + Send + 'static,
251{
252    let mut q = db.query::<T>();
253    match source {
254        AsyncSource::Full => {}
255        AsyncSource::IndexRange { name, start, end } => {
256            q = q.index_range(&name, (start, end))?;
257        }
258    }
259    // Power-of-ten Rule 2: filter count is bounded by however many the
260    // caller pushed — every closure is a `'static` `Fn`, applied at
261    // fetch time.
262    for predicate in filters {
263        q = q.filter(move |doc| predicate(doc));
264    }
265    match sort_key {
266        Some(SortKey::Dynamic(f)) => {
267            // Forward onto the blocking `Query::sort_by` so the
268            // `Error::SortKeyEncode` path lights up unchanged.
269            q = q.sort_by(move |doc| f(doc));
270        }
271        Some(SortKey::Bytes(f)) => {
272            q = q.sort_by_bytes(move |doc| f(doc));
273        }
274        None => {}
275    }
276    if let Some(n) = limit {
277        q = q.limit(n);
278    }
279    if let Some(n) = sort_buffer_limit {
280        q = q.sort_buffer_limit(n);
281    }
282    Ok(q)
283}
284
285fn clone_dynamic_bound(b: Bound<&Dynamic>) -> Bound<Dynamic> {
286    match b {
287        Bound::Included(d) => Bound::Included(d.clone()),
288        Bound::Excluded(d) => Bound::Excluded(d.clone()),
289        Bound::Unbounded => Bound::Unbounded,
290    }
291}