obj-db 1.1.1

Embedded document database. Stable file format, full ACID, single-file portability.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! `AsyncDb` — async-facing wrapper over the blocking [`Db`].
//!
//! Each method clones the inner `Arc<Db>` and moves the clone into a
//! [`blocking::unblock`] task; the task runs the corresponding
//! synchronous method to completion. The blocking-task return value is
//! `Send + 'static`, which the existing blocking surface already
//! satisfies (every `Db` method returns `Result<T>` with `T: Send +
//! 'static` for every type we wrap here).

use std::path::{Path, PathBuf};
use std::sync::Arc;

use obj_core::Document;
use obj_core::{Id, Result};

use crate::asynchronous::collection::AsyncCollection;
use crate::asynchronous::query::AsyncQuery;
use crate::{Config, Db, DbStat, IntegrityReport, ReadTxn, WriteTxn};

/// Async-facing wrapper around the blocking [`Db`].
///
/// `AsyncDb` is cheap to clone (one `Arc` bump) so it can be shared
/// across spawned tasks without locking. The blocking engine sits
/// behind a single `Arc<Db>`; every async method hands its body off
/// to the [`blocking`] thread pool.
///
/// Public construction goes through [`AsyncDb::open`] /
/// [`AsyncDb::open_with`] / [`AsyncDb::memory`] /
/// [`AsyncDb::memory_with`] / [`AsyncDb::open_readonly`] /
/// [`AsyncDb::from_blocking`].
#[derive(Clone, Debug)]
pub struct AsyncDb {
    inner: Arc<Db>,
}

impl AsyncDb {
    /// Construct an `AsyncDb` from an already-opened blocking [`Db`].
    ///
    /// Synchronous on purpose — wrapping an in-hand `Db` does no
    /// I/O. Useful when the caller already opened the database from
    /// a blocking context (e.g. CLI bootstrap) and wants to drive it
    /// async-style from the rest of the program.
    #[must_use]
    pub fn from_blocking(db: Db) -> Self {
        Self {
            inner: Arc::new(db),
        }
    }

    /// Open or create a file-backed database at `path` with default
    /// configuration. See [`Db::open`].
    ///
    /// # Errors
    ///
    /// As [`Db::open`].
    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        Self::open_with(path, Config::default()).await
    }

    /// Open or create a file-backed database with `config`. See
    /// [`Db::open_with`].
    ///
    /// # Errors
    ///
    /// As [`Db::open_with`].
    pub async fn open_with<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
        let path_buf: PathBuf = path.as_ref().to_path_buf();
        unblock(move || Db::open_with(path_buf, config).map(Self::from_blocking)).await
    }

    /// Open a fresh in-memory database. See [`Db::memory`].
    ///
    /// # Errors
    ///
    /// As [`Db::memory`].
    pub async fn memory() -> Result<Self> {
        Self::memory_with(Config::default()).await
    }

    /// As [`AsyncDb::memory`] with a caller-supplied [`Config`].
    ///
    /// # Errors
    ///
    /// As [`Db::memory_with`].
    pub async fn memory_with(config: Config) -> Result<Self> {
        unblock(move || Db::memory_with(config).map(Self::from_blocking)).await
    }

    /// Open the database at `path` read-only. See [`Db::open_readonly`].
    ///
    /// # Errors
    ///
    /// As [`Db::open_readonly`].
    pub async fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
        let path_buf: PathBuf = path.as_ref().to_path_buf();
        unblock(move || Db::open_readonly(path_buf).map(Self::from_blocking)).await
    }

    /// Borrow the underlying blocking [`Db`].
    ///
    /// Useful when async code needs to hand a `&Db` to a third-party
    /// API that only takes the blocking type — note the borrow is
    /// scoped to the local task, not the blocking pool.
    #[must_use]
    pub fn as_blocking(&self) -> &Db {
        &self.inner
    }

    /// Insert `doc`. See [`Db::insert`].
    ///
    /// # Errors
    ///
    /// As [`Db::insert`].
    pub async fn insert<T>(&self, doc: T) -> Result<Id>
    where
        T: Document + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.insert(doc)).await
    }

    /// Fetch the document at `id`. See [`Db::get`].
    ///
    /// # Errors
    ///
    /// As [`Db::get`].
    pub async fn get<T>(&self, id: Id) -> Result<Option<T>>
    where
        T: Document + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.get::<T>(id)).await
    }

    /// Update the document at `id`. See [`Db::update`].
    ///
    /// The closure runs **synchronously** inside the blocking task,
    /// so it must be `Send + 'static`. This mirrors the wider
    /// "closure runs on the blocking pool" contract documented on
    /// [`AsyncDb::transaction`].
    ///
    /// # Errors
    ///
    /// As [`Db::update`].
    pub async fn update<T, F>(&self, id: Id, f: F) -> Result<()>
    where
        T: Document + Send + 'static,
        F: FnOnce(&mut T) + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.update::<T, F>(id, f)).await
    }

    /// Delete the document at `id`. See [`Db::delete`].
    ///
    /// # Errors
    ///
    /// As [`Db::delete`].
    pub async fn delete<T>(&self, id: Id) -> Result<bool>
    where
        T: Document + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.delete::<T>(id)).await
    }

    /// Insert-or-replace the document at `id`. See [`Db::upsert`].
    ///
    /// # Errors
    ///
    /// As [`Db::upsert`].
    pub async fn upsert<T>(&self, id: Id, doc: T) -> Result<()>
    where
        T: Document + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.upsert(id, doc)).await
    }

    /// Point lookup on a `Unique` index. See [`Db::find_unique`].
    ///
    /// # Errors
    ///
    /// As [`Db::find_unique`].
    pub async fn find_unique<T, K>(&self, index_name: &str, key: K) -> Result<Option<T>>
    where
        T: Document + Send + 'static,
        K: Into<obj_core::codec::Dynamic> + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        let name = index_name.to_owned();
        unblock(move || inner.find_unique::<T>(&name, key)).await
    }

    /// Materialise the collection's primary B-tree into `Vec<T>`. See
    /// [`Db::all`].
    ///
    /// Streaming async iteration is intentionally **not** wrapped in
    /// this phase — the entire collection is collected inside the
    /// blocking task. For very large collections, drive the blocking
    /// [`Db::iter_all`] from a dedicated `tokio::task::spawn_blocking`
    /// (or equivalent) until the async streaming surface lands.
    ///
    /// # Errors
    ///
    /// As [`Db::all`].
    pub async fn all<T>(&self) -> Result<Vec<T>>
    where
        T: Document + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.all::<T>()).await
    }

    /// Run a closure inside a write transaction. See
    /// [`Db::transaction`].
    ///
    /// # Closure contract
    ///
    /// The closure runs **synchronously** inside the blocking task,
    /// so it must be `Send + 'static`; the return value `R` likewise.
    /// No `async fn` inside the closure — that is a deliberate
    /// "async-over-blocking" restriction, matching the contract used
    /// by sqlx and other async-over-sync database wrappers.
    ///
    /// # Errors
    ///
    /// As [`Db::transaction`].
    pub async fn transaction<R, F>(&self, body: F) -> Result<R>
    where
        R: Send + 'static,
        F: FnOnce(&mut WriteTxn<'_>) -> Result<R> + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.transaction(body)).await
    }

    /// Run a closure inside a read transaction. See
    /// [`Db::read_transaction`].
    ///
    /// Same `Send + 'static` closure restriction as
    /// [`AsyncDb::transaction`].
    ///
    /// # Errors
    ///
    /// As [`Db::read_transaction`].
    pub async fn read_transaction<R, F>(&self, body: F) -> Result<R>
    where
        R: Send + 'static,
        F: FnOnce(&ReadTxn<'_>) -> Result<R> + Send + 'static,
    {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.read_transaction(body)).await
    }

    /// Hot-backup the database to `dest`. See [`Db::backup_to`].
    ///
    /// # Errors
    ///
    /// As [`Db::backup_to`].
    pub async fn backup_to<P: AsRef<Path>>(&self, dest: P) -> Result<()> {
        let inner = Arc::clone(&self.inner);
        let dest_buf: PathBuf = dest.as_ref().to_path_buf();
        unblock(move || inner.backup_to(dest_buf)).await
    }

    /// Attach a read-only `.obj` file under `namespace`. See
    /// [`Db::attach`].
    ///
    /// Takes `&mut self` because the blocking [`Db::attach`] takes
    /// `&mut Db`. We temporarily unwrap the `Arc<Db>` (it must be
    /// uniquely owned at this point) so the blocking task can take
    /// the `&mut`. If the `Arc` is shared, attach falls back to
    /// [`Error::Busy`](obj_core::Error::Busy) with the
    /// `WriterInProcess` kind — clone the `AsyncDb` only after all
    /// `attach` / `detach` calls.
    ///
    /// # Errors
    ///
    /// As [`Db::attach`], plus
    /// [`Error::Busy`](obj_core::Error::Busy) when the `Arc<Db>` is
    /// not uniquely owned at call time.
    pub async fn attach<P>(&mut self, path: P, namespace: impl Into<String>) -> Result<()>
    where
        P: AsRef<Path>,
    {
        let path_buf: PathBuf = path.as_ref().to_path_buf();
        let namespace = namespace.into();
        self.with_mut_db(move |db| db.attach(&path_buf, namespace))
            .await
    }

    /// Detach the attachment registered under `namespace`. See
    /// [`Db::detach`].
    ///
    /// Same `&mut self` contract as [`AsyncDb::attach`].
    ///
    /// # Errors
    ///
    /// As [`Db::detach`].
    pub async fn detach(&mut self, namespace: &str) -> Result<()> {
        let namespace = namespace.to_owned();
        self.with_mut_db(move |db| db.detach(&namespace)).await
    }

    /// Run `f` with a `&mut Db`. The `Arc<Db>` inside `self` must be
    /// uniquely owned at call time; otherwise the function returns
    /// [`Error::Busy`](obj_core::Error::Busy) so the caller does not
    /// silently observe stale-state behaviour.
    ///
    /// The `&mut self` receiver guarantees no other `&AsyncDb`
    /// reference borrows `self` for the duration of the call, but
    /// **clones** of `self` (which hold their own `Arc<Db>`) defeat
    /// the `Arc::try_unwrap` path. Reserve `attach` / `detach` for
    /// the bootstrap phase, before the `AsyncDb` is shared across
    /// tasks.
    async fn with_mut_db<F, R>(&mut self, f: F) -> Result<R>
    where
        F: FnOnce(&mut Db) -> Result<R> + Send + 'static,
        R: Send + 'static,
    {
        // Move `self.inner` out (replacing with a temporary sentinel)
        // so `Arc::try_unwrap` sees exactly one strong reference. The
        // sentinel is never observed by the caller — we restore the
        // real `Arc<Db>` before returning. Power-of-ten Rule 7: every
        // `try_unwrap` branch is handled; on failure we put the
        // original `Arc<Db>` back so the receiver is left valid.
        let sentinel = match Db::memory() {
            Ok(db) => Arc::new(db),
            Err(e) => return Err(e),
        };
        let original = std::mem::replace(&mut self.inner, sentinel);
        let mut db = match Arc::try_unwrap(original) {
            Ok(db) => db,
            Err(arc) => {
                // Restore the receiver so `self` is left valid; the
                // sentinel never escapes.
                self.inner = arc;
                return Err(obj_core::Error::Busy {
                    kind: obj_core::LockKind::WriterInProcess,
                });
            }
        };
        let (db_back, result) = unblock(move || {
            let result = f(&mut db);
            (db, result)
        })
        .await;
        self.inner = Arc::new(db_back);
        result
    }

    /// Run [`Db::integrity_check`]. See that method.
    ///
    /// # Errors
    ///
    /// As [`Db::integrity_check`].
    pub async fn integrity_check(&self) -> Result<IntegrityReport> {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.integrity_check()).await
    }

    /// Read [`Db::stat`]. See that method.
    ///
    /// # Errors
    ///
    /// As [`Db::stat`].
    pub async fn stat(&self) -> Result<DbStat> {
        let inner = Arc::clone(&self.inner);
        unblock(move || inner.stat()).await
    }

    /// Open a read-only typed handle to a runtime-named collection.
    /// See [`Db::collection`].
    ///
    /// Construction is infallible; the handle dispatches into the
    /// blocking pool on every read-only method call.
    #[must_use]
    pub fn collection<T>(&self, name: impl Into<String>) -> AsyncCollection<T>
    where
        T: Document + Send + 'static,
    {
        AsyncCollection::lazy(Arc::clone(&self.inner), name.into())
    }

    /// Construct a fresh [`AsyncQuery`] builder rooted at this
    /// database. See [`Db::query`].
    #[must_use]
    pub fn query<T>(&self) -> AsyncQuery<T>
    where
        T: Document + Send + 'static,
    {
        AsyncQuery::new(Arc::clone(&self.inner))
    }
}

/// Internal shim — `blocking::unblock` plus optional `tracing` span
/// propagation. Capturing the current span before the hop and
/// re-entering it inside the blocking task is the documented pattern
/// for `tracing` across thread-pool work (see the `tracing` crate's
/// `Span::in_scope` docs).
///
/// Power-of-ten Rule 4: the helper centralises the propagation logic
/// so each call site stays a one-liner.
pub(crate) async fn unblock<F, R>(f: F) -> R
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    #[cfg(feature = "tracing")]
    let span = tracing::Span::current();
    blocking::unblock(move || {
        #[cfg(feature = "tracing")]
        let _guard = span.enter();
        f()
    })
    .await
}