Skip to main content

cynos_database/
reactive_bridge.rs

1//! Reactive bridge - Integration between Query and Reactive modules.
2//!
3//! This module provides the bridge between the query system and the reactive
4//! system, enabling observable queries that update automatically when
5//! underlying data changes.
6//!
7//! Uses re-query strategy: when data changes, re-execute the query using
8//! the query optimizer and indexes for optimal performance.
9//! Physical plans are cached to avoid repeated optimization overhead.
10
11use crate::convert::{row_to_js, value_to_js};
12use crate::binary_protocol::{BinaryEncoder, BinaryResult, SchemaLayout};
13use crate::query_engine::execute_physical_plan;
14use cynos_storage::TableCache;
15use alloc::string::String;
16use alloc::boxed::Box;
17use alloc::rc::Rc;
18use alloc::vec::Vec;
19use cynos_core::schema::Table;
20use cynos_core::Row;
21use cynos_query::planner::PhysicalPlan;
22use cynos_reactive::TableId;
23use core::cell::RefCell;
24use hashbrown::{HashMap, HashSet};
25use wasm_bindgen::prelude::*;
26
27/// A re-query based observable that re-executes the query on each change.
28/// This leverages the query optimizer and indexes for optimal performance.
29/// The physical plan is cached to avoid repeated optimization overhead.
30pub struct ReQueryObservable {
31    /// The cached physical plan to execute
32    physical_plan: PhysicalPlan,
33    /// Reference to the table cache
34    cache: Rc<RefCell<TableCache>>,
35    /// Current result set
36    result: Vec<Rc<Row>>,
37    /// Subscription callbacks
38    subscriptions: Vec<(usize, Box<dyn Fn(&[Rc<Row>]) + 'static>)>,
39    /// Next subscription ID
40    next_sub_id: usize,
41    /// Whether this is a JOIN query (results have DUMMY_ROW_ID)
42    is_join_query: bool,
43}
44
45impl ReQueryObservable {
46    /// Creates a new re-query observable with a pre-compiled physical plan.
47    pub fn new(
48        physical_plan: PhysicalPlan,
49        cache: Rc<RefCell<TableCache>>,
50        initial_result: Vec<Rc<Row>>,
51    ) -> Self {
52        // Detect JOIN query by checking if first row has dummy ID
53        let is_join_query = initial_result.first().map(|r| r.is_dummy()).unwrap_or(false);
54        Self {
55            physical_plan,
56            cache,
57            result: initial_result,
58            subscriptions: Vec::new(),
59            next_sub_id: 0,
60            is_join_query,
61        }
62    }
63
64    /// Returns the current result.
65    pub fn result(&self) -> &[Rc<Row>] {
66        &self.result
67    }
68
69    /// Returns the number of rows.
70    pub fn len(&self) -> usize {
71        self.result.len()
72    }
73
74    /// Returns true if empty.
75    pub fn is_empty(&self) -> bool {
76        self.result.is_empty()
77    }
78
79    /// Subscribes to changes.
80    pub fn subscribe<F: Fn(&[Rc<Row>]) + 'static>(&mut self, callback: F) -> usize {
81        let id = self.next_sub_id;
82        self.next_sub_id += 1;
83        self.subscriptions.push((id, Box::new(callback)));
84        id
85    }
86
87    /// Unsubscribes by ID.
88    pub fn unsubscribe(&mut self, id: usize) -> bool {
89        let len_before = self.subscriptions.len();
90        self.subscriptions.retain(|(sub_id, _)| *sub_id != id);
91        self.subscriptions.len() < len_before
92    }
93
94    /// Returns subscription count.
95    pub fn subscription_count(&self) -> usize {
96        self.subscriptions.len()
97    }
98
99    /// Called when the table changes - re-executes the cached physical plan.
100    /// Only notifies subscribers if the result actually changed.
101    /// Skips re-query entirely if there are no subscribers.
102    ///
103    /// `changed_ids` contains the row IDs that were modified - used to optimize
104    /// comparison by only checking these rows when the result set size is unchanged.
105    pub fn on_change(&mut self, changed_ids: &HashSet<u64>) {
106        // Skip re-query if no subscribers - major optimization for unused observables
107        if self.subscriptions.is_empty() {
108            return;
109        }
110
111        // Re-execute the cached physical plan (no optimization overhead)
112        let cache = self.cache.borrow();
113
114        match execute_physical_plan(&cache, &self.physical_plan) {
115            Ok(new_result) => {
116                // Only notify if result changed
117                if !Self::results_equal(&self.result, &new_result, changed_ids, self.is_join_query) {
118                    self.result = new_result;
119                    // Notify all subscribers
120                    for (_, callback) in &self.subscriptions {
121                        callback(&self.result);
122                    }
123                }
124            }
125            Err(_) => {
126                // Query execution failed, keep old result
127            }
128        }
129    }
130
131    /// Compares two result sets for equality using row versions.
132    /// This is O(n) where n is the number of rows, comparing only version numbers.
133    /// For single-table queries, can further optimize by only checking changed_ids.
134    fn results_equal(old: &[Rc<Row>], new: &[Rc<Row>], changed_ids: &HashSet<u64>, is_join_query: bool) -> bool {
135        use cynos_core::DUMMY_ROW_ID;
136
137        // Different lengths means definitely changed
138        if old.len() != new.len() {
139            return false;
140        }
141
142        // Empty results are equal
143        if old.is_empty() {
144            return true;
145        }
146
147        // Check if this is an aggregate/join result (rows have DUMMY_ROW_ID)
148        let is_aggregate_result = old.first().map(|r| r.id() == DUMMY_ROW_ID).unwrap_or(false);
149
150        if is_join_query || is_aggregate_result {
151            // For JOIN/aggregate queries, compare versions (sum of source row versions)
152            // If any source row changed, the sum version will be different
153            for (old_row, new_row) in old.iter().zip(new.iter()) {
154                if old_row.version() != new_row.version() {
155                    return false;
156                }
157            }
158        } else {
159            // For single-table queries, use optimized comparison
160            // Compare row IDs first (fast path)
161            let ids_match = old.iter().zip(new.iter()).all(|(a, b)| a.id() == b.id());
162            if !ids_match {
163                return false;
164            }
165
166            // IDs match - only compare versions of changed rows
167            for (old_row, new_row) in old.iter().zip(new.iter()) {
168                if changed_ids.contains(&old_row.id()) {
169                    if old_row.version() != new_row.version() {
170                        return false;
171                    }
172                }
173            }
174        }
175
176        true
177    }
178}
179
180/// Registry for tracking re-query observables and routing table changes.
181/// Supports batching of changes to avoid redundant re-queries during rapid updates.
182pub struct QueryRegistry {
183    /// Map from table ID to list of queries that depend on it
184    queries: HashMap<TableId, Vec<Rc<RefCell<ReQueryObservable>>>>,
185    /// Pending changes to be flushed (table_id -> accumulated changed_ids)
186    pending_changes: Rc<RefCell<HashMap<TableId, HashSet<u64>>>>,
187    /// Whether a flush is already scheduled
188    flush_scheduled: Rc<RefCell<bool>>,
189    /// Self reference for scheduling flush callback
190    self_ref: Option<Rc<RefCell<QueryRegistry>>>,
191}
192
193impl QueryRegistry {
194    /// Creates a new query registry.
195    pub fn new() -> Self {
196        Self {
197            queries: HashMap::new(),
198            pending_changes: Rc::new(RefCell::new(HashMap::new())),
199            flush_scheduled: Rc::new(RefCell::new(false)),
200            self_ref: None,
201        }
202    }
203
204    /// Sets the self reference for scheduling flush callbacks.
205    /// Must be called after wrapping in Rc<RefCell<>>.
206    pub fn set_self_ref(&mut self, self_ref: Rc<RefCell<QueryRegistry>>) {
207        self.self_ref = Some(self_ref);
208    }
209
210    /// Registers a query with its dependent table.
211    pub fn register(&mut self, query: Rc<RefCell<ReQueryObservable>>, table_id: TableId) {
212        self.queries
213            .entry(table_id)
214            .or_insert_with(Vec::new)
215            .push(query);
216    }
217
218    /// Handles table changes by batching and scheduling a flush.
219    /// Multiple rapid changes are coalesced into a single re-query.
220    pub fn on_table_change(&mut self, table_id: TableId, changed_ids: &HashSet<u64>) {
221        // Accumulate changes
222        {
223            let mut pending = self.pending_changes.borrow_mut();
224            pending
225                .entry(table_id)
226                .or_insert_with(HashSet::new)
227                .extend(changed_ids.iter().copied());
228        }
229
230        // Schedule flush if not already scheduled
231        let mut scheduled = self.flush_scheduled.borrow_mut();
232        if !*scheduled {
233            *scheduled = true;
234            drop(scheduled);
235            self.schedule_flush();
236        }
237    }
238
239    /// Schedules a flush to run after the current microtask.
240    fn schedule_flush(&self) {
241        #[cfg(target_arch = "wasm32")]
242        {
243            if let Some(ref self_ref) = self.self_ref {
244                let self_ref_clone = self_ref.clone();
245                let pending_changes = self.pending_changes.clone();
246                let flush_scheduled = self.flush_scheduled.clone();
247
248                // Use queueMicrotask via Promise.resolve().then()
249                let closure = Closure::once(Box::new(move |_: JsValue| {
250                    *flush_scheduled.borrow_mut() = false;
251                    let changes: HashMap<TableId, HashSet<u64>> =
252                        pending_changes.borrow_mut().drain().collect();
253
254                    let registry = self_ref_clone.borrow();
255                    for (table_id, changed_ids) in changes {
256                        if let Some(queries) = registry.queries.get(&table_id) {
257                            for query in queries {
258                                query.borrow_mut().on_change(&changed_ids);
259                            }
260                        }
261                    }
262                }) as Box<dyn FnOnce(JsValue)>);
263
264                let promise = js_sys::Promise::resolve(&JsValue::UNDEFINED);
265                let _ = promise.then(&closure);
266                closure.forget();
267            }
268        }
269
270        #[cfg(not(target_arch = "wasm32"))]
271        {
272            // In non-WASM environment, flush immediately (for testing)
273            self.flush_sync();
274        }
275    }
276
277    /// Synchronous flush for testing in non-WASM environment.
278    #[cfg(not(target_arch = "wasm32"))]
279    fn flush_sync(&self) {
280        *self.flush_scheduled.borrow_mut() = false;
281        let changes: HashMap<TableId, HashSet<u64>> =
282            self.pending_changes.borrow_mut().drain().collect();
283
284        for (table_id, changed_ids) in changes {
285            if let Some(queries) = self.queries.get(&table_id) {
286                for query in queries {
287                    query.borrow_mut().on_change(&changed_ids);
288                }
289            }
290        }
291    }
292
293    /// Forces an immediate flush of all pending changes.
294    /// Useful for testing or when you need synchronous behcynos.
295    pub fn flush(&self) {
296        *self.flush_scheduled.borrow_mut() = false;
297        let changes: HashMap<TableId, HashSet<u64>> =
298            self.pending_changes.borrow_mut().drain().collect();
299
300        for (table_id, changed_ids) in changes {
301            if let Some(queries) = self.queries.get(&table_id) {
302                for query in queries {
303                    query.borrow_mut().on_change(&changed_ids);
304                }
305            }
306        }
307    }
308
309    /// Returns the number of registered queries.
310    pub fn query_count(&self) -> usize {
311        self.queries.values().map(|v| v.len()).sum()
312    }
313
314    /// Returns whether there are pending changes waiting to be flushed.
315    pub fn has_pending_changes(&self) -> bool {
316        !self.pending_changes.borrow().is_empty()
317    }
318}
319
320impl Default for QueryRegistry {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326/// JavaScript-friendly observable query wrapper.
327/// Uses re-query strategy for optimal performance with indexes.
328#[wasm_bindgen]
329pub struct JsObservableQuery {
330    inner: Rc<RefCell<ReQueryObservable>>,
331    schema: Table,
332    /// Optional projected column names. If Some, only these columns are returned.
333    projected_columns: Option<Vec<String>>,
334    /// Pre-computed binary layout for getResultBinary().
335    binary_layout: SchemaLayout,
336    /// Optional aggregate column names. If Some, this is an aggregate query.
337    aggregate_columns: Option<Vec<String>>,
338}
339
340impl JsObservableQuery {
341    pub(crate) fn new(
342        inner: Rc<RefCell<ReQueryObservable>>,
343        schema: Table,
344        binary_layout: SchemaLayout,
345    ) -> Self {
346        Self { inner, schema, projected_columns: None, binary_layout, aggregate_columns: None }
347    }
348
349    pub(crate) fn new_with_projection(
350        inner: Rc<RefCell<ReQueryObservable>>,
351        schema: Table,
352        projected_columns: Vec<String>,
353        binary_layout: SchemaLayout,
354    ) -> Self {
355        Self { inner, schema, projected_columns: Some(projected_columns), binary_layout, aggregate_columns: None }
356    }
357
358    pub(crate) fn new_with_aggregates(
359        inner: Rc<RefCell<ReQueryObservable>>,
360        schema: Table,
361        aggregate_columns: Vec<String>,
362        binary_layout: SchemaLayout,
363    ) -> Self {
364        Self { inner, schema, projected_columns: None, binary_layout, aggregate_columns: Some(aggregate_columns) }
365    }
366
367    /// Get the inner observable for creating JsChangesStream.
368    pub(crate) fn inner(&self) -> Rc<RefCell<ReQueryObservable>> {
369        self.inner.clone()
370    }
371
372    /// Get the schema.
373    pub(crate) fn schema(&self) -> &Table {
374        &self.schema
375    }
376
377    /// Get the projected columns.
378    pub(crate) fn projected_columns(&self) -> Option<&Vec<String>> {
379        self.projected_columns.as_ref()
380    }
381
382    /// Get the aggregate columns.
383    pub(crate) fn aggregate_columns(&self) -> Option<&Vec<String>> {
384        self.aggregate_columns.as_ref()
385    }
386}
387
388#[wasm_bindgen]
389impl JsObservableQuery {
390    /// Subscribes to query changes.
391    ///
392    /// The callback receives the complete current result set as a JavaScript array.
393    /// It is called whenever data changes (not immediately - use getResult for initial data).
394    /// Returns an unsubscribe function.
395    pub fn subscribe(&mut self, callback: js_sys::Function) -> js_sys::Function {
396        let schema = self.schema.clone();
397        let projected_columns = self.projected_columns.clone();
398        let aggregate_columns = self.aggregate_columns.clone();
399
400        let sub_id = self.inner.borrow_mut().subscribe(move |rows| {
401            let current_data = if let Some(ref cols) = aggregate_columns {
402                projected_rows_to_js_array(rows, cols)
403            } else if let Some(ref cols) = projected_columns {
404                projected_rows_to_js_array(rows, cols)
405            } else {
406                rows_to_js_array(rows, &schema)
407            };
408            callback.call1(&JsValue::NULL, &current_data).ok();
409        });
410
411        // Create unsubscribe function
412        let inner_unsub = self.inner.clone();
413        let unsubscribe = Closure::once_into_js(move || {
414            inner_unsub.borrow_mut().unsubscribe(sub_id);
415        });
416
417        unsubscribe.unchecked_into()
418    }
419
420    /// Returns the current result as a JavaScript array.
421    #[wasm_bindgen(js_name = getResult)]
422    pub fn get_result(&self) -> JsValue {
423        let inner = self.inner.borrow();
424        if let Some(ref cols) = self.aggregate_columns {
425            projected_rows_to_js_array(inner.result(), cols)
426        } else if let Some(ref cols) = self.projected_columns {
427            projected_rows_to_js_array(inner.result(), cols)
428        } else {
429            rows_to_js_array(inner.result(), &self.schema)
430        }
431    }
432
433    /// Returns the current result as a binary buffer for zero-copy access.
434    #[wasm_bindgen(js_name = getResultBinary)]
435    pub fn get_result_binary(&self) -> BinaryResult {
436        let inner = self.inner.borrow();
437        let rows = inner.result();
438        let mut encoder = BinaryEncoder::new(self.binary_layout.clone(), rows.len());
439        encoder.encode_rows(rows);
440        BinaryResult::new(encoder.finish())
441    }
442
443    /// Returns the schema layout for decoding binary results.
444    #[wasm_bindgen(js_name = getSchemaLayout)]
445    pub fn get_schema_layout(&self) -> SchemaLayout {
446        self.binary_layout.clone()
447    }
448
449    /// Returns the number of rows in the result.
450    #[wasm_bindgen(getter)]
451    pub fn length(&self) -> usize {
452        self.inner.borrow().len()
453    }
454
455    /// Returns whether the result is empty.
456    #[wasm_bindgen(js_name = isEmpty)]
457    pub fn is_empty(&self) -> bool {
458        self.inner.borrow().is_empty()
459    }
460
461    /// Returns the number of active subscriptions.
462    #[wasm_bindgen(js_name = subscriptionCount)]
463    pub fn subscription_count(&self) -> usize {
464        self.inner.borrow().subscription_count()
465    }
466}
467
468/// JavaScript-friendly changes stream.
469///
470/// This provides the `changes()` API that yields the complete result set
471/// whenever data changes. The callback receives the full current data,
472/// not incremental changes - perfect for React's setState pattern.
473#[wasm_bindgen]
474pub struct JsChangesStream {
475    inner: Rc<RefCell<ReQueryObservable>>,
476    schema: Table,
477    /// Optional projected column names. If Some, only these columns are returned.
478    projected_columns: Option<Vec<String>>,
479    /// Pre-computed binary layout for getResultBinary().
480    binary_layout: SchemaLayout,
481}
482
483impl JsChangesStream {
484    pub(crate) fn from_observable(observable: JsObservableQuery) -> Self {
485        Self {
486            inner: observable.inner(),
487            schema: observable.schema().clone(),
488            projected_columns: observable.projected_columns().cloned(),
489            binary_layout: observable.binary_layout.clone(),
490        }
491    }
492}
493
494#[wasm_bindgen]
495impl JsChangesStream {
496    /// Subscribes to the changes stream.
497    ///
498    /// The callback receives the complete current result set as a JavaScript array.
499    /// It is called immediately with the initial data, and again whenever data changes.
500    /// Perfect for React: `stream.subscribe(data => setUsers(data))`
501    ///
502    /// Returns an unsubscribe function.
503    pub fn subscribe(&self, callback: js_sys::Function) -> js_sys::Function {
504        let schema = self.schema.clone();
505        let inner = self.inner.clone();
506        let projected_columns = self.projected_columns.clone();
507
508        // Emit initial value immediately
509        let initial_data = if let Some(ref cols) = projected_columns {
510            projected_rows_to_js_array(inner.borrow().result(), cols)
511        } else {
512            rows_to_js_array(inner.borrow().result(), &schema)
513        };
514        callback.call1(&JsValue::NULL, &initial_data).ok();
515
516        // Subscribe to subsequent changes
517        let schema_clone = schema.clone();
518        let projected_columns_clone = projected_columns.clone();
519        let sub_id = inner.borrow_mut().subscribe(move |rows| {
520            let current_data = if let Some(ref cols) = projected_columns_clone {
521                projected_rows_to_js_array(rows, cols)
522            } else {
523                rows_to_js_array(rows, &schema_clone)
524            };
525            callback.call1(&JsValue::NULL, &current_data).ok();
526        });
527
528        // Create unsubscribe function
529        let unsubscribe = Closure::once_into_js(move || {
530            inner.borrow_mut().unsubscribe(sub_id);
531        });
532
533        unsubscribe.unchecked_into()
534    }
535
536    /// Returns the current result.
537    #[wasm_bindgen(js_name = getResult)]
538    pub fn get_result(&self) -> JsValue {
539        let inner = self.inner.borrow();
540        if let Some(ref cols) = self.projected_columns {
541            projected_rows_to_js_array(inner.result(), cols)
542        } else {
543            rows_to_js_array(inner.result(), &self.schema)
544        }
545    }
546
547    /// Returns the current result as a binary buffer for zero-copy access.
548    #[wasm_bindgen(js_name = getResultBinary)]
549    pub fn get_result_binary(&self) -> BinaryResult {
550        let inner = self.inner.borrow();
551        let rows = inner.result();
552        let mut encoder = BinaryEncoder::new(self.binary_layout.clone(), rows.len());
553        encoder.encode_rows(rows);
554        BinaryResult::new(encoder.finish())
555    }
556
557    /// Returns the schema layout for decoding binary results.
558    #[wasm_bindgen(js_name = getSchemaLayout)]
559    pub fn get_schema_layout(&self) -> SchemaLayout {
560        self.binary_layout.clone()
561    }
562}
563
564/// Converts rows to a JavaScript array.
565fn rows_to_js_array(rows: &[Rc<Row>], schema: &Table) -> JsValue {
566    let arr = js_sys::Array::new_with_length(rows.len() as u32);
567    for (i, row) in rows.iter().enumerate() {
568        arr.set(i as u32, row_to_js(row, schema));
569    }
570    arr.into()
571}
572
573/// Converts projected rows to a JavaScript array.
574/// Only includes the specified columns in the output.
575fn projected_rows_to_js_array(rows: &[Rc<Row>], column_names: &[String]) -> JsValue {
576    let arr = js_sys::Array::new_with_length(rows.len() as u32);
577    for (i, row) in rows.iter().enumerate() {
578        let obj = js_sys::Object::new();
579        for (col_idx, col_name) in column_names.iter().enumerate() {
580            if let Some(value) = row.get(col_idx) {
581                let js_val = value_to_js(value);
582                js_sys::Reflect::set(&obj, &JsValue::from_str(col_name), &js_val).ok();
583            }
584        }
585        arr.set(i as u32, obj.into());
586    }
587    arr.into()
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use cynos_core::schema::TableBuilder;
594    use cynos_core::{DataType, Value};
595    use wasm_bindgen_test::*;
596
597    wasm_bindgen_test_configure!(run_in_browser);
598
599    fn test_schema() -> Table {
600        TableBuilder::new("users")
601            .unwrap()
602            .add_column("id", DataType::Int64)
603            .unwrap()
604            .add_column("name", DataType::String)
605            .unwrap()
606            .add_column("age", DataType::Int32)
607            .unwrap()
608            .add_primary_key(&["id"], false)
609            .unwrap()
610            .build()
611            .unwrap()
612    }
613
614    fn make_row(id: u64, name: &str, age: i32) -> Row {
615        Row::new(
616            id,
617            alloc::vec![
618                Value::Int64(id as i64),
619                Value::String(name.into()),
620                Value::Int32(age),
621            ],
622        )
623    }
624
625    #[wasm_bindgen_test]
626    fn test_query_registry_new() {
627        let registry = QueryRegistry::new();
628        assert_eq!(registry.query_count(), 0);
629    }
630
631    #[wasm_bindgen_test]
632    fn test_rows_to_js_array() {
633        let schema = test_schema();
634        let rows: Vec<Rc<Row>> = alloc::vec![
635            Rc::new(make_row(1, "Alice", 25)),
636            Rc::new(make_row(2, "Bob", 30)),
637        ];
638
639        let js = rows_to_js_array(&rows, &schema);
640        let arr = js_sys::Array::from(&js);
641        assert_eq!(arr.length(), 2);
642    }
643}