1use crate::convert::{row_to_js, value_to_js};
12use crate::binary_protocol::{BinaryEncoder, BinaryResult, SchemaLayout};
13use crate::query_engine::execute_physical_plan;
14use cynos_storage::TableCache;
15use alloc::string::String;
16use alloc::boxed::Box;
17use alloc::rc::Rc;
18use alloc::vec::Vec;
19use cynos_core::schema::Table;
20use cynos_core::Row;
21use cynos_query::planner::PhysicalPlan;
22use cynos_reactive::TableId;
23use core::cell::RefCell;
24use hashbrown::{HashMap, HashSet};
25use wasm_bindgen::prelude::*;
26
27pub struct ReQueryObservable {
31 physical_plan: PhysicalPlan,
33 cache: Rc<RefCell<TableCache>>,
35 result: Vec<Rc<Row>>,
37 subscriptions: Vec<(usize, Box<dyn Fn(&[Rc<Row>]) + 'static>)>,
39 next_sub_id: usize,
41 is_join_query: bool,
43}
44
45impl ReQueryObservable {
46 pub fn new(
48 physical_plan: PhysicalPlan,
49 cache: Rc<RefCell<TableCache>>,
50 initial_result: Vec<Rc<Row>>,
51 ) -> Self {
52 let is_join_query = initial_result.first().map(|r| r.is_dummy()).unwrap_or(false);
54 Self {
55 physical_plan,
56 cache,
57 result: initial_result,
58 subscriptions: Vec::new(),
59 next_sub_id: 0,
60 is_join_query,
61 }
62 }
63
64 pub fn result(&self) -> &[Rc<Row>] {
66 &self.result
67 }
68
69 pub fn len(&self) -> usize {
71 self.result.len()
72 }
73
74 pub fn is_empty(&self) -> bool {
76 self.result.is_empty()
77 }
78
79 pub fn subscribe<F: Fn(&[Rc<Row>]) + 'static>(&mut self, callback: F) -> usize {
81 let id = self.next_sub_id;
82 self.next_sub_id += 1;
83 self.subscriptions.push((id, Box::new(callback)));
84 id
85 }
86
87 pub fn unsubscribe(&mut self, id: usize) -> bool {
89 let len_before = self.subscriptions.len();
90 self.subscriptions.retain(|(sub_id, _)| *sub_id != id);
91 self.subscriptions.len() < len_before
92 }
93
94 pub fn subscription_count(&self) -> usize {
96 self.subscriptions.len()
97 }
98
99 pub fn on_change(&mut self, changed_ids: &HashSet<u64>) {
106 if self.subscriptions.is_empty() {
108 return;
109 }
110
111 let cache = self.cache.borrow();
113
114 match execute_physical_plan(&cache, &self.physical_plan) {
115 Ok(new_result) => {
116 if !Self::results_equal(&self.result, &new_result, changed_ids, self.is_join_query) {
118 self.result = new_result;
119 for (_, callback) in &self.subscriptions {
121 callback(&self.result);
122 }
123 }
124 }
125 Err(_) => {
126 }
128 }
129 }
130
131 fn results_equal(old: &[Rc<Row>], new: &[Rc<Row>], changed_ids: &HashSet<u64>, is_join_query: bool) -> bool {
135 use cynos_core::DUMMY_ROW_ID;
136
137 if old.len() != new.len() {
139 return false;
140 }
141
142 if old.is_empty() {
144 return true;
145 }
146
147 let is_aggregate_result = old.first().map(|r| r.id() == DUMMY_ROW_ID).unwrap_or(false);
149
150 if is_join_query || is_aggregate_result {
151 for (old_row, new_row) in old.iter().zip(new.iter()) {
154 if old_row.version() != new_row.version() {
155 return false;
156 }
157 }
158 } else {
159 let ids_match = old.iter().zip(new.iter()).all(|(a, b)| a.id() == b.id());
162 if !ids_match {
163 return false;
164 }
165
166 for (old_row, new_row) in old.iter().zip(new.iter()) {
168 if changed_ids.contains(&old_row.id()) {
169 if old_row.version() != new_row.version() {
170 return false;
171 }
172 }
173 }
174 }
175
176 true
177 }
178}
179
180pub struct QueryRegistry {
183 queries: HashMap<TableId, Vec<Rc<RefCell<ReQueryObservable>>>>,
185 pending_changes: Rc<RefCell<HashMap<TableId, HashSet<u64>>>>,
187 flush_scheduled: Rc<RefCell<bool>>,
189 self_ref: Option<Rc<RefCell<QueryRegistry>>>,
191}
192
193impl QueryRegistry {
194 pub fn new() -> Self {
196 Self {
197 queries: HashMap::new(),
198 pending_changes: Rc::new(RefCell::new(HashMap::new())),
199 flush_scheduled: Rc::new(RefCell::new(false)),
200 self_ref: None,
201 }
202 }
203
204 pub fn set_self_ref(&mut self, self_ref: Rc<RefCell<QueryRegistry>>) {
207 self.self_ref = Some(self_ref);
208 }
209
210 pub fn register(&mut self, query: Rc<RefCell<ReQueryObservable>>, table_id: TableId) {
212 self.queries
213 .entry(table_id)
214 .or_insert_with(Vec::new)
215 .push(query);
216 }
217
218 pub fn on_table_change(&mut self, table_id: TableId, changed_ids: &HashSet<u64>) {
221 {
223 let mut pending = self.pending_changes.borrow_mut();
224 pending
225 .entry(table_id)
226 .or_insert_with(HashSet::new)
227 .extend(changed_ids.iter().copied());
228 }
229
230 let mut scheduled = self.flush_scheduled.borrow_mut();
232 if !*scheduled {
233 *scheduled = true;
234 drop(scheduled);
235 self.schedule_flush();
236 }
237 }
238
239 fn schedule_flush(&self) {
241 #[cfg(target_arch = "wasm32")]
242 {
243 if let Some(ref self_ref) = self.self_ref {
244 let self_ref_clone = self_ref.clone();
245 let pending_changes = self.pending_changes.clone();
246 let flush_scheduled = self.flush_scheduled.clone();
247
248 let closure = Closure::once(Box::new(move |_: JsValue| {
250 *flush_scheduled.borrow_mut() = false;
251 let changes: HashMap<TableId, HashSet<u64>> =
252 pending_changes.borrow_mut().drain().collect();
253
254 let registry = self_ref_clone.borrow();
255 for (table_id, changed_ids) in changes {
256 if let Some(queries) = registry.queries.get(&table_id) {
257 for query in queries {
258 query.borrow_mut().on_change(&changed_ids);
259 }
260 }
261 }
262 }) as Box<dyn FnOnce(JsValue)>);
263
264 let promise = js_sys::Promise::resolve(&JsValue::UNDEFINED);
265 let _ = promise.then(&closure);
266 closure.forget();
267 }
268 }
269
270 #[cfg(not(target_arch = "wasm32"))]
271 {
272 self.flush_sync();
274 }
275 }
276
277 #[cfg(not(target_arch = "wasm32"))]
279 fn flush_sync(&self) {
280 *self.flush_scheduled.borrow_mut() = false;
281 let changes: HashMap<TableId, HashSet<u64>> =
282 self.pending_changes.borrow_mut().drain().collect();
283
284 for (table_id, changed_ids) in changes {
285 if let Some(queries) = self.queries.get(&table_id) {
286 for query in queries {
287 query.borrow_mut().on_change(&changed_ids);
288 }
289 }
290 }
291 }
292
293 pub fn flush(&self) {
296 *self.flush_scheduled.borrow_mut() = false;
297 let changes: HashMap<TableId, HashSet<u64>> =
298 self.pending_changes.borrow_mut().drain().collect();
299
300 for (table_id, changed_ids) in changes {
301 if let Some(queries) = self.queries.get(&table_id) {
302 for query in queries {
303 query.borrow_mut().on_change(&changed_ids);
304 }
305 }
306 }
307 }
308
309 pub fn query_count(&self) -> usize {
311 self.queries.values().map(|v| v.len()).sum()
312 }
313
314 pub fn has_pending_changes(&self) -> bool {
316 !self.pending_changes.borrow().is_empty()
317 }
318}
319
320impl Default for QueryRegistry {
321 fn default() -> Self {
322 Self::new()
323 }
324}
325
326#[wasm_bindgen]
329pub struct JsObservableQuery {
330 inner: Rc<RefCell<ReQueryObservable>>,
331 schema: Table,
332 projected_columns: Option<Vec<String>>,
334 binary_layout: SchemaLayout,
336 aggregate_columns: Option<Vec<String>>,
338}
339
340impl JsObservableQuery {
341 pub(crate) fn new(
342 inner: Rc<RefCell<ReQueryObservable>>,
343 schema: Table,
344 binary_layout: SchemaLayout,
345 ) -> Self {
346 Self { inner, schema, projected_columns: None, binary_layout, aggregate_columns: None }
347 }
348
349 pub(crate) fn new_with_projection(
350 inner: Rc<RefCell<ReQueryObservable>>,
351 schema: Table,
352 projected_columns: Vec<String>,
353 binary_layout: SchemaLayout,
354 ) -> Self {
355 Self { inner, schema, projected_columns: Some(projected_columns), binary_layout, aggregate_columns: None }
356 }
357
358 pub(crate) fn new_with_aggregates(
359 inner: Rc<RefCell<ReQueryObservable>>,
360 schema: Table,
361 aggregate_columns: Vec<String>,
362 binary_layout: SchemaLayout,
363 ) -> Self {
364 Self { inner, schema, projected_columns: None, binary_layout, aggregate_columns: Some(aggregate_columns) }
365 }
366
367 pub(crate) fn inner(&self) -> Rc<RefCell<ReQueryObservable>> {
369 self.inner.clone()
370 }
371
372 pub(crate) fn schema(&self) -> &Table {
374 &self.schema
375 }
376
377 pub(crate) fn projected_columns(&self) -> Option<&Vec<String>> {
379 self.projected_columns.as_ref()
380 }
381
382 pub(crate) fn aggregate_columns(&self) -> Option<&Vec<String>> {
384 self.aggregate_columns.as_ref()
385 }
386}
387
388#[wasm_bindgen]
389impl JsObservableQuery {
390 pub fn subscribe(&mut self, callback: js_sys::Function) -> js_sys::Function {
396 let schema = self.schema.clone();
397 let projected_columns = self.projected_columns.clone();
398 let aggregate_columns = self.aggregate_columns.clone();
399
400 let sub_id = self.inner.borrow_mut().subscribe(move |rows| {
401 let current_data = if let Some(ref cols) = aggregate_columns {
402 projected_rows_to_js_array(rows, cols)
403 } else if let Some(ref cols) = projected_columns {
404 projected_rows_to_js_array(rows, cols)
405 } else {
406 rows_to_js_array(rows, &schema)
407 };
408 callback.call1(&JsValue::NULL, ¤t_data).ok();
409 });
410
411 let inner_unsub = self.inner.clone();
413 let unsubscribe = Closure::once_into_js(move || {
414 inner_unsub.borrow_mut().unsubscribe(sub_id);
415 });
416
417 unsubscribe.unchecked_into()
418 }
419
420 #[wasm_bindgen(js_name = getResult)]
422 pub fn get_result(&self) -> JsValue {
423 let inner = self.inner.borrow();
424 if let Some(ref cols) = self.aggregate_columns {
425 projected_rows_to_js_array(inner.result(), cols)
426 } else if let Some(ref cols) = self.projected_columns {
427 projected_rows_to_js_array(inner.result(), cols)
428 } else {
429 rows_to_js_array(inner.result(), &self.schema)
430 }
431 }
432
433 #[wasm_bindgen(js_name = getResultBinary)]
435 pub fn get_result_binary(&self) -> BinaryResult {
436 let inner = self.inner.borrow();
437 let rows = inner.result();
438 let mut encoder = BinaryEncoder::new(self.binary_layout.clone(), rows.len());
439 encoder.encode_rows(rows);
440 BinaryResult::new(encoder.finish())
441 }
442
443 #[wasm_bindgen(js_name = getSchemaLayout)]
445 pub fn get_schema_layout(&self) -> SchemaLayout {
446 self.binary_layout.clone()
447 }
448
449 #[wasm_bindgen(getter)]
451 pub fn length(&self) -> usize {
452 self.inner.borrow().len()
453 }
454
455 #[wasm_bindgen(js_name = isEmpty)]
457 pub fn is_empty(&self) -> bool {
458 self.inner.borrow().is_empty()
459 }
460
461 #[wasm_bindgen(js_name = subscriptionCount)]
463 pub fn subscription_count(&self) -> usize {
464 self.inner.borrow().subscription_count()
465 }
466}
467
468#[wasm_bindgen]
474pub struct JsChangesStream {
475 inner: Rc<RefCell<ReQueryObservable>>,
476 schema: Table,
477 projected_columns: Option<Vec<String>>,
479 binary_layout: SchemaLayout,
481}
482
483impl JsChangesStream {
484 pub(crate) fn from_observable(observable: JsObservableQuery) -> Self {
485 Self {
486 inner: observable.inner(),
487 schema: observable.schema().clone(),
488 projected_columns: observable.projected_columns().cloned(),
489 binary_layout: observable.binary_layout.clone(),
490 }
491 }
492}
493
494#[wasm_bindgen]
495impl JsChangesStream {
496 pub fn subscribe(&self, callback: js_sys::Function) -> js_sys::Function {
504 let schema = self.schema.clone();
505 let inner = self.inner.clone();
506 let projected_columns = self.projected_columns.clone();
507
508 let initial_data = if let Some(ref cols) = projected_columns {
510 projected_rows_to_js_array(inner.borrow().result(), cols)
511 } else {
512 rows_to_js_array(inner.borrow().result(), &schema)
513 };
514 callback.call1(&JsValue::NULL, &initial_data).ok();
515
516 let schema_clone = schema.clone();
518 let projected_columns_clone = projected_columns.clone();
519 let sub_id = inner.borrow_mut().subscribe(move |rows| {
520 let current_data = if let Some(ref cols) = projected_columns_clone {
521 projected_rows_to_js_array(rows, cols)
522 } else {
523 rows_to_js_array(rows, &schema_clone)
524 };
525 callback.call1(&JsValue::NULL, ¤t_data).ok();
526 });
527
528 let unsubscribe = Closure::once_into_js(move || {
530 inner.borrow_mut().unsubscribe(sub_id);
531 });
532
533 unsubscribe.unchecked_into()
534 }
535
536 #[wasm_bindgen(js_name = getResult)]
538 pub fn get_result(&self) -> JsValue {
539 let inner = self.inner.borrow();
540 if let Some(ref cols) = self.projected_columns {
541 projected_rows_to_js_array(inner.result(), cols)
542 } else {
543 rows_to_js_array(inner.result(), &self.schema)
544 }
545 }
546
547 #[wasm_bindgen(js_name = getResultBinary)]
549 pub fn get_result_binary(&self) -> BinaryResult {
550 let inner = self.inner.borrow();
551 let rows = inner.result();
552 let mut encoder = BinaryEncoder::new(self.binary_layout.clone(), rows.len());
553 encoder.encode_rows(rows);
554 BinaryResult::new(encoder.finish())
555 }
556
557 #[wasm_bindgen(js_name = getSchemaLayout)]
559 pub fn get_schema_layout(&self) -> SchemaLayout {
560 self.binary_layout.clone()
561 }
562}
563
564fn rows_to_js_array(rows: &[Rc<Row>], schema: &Table) -> JsValue {
566 let arr = js_sys::Array::new_with_length(rows.len() as u32);
567 for (i, row) in rows.iter().enumerate() {
568 arr.set(i as u32, row_to_js(row, schema));
569 }
570 arr.into()
571}
572
573fn projected_rows_to_js_array(rows: &[Rc<Row>], column_names: &[String]) -> JsValue {
576 let arr = js_sys::Array::new_with_length(rows.len() as u32);
577 for (i, row) in rows.iter().enumerate() {
578 let obj = js_sys::Object::new();
579 for (col_idx, col_name) in column_names.iter().enumerate() {
580 if let Some(value) = row.get(col_idx) {
581 let js_val = value_to_js(value);
582 js_sys::Reflect::set(&obj, &JsValue::from_str(col_name), &js_val).ok();
583 }
584 }
585 arr.set(i as u32, obj.into());
586 }
587 arr.into()
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use cynos_core::schema::TableBuilder;
594 use cynos_core::{DataType, Value};
595 use wasm_bindgen_test::*;
596
597 wasm_bindgen_test_configure!(run_in_browser);
598
599 fn test_schema() -> Table {
600 TableBuilder::new("users")
601 .unwrap()
602 .add_column("id", DataType::Int64)
603 .unwrap()
604 .add_column("name", DataType::String)
605 .unwrap()
606 .add_column("age", DataType::Int32)
607 .unwrap()
608 .add_primary_key(&["id"], false)
609 .unwrap()
610 .build()
611 .unwrap()
612 }
613
614 fn make_row(id: u64, name: &str, age: i32) -> Row {
615 Row::new(
616 id,
617 alloc::vec![
618 Value::Int64(id as i64),
619 Value::String(name.into()),
620 Value::Int32(age),
621 ],
622 )
623 }
624
625 #[wasm_bindgen_test]
626 fn test_query_registry_new() {
627 let registry = QueryRegistry::new();
628 assert_eq!(registry.query_count(), 0);
629 }
630
631 #[wasm_bindgen_test]
632 fn test_rows_to_js_array() {
633 let schema = test_schema();
634 let rows: Vec<Rc<Row>> = alloc::vec![
635 Rc::new(make_row(1, "Alice", 25)),
636 Rc::new(make_row(2, "Bob", 30)),
637 ];
638
639 let js = rows_to_js_array(&rows, &schema);
640 let arr = js_sys::Array::from(&js);
641 assert_eq!(arr.length(), 2);
642 }
643}