obj/asynchronous/query.rs
1//! `AsyncQuery` — async-facing wrapper over the M8 [`crate::Query`]
2//! builder.
3//!
4//! The blocking [`crate::Query`] borrows `&'db Db`; the async builder
5//! cannot keep that borrow alive across an `.await`. Instead,
6//! `AsyncQuery` stores the *configuration* (source, filters, sort key,
7//! limit) and re-materialises a `Query<'db, T>` on the inner `Db`
8//! inside the [`blocking`] task at terminal-method time.
9//!
10//! Builder setters (`filter`, `sort_by`, `sort_by_bytes`, `limit`,
11//! `index_range`, `sort_buffer_limit`) are synchronous because they
12//! manipulate in-memory state only. Terminal methods (`fetch`,
13//! `count`) are `async fn` — they hand the blocking scan off to the
14//! `blocking` pool.
15
16use std::marker::PhantomData;
17use std::ops::{Bound, RangeBounds};
18use std::sync::Arc;
19
20use obj_core::codec::Dynamic;
21use obj_core::{Document, Result};
22
23use crate::asynchronous::db::unblock;
24use crate::Db;
25
26/// Boxed filter predicate — same shape as the blocking
27/// [`crate::Query`]'s internal `FilterFn`, with the extra `Send`
28/// bound that the blocking-task hop requires.
29type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + 'static>;
30
31/// Boxed sort-key extractor producing the structured `Dynamic` shape.
32/// Forwarded onto [`crate::Query::sort_by`] in the blocking task so
33/// the existing `Error::SortKeyEncode` propagation lights up at fetch
34/// time — no error swallowing.
35type SortDynamicFn<T> = Box<dyn Fn(&T) -> Dynamic + Send + 'static>;
36
37/// Boxed sort-key extractor producing the raw byte shape. Forwarded
38/// onto [`crate::Query::sort_by_bytes`] in the blocking task; the
39/// caller's contract is infallible.
40type SortBytesFn<T> = Box<dyn Fn(&T) -> Vec<u8> + Send + 'static>;
41
42/// Mutually-exclusive sort-key state. `sort_by` and `sort_by_bytes`
43/// overwrite each other on the blocking [`crate::Query`] surface;
44/// mirroring that here keeps the build step a one-for-one mapping.
45enum SortKey<T> {
46 Dynamic(SortDynamicFn<T>),
47 Bytes(SortBytesFn<T>),
48}
49
50#[derive(Debug, Clone)]
51enum AsyncSource {
52 Full,
53 IndexRange {
54 name: String,
55 start: Bound<Dynamic>,
56 end: Bound<Dynamic>,
57 },
58}
59
60/// Async-facing query builder. See [`crate::Query`] for the surface
61/// semantics; this wrapper only changes the terminal methods to
62/// `async fn` and adds `Send` to every stored closure.
63pub struct AsyncQuery<T> {
64 db: Arc<Db>,
65 source: AsyncSource,
66 filters: Vec<FilterFn<T>>,
67 limit: Option<usize>,
68 sort_key: Option<SortKey<T>>,
69 sort_buffer_limit: Option<usize>,
70 _phantom: PhantomData<fn() -> T>,
71}
72
73impl<T> std::fmt::Debug for AsyncQuery<T> {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("AsyncQuery")
76 .field("source", &self.source)
77 .field("filters", &self.filters.len())
78 .field("limit", &self.limit)
79 .field("sort_key", &self.sort_key.is_some())
80 .field("sort_buffer_limit", &self.sort_buffer_limit)
81 .finish_non_exhaustive()
82 }
83}
84
85impl<T> AsyncQuery<T>
86where
87 T: Document + Send + 'static,
88{
89 pub(crate) fn new(db: Arc<Db>) -> Self {
90 Self {
91 db,
92 source: AsyncSource::Full,
93 filters: Vec::new(),
94 limit: None,
95 sort_key: None,
96 sort_buffer_limit: None,
97 _phantom: PhantomData,
98 }
99 }
100
101 /// Append a filter predicate. Async sibling of
102 /// [`crate::Query::filter`] — adds a `Send` bound so the closure
103 /// can ride the blocking-task hop.
104 #[must_use]
105 pub fn filter<F>(mut self, predicate: F) -> Self
106 where
107 F: Fn(&T) -> bool + Send + 'static,
108 {
109 self.filters.push(Box::new(predicate));
110 self
111 }
112
113 /// Cap the result set at `n`. See [`crate::Query::limit`].
114 #[must_use]
115 pub fn limit(mut self, n: usize) -> Self {
116 self.limit = Some(n);
117 self
118 }
119
120 /// Switch the source to a named index's range. See
121 /// [`crate::Query::index_range`].
122 ///
123 /// # Errors
124 ///
125 /// As [`crate::Query::index_range`] — note that the underlying
126 /// encoder fires at `fetch` / `count` time rather than here,
127 /// because the async builder stores the structured `Dynamic`
128 /// bounds and forwards them onto the blocking `Query` inside the
129 /// blocking task.
130 #[must_use]
131 pub fn index_range<R>(mut self, name: &str, range: R) -> Self
132 where
133 R: RangeBounds<Dynamic>,
134 {
135 self.source = AsyncSource::IndexRange {
136 name: name.to_owned(),
137 start: clone_dynamic_bound(range.start_bound()),
138 end: clone_dynamic_bound(range.end_bound()),
139 };
140 self
141 }
142
143 /// Sort the result by `key`'s output. See [`crate::Query::sort_by`].
144 /// Adds a `Send` bound to the closure so it can ride the
145 /// blocking-task hop.
146 #[must_use]
147 pub fn sort_by<F>(mut self, key: F) -> Self
148 where
149 F: Fn(&T) -> Dynamic + Send + 'static,
150 {
151 self.sort_key = Some(SortKey::Dynamic(Box::new(key)));
152 self
153 }
154
155 /// Sort by raw bytes. See [`crate::Query::sort_by_bytes`].
156 #[must_use]
157 pub fn sort_by_bytes<F>(mut self, key: F) -> Self
158 where
159 F: Fn(&T) -> Vec<u8> + Send + 'static,
160 {
161 self.sort_key = Some(SortKey::Bytes(Box::new(key)));
162 self
163 }
164
165 /// Override the per-query sort-buffer ceiling. See
166 /// [`crate::Query::sort_buffer_limit`].
167 #[must_use]
168 pub fn sort_buffer_limit(mut self, n: usize) -> Self {
169 self.sort_buffer_limit = Some(n);
170 self
171 }
172
173 /// Materialise the matching documents. See [`crate::Query::fetch`].
174 ///
175 /// # Errors
176 ///
177 /// As [`crate::Query::fetch`].
178 pub async fn fetch(self) -> Result<Vec<T>> {
179 let AsyncQuery {
180 db,
181 source,
182 filters,
183 limit,
184 sort_key,
185 sort_buffer_limit,
186 _phantom,
187 } = self;
188 unblock(move || {
189 let q = build_blocking_query::<T>(
190 &db,
191 source,
192 filters,
193 limit,
194 sort_key,
195 sort_buffer_limit,
196 )?;
197 q.fetch()
198 })
199 .await
200 }
201
202 /// Count matching documents. See [`crate::Query::count`].
203 ///
204 /// Takes `self` by value rather than by reference because the
205 /// async-builder's stored closures are `!Sync` in the general
206 /// case; consuming the builder avoids cloning the closures and
207 /// keeps the surface honest about ownership.
208 ///
209 /// # Errors
210 ///
211 /// As [`crate::Query::count`].
212 pub async fn count(self) -> Result<u64> {
213 let AsyncQuery {
214 db,
215 source,
216 filters,
217 limit,
218 sort_key,
219 sort_buffer_limit,
220 _phantom,
221 } = self;
222 unblock(move || {
223 let q = build_blocking_query::<T>(
224 &db,
225 source,
226 filters,
227 limit,
228 sort_key,
229 sort_buffer_limit,
230 )?;
231 q.count()
232 })
233 .await
234 }
235}
236
237/// Construct a blocking [`crate::Query<'db, T>`] from the async
238/// builder's stored configuration. The lifetime `'db` is whatever the
239/// caller's blocking task sees — borrowing `&'db Db` for the duration
240/// of `.fetch()` / `.count()` is the standard blocking-query contract.
241fn build_blocking_query<T>(
242 db: &Db,
243 source: AsyncSource,
244 filters: Vec<FilterFn<T>>,
245 limit: Option<usize>,
246 sort_key: Option<SortKey<T>>,
247 sort_buffer_limit: Option<usize>,
248) -> Result<crate::Query<'_, T>>
249where
250 T: Document + Send + 'static,
251{
252 let mut q = db.query::<T>();
253 match source {
254 AsyncSource::Full => {}
255 AsyncSource::IndexRange { name, start, end } => {
256 q = q.index_range(&name, (start, end))?;
257 }
258 }
259 // Power-of-ten Rule 2: filter count is bounded by however many the
260 // caller pushed — every closure is a `'static` `Fn`, applied at
261 // fetch time.
262 for predicate in filters {
263 q = q.filter(move |doc| predicate(doc));
264 }
265 match sort_key {
266 Some(SortKey::Dynamic(f)) => {
267 // Forward onto the blocking `Query::sort_by` so the
268 // `Error::SortKeyEncode` path lights up unchanged.
269 q = q.sort_by(move |doc| f(doc));
270 }
271 Some(SortKey::Bytes(f)) => {
272 q = q.sort_by_bytes(move |doc| f(doc));
273 }
274 None => {}
275 }
276 if let Some(n) = limit {
277 q = q.limit(n);
278 }
279 if let Some(n) = sort_buffer_limit {
280 q = q.sort_buffer_limit(n);
281 }
282 Ok(q)
283}
284
285fn clone_dynamic_bound(b: Bound<&Dynamic>) -> Bound<Dynamic> {
286 match b {
287 Bound::Included(d) => Bound::Included(d.clone()),
288 Bound::Excluded(d) => Bound::Excluded(d.clone()),
289 Bound::Unbounded => Bound::Unbounded,
290 }
291}