Skip to main content

nookdb_core/
live.rs

1//! Reactive subsystem: recompute `live()` queries on matching commits.
2//!
3//! `LiveEngine::new` registers ONE `CommitObserver` on the database's
4//! `Notifier` and spawns a single worker thread (`std::thread` +
5//! `std::sync`, zero `unsafe`). `on_commit` only marks matching subs
6//! dirty + wakes the worker (it runs on the committing thread and must
7//! return fast). The worker drains+dedupes the dirty set, recomputes
8//! each via the authoritative M2 `Collection::find`, and emits a JSON
9//! envelope through `EmitSink`. Coalescing is the natural consequence
10//! of draining the dirty set before recompute + the fresh MVCC read
11//! always observing the latest committed state (spec §2/§3).
12use std::collections::{BTreeSet, HashMap};
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Condvar, Mutex, Weak};
16use std::thread::JoinHandle;
17
18use serde_json::Value;
19
20use crate::collection::Collection;
21use crate::database::Database;
22use crate::notify::{CommitEvent, CommitObserver, ObserverHandle};
23use crate::schema::ir::SchemaIr;
24
25/// Identifies one live subscription within a [`LiveEngine`].
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubId(u64);
28
29/// The core ↔ binding emit boundary. `nookdb-napi`'s `TsfnSink`
30/// implements this over a `ThreadsafeFunction`; core never sees `napi`.
31pub trait EmitSink: Send + Sync {
32    /// Delivers one envelope (`{"ok":true,"value":[…]}` or
33    /// `{"ok":false,"error":"[kind] message"}`).
34    ///
35    /// Called on the single live worker thread; an implementation MUST be
36    /// cheap and non-blocking — a blocking `emit` stalls delivery for every
37    /// other subscription.
38    fn emit(&self, envelope_json: &str);
39    /// Returns `true` once the JS side has gone away; the worker then
40    /// drops the subscription.
41    fn is_closed(&self) -> bool;
42}
43
44struct LiveSub {
45    collection: String,
46    filter: Value,
47    sink: Arc<dyn EmitSink>,
48    dirty: bool,
49}
50
51struct LiveShared {
52    db: Arc<Database>,
53    schema: Arc<SchemaIr>,
54    subs: Mutex<HashMap<u64, LiveSub>>,
55    /// Set when any sub is dirty or on shutdown; pairs with `cv`.
56    wake: Mutex<bool>,
57    cv: Condvar,
58    shutdown: AtomicBool,
59    next_id: AtomicU64,
60}
61
62impl LiveShared {
63    fn wake_worker(&self) {
64        *self
65            .wake
66            .lock()
67            .unwrap_or_else(std::sync::PoisonError::into_inner) = true;
68        self.cv.notify_one();
69    }
70}
71
72/// The single registered observer. Holds a `Weak<LiveShared>` to break
73/// the `Database → Notifier → observer → LiveShared → Database` cycle.
74struct ReactiveObserver {
75    shared: Weak<LiveShared>,
76}
77
78impl CommitObserver for ReactiveObserver {
79    fn on_commit(&self, ev: &CommitEvent) {
80        let Some(shared) = self.shared.upgrade() else {
81            return;
82        };
83        let touched = ev.touched_collections();
84        let mut any = false;
85        if let Ok(mut subs) = shared.subs.lock() {
86            for s in subs.values_mut() {
87                if touched.contains(s.collection.as_str()) {
88                    s.dirty = true;
89                    any = true;
90                }
91            }
92        }
93        if any {
94            shared.wake_worker();
95        }
96    }
97}
98
99/// Owns the reactive worker + the observer registration. Drop joins
100/// the worker and unregisters the observer (via the [`ObserverHandle`]).
101pub struct LiveEngine {
102    shared: Arc<LiveShared>,
103    worker: Option<JoinHandle<()>>,
104    _obs: ObserverHandle,
105}
106
107impl LiveEngine {
108    /// Builds the engine, registers the reactive observer on `db`'s
109    /// notifier, and spawns the worker.
110    // `db`/`schema` are taken by owned `Arc` (not `&Arc`) on purpose:
111    // this is the stable surface the NAPI binding (a later task)
112    // constructs from the opened DB + compiled IR and then *moves*
113    // ownership of into the engine (the worker outlives the call).
114    #[allow(clippy::needless_pass_by_value)]
115    #[must_use]
116    pub fn new(db: Arc<Database>, schema: Arc<SchemaIr>) -> Arc<Self> {
117        let shared = Arc::new(LiveShared {
118            db: db.clone(),
119            schema,
120            subs: Mutex::new(HashMap::new()),
121            wake: Mutex::new(false),
122            cv: Condvar::new(),
123            shutdown: AtomicBool::new(false),
124            next_id: AtomicU64::new(0),
125        });
126        let obs = db.add_observer(Arc::new(ReactiveObserver {
127            shared: Arc::downgrade(&shared),
128        }));
129        let worker = {
130            let shared = shared.clone();
131            std::thread::spawn(move || worker_loop(&shared))
132        };
133        Arc::new(Self {
134            shared,
135            worker: Some(worker),
136            _obs: obs,
137        })
138    }
139
140    /// Registers a live query. Returns its id and the **synchronously
141    /// computed** initial snapshot envelope (so `.value` is populated
142    /// without waiting for the first commit).
143    #[must_use]
144    pub fn register(
145        &self,
146        collection: &str,
147        filter: Value,
148        sink: Arc<dyn EmitSink>,
149    ) -> (SubId, String) {
150        let initial = recompute_envelope(&self.shared, collection, &filter);
151        let id = self.shared.next_id.fetch_add(1, Ordering::Relaxed);
152        if let Ok(mut subs) = self.shared.subs.lock() {
153            subs.insert(
154                id,
155                LiveSub {
156                    collection: collection.to_string(),
157                    filter,
158                    sink,
159                    dirty: false,
160                },
161            );
162        }
163        (SubId(id), initial)
164    }
165
166    /// Cancels a subscription; no further emissions for it.
167    pub fn cancel(&self, sub: SubId) {
168        if let Ok(mut subs) = self.shared.subs.lock() {
169            subs.remove(&sub.0);
170        }
171    }
172}
173
174impl Drop for LiveEngine {
175    fn drop(&mut self) {
176        self.shared.shutdown.store(true, Ordering::SeqCst);
177        self.shared.wake_worker();
178        if let Some(j) = self.worker.take() {
179            let _ = j.join();
180        }
181    }
182}
183
184/// Runs `Collection::find` (the authoritative M2 path) against a fresh
185/// MVCC read and serialises the envelope. Any error becomes an
186/// `{"ok":false,"error":"[kind] message"}` envelope (the `[kind]`
187/// convention shared with the NAPI error mapping).
188fn recompute_envelope(shared: &LiveShared, collection: &str, filter: &Value) -> String {
189    let run = || -> Result<Vec<Value>, crate::error::NookError> {
190        Collection::new(&shared.db, &shared.schema, collection)?.find(filter)
191    };
192    match catch_unwind(AssertUnwindSafe(run)) {
193        Ok(Ok(docs)) => serde_json::json!({ "ok": true, "value": docs }).to_string(),
194        Ok(Err(e)) => serde_json::json!({
195            "ok": false,
196            "error": format!("[{}] {}", e.kind().as_str(), e)
197        })
198        .to_string(),
199        Err(_) => serde_json::json!({
200            "ok": false,
201            "error": "[storage] live recompute panicked"
202        })
203        .to_string(),
204    }
205}
206
207fn worker_loop(shared: &LiveShared) {
208    loop {
209        // Wait until woken (dirty sub or shutdown).
210        {
211            let mut woke = shared
212                .wake
213                .lock()
214                .unwrap_or_else(std::sync::PoisonError::into_inner);
215            while !*woke && !shared.shutdown.load(Ordering::SeqCst) {
216                woke = shared
217                    .cv
218                    .wait(woke)
219                    .unwrap_or_else(std::sync::PoisonError::into_inner);
220            }
221            *woke = false;
222        }
223        if shared.shutdown.load(Ordering::SeqCst) {
224            return;
225        }
226        // Snapshot the dirty subs' descriptors under a short lock, clear
227        // their dirty flags (coalesce), then recompute lock-free.
228        let work: Vec<(u64, String, Value, Arc<dyn EmitSink>)> = {
229            let Ok(mut subs) = shared.subs.lock() else {
230                continue;
231            };
232            subs.iter_mut()
233                .filter(|(_, s)| s.dirty)
234                .map(|(id, s)| {
235                    s.dirty = false;
236                    (*id, s.collection.clone(), s.filter.clone(), s.sink.clone())
237                })
238                .collect()
239        };
240        let mut dead: BTreeSet<u64> = BTreeSet::new();
241        for (id, collection, filter, sink) in work {
242            if sink.is_closed() {
243                dead.insert(id);
244                continue;
245            }
246            // Skip if cancelled between snapshot and now.
247            if shared.subs.lock().map_or(true, |s| !s.contains_key(&id)) {
248                continue;
249            }
250            let env = recompute_envelope(shared, &collection, &filter);
251            sink.emit(&env);
252        }
253        if !dead.is_empty() {
254            if let Ok(mut subs) = shared.subs.lock() {
255                for id in dead {
256                    subs.remove(&id);
257                }
258            }
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::database::Database;
267    use crate::schema::ir::SchemaIr;
268    use std::sync::{Arc, Mutex};
269    use std::time::{Duration, Instant};
270
271    fn setup() -> (tempfile::TempDir, Arc<Database>, Arc<SchemaIr>) {
272        let d = tempfile::tempdir().unwrap();
273        let db = Arc::new(Database::open(d.path().join("t.db")).unwrap());
274        let ir = Arc::new(
275            SchemaIr::compile(
276                r#"{"u":{"idField":"id","fields":[
277                  {"name":"id","type":"id"},{"name":"role","type":"enum","variants":["admin","user"]}],
278                  "indexes":[{"field":"role","unique":false}]}}"#,
279            )
280            .unwrap(),
281        );
282        (d, db, ir)
283    }
284
285    /// Test sink: records every emitted envelope string.
286    #[derive(Default)]
287    struct VecSink(Mutex<Vec<String>>);
288    impl EmitSink for VecSink {
289        fn emit(&self, envelope_json: &str) {
290            self.0.lock().unwrap().push(envelope_json.to_string());
291        }
292        fn is_closed(&self) -> bool {
293            false
294        }
295    }
296
297    fn wait_until<F: Fn() -> bool>(f: F) {
298        let start = Instant::now();
299        while !f() {
300            assert!(
301                start.elapsed() < Duration::from_secs(5),
302                "live emission timed out"
303            );
304            std::thread::sleep(Duration::from_millis(5));
305        }
306    }
307
308    fn insert(db: &Database, ir: &SchemaIr, doc: &serde_json::Value) {
309        crate::collection::Collection::new(db, ir, "u")
310            .unwrap()
311            .insert(doc)
312            .unwrap();
313    }
314
315    #[test]
316    fn register_returns_initial_snapshot_then_emits_on_matching_commit() {
317        let (_d, db, ir) = setup();
318        insert(&db, &ir, &serde_json::json!({"id":"1","role":"admin"}));
319        let engine = LiveEngine::new(db.clone(), ir.clone());
320        let sink = Arc::new(VecSink::default());
321        let (_sub, initial) =
322            engine.register("u", serde_json::json!({"role":"admin"}), sink.clone());
323        assert!(initial.contains("\"ok\":true"));
324        assert!(
325            initial.contains("\"1\""),
326            "initial snapshot has the existing admin"
327        );
328
329        insert(&db, &ir, &serde_json::json!({"id":"2","role":"admin"}));
330        wait_until(|| !sink.0.lock().unwrap().is_empty());
331        let last = sink.0.lock().unwrap().last().unwrap().clone();
332        assert!(last.contains("\"ok\":true") && last.contains("\"2\""));
333    }
334
335    #[test]
336    fn a_commit_to_an_unrelated_collection_does_not_emit() {
337        let (_d, db, ir) = setup();
338        let engine = LiveEngine::new(db.clone(), ir);
339        let sink = Arc::new(VecSink::default());
340        let (_s, _i) = engine.register("u", serde_json::json!({}), sink.clone());
341        // write to a different collection through the bytes API
342        db.write(|tx| tx.put("other", b"x", b"y")).unwrap();
343        std::thread::sleep(Duration::from_millis(50));
344        assert!(sink.0.lock().unwrap().is_empty());
345    }
346
347    #[test]
348    fn cancel_stops_further_emissions() {
349        let (_d, db, ir) = setup();
350        let engine = LiveEngine::new(db.clone(), ir.clone());
351        let sink = Arc::new(VecSink::default());
352        let (sub, _i) = engine.register("u", serde_json::json!({}), sink.clone());
353        engine.cancel(sub);
354        insert(&db, &ir, &serde_json::json!({"id":"9","role":"user"}));
355        std::thread::sleep(Duration::from_millis(50));
356        assert!(
357            sink.0.lock().unwrap().is_empty(),
358            "no emission after cancel"
359        );
360    }
361
362    #[test]
363    fn rapid_commits_coalesce_to_a_snapshot_with_the_final_state() {
364        let (_d, db, ir) = setup();
365        let engine = LiveEngine::new(db.clone(), ir.clone());
366        let sink = Arc::new(VecSink::default());
367        let (_s, _i) = engine.register("u", serde_json::json!({}), sink.clone());
368        for i in 0..20 {
369            insert(
370                &db,
371                &ir,
372                &serde_json::json!({"id":format!("{i}"),"role":"user"}),
373            );
374        }
375        wait_until(|| {
376            sink.0
377                .lock()
378                .unwrap()
379                .last()
380                .is_some_and(|s| s.contains("\"19\""))
381        });
382        let emissions = sink.0.lock().unwrap().len();
383        assert!(emissions <= 20, "coalesced: fewer emissions than commits");
384        assert!(emissions >= 1);
385        // Non-vacuous final-state check: the last envelope must be a
386        // successful snapshot containing every inserted id "0".."19".
387        // This proves the "fresh MVCC read sees the latest committed state"
388        // guarantee without any timing dependence (wait_until already
389        // ensured "19" is present before we reach this point).
390        let last = sink.0.lock().unwrap().last().unwrap().clone();
391        assert!(
392            last.contains("\"ok\":true"),
393            "final emission is a snapshot, not an error"
394        );
395        for i in 0..20 {
396            assert!(
397                last.contains(&format!("\"{i}\"")),
398                "final coalesced snapshot must contain id {i} (saw: {last})"
399            );
400        }
401    }
402
403    proptest::proptest! {
404        #![proptest_config(proptest::prelude::ProptestConfig::with_cases(24))]
405        #[test]
406        fn emitted_snapshot_equals_one_shot_find(
407            ops in proptest::collection::vec((0u32..8, proptest::bool::ANY), 1..16)
408        ) {
409            let (_d, db, ir) = setup();
410            let engine = LiveEngine::new(db.clone(), ir.clone());
411            let sink = Arc::new(VecSink::default());
412            let (_s, _i) = engine.register("u", serde_json::json!({"role":"admin"}), sink.clone());
413
414            for (n, is_admin) in &ops {
415                let role = if *is_admin { "admin" } else { "user" };
416                let _ = crate::collection::Collection::new(&db, &ir, "u").unwrap()
417                    .insert(&serde_json::json!({"id": format!("{n}"), "role": role}));
418            }
419
420            // Authoritative one-shot result over the final state.
421            let want = crate::collection::Collection::new(&db, &ir, "u").unwrap()
422                .find(&serde_json::json!({"role":"admin"})).unwrap();
423            let want_ids: std::collections::BTreeSet<String> = want.iter()
424                .map(|d| d["id"].as_str().unwrap().to_string()).collect();
425
426            // The LAST emitted snapshot (coalesced) must equal `want`.
427            wait_until(|| {
428                sink.0.lock().unwrap().last().map_or(want_ids.is_empty(), |s| {
429                    serde_json::from_str::<serde_json::Value>(s).ok()
430                        .and_then(|v| v.get("value").cloned())
431                        .is_some_and(|val| {
432                            let got: std::collections::BTreeSet<String> = val.as_array().unwrap()
433                                .iter().map(|d| d["id"].as_str().unwrap().to_string()).collect();
434                            got == want_ids
435                        })
436                })
437            });
438
439            let last = sink.0.lock().unwrap().last().cloned();
440            let emitted_ids: std::collections::BTreeSet<String> = last.map_or_else(
441                std::collections::BTreeSet::new,
442                |s| {
443                    serde_json::from_str::<serde_json::Value>(&s).unwrap()["value"]
444                        .as_array().unwrap().iter()
445                        .map(|d| d["id"].as_str().unwrap().to_string()).collect()
446                },
447            );
448            proptest::prop_assert_eq!(emitted_ids, want_ids);
449        }
450    }
451}