Skip to main content

nookdb_core/
live.rs

1//! Reactive subsystem: recompute `live()` queries on matching commits.
2//!
3//! `LiveEngine::new` registers ONE `CommitObserver` on the database's
4//! `Notifier` and spawns a single worker thread (`std::thread` +
5//! `std::sync`, zero `unsafe`). `on_commit` only marks matching subs
6//! dirty + wakes the worker (it runs on the committing thread and must
7//! return fast). The worker drains+dedupes the dirty set, recomputes
8//! each via the authoritative M2 `Collection::find`, and emits a JSON
9//! envelope through `EmitSink`. Coalescing is the natural consequence
10//! of draining the dirty set before recompute + the fresh MVCC read
11//! always observing the latest committed state (spec §2/§3).
12use std::collections::{BTreeSet, HashMap};
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Condvar, Mutex, Weak};
16use std::thread::JoinHandle;
17
18use serde_json::Value;
19
20use crate::collection::Collection;
21use crate::database::Database;
22use crate::notify::{CommitEvent, CommitObserver, ObserverHandle};
23use crate::schema::ir::SchemaIr;
24
25/// Identifies one live subscription within a [`LiveEngine`].
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubId(u64);
28
29/// The core ↔ binding emit boundary. `nookdb-napi`'s `TsfnSink`
30/// implements this over a `ThreadsafeFunction`; core never sees `napi`.
31pub trait EmitSink: Send + Sync {
32    /// Delivers one envelope (`{"ok":true,"value":[…]}` or
33    /// `{"ok":false,"error":"[kind] message"}`).
34    ///
35    /// Called on the single live worker thread; an implementation MUST be
36    /// cheap and non-blocking — a blocking `emit` stalls delivery for every
37    /// other subscription.
38    fn emit(&self, envelope_json: &str);
39    /// Returns `true` once the JS side has gone away; the worker then
40    /// drops the subscription.
41    fn is_closed(&self) -> bool;
42}
43
44struct LiveSub {
45    collection: String,
46    filter: Value,
47    options: crate::query::QueryOptions,
48    sink: Arc<dyn EmitSink>,
49    dirty: bool,
50}
51
52struct LiveShared {
53    db: Arc<Database>,
54    schema: Arc<SchemaIr>,
55    subs: Mutex<HashMap<u64, LiveSub>>,
56    /// Set when any sub is dirty or on shutdown; pairs with `cv`.
57    wake: Mutex<bool>,
58    cv: Condvar,
59    shutdown: AtomicBool,
60    next_id: AtomicU64,
61}
62
63impl LiveShared {
64    fn wake_worker(&self) {
65        *self
66            .wake
67            .lock()
68            .unwrap_or_else(std::sync::PoisonError::into_inner) = true;
69        self.cv.notify_one();
70    }
71}
72
73/// The single registered observer. Holds a `Weak<LiveShared>` to break
74/// the `Database → Notifier → observer → LiveShared → Database` cycle.
75struct ReactiveObserver {
76    shared: Weak<LiveShared>,
77}
78
79impl CommitObserver for ReactiveObserver {
80    fn on_commit(&self, ev: &CommitEvent) {
81        let Some(shared) = self.shared.upgrade() else {
82            return;
83        };
84        let touched = ev.touched_collections();
85        let mut any = false;
86        if let Ok(mut subs) = shared.subs.lock() {
87            for s in subs.values_mut() {
88                if touched.contains(s.collection.as_str()) {
89                    s.dirty = true;
90                    any = true;
91                }
92            }
93        }
94        if any {
95            shared.wake_worker();
96        }
97    }
98}
99
100/// Owns the reactive worker + the observer registration. Drop joins
101/// the worker and unregisters the observer (via the [`ObserverHandle`]).
102pub struct LiveEngine {
103    shared: Arc<LiveShared>,
104    worker: Option<JoinHandle<()>>,
105    _obs: ObserverHandle,
106}
107
108impl LiveEngine {
109    /// Builds the engine, registers the reactive observer on `db`'s
110    /// notifier, and spawns the worker.
111    // `db`/`schema` are taken by owned `Arc` (not `&Arc`) on purpose:
112    // this is the stable surface the NAPI binding (a later task)
113    // constructs from the opened DB + compiled IR and then *moves*
114    // ownership of into the engine (the worker outlives the call).
115    #[allow(clippy::needless_pass_by_value)]
116    #[must_use]
117    pub fn new(db: Arc<Database>, schema: Arc<SchemaIr>) -> Arc<Self> {
118        let shared = Arc::new(LiveShared {
119            db: db.clone(),
120            schema,
121            subs: Mutex::new(HashMap::new()),
122            wake: Mutex::new(false),
123            cv: Condvar::new(),
124            shutdown: AtomicBool::new(false),
125            next_id: AtomicU64::new(0),
126        });
127        let obs = db.add_observer(Arc::new(ReactiveObserver {
128            shared: Arc::downgrade(&shared),
129        }));
130        let worker = {
131            let shared = shared.clone();
132            std::thread::spawn(move || worker_loop(&shared))
133        };
134        Arc::new(Self {
135            shared,
136            worker: Some(worker),
137            _obs: obs,
138        })
139    }
140
141    /// Registers a live query. Returns its id and the **synchronously
142    /// computed** initial snapshot envelope (so `.value` is populated
143    /// without waiting for the first commit).
144    #[must_use]
145    pub fn register(
146        &self,
147        collection: &str,
148        filter: Value,
149        options: crate::query::QueryOptions,
150        sink: Arc<dyn EmitSink>,
151    ) -> (SubId, String) {
152        let initial = recompute_envelope(&self.shared, collection, &filter, &options);
153        let id = self.shared.next_id.fetch_add(1, Ordering::Relaxed);
154        if let Ok(mut subs) = self.shared.subs.lock() {
155            subs.insert(
156                id,
157                LiveSub {
158                    collection: collection.to_string(),
159                    filter,
160                    options,
161                    sink,
162                    dirty: false,
163                },
164            );
165        }
166        (SubId(id), initial)
167    }
168
169    /// Cancels a subscription; no further emissions for it.
170    pub fn cancel(&self, sub: SubId) {
171        if let Ok(mut subs) = self.shared.subs.lock() {
172            subs.remove(&sub.0);
173        }
174    }
175}
176
177impl Drop for LiveEngine {
178    fn drop(&mut self) {
179        self.shared.shutdown.store(true, Ordering::SeqCst);
180        self.shared.wake_worker();
181        if let Some(j) = self.worker.take() {
182            let _ = j.join();
183        }
184    }
185}
186
187/// Runs `Collection::find` (the authoritative M2 path) against a fresh
188/// MVCC read and serialises the envelope. Any error becomes an
189/// `{"ok":false,"error":"[kind] message"}` envelope (the `[kind]`
190/// convention shared with the NAPI error mapping).
191fn recompute_envelope(
192    shared: &LiveShared,
193    collection: &str,
194    filter: &Value,
195    options: &crate::query::QueryOptions,
196) -> String {
197    let run = || -> Result<Vec<Value>, crate::error::NookError> {
198        Collection::new(&shared.db, &shared.schema, collection)?.find_with(filter, options)
199    };
200    match catch_unwind(AssertUnwindSafe(run)) {
201        Ok(Ok(docs)) => serde_json::json!({ "ok": true, "value": docs }).to_string(),
202        Ok(Err(e)) => serde_json::json!({
203            "ok": false,
204            "error": format!("[{}] {}", e.kind().as_str(), e)
205        })
206        .to_string(),
207        Err(_) => serde_json::json!({
208            "ok": false,
209            "error": "[storage] live recompute panicked"
210        })
211        .to_string(),
212    }
213}
214
215/// One dirty subscription snapshotted for lock-free recompute:
216/// `(id, collection, filter, options, sink)`.
217type WorkItem = (
218    u64,
219    String,
220    Value,
221    crate::query::QueryOptions,
222    Arc<dyn EmitSink>,
223);
224
225fn worker_loop(shared: &LiveShared) {
226    loop {
227        // Wait until woken (dirty sub or shutdown).
228        {
229            let mut woke = shared
230                .wake
231                .lock()
232                .unwrap_or_else(std::sync::PoisonError::into_inner);
233            while !*woke && !shared.shutdown.load(Ordering::SeqCst) {
234                woke = shared
235                    .cv
236                    .wait(woke)
237                    .unwrap_or_else(std::sync::PoisonError::into_inner);
238            }
239            *woke = false;
240        }
241        if shared.shutdown.load(Ordering::SeqCst) {
242            return;
243        }
244        // Snapshot the dirty subs' descriptors under a short lock, clear
245        // their dirty flags (coalesce), then recompute lock-free.
246        let work: Vec<WorkItem> = {
247            let Ok(mut subs) = shared.subs.lock() else {
248                continue;
249            };
250            subs.iter_mut()
251                .filter(|(_, s)| s.dirty)
252                .map(|(id, s)| {
253                    s.dirty = false;
254                    (
255                        *id,
256                        s.collection.clone(),
257                        s.filter.clone(),
258                        s.options.clone(),
259                        s.sink.clone(),
260                    )
261                })
262                .collect()
263        };
264        let mut dead: BTreeSet<u64> = BTreeSet::new();
265        for (id, collection, filter, options, sink) in work {
266            if sink.is_closed() {
267                dead.insert(id);
268                continue;
269            }
270            // Skip if cancelled between snapshot and now.
271            if shared.subs.lock().map_or(true, |s| !s.contains_key(&id)) {
272                continue;
273            }
274            let env = recompute_envelope(shared, &collection, &filter, &options);
275            sink.emit(&env);
276        }
277        if !dead.is_empty() {
278            if let Ok(mut subs) = shared.subs.lock() {
279                for id in dead {
280                    subs.remove(&id);
281                }
282            }
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::database::Database;
291    use crate::schema::ir::SchemaIr;
292    use std::sync::{Arc, Mutex};
293    use std::time::{Duration, Instant};
294
295    fn setup() -> (tempfile::TempDir, Arc<Database>, Arc<SchemaIr>) {
296        let d = tempfile::tempdir().unwrap();
297        let db = Arc::new(Database::open(d.path().join("t.db")).unwrap());
298        let ir = Arc::new(
299            SchemaIr::compile(
300                r#"{"u":{"idField":"id","fields":[
301                  {"name":"id","type":"id"},
302                  {"name":"role","type":"enum","variants":["admin","user"]},
303                  {"name":"n","type":"number","optional":true}],
304                  "indexes":[{"field":"role","unique":false}]}}"#,
305            )
306            .unwrap(),
307        );
308        (d, db, ir)
309    }
310
311    /// Test sink: records every emitted envelope string.
312    #[derive(Default)]
313    struct VecSink(Mutex<Vec<String>>);
314    impl EmitSink for VecSink {
315        fn emit(&self, envelope_json: &str) {
316            self.0.lock().unwrap().push(envelope_json.to_string());
317        }
318        fn is_closed(&self) -> bool {
319            false
320        }
321    }
322
323    fn wait_until<F: Fn() -> bool>(f: F) {
324        let start = Instant::now();
325        while !f() {
326            assert!(
327                start.elapsed() < Duration::from_secs(5),
328                "live emission timed out"
329            );
330            std::thread::sleep(Duration::from_millis(5));
331        }
332    }
333
334    fn insert(db: &Database, ir: &SchemaIr, doc: &serde_json::Value) {
335        crate::collection::Collection::new(db, ir, "u")
336            .unwrap()
337            .insert(doc)
338            .unwrap();
339    }
340
341    #[test]
342    fn register_returns_initial_snapshot_then_emits_on_matching_commit() {
343        let (_d, db, ir) = setup();
344        insert(&db, &ir, &serde_json::json!({"id":"1","role":"admin"}));
345        let engine = LiveEngine::new(db.clone(), ir.clone());
346        let sink = Arc::new(VecSink::default());
347        let (_sub, initial) = engine.register(
348            "u",
349            serde_json::json!({"role":"admin"}),
350            crate::query::QueryOptions::default(),
351            sink.clone(),
352        );
353        assert!(initial.contains("\"ok\":true"));
354        assert!(
355            initial.contains("\"1\""),
356            "initial snapshot has the existing admin"
357        );
358
359        insert(&db, &ir, &serde_json::json!({"id":"2","role":"admin"}));
360        wait_until(|| !sink.0.lock().unwrap().is_empty());
361        let last = sink.0.lock().unwrap().last().unwrap().clone();
362        assert!(last.contains("\"ok\":true") && last.contains("\"2\""));
363    }
364
365    #[test]
366    fn register_with_options_sorts_and_limits_initial_and_recompute() {
367        let (_d, db, ir) = setup();
368        for (id, n) in [("a", 3), ("b", 1), ("c", 2)] {
369            insert(
370                &db,
371                &ir,
372                &serde_json::json!({"id": id, "role": "user", "n": n}),
373            );
374        }
375        let engine = LiveEngine::new(db.clone(), ir.clone());
376        let sink = Arc::new(VecSink::default());
377        let opts =
378            crate::query::QueryOptions::parse(Some(r#"{"sort":[["n","asc"]],"limit":2}"#)).unwrap();
379        let (_sub, initial) =
380            engine.register("u", serde_json::json!({"role":"user"}), opts, sink.clone());
381        let v: serde_json::Value = serde_json::from_str(&initial).unwrap();
382        let ids: Vec<_> = v["value"]
383            .as_array()
384            .unwrap()
385            .iter()
386            .map(|d| d["id"].as_str().unwrap().to_string())
387            .collect();
388        assert_eq!(ids, vec!["b", "c"]);
389
390        insert(&db, &ir, &serde_json::json!({"id":"d","role":"user","n":0}));
391        wait_until(|| !sink.0.lock().unwrap().is_empty());
392        let last = sink.0.lock().unwrap().last().unwrap().clone();
393        let last_v: serde_json::Value = serde_json::from_str(&last).unwrap();
394        let last_ids: Vec<_> = last_v["value"]
395            .as_array()
396            .unwrap()
397            .iter()
398            .map(|d| d["id"].as_str().unwrap().to_string())
399            .collect();
400        assert_eq!(last_ids, vec!["d", "b"]);
401    }
402
403    #[test]
404    fn a_commit_to_an_unrelated_collection_does_not_emit() {
405        let (_d, db, ir) = setup();
406        let engine = LiveEngine::new(db.clone(), ir);
407        let sink = Arc::new(VecSink::default());
408        let (_s, _i) = engine.register(
409            "u",
410            serde_json::json!({}),
411            crate::query::QueryOptions::default(),
412            sink.clone(),
413        );
414        // write to a different collection through the bytes API
415        db.write(|tx| tx.put("other", b"x", b"y")).unwrap();
416        std::thread::sleep(Duration::from_millis(50));
417        assert!(sink.0.lock().unwrap().is_empty());
418    }
419
420    #[test]
421    fn cancel_stops_further_emissions() {
422        let (_d, db, ir) = setup();
423        let engine = LiveEngine::new(db.clone(), ir.clone());
424        let sink = Arc::new(VecSink::default());
425        let (sub, _i) = engine.register(
426            "u",
427            serde_json::json!({}),
428            crate::query::QueryOptions::default(),
429            sink.clone(),
430        );
431        engine.cancel(sub);
432        insert(&db, &ir, &serde_json::json!({"id":"9","role":"user"}));
433        std::thread::sleep(Duration::from_millis(50));
434        assert!(
435            sink.0.lock().unwrap().is_empty(),
436            "no emission after cancel"
437        );
438    }
439
440    #[test]
441    fn rapid_commits_coalesce_to_a_snapshot_with_the_final_state() {
442        let (_d, db, ir) = setup();
443        let engine = LiveEngine::new(db.clone(), ir.clone());
444        let sink = Arc::new(VecSink::default());
445        let (_s, _i) = engine.register(
446            "u",
447            serde_json::json!({}),
448            crate::query::QueryOptions::default(),
449            sink.clone(),
450        );
451        for i in 0..20 {
452            insert(
453                &db,
454                &ir,
455                &serde_json::json!({"id":format!("{i}"),"role":"user"}),
456            );
457        }
458        wait_until(|| {
459            sink.0
460                .lock()
461                .unwrap()
462                .last()
463                .is_some_and(|s| s.contains("\"19\""))
464        });
465        let emissions = sink.0.lock().unwrap().len();
466        assert!(emissions <= 20, "coalesced: fewer emissions than commits");
467        assert!(emissions >= 1);
468        // Non-vacuous final-state check: the last envelope must be a
469        // successful snapshot containing every inserted id "0".."19".
470        // This proves the "fresh MVCC read sees the latest committed state"
471        // guarantee without any timing dependence (wait_until already
472        // ensured "19" is present before we reach this point).
473        let last = sink.0.lock().unwrap().last().unwrap().clone();
474        assert!(
475            last.contains("\"ok\":true"),
476            "final emission is a snapshot, not an error"
477        );
478        for i in 0..20 {
479            assert!(
480                last.contains(&format!("\"{i}\"")),
481                "final coalesced snapshot must contain id {i} (saw: {last})"
482            );
483        }
484    }
485
486    proptest::proptest! {
487        #![proptest_config(proptest::prelude::ProptestConfig::with_cases(24))]
488        #[test]
489        fn emitted_snapshot_equals_one_shot_find(
490            ops in proptest::collection::vec((0u32..8, proptest::bool::ANY), 1..16)
491        ) {
492            let (_d, db, ir) = setup();
493            let engine = LiveEngine::new(db.clone(), ir.clone());
494            let sink = Arc::new(VecSink::default());
495            let (_s, _i) = engine.register("u", serde_json::json!({"role":"admin"}), crate::query::QueryOptions::default(), sink.clone());
496
497            for (n, is_admin) in &ops {
498                let role = if *is_admin { "admin" } else { "user" };
499                let _ = crate::collection::Collection::new(&db, &ir, "u").unwrap()
500                    .insert(&serde_json::json!({"id": format!("{n}"), "role": role}));
501            }
502
503            // Authoritative one-shot result over the final state.
504            let want = crate::collection::Collection::new(&db, &ir, "u").unwrap()
505                .find(&serde_json::json!({"role":"admin"})).unwrap();
506            let want_ids: std::collections::BTreeSet<String> = want.iter()
507                .map(|d| d["id"].as_str().unwrap().to_string()).collect();
508
509            // The LAST emitted snapshot (coalesced) must equal `want`.
510            wait_until(|| {
511                sink.0.lock().unwrap().last().map_or(want_ids.is_empty(), |s| {
512                    serde_json::from_str::<serde_json::Value>(s).ok()
513                        .and_then(|v| v.get("value").cloned())
514                        .is_some_and(|val| {
515                            let got: std::collections::BTreeSet<String> = val.as_array().unwrap()
516                                .iter().map(|d| d["id"].as_str().unwrap().to_string()).collect();
517                            got == want_ids
518                        })
519                })
520            });
521
522            let last = sink.0.lock().unwrap().last().cloned();
523            let emitted_ids: std::collections::BTreeSet<String> = last.map_or_else(
524                std::collections::BTreeSet::new,
525                |s| {
526                    serde_json::from_str::<serde_json::Value>(&s).unwrap()["value"]
527                        .as_array().unwrap().iter()
528                        .map(|d| d["id"].as_str().unwrap().to_string()).collect()
529                },
530            );
531            proptest::prop_assert_eq!(emitted_ids, want_ids);
532        }
533    }
534}