obj/asynchronous/db.rs
1//! `AsyncDb` — async-facing wrapper over the blocking [`Db`].
2//!
3//! Each method clones the inner `Arc<Db>` and moves the clone into a
4//! [`blocking::unblock`] task; the task runs the corresponding
5//! synchronous method to completion. The blocking-task return value is
6//! `Send + 'static`, which the existing blocking surface already
7//! satisfies (every `Db` method returns `Result<T>` with `T: Send +
8//! 'static` for every type we wrap here).
9
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use obj_core::Document;
14use obj_core::{Id, Result};
15
16use crate::asynchronous::collection::AsyncCollection;
17use crate::asynchronous::query::AsyncQuery;
18use crate::{Config, Db, DbStat, IntegrityReport, ReadTxn, WriteTxn};
19
20/// Async-facing wrapper around the blocking [`Db`].
21///
22/// `AsyncDb` is cheap to clone (one `Arc` bump) so it can be shared
23/// across spawned tasks without locking. The blocking engine sits
24/// behind a single `Arc<Db>`; every async method hands its body off
25/// to the [`blocking`] thread pool.
26///
27/// Public construction goes through [`AsyncDb::open`] /
28/// [`AsyncDb::open_with`] / [`AsyncDb::memory`] /
29/// [`AsyncDb::memory_with`] / [`AsyncDb::open_readonly`] /
30/// [`AsyncDb::from_blocking`].
31#[derive(Clone, Debug)]
32pub struct AsyncDb {
33 inner: Arc<Db>,
34}
35
36impl AsyncDb {
37 /// Construct an `AsyncDb` from an already-opened blocking [`Db`].
38 ///
39 /// Synchronous on purpose — wrapping an in-hand `Db` does no
40 /// I/O. Useful when the caller already opened the database from
41 /// a blocking context (e.g. CLI bootstrap) and wants to drive it
42 /// async-style from the rest of the program.
43 #[must_use]
44 pub fn from_blocking(db: Db) -> Self {
45 Self {
46 inner: Arc::new(db),
47 }
48 }
49
50 /// Open or create a file-backed database at `path` with default
51 /// configuration. See [`Db::open`].
52 ///
53 /// # Errors
54 ///
55 /// As [`Db::open`].
56 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
57 Self::open_with(path, Config::default()).await
58 }
59
60 /// Open or create a file-backed database with `config`. See
61 /// [`Db::open_with`].
62 ///
63 /// # Errors
64 ///
65 /// As [`Db::open_with`].
66 pub async fn open_with<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
67 let path_buf: PathBuf = path.as_ref().to_path_buf();
68 unblock(move || Db::open_with(path_buf, config).map(Self::from_blocking)).await
69 }
70
71 /// Open a fresh in-memory database. See [`Db::memory`].
72 ///
73 /// # Errors
74 ///
75 /// As [`Db::memory`].
76 pub async fn memory() -> Result<Self> {
77 Self::memory_with(Config::default()).await
78 }
79
80 /// As [`AsyncDb::memory`] with a caller-supplied [`Config`].
81 ///
82 /// # Errors
83 ///
84 /// As [`Db::memory_with`].
85 pub async fn memory_with(config: Config) -> Result<Self> {
86 unblock(move || Db::memory_with(config).map(Self::from_blocking)).await
87 }
88
89 /// Open the database at `path` read-only. See [`Db::open_readonly`].
90 ///
91 /// # Errors
92 ///
93 /// As [`Db::open_readonly`].
94 pub async fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
95 let path_buf: PathBuf = path.as_ref().to_path_buf();
96 unblock(move || Db::open_readonly(path_buf).map(Self::from_blocking)).await
97 }
98
99 /// Borrow the underlying blocking [`Db`].
100 ///
101 /// Useful when async code needs to hand a `&Db` to a third-party
102 /// API that only takes the blocking type — note the borrow is
103 /// scoped to the local task, not the blocking pool.
104 #[must_use]
105 pub fn as_blocking(&self) -> &Db {
106 &self.inner
107 }
108
109 /// Insert `doc`. See [`Db::insert`].
110 ///
111 /// # Errors
112 ///
113 /// As [`Db::insert`].
114 pub async fn insert<T>(&self, doc: T) -> Result<Id>
115 where
116 T: Document + Send + 'static,
117 {
118 let inner = Arc::clone(&self.inner);
119 unblock(move || inner.insert(doc)).await
120 }
121
122 /// Fetch the document at `id`. See [`Db::get`].
123 ///
124 /// # Errors
125 ///
126 /// As [`Db::get`].
127 pub async fn get<T>(&self, id: Id) -> Result<Option<T>>
128 where
129 T: Document + Send + 'static,
130 {
131 let inner = Arc::clone(&self.inner);
132 unblock(move || inner.get::<T>(id)).await
133 }
134
135 /// Update the document at `id`. See [`Db::update`].
136 ///
137 /// The closure runs **synchronously** inside the blocking task,
138 /// so it must be `Send + 'static`. This mirrors the wider
139 /// "closure runs on the blocking pool" contract documented on
140 /// [`AsyncDb::transaction`].
141 ///
142 /// # Errors
143 ///
144 /// As [`Db::update`].
145 pub async fn update<T, F>(&self, id: Id, f: F) -> Result<()>
146 where
147 T: Document + Send + 'static,
148 F: FnOnce(&mut T) + Send + 'static,
149 {
150 let inner = Arc::clone(&self.inner);
151 unblock(move || inner.update::<T, F>(id, f)).await
152 }
153
154 /// Delete the document at `id`. See [`Db::delete`].
155 ///
156 /// # Errors
157 ///
158 /// As [`Db::delete`].
159 pub async fn delete<T>(&self, id: Id) -> Result<bool>
160 where
161 T: Document + Send + 'static,
162 {
163 let inner = Arc::clone(&self.inner);
164 unblock(move || inner.delete::<T>(id)).await
165 }
166
167 /// Insert-or-replace the document at `id`. See [`Db::upsert`].
168 ///
169 /// # Errors
170 ///
171 /// As [`Db::upsert`].
172 pub async fn upsert<T>(&self, id: Id, doc: T) -> Result<()>
173 where
174 T: Document + Send + 'static,
175 {
176 let inner = Arc::clone(&self.inner);
177 unblock(move || inner.upsert(id, doc)).await
178 }
179
180 /// Point lookup on a `Unique` index. See [`Db::find_unique`].
181 ///
182 /// # Errors
183 ///
184 /// As [`Db::find_unique`].
185 pub async fn find_unique<T, K>(&self, index_name: &str, key: K) -> Result<Option<T>>
186 where
187 T: Document + Send + 'static,
188 K: Into<obj_core::codec::Dynamic> + Send + 'static,
189 {
190 let inner = Arc::clone(&self.inner);
191 let name = index_name.to_owned();
192 unblock(move || inner.find_unique::<T>(&name, key)).await
193 }
194
195 /// Materialise the collection's primary B-tree into `Vec<T>`. See
196 /// [`Db::all`].
197 ///
198 /// Streaming async iteration is intentionally **not** wrapped in
199 /// this phase — the entire collection is collected inside the
200 /// blocking task. For very large collections, drive the blocking
201 /// [`Db::iter_all`] from a dedicated `tokio::task::spawn_blocking`
202 /// (or equivalent) until the async streaming surface lands.
203 ///
204 /// # Errors
205 ///
206 /// As [`Db::all`].
207 pub async fn all<T>(&self) -> Result<Vec<T>>
208 where
209 T: Document + Send + 'static,
210 {
211 let inner = Arc::clone(&self.inner);
212 unblock(move || inner.all::<T>()).await
213 }
214
215 /// Run a closure inside a write transaction. See
216 /// [`Db::transaction`].
217 ///
218 /// # Closure contract
219 ///
220 /// The closure runs **synchronously** inside the blocking task,
221 /// so it must be `Send + 'static`; the return value `R` likewise.
222 /// No `async fn` inside the closure — that is a deliberate
223 /// "async-over-blocking" restriction, matching the contract used
224 /// by sqlx and other async-over-sync database wrappers.
225 ///
226 /// # Errors
227 ///
228 /// As [`Db::transaction`].
229 pub async fn transaction<R, F>(&self, body: F) -> Result<R>
230 where
231 R: Send + 'static,
232 F: FnOnce(&mut WriteTxn<'_>) -> Result<R> + Send + 'static,
233 {
234 let inner = Arc::clone(&self.inner);
235 unblock(move || inner.transaction(body)).await
236 }
237
238 /// Run a closure inside a read transaction. See
239 /// [`Db::read_transaction`].
240 ///
241 /// Same `Send + 'static` closure restriction as
242 /// [`AsyncDb::transaction`].
243 ///
244 /// # Errors
245 ///
246 /// As [`Db::read_transaction`].
247 pub async fn read_transaction<R, F>(&self, body: F) -> Result<R>
248 where
249 R: Send + 'static,
250 F: FnOnce(&ReadTxn<'_>) -> Result<R> + Send + 'static,
251 {
252 let inner = Arc::clone(&self.inner);
253 unblock(move || inner.read_transaction(body)).await
254 }
255
256 /// Hot-backup the database to `dest`. See [`Db::backup_to`].
257 ///
258 /// # Errors
259 ///
260 /// As [`Db::backup_to`].
261 pub async fn backup_to<P: AsRef<Path>>(&self, dest: P) -> Result<()> {
262 let inner = Arc::clone(&self.inner);
263 let dest_buf: PathBuf = dest.as_ref().to_path_buf();
264 unblock(move || inner.backup_to(dest_buf)).await
265 }
266
267 /// Attach a read-only `.obj` file under `namespace`. See
268 /// [`Db::attach`].
269 ///
270 /// Takes `&mut self` because the blocking [`Db::attach`] takes
271 /// `&mut Db`. We temporarily unwrap the `Arc<Db>` (it must be
272 /// uniquely owned at this point) so the blocking task can take
273 /// the `&mut`. If the `Arc` is shared, attach falls back to
274 /// [`Error::Busy`](obj_core::Error::Busy) with the
275 /// `WriterInProcess` kind — clone the `AsyncDb` only after all
276 /// `attach` / `detach` calls.
277 ///
278 /// # Errors
279 ///
280 /// As [`Db::attach`], plus
281 /// [`Error::Busy`](obj_core::Error::Busy) when the `Arc<Db>` is
282 /// not uniquely owned at call time.
283 pub async fn attach<P>(&mut self, path: P, namespace: impl Into<String>) -> Result<()>
284 where
285 P: AsRef<Path>,
286 {
287 let path_buf: PathBuf = path.as_ref().to_path_buf();
288 let namespace = namespace.into();
289 self.with_mut_db(move |db| db.attach(&path_buf, namespace))
290 .await
291 }
292
293 /// Detach the attachment registered under `namespace`. See
294 /// [`Db::detach`].
295 ///
296 /// Same `&mut self` contract as [`AsyncDb::attach`].
297 ///
298 /// # Errors
299 ///
300 /// As [`Db::detach`].
301 pub async fn detach(&mut self, namespace: &str) -> Result<()> {
302 let namespace = namespace.to_owned();
303 self.with_mut_db(move |db| db.detach(&namespace)).await
304 }
305
306 /// Run `f` with a `&mut Db`. The `Arc<Db>` inside `self` must be
307 /// uniquely owned at call time; otherwise the function returns
308 /// [`Error::Busy`](obj_core::Error::Busy) so the caller does not
309 /// silently observe stale-state behaviour.
310 ///
311 /// The `&mut self` receiver guarantees no other `&AsyncDb`
312 /// reference borrows `self` for the duration of the call, but
313 /// **clones** of `self` (which hold their own `Arc<Db>`) defeat
314 /// the `Arc::try_unwrap` path. Reserve `attach` / `detach` for
315 /// the bootstrap phase, before the `AsyncDb` is shared across
316 /// tasks.
317 async fn with_mut_db<F, R>(&mut self, f: F) -> Result<R>
318 where
319 F: FnOnce(&mut Db) -> Result<R> + Send + 'static,
320 R: Send + 'static,
321 {
322 // Move `self.inner` out (replacing with a temporary sentinel)
323 // so `Arc::try_unwrap` sees exactly one strong reference. The
324 // sentinel is never observed by the caller — we restore the
325 // real `Arc<Db>` before returning. Power-of-ten Rule 7: every
326 // `try_unwrap` branch is handled; on failure we put the
327 // original `Arc<Db>` back so the receiver is left valid.
328 let sentinel = match Db::memory() {
329 Ok(db) => Arc::new(db),
330 Err(e) => return Err(e),
331 };
332 let original = std::mem::replace(&mut self.inner, sentinel);
333 let mut db = match Arc::try_unwrap(original) {
334 Ok(db) => db,
335 Err(arc) => {
336 // Restore the receiver so `self` is left valid; the
337 // sentinel never escapes.
338 self.inner = arc;
339 return Err(obj_core::Error::Busy {
340 kind: obj_core::LockKind::WriterInProcess,
341 });
342 }
343 };
344 let (db_back, result) = unblock(move || {
345 let result = f(&mut db);
346 (db, result)
347 })
348 .await;
349 self.inner = Arc::new(db_back);
350 result
351 }
352
353 /// Run [`Db::integrity_check`]. See that method.
354 ///
355 /// # Errors
356 ///
357 /// As [`Db::integrity_check`].
358 pub async fn integrity_check(&self) -> Result<IntegrityReport> {
359 let inner = Arc::clone(&self.inner);
360 unblock(move || inner.integrity_check()).await
361 }
362
363 /// Read [`Db::stat`]. See that method.
364 ///
365 /// # Errors
366 ///
367 /// As [`Db::stat`].
368 pub async fn stat(&self) -> Result<DbStat> {
369 let inner = Arc::clone(&self.inner);
370 unblock(move || inner.stat()).await
371 }
372
373 /// Open a read-only typed handle to a runtime-named collection.
374 /// See [`Db::collection`].
375 ///
376 /// Construction is infallible; the handle dispatches into the
377 /// blocking pool on every read-only method call.
378 #[must_use]
379 pub fn collection<T>(&self, name: impl Into<String>) -> AsyncCollection<T>
380 where
381 T: Document + Send + 'static,
382 {
383 AsyncCollection::lazy(Arc::clone(&self.inner), name.into())
384 }
385
386 /// Construct a fresh [`AsyncQuery`] builder rooted at this
387 /// database. See [`Db::query`].
388 #[must_use]
389 pub fn query<T>(&self) -> AsyncQuery<T>
390 where
391 T: Document + Send + 'static,
392 {
393 AsyncQuery::new(Arc::clone(&self.inner))
394 }
395}
396
397/// Internal shim — `blocking::unblock` plus optional `tracing` span
398/// propagation. Capturing the current span before the hop and
399/// re-entering it inside the blocking task is the documented pattern
400/// for `tracing` across thread-pool work (see the `tracing` crate's
401/// `Span::in_scope` docs).
402///
403/// Power-of-ten Rule 4: the helper centralises the propagation logic
404/// so each call site stays a one-liner.
405pub(crate) async fn unblock<F, R>(f: F) -> R
406where
407 F: FnOnce() -> R + Send + 'static,
408 R: Send + 'static,
409{
410 #[cfg(feature = "tracing")]
411 let span = tracing::Span::current();
412 blocking::unblock(move || {
413 #[cfg(feature = "tracing")]
414 let _guard = span.enter();
415 f()
416 })
417 .await
418}