1use std::collections::{BTreeSet, HashMap};
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Condvar, Mutex, Weak};
16use std::thread::JoinHandle;
17
18use serde_json::Value;
19
20use crate::collection::Collection;
21use crate::database::Database;
22use crate::notify::{CommitEvent, CommitObserver, ObserverHandle};
23use crate::schema::ir::SchemaIr;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubId(u64);
28
29pub trait EmitSink: Send + Sync {
32 fn emit(&self, envelope_json: &str);
39 fn is_closed(&self) -> bool;
42}
43
44struct LiveSub {
45 collection: String,
46 filter: Value,
47 sink: Arc<dyn EmitSink>,
48 dirty: bool,
49}
50
51struct LiveShared {
52 db: Arc<Database>,
53 schema: Arc<SchemaIr>,
54 subs: Mutex<HashMap<u64, LiveSub>>,
55 wake: Mutex<bool>,
57 cv: Condvar,
58 shutdown: AtomicBool,
59 next_id: AtomicU64,
60}
61
62impl LiveShared {
63 fn wake_worker(&self) {
64 *self
65 .wake
66 .lock()
67 .unwrap_or_else(std::sync::PoisonError::into_inner) = true;
68 self.cv.notify_one();
69 }
70}
71
72struct ReactiveObserver {
75 shared: Weak<LiveShared>,
76}
77
78impl CommitObserver for ReactiveObserver {
79 fn on_commit(&self, ev: &CommitEvent) {
80 let Some(shared) = self.shared.upgrade() else {
81 return;
82 };
83 let touched = ev.touched_collections();
84 let mut any = false;
85 if let Ok(mut subs) = shared.subs.lock() {
86 for s in subs.values_mut() {
87 if touched.contains(s.collection.as_str()) {
88 s.dirty = true;
89 any = true;
90 }
91 }
92 }
93 if any {
94 shared.wake_worker();
95 }
96 }
97}
98
99pub struct LiveEngine {
102 shared: Arc<LiveShared>,
103 worker: Option<JoinHandle<()>>,
104 _obs: ObserverHandle,
105}
106
107impl LiveEngine {
108 #[allow(clippy::needless_pass_by_value)]
115 #[must_use]
116 pub fn new(db: Arc<Database>, schema: Arc<SchemaIr>) -> Arc<Self> {
117 let shared = Arc::new(LiveShared {
118 db: db.clone(),
119 schema,
120 subs: Mutex::new(HashMap::new()),
121 wake: Mutex::new(false),
122 cv: Condvar::new(),
123 shutdown: AtomicBool::new(false),
124 next_id: AtomicU64::new(0),
125 });
126 let obs = db.add_observer(Arc::new(ReactiveObserver {
127 shared: Arc::downgrade(&shared),
128 }));
129 let worker = {
130 let shared = shared.clone();
131 std::thread::spawn(move || worker_loop(&shared))
132 };
133 Arc::new(Self {
134 shared,
135 worker: Some(worker),
136 _obs: obs,
137 })
138 }
139
140 #[must_use]
144 pub fn register(
145 &self,
146 collection: &str,
147 filter: Value,
148 sink: Arc<dyn EmitSink>,
149 ) -> (SubId, String) {
150 let initial = recompute_envelope(&self.shared, collection, &filter);
151 let id = self.shared.next_id.fetch_add(1, Ordering::Relaxed);
152 if let Ok(mut subs) = self.shared.subs.lock() {
153 subs.insert(
154 id,
155 LiveSub {
156 collection: collection.to_string(),
157 filter,
158 sink,
159 dirty: false,
160 },
161 );
162 }
163 (SubId(id), initial)
164 }
165
166 pub fn cancel(&self, sub: SubId) {
168 if let Ok(mut subs) = self.shared.subs.lock() {
169 subs.remove(&sub.0);
170 }
171 }
172}
173
174impl Drop for LiveEngine {
175 fn drop(&mut self) {
176 self.shared.shutdown.store(true, Ordering::SeqCst);
177 self.shared.wake_worker();
178 if let Some(j) = self.worker.take() {
179 let _ = j.join();
180 }
181 }
182}
183
184fn recompute_envelope(shared: &LiveShared, collection: &str, filter: &Value) -> String {
189 let run = || -> Result<Vec<Value>, crate::error::NookError> {
190 Collection::new(&shared.db, &shared.schema, collection)?.find(filter)
191 };
192 match catch_unwind(AssertUnwindSafe(run)) {
193 Ok(Ok(docs)) => serde_json::json!({ "ok": true, "value": docs }).to_string(),
194 Ok(Err(e)) => serde_json::json!({
195 "ok": false,
196 "error": format!("[{}] {}", e.kind().as_str(), e)
197 })
198 .to_string(),
199 Err(_) => serde_json::json!({
200 "ok": false,
201 "error": "[storage] live recompute panicked"
202 })
203 .to_string(),
204 }
205}
206
207fn worker_loop(shared: &LiveShared) {
208 loop {
209 {
211 let mut woke = shared
212 .wake
213 .lock()
214 .unwrap_or_else(std::sync::PoisonError::into_inner);
215 while !*woke && !shared.shutdown.load(Ordering::SeqCst) {
216 woke = shared
217 .cv
218 .wait(woke)
219 .unwrap_or_else(std::sync::PoisonError::into_inner);
220 }
221 *woke = false;
222 }
223 if shared.shutdown.load(Ordering::SeqCst) {
224 return;
225 }
226 let work: Vec<(u64, String, Value, Arc<dyn EmitSink>)> = {
229 let Ok(mut subs) = shared.subs.lock() else {
230 continue;
231 };
232 subs.iter_mut()
233 .filter(|(_, s)| s.dirty)
234 .map(|(id, s)| {
235 s.dirty = false;
236 (*id, s.collection.clone(), s.filter.clone(), s.sink.clone())
237 })
238 .collect()
239 };
240 let mut dead: BTreeSet<u64> = BTreeSet::new();
241 for (id, collection, filter, sink) in work {
242 if sink.is_closed() {
243 dead.insert(id);
244 continue;
245 }
246 if shared.subs.lock().map_or(true, |s| !s.contains_key(&id)) {
248 continue;
249 }
250 let env = recompute_envelope(shared, &collection, &filter);
251 sink.emit(&env);
252 }
253 if !dead.is_empty() {
254 if let Ok(mut subs) = shared.subs.lock() {
255 for id in dead {
256 subs.remove(&id);
257 }
258 }
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::database::Database;
267 use crate::schema::ir::SchemaIr;
268 use std::sync::{Arc, Mutex};
269 use std::time::{Duration, Instant};
270
271 fn setup() -> (tempfile::TempDir, Arc<Database>, Arc<SchemaIr>) {
272 let d = tempfile::tempdir().unwrap();
273 let db = Arc::new(Database::open(d.path().join("t.db")).unwrap());
274 let ir = Arc::new(
275 SchemaIr::compile(
276 r#"{"u":{"idField":"id","fields":[
277 {"name":"id","type":"id"},{"name":"role","type":"enum","variants":["admin","user"]}],
278 "indexes":[{"field":"role","unique":false}]}}"#,
279 )
280 .unwrap(),
281 );
282 (d, db, ir)
283 }
284
285 #[derive(Default)]
287 struct VecSink(Mutex<Vec<String>>);
288 impl EmitSink for VecSink {
289 fn emit(&self, envelope_json: &str) {
290 self.0.lock().unwrap().push(envelope_json.to_string());
291 }
292 fn is_closed(&self) -> bool {
293 false
294 }
295 }
296
297 fn wait_until<F: Fn() -> bool>(f: F) {
298 let start = Instant::now();
299 while !f() {
300 assert!(
301 start.elapsed() < Duration::from_secs(5),
302 "live emission timed out"
303 );
304 std::thread::sleep(Duration::from_millis(5));
305 }
306 }
307
308 fn insert(db: &Database, ir: &SchemaIr, doc: &serde_json::Value) {
309 crate::collection::Collection::new(db, ir, "u")
310 .unwrap()
311 .insert(doc)
312 .unwrap();
313 }
314
315 #[test]
316 fn register_returns_initial_snapshot_then_emits_on_matching_commit() {
317 let (_d, db, ir) = setup();
318 insert(&db, &ir, &serde_json::json!({"id":"1","role":"admin"}));
319 let engine = LiveEngine::new(db.clone(), ir.clone());
320 let sink = Arc::new(VecSink::default());
321 let (_sub, initial) =
322 engine.register("u", serde_json::json!({"role":"admin"}), sink.clone());
323 assert!(initial.contains("\"ok\":true"));
324 assert!(
325 initial.contains("\"1\""),
326 "initial snapshot has the existing admin"
327 );
328
329 insert(&db, &ir, &serde_json::json!({"id":"2","role":"admin"}));
330 wait_until(|| !sink.0.lock().unwrap().is_empty());
331 let last = sink.0.lock().unwrap().last().unwrap().clone();
332 assert!(last.contains("\"ok\":true") && last.contains("\"2\""));
333 }
334
335 #[test]
336 fn a_commit_to_an_unrelated_collection_does_not_emit() {
337 let (_d, db, ir) = setup();
338 let engine = LiveEngine::new(db.clone(), ir);
339 let sink = Arc::new(VecSink::default());
340 let (_s, _i) = engine.register("u", serde_json::json!({}), sink.clone());
341 db.write(|tx| tx.put("other", b"x", b"y")).unwrap();
343 std::thread::sleep(Duration::from_millis(50));
344 assert!(sink.0.lock().unwrap().is_empty());
345 }
346
347 #[test]
348 fn cancel_stops_further_emissions() {
349 let (_d, db, ir) = setup();
350 let engine = LiveEngine::new(db.clone(), ir.clone());
351 let sink = Arc::new(VecSink::default());
352 let (sub, _i) = engine.register("u", serde_json::json!({}), sink.clone());
353 engine.cancel(sub);
354 insert(&db, &ir, &serde_json::json!({"id":"9","role":"user"}));
355 std::thread::sleep(Duration::from_millis(50));
356 assert!(
357 sink.0.lock().unwrap().is_empty(),
358 "no emission after cancel"
359 );
360 }
361
362 #[test]
363 fn rapid_commits_coalesce_to_a_snapshot_with_the_final_state() {
364 let (_d, db, ir) = setup();
365 let engine = LiveEngine::new(db.clone(), ir.clone());
366 let sink = Arc::new(VecSink::default());
367 let (_s, _i) = engine.register("u", serde_json::json!({}), sink.clone());
368 for i in 0..20 {
369 insert(
370 &db,
371 &ir,
372 &serde_json::json!({"id":format!("{i}"),"role":"user"}),
373 );
374 }
375 wait_until(|| {
376 sink.0
377 .lock()
378 .unwrap()
379 .last()
380 .is_some_and(|s| s.contains("\"19\""))
381 });
382 let emissions = sink.0.lock().unwrap().len();
383 assert!(emissions <= 20, "coalesced: fewer emissions than commits");
384 assert!(emissions >= 1);
385 let last = sink.0.lock().unwrap().last().unwrap().clone();
391 assert!(
392 last.contains("\"ok\":true"),
393 "final emission is a snapshot, not an error"
394 );
395 for i in 0..20 {
396 assert!(
397 last.contains(&format!("\"{i}\"")),
398 "final coalesced snapshot must contain id {i} (saw: {last})"
399 );
400 }
401 }
402
403 proptest::proptest! {
404 #![proptest_config(proptest::prelude::ProptestConfig::with_cases(24))]
405 #[test]
406 fn emitted_snapshot_equals_one_shot_find(
407 ops in proptest::collection::vec((0u32..8, proptest::bool::ANY), 1..16)
408 ) {
409 let (_d, db, ir) = setup();
410 let engine = LiveEngine::new(db.clone(), ir.clone());
411 let sink = Arc::new(VecSink::default());
412 let (_s, _i) = engine.register("u", serde_json::json!({"role":"admin"}), sink.clone());
413
414 for (n, is_admin) in &ops {
415 let role = if *is_admin { "admin" } else { "user" };
416 let _ = crate::collection::Collection::new(&db, &ir, "u").unwrap()
417 .insert(&serde_json::json!({"id": format!("{n}"), "role": role}));
418 }
419
420 let want = crate::collection::Collection::new(&db, &ir, "u").unwrap()
422 .find(&serde_json::json!({"role":"admin"})).unwrap();
423 let want_ids: std::collections::BTreeSet<String> = want.iter()
424 .map(|d| d["id"].as_str().unwrap().to_string()).collect();
425
426 wait_until(|| {
428 sink.0.lock().unwrap().last().map_or(want_ids.is_empty(), |s| {
429 serde_json::from_str::<serde_json::Value>(s).ok()
430 .and_then(|v| v.get("value").cloned())
431 .is_some_and(|val| {
432 let got: std::collections::BTreeSet<String> = val.as_array().unwrap()
433 .iter().map(|d| d["id"].as_str().unwrap().to_string()).collect();
434 got == want_ids
435 })
436 })
437 });
438
439 let last = sink.0.lock().unwrap().last().cloned();
440 let emitted_ids: std::collections::BTreeSet<String> = last.map_or_else(
441 std::collections::BTreeSet::new,
442 |s| {
443 serde_json::from_str::<serde_json::Value>(&s).unwrap()["value"]
444 .as_array().unwrap().iter()
445 .map(|d| d["id"].as_str().unwrap().to_string()).collect()
446 },
447 );
448 proptest::prop_assert_eq!(emitted_ids, want_ids);
449 }
450 }
451}