Skip to main content

obj/asynchronous/
db.rs

1//! `AsyncDb` — async-facing wrapper over the blocking [`Db`].
2//!
3//! Each method clones the inner `Arc<Db>` and moves the clone into a
4//! [`blocking::unblock`] task; the task runs the corresponding
5//! synchronous method to completion. The blocking-task return value is
6//! `Send + 'static`, which the existing blocking surface already
7//! satisfies (every `Db` method returns `Result<T>` with `T: Send +
8//! 'static` for every type we wrap here).
9
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use obj_core::Document;
14use obj_core::{Id, Result};
15
16use crate::asynchronous::collection::AsyncCollection;
17use crate::asynchronous::query::AsyncQuery;
18use crate::{Config, Db, DbStat, IntegrityReport, ReadTxn, WriteTxn};
19
20/// Async-facing wrapper around the blocking [`Db`].
21///
22/// `AsyncDb` is cheap to clone (one `Arc` bump) so it can be shared
23/// across spawned tasks without locking. The blocking engine sits
24/// behind a single `Arc<Db>`; every async method hands its body off
25/// to the [`blocking`] thread pool.
26///
27/// Public construction goes through [`AsyncDb::open`] /
28/// [`AsyncDb::open_with`] / [`AsyncDb::memory`] /
29/// [`AsyncDb::memory_with`] / [`AsyncDb::open_readonly`] /
30/// [`AsyncDb::from_blocking`].
31#[derive(Clone, Debug)]
32pub struct AsyncDb {
33    inner: Arc<Db>,
34}
35
36impl AsyncDb {
37    /// Construct an `AsyncDb` from an already-opened blocking [`Db`].
38    ///
39    /// Synchronous on purpose — wrapping an in-hand `Db` does no
40    /// I/O. Useful when the caller already opened the database from
41    /// a blocking context (e.g. CLI bootstrap) and wants to drive it
42    /// async-style from the rest of the program.
43    #[must_use]
44    pub fn from_blocking(db: Db) -> Self {
45        Self {
46            inner: Arc::new(db),
47        }
48    }
49
50    /// Open or create a file-backed database at `path` with default
51    /// configuration. See [`Db::open`].
52    ///
53    /// # Errors
54    ///
55    /// As [`Db::open`].
56    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
57        Self::open_with(path, Config::default()).await
58    }
59
60    /// Open or create a file-backed database with `config`. See
61    /// [`Db::open_with`].
62    ///
63    /// # Errors
64    ///
65    /// As [`Db::open_with`].
66    pub async fn open_with<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
67        let path_buf: PathBuf = path.as_ref().to_path_buf();
68        unblock(move || Db::open_with(path_buf, config).map(Self::from_blocking)).await
69    }
70
71    /// Open a fresh in-memory database. See [`Db::memory`].
72    ///
73    /// # Errors
74    ///
75    /// As [`Db::memory`].
76    pub async fn memory() -> Result<Self> {
77        Self::memory_with(Config::default()).await
78    }
79
80    /// As [`AsyncDb::memory`] with a caller-supplied [`Config`].
81    ///
82    /// # Errors
83    ///
84    /// As [`Db::memory_with`].
85    pub async fn memory_with(config: Config) -> Result<Self> {
86        unblock(move || Db::memory_with(config).map(Self::from_blocking)).await
87    }
88
89    /// Open the database at `path` read-only. See [`Db::open_readonly`].
90    ///
91    /// # Errors
92    ///
93    /// As [`Db::open_readonly`].
94    pub async fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
95        let path_buf: PathBuf = path.as_ref().to_path_buf();
96        unblock(move || Db::open_readonly(path_buf).map(Self::from_blocking)).await
97    }
98
99    /// Borrow the underlying blocking [`Db`].
100    ///
101    /// Useful when async code needs to hand a `&Db` to a third-party
102    /// API that only takes the blocking type — note the borrow is
103    /// scoped to the local task, not the blocking pool.
104    #[must_use]
105    pub fn as_blocking(&self) -> &Db {
106        &self.inner
107    }
108
109    /// Insert `doc`. See [`Db::insert`].
110    ///
111    /// # Errors
112    ///
113    /// As [`Db::insert`].
114    pub async fn insert<T>(&self, doc: T) -> Result<Id>
115    where
116        T: Document + Send + 'static,
117    {
118        let inner = Arc::clone(&self.inner);
119        unblock(move || inner.insert(doc)).await
120    }
121
122    /// Fetch the document at `id`. See [`Db::get`].
123    ///
124    /// # Errors
125    ///
126    /// As [`Db::get`].
127    pub async fn get<T>(&self, id: Id) -> Result<Option<T>>
128    where
129        T: Document + Send + 'static,
130    {
131        let inner = Arc::clone(&self.inner);
132        unblock(move || inner.get::<T>(id)).await
133    }
134
135    /// Update the document at `id`. See [`Db::update`].
136    ///
137    /// The closure runs **synchronously** inside the blocking task,
138    /// so it must be `Send + 'static`. This mirrors the wider
139    /// "closure runs on the blocking pool" contract documented on
140    /// [`AsyncDb::transaction`].
141    ///
142    /// # Errors
143    ///
144    /// As [`Db::update`].
145    pub async fn update<T, F>(&self, id: Id, f: F) -> Result<()>
146    where
147        T: Document + Send + 'static,
148        F: FnOnce(&mut T) + Send + 'static,
149    {
150        let inner = Arc::clone(&self.inner);
151        unblock(move || inner.update::<T, F>(id, f)).await
152    }
153
154    /// Delete the document at `id`. See [`Db::delete`].
155    ///
156    /// # Errors
157    ///
158    /// As [`Db::delete`].
159    pub async fn delete<T>(&self, id: Id) -> Result<bool>
160    where
161        T: Document + Send + 'static,
162    {
163        let inner = Arc::clone(&self.inner);
164        unblock(move || inner.delete::<T>(id)).await
165    }
166
167    /// Insert-or-replace the document at `id`. See [`Db::upsert`].
168    ///
169    /// # Errors
170    ///
171    /// As [`Db::upsert`].
172    pub async fn upsert<T>(&self, id: Id, doc: T) -> Result<()>
173    where
174        T: Document + Send + 'static,
175    {
176        let inner = Arc::clone(&self.inner);
177        unblock(move || inner.upsert(id, doc)).await
178    }
179
180    /// Point lookup on a `Unique` index. See [`Db::find_unique`].
181    ///
182    /// # Errors
183    ///
184    /// As [`Db::find_unique`].
185    pub async fn find_unique<T, K>(&self, index_name: &str, key: K) -> Result<Option<T>>
186    where
187        T: Document + Send + 'static,
188        K: Into<obj_core::codec::Dynamic> + Send + 'static,
189    {
190        let inner = Arc::clone(&self.inner);
191        let name = index_name.to_owned();
192        unblock(move || inner.find_unique::<T>(&name, key)).await
193    }
194
195    /// Materialise the collection's primary B-tree into `Vec<T>`. See
196    /// [`Db::all`].
197    ///
198    /// Streaming async iteration is intentionally **not** wrapped in
199    /// this phase — the entire collection is collected inside the
200    /// blocking task. For very large collections, drive the blocking
201    /// [`Db::iter_all`] from a dedicated `tokio::task::spawn_blocking`
202    /// (or equivalent) until the async streaming surface lands.
203    ///
204    /// # Errors
205    ///
206    /// As [`Db::all`].
207    pub async fn all<T>(&self) -> Result<Vec<T>>
208    where
209        T: Document + Send + 'static,
210    {
211        let inner = Arc::clone(&self.inner);
212        unblock(move || inner.all::<T>()).await
213    }
214
215    /// Run a closure inside a write transaction. See
216    /// [`Db::transaction`].
217    ///
218    /// # Closure contract
219    ///
220    /// The closure runs **synchronously** inside the blocking task,
221    /// so it must be `Send + 'static`; the return value `R` likewise.
222    /// No `async fn` inside the closure — that is a deliberate
223    /// "async-over-blocking" restriction, matching the contract used
224    /// by sqlx and other async-over-sync database wrappers.
225    ///
226    /// # Errors
227    ///
228    /// As [`Db::transaction`].
229    pub async fn transaction<R, F>(&self, body: F) -> Result<R>
230    where
231        R: Send + 'static,
232        F: FnOnce(&mut WriteTxn<'_>) -> Result<R> + Send + 'static,
233    {
234        let inner = Arc::clone(&self.inner);
235        unblock(move || inner.transaction(body)).await
236    }
237
238    /// Run a closure inside a read transaction. See
239    /// [`Db::read_transaction`].
240    ///
241    /// Same `Send + 'static` closure restriction as
242    /// [`AsyncDb::transaction`].
243    ///
244    /// # Errors
245    ///
246    /// As [`Db::read_transaction`].
247    pub async fn read_transaction<R, F>(&self, body: F) -> Result<R>
248    where
249        R: Send + 'static,
250        F: FnOnce(&ReadTxn<'_>) -> Result<R> + Send + 'static,
251    {
252        let inner = Arc::clone(&self.inner);
253        unblock(move || inner.read_transaction(body)).await
254    }
255
256    /// Hot-backup the database to `dest`. See [`Db::backup_to`].
257    ///
258    /// # Errors
259    ///
260    /// As [`Db::backup_to`].
261    pub async fn backup_to<P: AsRef<Path>>(&self, dest: P) -> Result<()> {
262        let inner = Arc::clone(&self.inner);
263        let dest_buf: PathBuf = dest.as_ref().to_path_buf();
264        unblock(move || inner.backup_to(dest_buf)).await
265    }
266
267    /// Attach a read-only `.obj` file under `namespace`. See
268    /// [`Db::attach`].
269    ///
270    /// Takes `&mut self` because the blocking [`Db::attach`] takes
271    /// `&mut Db`. We temporarily unwrap the `Arc<Db>` (it must be
272    /// uniquely owned at this point) so the blocking task can take
273    /// the `&mut`. If the `Arc` is shared, attach falls back to
274    /// [`Error::Busy`](obj_core::Error::Busy) with the
275    /// `WriterInProcess` kind — clone the `AsyncDb` only after all
276    /// `attach` / `detach` calls.
277    ///
278    /// # Errors
279    ///
280    /// As [`Db::attach`], plus
281    /// [`Error::Busy`](obj_core::Error::Busy) when the `Arc<Db>` is
282    /// not uniquely owned at call time.
283    pub async fn attach<P>(&mut self, path: P, namespace: impl Into<String>) -> Result<()>
284    where
285        P: AsRef<Path>,
286    {
287        let path_buf: PathBuf = path.as_ref().to_path_buf();
288        let namespace = namespace.into();
289        self.with_mut_db(move |db| db.attach(&path_buf, namespace))
290            .await
291    }
292
293    /// Detach the attachment registered under `namespace`. See
294    /// [`Db::detach`].
295    ///
296    /// Same `&mut self` contract as [`AsyncDb::attach`].
297    ///
298    /// # Errors
299    ///
300    /// As [`Db::detach`].
301    pub async fn detach(&mut self, namespace: &str) -> Result<()> {
302        let namespace = namespace.to_owned();
303        self.with_mut_db(move |db| db.detach(&namespace)).await
304    }
305
306    /// Run `f` with a `&mut Db`. The `Arc<Db>` inside `self` must be
307    /// uniquely owned at call time; otherwise the function returns
308    /// [`Error::Busy`](obj_core::Error::Busy) so the caller does not
309    /// silently observe stale-state behaviour.
310    ///
311    /// The `&mut self` receiver guarantees no other `&AsyncDb`
312    /// reference borrows `self` for the duration of the call, but
313    /// **clones** of `self` (which hold their own `Arc<Db>`) defeat
314    /// the `Arc::try_unwrap` path. Reserve `attach` / `detach` for
315    /// the bootstrap phase, before the `AsyncDb` is shared across
316    /// tasks.
317    async fn with_mut_db<F, R>(&mut self, f: F) -> Result<R>
318    where
319        F: FnOnce(&mut Db) -> Result<R> + Send + 'static,
320        R: Send + 'static,
321    {
322        // Move `self.inner` out (replacing with a temporary sentinel)
323        // so `Arc::try_unwrap` sees exactly one strong reference. The
324        // sentinel is never observed by the caller — we restore the
325        // real `Arc<Db>` before returning. Power-of-ten Rule 7: every
326        // `try_unwrap` branch is handled; on failure we put the
327        // original `Arc<Db>` back so the receiver is left valid.
328        let sentinel = match Db::memory() {
329            Ok(db) => Arc::new(db),
330            Err(e) => return Err(e),
331        };
332        let original = std::mem::replace(&mut self.inner, sentinel);
333        let mut db = match Arc::try_unwrap(original) {
334            Ok(db) => db,
335            Err(arc) => {
336                // Restore the receiver so `self` is left valid; the
337                // sentinel never escapes.
338                self.inner = arc;
339                return Err(obj_core::Error::Busy {
340                    kind: obj_core::LockKind::WriterInProcess,
341                });
342            }
343        };
344        let (db_back, result) = unblock(move || {
345            let result = f(&mut db);
346            (db, result)
347        })
348        .await;
349        self.inner = Arc::new(db_back);
350        result
351    }
352
353    /// Run [`Db::integrity_check`]. See that method.
354    ///
355    /// # Errors
356    ///
357    /// As [`Db::integrity_check`].
358    pub async fn integrity_check(&self) -> Result<IntegrityReport> {
359        let inner = Arc::clone(&self.inner);
360        unblock(move || inner.integrity_check()).await
361    }
362
363    /// Read [`Db::stat`]. See that method.
364    ///
365    /// # Errors
366    ///
367    /// As [`Db::stat`].
368    pub async fn stat(&self) -> Result<DbStat> {
369        let inner = Arc::clone(&self.inner);
370        unblock(move || inner.stat()).await
371    }
372
373    /// Open a read-only typed handle to a runtime-named collection.
374    /// See [`Db::collection`].
375    ///
376    /// Construction is infallible; the handle dispatches into the
377    /// blocking pool on every read-only method call.
378    #[must_use]
379    pub fn collection<T>(&self, name: impl Into<String>) -> AsyncCollection<T>
380    where
381        T: Document + Send + 'static,
382    {
383        AsyncCollection::lazy(Arc::clone(&self.inner), name.into())
384    }
385
386    /// Construct a fresh [`AsyncQuery`] builder rooted at this
387    /// database. See [`Db::query`].
388    #[must_use]
389    pub fn query<T>(&self) -> AsyncQuery<T>
390    where
391        T: Document + Send + 'static,
392    {
393        AsyncQuery::new(Arc::clone(&self.inner))
394    }
395}
396
397/// Internal shim — `blocking::unblock` plus optional `tracing` span
398/// propagation. Capturing the current span before the hop and
399/// re-entering it inside the blocking task is the documented pattern
400/// for `tracing` across thread-pool work (see the `tracing` crate's
401/// `Span::in_scope` docs).
402///
403/// Power-of-ten Rule 4: the helper centralises the propagation logic
404/// so each call site stays a one-liner.
405pub(crate) async fn unblock<F, R>(f: F) -> R
406where
407    F: FnOnce() -> R + Send + 'static,
408    R: Send + 'static,
409{
410    #[cfg(feature = "tracing")]
411    let span = tracing::Span::current();
412    blocking::unblock(move || {
413        #[cfg(feature = "tracing")]
414        let _guard = span.enter();
415        f()
416    })
417    .await
418}