1use std::collections::{BTreeSet, HashMap};
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Condvar, Mutex, Weak};
16use std::thread::JoinHandle;
17
18use serde_json::Value;
19
20use crate::collection::Collection;
21use crate::database::Database;
22use crate::notify::{CommitEvent, CommitObserver, ObserverHandle};
23use crate::schema::ir::SchemaIr;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubId(u64);
28
29pub trait EmitSink: Send + Sync {
32 fn emit(&self, envelope_json: &str);
39 fn is_closed(&self) -> bool;
42}
43
44struct LiveSub {
45 collection: String,
46 filter: Value,
47 options: crate::query::QueryOptions,
48 sink: Arc<dyn EmitSink>,
49 dirty: bool,
50}
51
52struct LiveShared {
53 db: Arc<Database>,
54 schema: Arc<SchemaIr>,
55 subs: Mutex<HashMap<u64, LiveSub>>,
56 wake: Mutex<bool>,
58 cv: Condvar,
59 shutdown: AtomicBool,
60 next_id: AtomicU64,
61}
62
63impl LiveShared {
64 fn wake_worker(&self) {
65 *self
66 .wake
67 .lock()
68 .unwrap_or_else(std::sync::PoisonError::into_inner) = true;
69 self.cv.notify_one();
70 }
71}
72
73struct ReactiveObserver {
76 shared: Weak<LiveShared>,
77}
78
79impl CommitObserver for ReactiveObserver {
80 fn on_commit(&self, ev: &CommitEvent) {
81 let Some(shared) = self.shared.upgrade() else {
82 return;
83 };
84 let touched = ev.touched_collections();
85 let mut any = false;
86 if let Ok(mut subs) = shared.subs.lock() {
87 for s in subs.values_mut() {
88 if touched.contains(s.collection.as_str()) {
89 s.dirty = true;
90 any = true;
91 }
92 }
93 }
94 if any {
95 shared.wake_worker();
96 }
97 }
98}
99
100pub struct LiveEngine {
103 shared: Arc<LiveShared>,
104 worker: Option<JoinHandle<()>>,
105 _obs: ObserverHandle,
106}
107
108impl LiveEngine {
109 #[allow(clippy::needless_pass_by_value)]
116 #[must_use]
117 pub fn new(db: Arc<Database>, schema: Arc<SchemaIr>) -> Arc<Self> {
118 let shared = Arc::new(LiveShared {
119 db: db.clone(),
120 schema,
121 subs: Mutex::new(HashMap::new()),
122 wake: Mutex::new(false),
123 cv: Condvar::new(),
124 shutdown: AtomicBool::new(false),
125 next_id: AtomicU64::new(0),
126 });
127 let obs = db.add_observer(Arc::new(ReactiveObserver {
128 shared: Arc::downgrade(&shared),
129 }));
130 let worker = {
131 let shared = shared.clone();
132 std::thread::spawn(move || worker_loop(&shared))
133 };
134 Arc::new(Self {
135 shared,
136 worker: Some(worker),
137 _obs: obs,
138 })
139 }
140
141 #[must_use]
145 pub fn register(
146 &self,
147 collection: &str,
148 filter: Value,
149 options: crate::query::QueryOptions,
150 sink: Arc<dyn EmitSink>,
151 ) -> (SubId, String) {
152 let initial = recompute_envelope(&self.shared, collection, &filter, &options);
153 let id = self.shared.next_id.fetch_add(1, Ordering::Relaxed);
154 if let Ok(mut subs) = self.shared.subs.lock() {
155 subs.insert(
156 id,
157 LiveSub {
158 collection: collection.to_string(),
159 filter,
160 options,
161 sink,
162 dirty: false,
163 },
164 );
165 }
166 (SubId(id), initial)
167 }
168
169 pub fn cancel(&self, sub: SubId) {
171 if let Ok(mut subs) = self.shared.subs.lock() {
172 subs.remove(&sub.0);
173 }
174 }
175}
176
177impl Drop for LiveEngine {
178 fn drop(&mut self) {
179 self.shared.shutdown.store(true, Ordering::SeqCst);
180 self.shared.wake_worker();
181 if let Some(j) = self.worker.take() {
182 let _ = j.join();
183 }
184 }
185}
186
187fn recompute_envelope(
192 shared: &LiveShared,
193 collection: &str,
194 filter: &Value,
195 options: &crate::query::QueryOptions,
196) -> String {
197 let run = || -> Result<Vec<Value>, crate::error::NookError> {
198 Collection::new(&shared.db, &shared.schema, collection)?.find_with(filter, options)
199 };
200 match catch_unwind(AssertUnwindSafe(run)) {
201 Ok(Ok(docs)) => serde_json::json!({ "ok": true, "value": docs }).to_string(),
202 Ok(Err(e)) => serde_json::json!({
203 "ok": false,
204 "error": format!("[{}] {}", e.kind().as_str(), e)
205 })
206 .to_string(),
207 Err(_) => serde_json::json!({
208 "ok": false,
209 "error": "[storage] live recompute panicked"
210 })
211 .to_string(),
212 }
213}
214
215type WorkItem = (
218 u64,
219 String,
220 Value,
221 crate::query::QueryOptions,
222 Arc<dyn EmitSink>,
223);
224
225fn worker_loop(shared: &LiveShared) {
226 loop {
227 {
229 let mut woke = shared
230 .wake
231 .lock()
232 .unwrap_or_else(std::sync::PoisonError::into_inner);
233 while !*woke && !shared.shutdown.load(Ordering::SeqCst) {
234 woke = shared
235 .cv
236 .wait(woke)
237 .unwrap_or_else(std::sync::PoisonError::into_inner);
238 }
239 *woke = false;
240 }
241 if shared.shutdown.load(Ordering::SeqCst) {
242 return;
243 }
244 let work: Vec<WorkItem> = {
247 let Ok(mut subs) = shared.subs.lock() else {
248 continue;
249 };
250 subs.iter_mut()
251 .filter(|(_, s)| s.dirty)
252 .map(|(id, s)| {
253 s.dirty = false;
254 (
255 *id,
256 s.collection.clone(),
257 s.filter.clone(),
258 s.options.clone(),
259 s.sink.clone(),
260 )
261 })
262 .collect()
263 };
264 let mut dead: BTreeSet<u64> = BTreeSet::new();
265 for (id, collection, filter, options, sink) in work {
266 if sink.is_closed() {
267 dead.insert(id);
268 continue;
269 }
270 if shared.subs.lock().map_or(true, |s| !s.contains_key(&id)) {
272 continue;
273 }
274 let env = recompute_envelope(shared, &collection, &filter, &options);
275 sink.emit(&env);
276 }
277 if !dead.is_empty() {
278 if let Ok(mut subs) = shared.subs.lock() {
279 for id in dead {
280 subs.remove(&id);
281 }
282 }
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::database::Database;
291 use crate::schema::ir::SchemaIr;
292 use std::sync::{Arc, Mutex};
293 use std::time::{Duration, Instant};
294
295 fn setup() -> (tempfile::TempDir, Arc<Database>, Arc<SchemaIr>) {
296 let d = tempfile::tempdir().unwrap();
297 let db = Arc::new(Database::open(d.path().join("t.db")).unwrap());
298 let ir = Arc::new(
299 SchemaIr::compile(
300 r#"{"u":{"idField":"id","fields":[
301 {"name":"id","type":"id"},
302 {"name":"role","type":"enum","variants":["admin","user"]},
303 {"name":"n","type":"number","optional":true}],
304 "indexes":[{"field":"role","unique":false}]}}"#,
305 )
306 .unwrap(),
307 );
308 (d, db, ir)
309 }
310
311 #[derive(Default)]
313 struct VecSink(Mutex<Vec<String>>);
314 impl EmitSink for VecSink {
315 fn emit(&self, envelope_json: &str) {
316 self.0.lock().unwrap().push(envelope_json.to_string());
317 }
318 fn is_closed(&self) -> bool {
319 false
320 }
321 }
322
323 fn wait_until<F: Fn() -> bool>(f: F) {
324 let start = Instant::now();
325 while !f() {
326 assert!(
327 start.elapsed() < Duration::from_secs(5),
328 "live emission timed out"
329 );
330 std::thread::sleep(Duration::from_millis(5));
331 }
332 }
333
334 fn insert(db: &Database, ir: &SchemaIr, doc: &serde_json::Value) {
335 crate::collection::Collection::new(db, ir, "u")
336 .unwrap()
337 .insert(doc)
338 .unwrap();
339 }
340
341 #[test]
342 fn register_returns_initial_snapshot_then_emits_on_matching_commit() {
343 let (_d, db, ir) = setup();
344 insert(&db, &ir, &serde_json::json!({"id":"1","role":"admin"}));
345 let engine = LiveEngine::new(db.clone(), ir.clone());
346 let sink = Arc::new(VecSink::default());
347 let (_sub, initial) = engine.register(
348 "u",
349 serde_json::json!({"role":"admin"}),
350 crate::query::QueryOptions::default(),
351 sink.clone(),
352 );
353 assert!(initial.contains("\"ok\":true"));
354 assert!(
355 initial.contains("\"1\""),
356 "initial snapshot has the existing admin"
357 );
358
359 insert(&db, &ir, &serde_json::json!({"id":"2","role":"admin"}));
360 wait_until(|| !sink.0.lock().unwrap().is_empty());
361 let last = sink.0.lock().unwrap().last().unwrap().clone();
362 assert!(last.contains("\"ok\":true") && last.contains("\"2\""));
363 }
364
365 #[test]
366 fn register_with_options_sorts_and_limits_initial_and_recompute() {
367 let (_d, db, ir) = setup();
368 for (id, n) in [("a", 3), ("b", 1), ("c", 2)] {
369 insert(
370 &db,
371 &ir,
372 &serde_json::json!({"id": id, "role": "user", "n": n}),
373 );
374 }
375 let engine = LiveEngine::new(db.clone(), ir.clone());
376 let sink = Arc::new(VecSink::default());
377 let opts =
378 crate::query::QueryOptions::parse(Some(r#"{"sort":[["n","asc"]],"limit":2}"#)).unwrap();
379 let (_sub, initial) =
380 engine.register("u", serde_json::json!({"role":"user"}), opts, sink.clone());
381 let v: serde_json::Value = serde_json::from_str(&initial).unwrap();
382 let ids: Vec<_> = v["value"]
383 .as_array()
384 .unwrap()
385 .iter()
386 .map(|d| d["id"].as_str().unwrap().to_string())
387 .collect();
388 assert_eq!(ids, vec!["b", "c"]);
389
390 insert(&db, &ir, &serde_json::json!({"id":"d","role":"user","n":0}));
391 wait_until(|| !sink.0.lock().unwrap().is_empty());
392 let last = sink.0.lock().unwrap().last().unwrap().clone();
393 let last_v: serde_json::Value = serde_json::from_str(&last).unwrap();
394 let last_ids: Vec<_> = last_v["value"]
395 .as_array()
396 .unwrap()
397 .iter()
398 .map(|d| d["id"].as_str().unwrap().to_string())
399 .collect();
400 assert_eq!(last_ids, vec!["d", "b"]);
401 }
402
403 #[test]
404 fn a_commit_to_an_unrelated_collection_does_not_emit() {
405 let (_d, db, ir) = setup();
406 let engine = LiveEngine::new(db.clone(), ir);
407 let sink = Arc::new(VecSink::default());
408 let (_s, _i) = engine.register(
409 "u",
410 serde_json::json!({}),
411 crate::query::QueryOptions::default(),
412 sink.clone(),
413 );
414 db.write(|tx| tx.put("other", b"x", b"y")).unwrap();
416 std::thread::sleep(Duration::from_millis(50));
417 assert!(sink.0.lock().unwrap().is_empty());
418 }
419
420 #[test]
421 fn cancel_stops_further_emissions() {
422 let (_d, db, ir) = setup();
423 let engine = LiveEngine::new(db.clone(), ir.clone());
424 let sink = Arc::new(VecSink::default());
425 let (sub, _i) = engine.register(
426 "u",
427 serde_json::json!({}),
428 crate::query::QueryOptions::default(),
429 sink.clone(),
430 );
431 engine.cancel(sub);
432 insert(&db, &ir, &serde_json::json!({"id":"9","role":"user"}));
433 std::thread::sleep(Duration::from_millis(50));
434 assert!(
435 sink.0.lock().unwrap().is_empty(),
436 "no emission after cancel"
437 );
438 }
439
440 #[test]
441 fn rapid_commits_coalesce_to_a_snapshot_with_the_final_state() {
442 let (_d, db, ir) = setup();
443 let engine = LiveEngine::new(db.clone(), ir.clone());
444 let sink = Arc::new(VecSink::default());
445 let (_s, _i) = engine.register(
446 "u",
447 serde_json::json!({}),
448 crate::query::QueryOptions::default(),
449 sink.clone(),
450 );
451 for i in 0..20 {
452 insert(
453 &db,
454 &ir,
455 &serde_json::json!({"id":format!("{i}"),"role":"user"}),
456 );
457 }
458 wait_until(|| {
459 sink.0
460 .lock()
461 .unwrap()
462 .last()
463 .is_some_and(|s| s.contains("\"19\""))
464 });
465 let emissions = sink.0.lock().unwrap().len();
466 assert!(emissions <= 20, "coalesced: fewer emissions than commits");
467 assert!(emissions >= 1);
468 let last = sink.0.lock().unwrap().last().unwrap().clone();
474 assert!(
475 last.contains("\"ok\":true"),
476 "final emission is a snapshot, not an error"
477 );
478 for i in 0..20 {
479 assert!(
480 last.contains(&format!("\"{i}\"")),
481 "final coalesced snapshot must contain id {i} (saw: {last})"
482 );
483 }
484 }
485
486 proptest::proptest! {
487 #![proptest_config(proptest::prelude::ProptestConfig::with_cases(24))]
488 #[test]
489 fn emitted_snapshot_equals_one_shot_find(
490 ops in proptest::collection::vec((0u32..8, proptest::bool::ANY), 1..16)
491 ) {
492 let (_d, db, ir) = setup();
493 let engine = LiveEngine::new(db.clone(), ir.clone());
494 let sink = Arc::new(VecSink::default());
495 let (_s, _i) = engine.register("u", serde_json::json!({"role":"admin"}), crate::query::QueryOptions::default(), sink.clone());
496
497 for (n, is_admin) in &ops {
498 let role = if *is_admin { "admin" } else { "user" };
499 let _ = crate::collection::Collection::new(&db, &ir, "u").unwrap()
500 .insert(&serde_json::json!({"id": format!("{n}"), "role": role}));
501 }
502
503 let want = crate::collection::Collection::new(&db, &ir, "u").unwrap()
505 .find(&serde_json::json!({"role":"admin"})).unwrap();
506 let want_ids: std::collections::BTreeSet<String> = want.iter()
507 .map(|d| d["id"].as_str().unwrap().to_string()).collect();
508
509 wait_until(|| {
511 sink.0.lock().unwrap().last().map_or(want_ids.is_empty(), |s| {
512 serde_json::from_str::<serde_json::Value>(s).ok()
513 .and_then(|v| v.get("value").cloned())
514 .is_some_and(|val| {
515 let got: std::collections::BTreeSet<String> = val.as_array().unwrap()
516 .iter().map(|d| d["id"].as_str().unwrap().to_string()).collect();
517 got == want_ids
518 })
519 })
520 });
521
522 let last = sink.0.lock().unwrap().last().cloned();
523 let emitted_ids: std::collections::BTreeSet<String> = last.map_or_else(
524 std::collections::BTreeSet::new,
525 |s| {
526 serde_json::from_str::<serde_json::Value>(&s).unwrap()["value"]
527 .as_array().unwrap().iter()
528 .map(|d| d["id"].as_str().unwrap().to_string()).collect()
529 },
530 );
531 proptest::prop_assert_eq!(emitted_ids, want_ids);
532 }
533 }
534}