1use crate::binary_protocol::{SchemaLayout, SchemaLayoutCache};
7use crate::convert::{js_array_to_rows, js_to_value, joined_rows_to_js_array, projected_rows_to_js_array, rows_to_js_array};
8use crate::expr::{Expr, ExprInner};
9use crate::query_engine::{compile_plan, execute_physical_plan, execute_plan, explain_plan};
10use crate::reactive_bridge::{JsChangesStream, JsObservableQuery, QueryRegistry, ReQueryObservable};
11use cynos_storage::TableCache;
12use crate::JsSortOrder;
13use alloc::boxed::Box;
14use alloc::rc::Rc;
15use alloc::string::{String, ToString};
16use alloc::vec::Vec;
17use cynos_core::schema::Table;
18use cynos_core::{reserve_row_ids, DataType, Row, Value};
19use cynos_incremental::Delta;
20use cynos_query::ast::{AggregateFunc, SortOrder};
21use cynos_query::plan_cache::{compute_plan_fingerprint, PlanCache};
22use cynos_query::planner::LogicalPlan;
23use cynos_reactive::TableId;
24use core::cell::RefCell;
25use wasm_bindgen::prelude::*;
26
27#[wasm_bindgen]
29pub struct SelectBuilder {
30 cache: Rc<RefCell<TableCache>>,
31 query_registry: Rc<RefCell<QueryRegistry>>,
32 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
33 schema_layout_cache: Rc<RefCell<SchemaLayoutCache>>,
34 plan_cache: Rc<RefCell<PlanCache>>,
35 columns: JsValue,
36 from_table: Option<String>,
37 where_clause: Option<Expr>,
38 order_by: Vec<(String, SortOrder)>,
39 limit_val: Option<usize>,
40 offset_val: Option<usize>,
41 joins: Vec<JoinClause>,
42 group_by_cols: Vec<String>,
43 aggregates: Vec<(AggregateFunc, Option<String>)>, }
45
46#[derive(Clone, Debug)]
47#[allow(dead_code)]
48struct JoinClause {
49 table: String, alias: Option<String>, condition: Expr,
52 join_type: JoinType,
53}
54
55impl JoinClause {
56 fn reference_name(&self) -> &str {
58 self.alias.as_deref().unwrap_or(&self.table)
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
63#[allow(dead_code)]
64enum JoinType {
65 Inner,
66 Left,
67 Right,
68}
69
70impl SelectBuilder {
71 pub(crate) fn new(
72 cache: Rc<RefCell<TableCache>>,
73 query_registry: Rc<RefCell<QueryRegistry>>,
74 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
75 schema_layout_cache: Rc<RefCell<SchemaLayoutCache>>,
76 plan_cache: Rc<RefCell<PlanCache>>,
77 columns: JsValue,
78 ) -> Self {
79 Self {
80 cache,
81 query_registry,
82 table_id_map,
83 schema_layout_cache,
84 plan_cache,
85 columns,
86 from_table: None,
87 where_clause: None,
88 order_by: Vec::new(),
89 limit_val: None,
90 offset_val: None,
91 joins: Vec::new(),
92 group_by_cols: Vec::new(),
93 aggregates: Vec::new(),
94 }
95 }
96
97 fn get_schema(&self) -> Option<Table> {
98 self.from_table.as_ref().and_then(|name| {
99 self.cache
100 .borrow()
101 .get_table(name)
102 .map(|s| s.schema().clone())
103 })
104 }
105
106 fn get_column_info_any_table(&self, col_name: &str) -> Option<(String, usize, DataType)> {
109 if let Some(dot_pos) = col_name.find('.') {
111 let table_part = &col_name[..dot_pos];
112 let col_part = &col_name[dot_pos + 1..];
113
114 if let Some(main_table) = &self.from_table {
116 if main_table == table_part {
117 if let Some(schema) = self.get_schema() {
118 if let Some(col) = schema.get_column(col_part) {
119 return Some((table_part.to_string(), col.index(), col.data_type()));
120 }
121 }
122 }
123 }
124
125 for join in &self.joins {
127 let ref_name = join.reference_name();
128 if ref_name == table_part {
129 if let Some(store) = self.cache.borrow().get_table(&join.table) {
131 if let Some(col) = store.schema().get_column(col_part) {
132 return Some((ref_name.to_string(), col.index(), col.data_type()));
134 }
135 }
136 }
137 }
138
139 if let Some(info) = self.cache
141 .borrow()
142 .get_table(table_part)
143 .and_then(|store| {
144 store.schema().get_column(col_part).map(|c| (table_part.to_string(), c.index(), c.data_type()))
145 })
146 {
147 return Some(info);
148 }
149 }
150
151 if let Some(table_name) = &self.from_table {
153 if let Some(schema) = self.get_schema() {
154 if let Some(col) = schema.get_column(col_name) {
155 return Some((table_name.clone(), col.index(), col.data_type()));
156 }
157 }
158 }
159
160 for join in &self.joins {
162 if let Some(info) = self.cache
163 .borrow()
164 .get_table(&join.table)
165 .and_then(|store| {
166 store.schema().get_column(col_name).map(|c| (join.reference_name().to_string(), c.index(), c.data_type()))
167 })
168 {
169 return Some(info);
170 }
171 }
172
173 None
174 }
175
176 fn parse_columns(&self) -> Option<Vec<String>> {
179 if self.columns.is_undefined() || self.columns.is_null() {
180 return None;
181 }
182
183 if let Some(arr) = self.columns.dyn_ref::<js_sys::Array>() {
184 if arr.length() == 0 {
185 return None; }
187
188 let first = arr.get(0);
191 if let Some(inner_arr) = first.dyn_ref::<js_sys::Array>() {
192 let cols: Vec<String> = inner_arr.iter().filter_map(|v| v.as_string()).collect();
194 if cols.is_empty() {
195 return None;
196 } else if cols.len() == 1 && cols[0] == "*" {
197 return None; } else {
199 return Some(cols);
200 }
201 }
202
203 let cols: Vec<String> = arr.iter().filter_map(|v| v.as_string()).collect();
205 if cols.is_empty() {
206 None
207 } else if cols.len() == 1 && cols[0] == "*" {
208 None } else {
210 Some(cols)
211 }
212 } else if let Some(s) = self.columns.as_string() {
213 if s == "*" {
214 None } else {
216 Some(alloc::vec![s])
217 }
218 } else {
219 None
220 }
221 }
222
223 fn build_logical_plan(&self, table_name: &str) -> LogicalPlan {
225 let mut plan = LogicalPlan::Scan {
227 table: table_name.to_string(),
228 };
229
230 let mut table_offsets: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
233
234 if let Some(schema) = self.get_schema() {
236 table_offsets.insert(table_name.to_string(), 0);
237 let mut current_offset = schema.columns().len();
238
239 for join in &self.joins {
241 let right_plan = LogicalPlan::Scan {
242 table: join.table.clone(),
243 };
244
245 let ref_name = join.reference_name().to_string();
247 table_offsets.insert(ref_name.clone(), current_offset);
248
249 let get_col_info = |name: &str| self.get_column_info_for_join_with_offsets_alias(name, join, &table_offsets);
251 let ast_condition = join.condition.to_ast_with_table(&get_col_info);
252
253 plan = match join.join_type {
254 JoinType::Inner => LogicalPlan::inner_join(plan, right_plan, ast_condition),
255 JoinType::Left => LogicalPlan::left_join(plan, right_plan, ast_condition),
256 JoinType::Right => {
257 LogicalPlan::left_join(right_plan, plan, ast_condition)
259 }
260 };
261
262 if let Some(store) = self.cache.borrow().get_table(&join.table) {
264 current_offset += store.schema().columns().len();
265 }
266 }
267 } else {
268 for join in &self.joins {
270 let right_plan = LogicalPlan::Scan {
271 table: join.table.clone(),
272 };
273
274 let get_col_info = |name: &str| self.get_column_info_for_join(name, &join.table);
275 let ast_condition = join.condition.to_ast_with_table(&get_col_info);
276
277 plan = match join.join_type {
278 JoinType::Inner => LogicalPlan::inner_join(plan, right_plan, ast_condition),
279 JoinType::Left => LogicalPlan::left_join(plan, right_plan, ast_condition),
280 JoinType::Right => {
281 LogicalPlan::left_join(right_plan, plan, ast_condition)
282 }
283 };
284 }
285 }
286
287 if let Some(ref predicate) = self.where_clause {
289 let get_col_info = |name: &str| {
292 self.get_column_info_any_table(name)
293 };
294 let ast_predicate = predicate.to_ast_with_table(&get_col_info);
295 plan = LogicalPlan::Filter {
296 input: Box::new(plan),
297 predicate: ast_predicate,
298 };
299 }
300
301 if !self.group_by_cols.is_empty() || !self.aggregates.is_empty() {
303 let group_by_exprs: Vec<_> = self.group_by_cols.iter().filter_map(|col| {
304 self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
305 let col_name = if let Some(dot_pos) = col.find('.') {
306 &col[dot_pos + 1..]
307 } else {
308 col.as_str()
309 };
310 cynos_query::ast::Expr::column(&tbl, col_name, idx)
311 })
312 }).collect();
313
314 let agg_exprs: Vec<_> = self.aggregates.iter().filter_map(|(func, col_opt)| {
315 if let Some(col) = col_opt {
316 self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
317 let col_name = if let Some(dot_pos) = col.find('.') {
318 &col[dot_pos + 1..]
319 } else {
320 col.as_str()
321 };
322 (*func, cynos_query::ast::Expr::column(&tbl, col_name, idx))
323 })
324 } else {
325 Some((*func, cynos_query::ast::Expr::literal(cynos_core::Value::Int64(1))))
327 }
328 }).collect();
329
330 plan = LogicalPlan::aggregate(plan, group_by_exprs, agg_exprs);
331 }
332
333 if !self.order_by.is_empty() {
335 let order_exprs: Vec<_> = self.order_by.iter().filter_map(|(col, order)| {
336 self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
338 let col_name = if let Some(dot_pos) = col.find('.') {
340 &col[dot_pos + 1..]
341 } else {
342 col.as_str()
343 };
344 (cynos_query::ast::Expr::column(&tbl, col_name, idx), *order)
345 })
346 }).collect();
347 plan = LogicalPlan::Sort {
348 input: Box::new(plan),
349 order_by: order_exprs,
350 };
351 }
352
353 if self.limit_val.is_some() || self.offset_val.is_some() {
356 plan = LogicalPlan::Limit {
357 input: Box::new(plan),
358 limit: self.limit_val.unwrap_or(1_000_000_000), offset: self.offset_val.unwrap_or(0),
360 };
361 }
362
363 if let Some(cols) = self.parse_columns() {
365 let project_exprs: Vec<_> = cols
366 .iter()
367 .filter_map(|col| {
368 self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
370 let col_name = if let Some(dot_pos) = col.find('.') {
372 &col[dot_pos + 1..]
373 } else {
374 col.as_str()
375 };
376 cynos_query::ast::Expr::column(&tbl, col_name, idx)
377 })
378 })
379 .collect();
380
381 if !project_exprs.is_empty() {
382 plan = LogicalPlan::Project {
383 input: Box::new(plan),
384 columns: project_exprs,
385 };
386 }
387 }
388
389 plan
390 }
391
392 fn get_column_info_for_projection(&self, col_name: &str) -> Option<(String, usize, DataType)> {
397 let (target_table, target_col) = if let Some(dot_pos) = col_name.find('.') {
399 (Some(&col_name[..dot_pos]), &col_name[dot_pos + 1..])
400 } else {
401 (None, col_name)
402 };
403
404 if let Some(main_table) = &self.from_table {
406 if let Some(schema) = self.get_schema() {
407 if target_table.is_none() || target_table == Some(main_table.as_str()) {
408 if let Some(col) = schema.get_column(target_col) {
409 return Some((main_table.clone(), col.index(), col.data_type()));
411 }
412 }
413 }
414 }
415
416 for join in &self.joins {
418 if let Some(store) = self.cache.borrow().get_table(&join.table) {
419 let schema = store.schema();
420 let ref_name = join.reference_name();
421 if target_table.is_none() || target_table == Some(ref_name) || target_table == Some(join.table.as_str()) {
423 if let Some(col) = schema.get_column(target_col) {
424 return Some((ref_name.to_string(), col.index(), col.data_type()));
426 }
427 }
428 }
429 }
430
431 None
432 }
433
434 fn create_projection_layout(&self, column_names: &[String]) -> crate::binary_protocol::SchemaLayout {
436 use crate::binary_protocol::{BinaryDataType, ColumnLayout, SchemaLayout};
437
438 let mut name_counts: hashbrown::HashMap<&str, usize> = hashbrown::HashMap::new();
440 for col_name in column_names {
441 let simple_name = if let Some(dot_pos) = col_name.find('.') {
442 &col_name[dot_pos + 1..]
443 } else {
444 col_name.as_str()
445 };
446 *name_counts.entry(simple_name).or_insert(0) += 1;
447 }
448
449 let mut columns: Vec<ColumnLayout> = Vec::new();
450 let mut offset = 0usize;
451
452 for name in column_names {
453 if let Some((_, _, data_type)) = self.get_column_info_any_table(name) {
455 let binary_type = BinaryDataType::from(data_type);
456 let fixed_size = binary_type.fixed_size();
457
458 let final_name = if let Some(dot_pos) = name.find('.') {
460 let simple_name = &name[dot_pos + 1..];
461 if name_counts.get(simple_name).copied().unwrap_or(0) > 1 {
462 name.clone()
464 } else {
465 simple_name.to_string()
467 }
468 } else {
469 name.clone()
470 };
471
472 columns.push(ColumnLayout {
473 name: final_name,
474 data_type: binary_type,
475 fixed_size,
476 is_nullable: true, offset,
478 });
479 offset += fixed_size;
480 }
481 }
482
483 let null_mask_size = (columns.len() + 7) / 8;
484 let data_size: usize = columns.iter().map(|c| c.fixed_size).sum();
485 let row_stride = null_mask_size + data_size;
486
487 SchemaLayout::new(columns, row_stride, null_mask_size)
488 }
489
490 fn get_column_info_for_join(&self, col_name: &str, join_table: &str) -> Option<(String, usize, DataType)> {
499 if let Some(dot_pos) = col_name.find('.') {
501 let table_part = &col_name[..dot_pos];
502 let col_part = &col_name[dot_pos + 1..];
503
504 if let Some(info) = self.cache
506 .borrow()
507 .get_table(table_part)
508 .and_then(|store| {
509 store.schema().get_column(col_part).map(|c| (table_part.to_string(), c.index(), c.data_type()))
510 })
511 {
512 return Some(info);
513 }
514
515 if let Some(table_name) = &self.from_table {
517 if table_name == table_part {
518 if let Some(schema) = self.get_schema() {
519 if let Some(col) = schema.get_column(col_part) {
520 return Some((table_name.clone(), col.index(), col.data_type()));
521 }
522 }
523 }
524 }
525 }
526
527 if let Some(info) = self.cache
529 .borrow()
530 .get_table(join_table)
531 .and_then(|store| {
532 store.schema().get_column(col_name).map(|c| (join_table.to_string(), c.index(), c.data_type()))
533 })
534 {
535 return Some(info);
536 }
537
538 if let Some(table_name) = &self.from_table {
540 if let Some(schema) = self.get_schema() {
541 if let Some(col) = schema.get_column(col_name) {
542 return Some((table_name.clone(), col.index(), col.data_type()));
543 }
544 }
545 }
546
547 None
548 }
549
550 fn get_column_info_for_join_with_offsets_alias(
554 &self,
555 col_name: &str,
556 current_join: &JoinClause,
557 table_offsets: &hashbrown::HashMap<String, usize>,
558 ) -> Option<(String, usize, DataType)> {
559 let current_ref_name = current_join.reference_name();
560
561 if let Some(dot_pos) = col_name.find('.') {
563 let table_part = &col_name[..dot_pos];
564 let col_part = &col_name[dot_pos + 1..];
565
566 if table_part == current_ref_name {
568 if let Some(store) = self.cache.borrow().get_table(¤t_join.table) {
569 if let Some(col) = store.schema().get_column(col_part) {
570 return Some((current_join.table.clone(), col.index(), col.data_type()));
573 }
574 }
575 }
576
577 if let Some(main_table) = &self.from_table {
579 if table_part == main_table {
580 if let Some(schema) = self.get_schema() {
581 if let Some(col) = schema.get_column(col_part) {
582 let offset = table_offsets.get(main_table).copied().unwrap_or(0);
583 return Some((table_part.to_string(), offset + col.index(), col.data_type()));
584 }
585 }
586 }
587 }
588
589 for join in &self.joins {
591 let ref_name = join.reference_name();
592 if table_part == ref_name && ref_name != current_ref_name {
593 if let Some(store) = self.cache.borrow().get_table(&join.table) {
594 if let Some(col) = store.schema().get_column(col_part) {
595 let offset = table_offsets.get(ref_name).copied().unwrap_or(0);
596 return Some((table_part.to_string(), offset + col.index(), col.data_type()));
597 }
598 }
599 }
600 }
601
602 if let Some(store) = self.cache.borrow().get_table(table_part) {
604 if let Some(col) = store.schema().get_column(col_part) {
605 let idx = if table_part == ¤t_join.table {
606 col.index()
607 } else {
608 let offset = table_offsets.get(table_part).copied().unwrap_or(0);
609 offset + col.index()
610 };
611 return Some((table_part.to_string(), idx, col.data_type()));
612 }
613 }
614 }
615
616 if let Some(store) = self.cache.borrow().get_table(¤t_join.table) {
618 if let Some(col) = store.schema().get_column(col_name) {
619 return Some((current_ref_name.to_string(), col.index(), col.data_type()));
621 }
622 }
623
624 if let Some(table_name) = &self.from_table {
626 let offset = table_offsets.get(table_name).copied().unwrap_or(0);
627 if let Some(schema) = self.get_schema() {
628 if let Some(col) = schema.get_column(col_name) {
629 return Some((table_name.clone(), offset + col.index(), col.data_type()));
630 }
631 }
632 }
633
634 None
635 }
636}
637
638#[wasm_bindgen]
639impl SelectBuilder {
640 pub fn from(mut self, table: &str) -> Self {
642 self.from_table = Some(table.to_string());
643 self
644 }
645
646 #[wasm_bindgen(js_name = "where")]
648 pub fn where_(mut self, predicate: &Expr) -> Self {
649 self.where_clause = Some(predicate.clone());
650 self
651 }
652
653 #[wasm_bindgen(js_name = orderBy)]
655 pub fn order_by(mut self, column: &str, order: JsSortOrder) -> Self {
656 self.order_by.push((column.to_string(), order.into()));
657 self
658 }
659
660 pub fn limit(mut self, n: usize) -> Self {
662 self.limit_val = Some(n);
663 self
664 }
665
666 pub fn offset(mut self, n: usize) -> Self {
668 self.offset_val = Some(n);
669 self
670 }
671
672 fn parse_table_spec(table_spec: &str) -> (String, Option<String>) {
675 let lower = table_spec.to_lowercase();
677 if let Some(pos) = lower.find(" as ") {
678 let table = table_spec[..pos].trim().to_string();
679 let alias = table_spec[pos + 4..].trim().to_string();
680 (table, Some(alias))
681 } else {
682 (table_spec.trim().to_string(), None)
683 }
684 }
685
686 #[wasm_bindgen(js_name = innerJoin)]
688 pub fn inner_join(mut self, table: &str, condition: &Expr) -> Self {
689 let (table_name, alias) = Self::parse_table_spec(table);
690 self.joins.push(JoinClause {
691 table: table_name,
692 alias,
693 condition: condition.clone(),
694 join_type: JoinType::Inner,
695 });
696 self
697 }
698
699 #[wasm_bindgen(js_name = leftJoin)]
701 pub fn left_join(mut self, table: &str, condition: &Expr) -> Self {
702 let (table_name, alias) = Self::parse_table_spec(table);
703 self.joins.push(JoinClause {
704 table: table_name,
705 alias,
706 condition: condition.clone(),
707 join_type: JoinType::Left,
708 });
709 self
710 }
711
712 #[wasm_bindgen(js_name = groupBy)]
714 pub fn group_by(mut self, columns: &JsValue) -> Self {
715 if let Some(arr) = columns.dyn_ref::<js_sys::Array>() {
716 self.group_by_cols = arr.iter().filter_map(|v| v.as_string()).collect();
717 } else if let Some(s) = columns.as_string() {
718 self.group_by_cols = alloc::vec![s];
719 }
720 self
721 }
722
723 #[wasm_bindgen(js_name = count)]
725 pub fn count(mut self) -> Self {
726 self.aggregates.push((AggregateFunc::Count, None));
727 self
728 }
729
730 #[wasm_bindgen(js_name = countCol)]
732 pub fn count_col(mut self, column: &str) -> Self {
733 self.aggregates.push((AggregateFunc::Count, Some(column.to_string())));
734 self
735 }
736
737 #[wasm_bindgen(js_name = sum)]
739 pub fn sum(mut self, column: &str) -> Self {
740 self.aggregates.push((AggregateFunc::Sum, Some(column.to_string())));
741 self
742 }
743
744 #[wasm_bindgen(js_name = avg)]
746 pub fn avg(mut self, column: &str) -> Self {
747 self.aggregates.push((AggregateFunc::Avg, Some(column.to_string())));
748 self
749 }
750
751 #[wasm_bindgen(js_name = min)]
753 pub fn min(mut self, column: &str) -> Self {
754 self.aggregates.push((AggregateFunc::Min, Some(column.to_string())));
755 self
756 }
757
758 #[wasm_bindgen(js_name = max)]
760 pub fn max(mut self, column: &str) -> Self {
761 self.aggregates.push((AggregateFunc::Max, Some(column.to_string())));
762 self
763 }
764
765 #[wasm_bindgen(js_name = stddev)]
767 pub fn stddev(mut self, column: &str) -> Self {
768 self.aggregates.push((AggregateFunc::StdDev, Some(column.to_string())));
769 self
770 }
771
772 #[wasm_bindgen(js_name = geomean)]
774 pub fn geomean(mut self, column: &str) -> Self {
775 self.aggregates.push((AggregateFunc::GeoMean, Some(column.to_string())));
776 self
777 }
778
779 #[wasm_bindgen(js_name = distinct)]
781 pub fn distinct(mut self, column: &str) -> Self {
782 self.aggregates.push((AggregateFunc::Distinct, Some(column.to_string())));
783 self
784 }
785
786 pub async fn exec(&self) -> Result<JsValue, JsValue> {
788 let table_name = self
789 .from_table
790 .as_ref()
791 .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
792
793 let cache = self.cache.borrow();
794 let store = cache
795 .get_table(table_name)
796 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
797
798 let schema = store.schema().clone();
799
800 let plan = self.build_logical_plan(table_name);
803
804 let rows = execute_plan(&cache, table_name, plan)
806 .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?;
807
808 if !self.aggregates.is_empty() || !self.group_by_cols.is_empty() {
810 Ok(self.aggregate_rows_to_js_array(&rows))
812 } else if let Some(cols) = self.parse_columns() {
813 Ok(projected_rows_to_js_array(&rows, &cols))
816 } else if !self.joins.is_empty() {
817 let mut schemas: Vec<&Table> = Vec::with_capacity(1 + self.joins.len());
819 schemas.push(store.schema());
820 for join in &self.joins {
821 let join_store = cache.get_table(&join.table).ok_or_else(|| {
822 JsValue::from_str(&alloc::format!("Join table not found: {}", join.table))
823 })?;
824 schemas.push(join_store.schema());
825 }
826 Ok(joined_rows_to_js_array(&rows, &schemas))
827 } else {
828 Ok(rows_to_js_array(&rows, &schema))
829 }
830 }
831
832 fn build_aggregate_column_names(&self) -> Vec<String> {
835 let mut col_names: Vec<String> = Vec::new();
836 for col in &self.group_by_cols {
837 let simple_name = if let Some(dot_pos) = col.find('.') {
839 &col[dot_pos + 1..]
840 } else {
841 col.as_str()
842 };
843 col_names.push(simple_name.to_string());
844 }
845 for (func, col_opt) in &self.aggregates {
846 let func_name = match func {
847 AggregateFunc::Count => "count",
848 AggregateFunc::Sum => "sum",
849 AggregateFunc::Avg => "avg",
850 AggregateFunc::Min => "min",
851 AggregateFunc::Max => "max",
852 AggregateFunc::Distinct => "distinct",
853 AggregateFunc::StdDev => "stddev",
854 AggregateFunc::GeoMean => "geomean",
855 };
856 let col_name = if let Some(col) = col_opt {
857 let simple_name = if let Some(dot_pos) = col.find('.') {
858 &col[dot_pos + 1..]
859 } else {
860 col.as_str()
861 };
862 alloc::format!("{}_{}", func_name, simple_name)
863 } else {
864 func_name.to_string() };
866 col_names.push(col_name);
867 }
868 col_names
869 }
870
871 fn aggregate_rows_to_js_array(&self, rows: &[Rc<Row>]) -> JsValue {
873 let result = js_sys::Array::new();
874 let col_names = self.build_aggregate_column_names();
875
876 for row in rows {
877 let obj = js_sys::Object::new();
878 for (i, name) in col_names.iter().enumerate() {
879 if let Some(value) = row.get(i) {
880 let js_val = crate::convert::value_to_js(value);
881 let _ = js_sys::Reflect::set(&obj, &JsValue::from_str(name), &js_val);
882 }
883 }
884 result.push(&obj);
885 }
886
887 result.into()
888 }
889
890 pub fn explain(&self) -> Result<JsValue, JsValue> {
897 let table_name = self
898 .from_table
899 .as_ref()
900 .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
901
902 let cache = self.cache.borrow();
903 let _ = cache
904 .get_table(table_name)
905 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
906
907 let plan = self.build_logical_plan(table_name);
909
910 let result = explain_plan(&cache, table_name, plan);
912
913 let obj = js_sys::Object::new();
915 js_sys::Reflect::set(&obj, &"logical".into(), &result.logical_plan.into())?;
916 js_sys::Reflect::set(&obj, &"optimized".into(), &result.optimized_plan.into())?;
917 js_sys::Reflect::set(&obj, &"physical".into(), &result.physical_plan.into())?;
918
919 Ok(obj.into())
920 }
921
922 pub fn observe(&self) -> Result<JsObservableQuery, JsValue> {
925 let table_name = self
926 .from_table
927 .as_ref()
928 .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
929
930 let table_id = self
931 .table_id_map
932 .borrow()
933 .get(table_name)
934 .copied()
935 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table ID not found: {}", table_name)))?;
936
937 let cache_ref = self.cache.clone();
938 let cache = cache_ref.borrow();
939 let store = cache
940 .get_table(table_name)
941 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
942
943 let schema = store.schema().clone();
944
945 let binary_layout = if self.joins.is_empty() {
947 if let Some(cols) = self.parse_columns() {
948 SchemaLayout::from_projection(&schema, &cols)
949 } else {
950 SchemaLayout::from_schema(&schema)
951 }
952 } else {
953 let mut schemas: Vec<&Table> = Vec::with_capacity(1 + self.joins.len());
954 schemas.push(store.schema());
955 for join in &self.joins {
956 let join_store = cache.get_table(&join.table).ok_or_else(|| {
957 JsValue::from_str(&alloc::format!("Join table not found: {}", join.table))
958 })?;
959 schemas.push(join_store.schema());
960 }
961 SchemaLayout::from_schemas(&schemas)
962 };
963
964 let logical_plan = self.build_logical_plan(table_name);
966 let physical_plan = compile_plan(&cache, table_name, logical_plan.clone());
967
968 let initial_rows = execute_physical_plan(&cache, &physical_plan)
970 .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?;
971
972 drop(cache); let observable = ReQueryObservable::new(
976 physical_plan,
977 cache_ref.clone(),
978 initial_rows,
979 );
980 let observable_rc = Rc::new(RefCell::new(observable));
981
982 self.query_registry
984 .borrow_mut()
985 .register(observable_rc.clone(), table_id);
986
987 if !self.aggregates.is_empty() || !self.group_by_cols.is_empty() {
989 let aggregate_cols = self.build_aggregate_column_names();
991 Ok(JsObservableQuery::new_with_aggregates(observable_rc, schema, aggregate_cols, binary_layout))
992 } else if let Some(cols) = self.parse_columns() {
993 Ok(JsObservableQuery::new_with_projection(observable_rc, schema, cols, binary_layout))
994 } else {
995 Ok(JsObservableQuery::new(observable_rc, schema, binary_layout))
996 }
997 }
998
999 #[allow(dead_code)]
1002 fn observe_with_joins(
1003 &self,
1004 _left_table_id: TableId,
1005 _left_schema: &Table,
1006 _cache: &TableCache,
1007 ) -> Result<JsObservableQuery, JsValue> {
1008 Err(JsValue::from_str("Join queries with observe() not yet supported in re-query mode"))
1010 }
1011
1012 #[allow(dead_code)]
1015 fn extract_join_keys(&self, condition: &Expr) -> Result<(String, String), JsValue> {
1016 match condition.inner() {
1017 ExprInner::Comparison { column, op, value } => {
1018 use crate::expr::ComparisonOp;
1019 if *op != ComparisonOp::Eq {
1020 return Err(JsValue::from_str("Join condition must be an equality"));
1021 }
1022
1023 let left_key = column.name();
1025
1026 let right_key = if let Some(s) = value.as_string() {
1029 s
1030 } else {
1031 return Err(JsValue::from_str("Join condition right side must be a column name"));
1032 };
1033
1034 Ok((left_key, right_key))
1035 }
1036 _ => Err(JsValue::from_str("Join condition must be a column equality")),
1037 }
1038 }
1039
1040 pub fn changes(&self) -> Result<JsChangesStream, JsValue> {
1042 let observable = self.observe()?;
1043 Ok(JsChangesStream::from_observable(observable))
1044 }
1045
1046 #[wasm_bindgen(js_name = getSchemaLayout)]
1049 pub fn get_schema_layout(&self) -> Result<crate::binary_protocol::SchemaLayout, JsValue> {
1050 let table_name = self
1051 .from_table
1052 .as_ref()
1053 .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
1054
1055 let cache = self.cache.borrow();
1056 let store = cache
1057 .get_table(table_name)
1058 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
1059
1060 let schema = store.schema();
1061
1062 let layout = if let Some(cols) = self.parse_columns() {
1066 self.create_projection_layout(&cols)
1069 } else {
1070 self.schema_layout_cache
1071 .borrow_mut()
1072 .get_or_create_full(table_name, schema)
1073 .clone()
1074 };
1075
1076 Ok(layout)
1077 }
1078
1079 #[wasm_bindgen(js_name = execBinary)]
1082 pub async fn exec_binary(&self) -> Result<crate::binary_protocol::BinaryResult, JsValue> {
1083 let table_name = self
1084 .from_table
1085 .as_ref()
1086 .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
1087
1088 let cache = self.cache.borrow();
1089 let store = cache
1090 .get_table(table_name)
1091 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
1092
1093 let schema = store.schema();
1094
1095 let plan = self.build_logical_plan(table_name);
1097
1098 let fingerprint = compute_plan_fingerprint(&plan);
1100
1101 let rows = {
1103 let mut plan_cache = self.plan_cache.borrow_mut();
1104 let physical_plan = plan_cache.get_or_insert_with(fingerprint, || {
1105 compile_plan(&cache, table_name, plan)
1106 });
1107
1108 execute_physical_plan(&cache, physical_plan)
1110 .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?
1111 };
1112
1113 let layout = if let Some(cols) = self.parse_columns() {
1117 self.create_projection_layout(&cols)
1120 } else {
1121 self.schema_layout_cache
1123 .borrow_mut()
1124 .get_or_create_full(table_name, schema)
1125 .clone()
1126 };
1127
1128 let mut encoder = crate::binary_protocol::BinaryEncoder::new(layout, rows.len());
1130 encoder.encode_rows(&rows);
1131 let buffer = encoder.finish();
1132
1133 Ok(crate::binary_protocol::BinaryResult::new(buffer))
1134 }
1135}
1136
1137#[wasm_bindgen]
1139pub struct InsertBuilder {
1140 cache: Rc<RefCell<TableCache>>,
1141 query_registry: Rc<RefCell<QueryRegistry>>,
1142 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1143 table_name: String,
1144 values_data: Option<JsValue>,
1145}
1146
1147impl InsertBuilder {
1148 pub(crate) fn new(
1149 cache: Rc<RefCell<TableCache>>,
1150 query_registry: Rc<RefCell<QueryRegistry>>,
1151 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1152 table: &str,
1153 ) -> Self {
1154 Self {
1155 cache,
1156 query_registry,
1157 table_id_map,
1158 table_name: table.to_string(),
1159 values_data: None,
1160 }
1161 }
1162}
1163
1164#[wasm_bindgen]
1165impl InsertBuilder {
1166 pub fn values(mut self, data: &JsValue) -> Self {
1168 self.values_data = Some(data.clone());
1169 self
1170 }
1171
1172 pub async fn exec(&self) -> Result<JsValue, JsValue> {
1174 let values = self
1175 .values_data
1176 .as_ref()
1177 .ok_or_else(|| JsValue::from_str("No values specified"))?;
1178
1179 let mut cache = self.cache.borrow_mut();
1180 let store = cache
1181 .get_table_mut(&self.table_name)
1182 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1183
1184 let schema = store.schema().clone();
1185
1186 let arr = js_sys::Array::from(values);
1188 let row_count = arr.length() as u64;
1189
1190 let start_row_id = reserve_row_ids(row_count);
1192
1193 let rows = js_array_to_rows(values, &schema, start_row_id)?;
1195 let row_count = rows.len();
1196
1197 let _deltas: Vec<Delta<Row>> = rows.iter().map(|r| Delta::insert(r.clone())).collect();
1199
1200 let mut inserted_ids = hashbrown::HashSet::new();
1202 for row in rows {
1203 inserted_ids.insert(row.id());
1204 store
1205 .insert(row)
1206 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1207 }
1208
1209 if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1211 drop(cache); self.query_registry
1213 .borrow_mut()
1214 .on_table_change(table_id, &inserted_ids);
1215 }
1216
1217 Ok(JsValue::from_f64(row_count as f64))
1218 }
1219}
1220
1221#[wasm_bindgen]
1223pub struct UpdateBuilder {
1224 cache: Rc<RefCell<TableCache>>,
1225 query_registry: Rc<RefCell<QueryRegistry>>,
1226 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1227 table_name: String,
1228 set_values: Vec<(String, JsValue)>,
1229 where_clause: Option<Expr>,
1230}
1231
1232impl UpdateBuilder {
1233 pub(crate) fn new(
1234 cache: Rc<RefCell<TableCache>>,
1235 query_registry: Rc<RefCell<QueryRegistry>>,
1236 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1237 table: &str,
1238 ) -> Self {
1239 Self {
1240 cache,
1241 query_registry,
1242 table_id_map,
1243 table_name: table.to_string(),
1244 set_values: Vec::new(),
1245 where_clause: None,
1246 }
1247 }
1248}
1249
1250#[wasm_bindgen]
1251impl UpdateBuilder {
1252 pub fn set(mut self, column_or_obj: &JsValue, value: Option<JsValue>) -> Self {
1257 if let Some(val) = value {
1258 if let Some(col_name) = column_or_obj.as_string() {
1260 self.set_values.push((col_name, val));
1261 }
1262 } else if let Some(obj) = column_or_obj.dyn_ref::<js_sys::Object>() {
1263 let keys = js_sys::Object::keys(obj);
1265 for key in keys.iter() {
1266 if let Some(col_name) = key.as_string() {
1267 let value = js_sys::Reflect::get(obj, &key).unwrap_or(JsValue::NULL);
1268 self.set_values.push((col_name, value));
1269 }
1270 }
1271 }
1272 self
1273 }
1274
1275 #[wasm_bindgen(js_name = "where")]
1277 pub fn where_(mut self, predicate: &Expr) -> Self {
1278 self.where_clause = Some(predicate.clone());
1279 self
1280 }
1281
1282 pub async fn exec(&self) -> Result<JsValue, JsValue> {
1284 let mut cache = self.cache.borrow_mut();
1285 let store = cache
1286 .get_table_mut(&self.table_name)
1287 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1288
1289 let schema = store.schema().clone();
1290
1291 let rows_to_update: Vec<Row> = store
1293 .scan()
1294 .filter(|row| {
1295 if let Some(ref pred) = self.where_clause {
1296 evaluate_predicate(pred, &**row, &schema)
1297 } else {
1298 true
1299 }
1300 })
1301 .map(|rc| (*rc).clone())
1302 .collect();
1303
1304 let mut deltas = Vec::new();
1305 let mut update_count = 0;
1306 let mut updated_ids = hashbrown::HashSet::new();
1307
1308 for old_row in rows_to_update {
1309 let mut new_values = old_row.values().to_vec();
1311
1312 for (col_name, js_val) in &self.set_values {
1313 if let Some(col) = schema.get_column(col_name) {
1314 let idx = col.index();
1315 let value = js_to_value(js_val, col.data_type())?;
1316 if idx < new_values.len() {
1317 new_values[idx] = value;
1318 }
1319 }
1320 }
1321
1322 let new_version = old_row.version().wrapping_add(1);
1324 let new_row = Row::new_with_version(old_row.id(), new_version, new_values);
1325
1326 deltas.push(Delta::delete(old_row.clone()));
1328 deltas.push(Delta::insert(new_row.clone()));
1329
1330 updated_ids.insert(old_row.id());
1332
1333 store
1335 .update(old_row.id(), new_row)
1336 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1337
1338 update_count += 1;
1339 }
1340
1341 if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1343 drop(cache);
1344 self.query_registry
1345 .borrow_mut()
1346 .on_table_change(table_id, &updated_ids);
1347 }
1348
1349 Ok(JsValue::from_f64(update_count as f64))
1350 }
1351}
1352
1353#[wasm_bindgen]
1355pub struct DeleteBuilder {
1356 cache: Rc<RefCell<TableCache>>,
1357 query_registry: Rc<RefCell<QueryRegistry>>,
1358 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1359 table_name: String,
1360 where_clause: Option<Expr>,
1361}
1362
1363impl DeleteBuilder {
1364 pub(crate) fn new(
1365 cache: Rc<RefCell<TableCache>>,
1366 query_registry: Rc<RefCell<QueryRegistry>>,
1367 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1368 table: &str,
1369 ) -> Self {
1370 Self {
1371 cache,
1372 query_registry,
1373 table_id_map,
1374 table_name: table.to_string(),
1375 where_clause: None,
1376 }
1377 }
1378}
1379
1380#[wasm_bindgen]
1381impl DeleteBuilder {
1382 #[wasm_bindgen(js_name = "where")]
1384 pub fn where_(mut self, predicate: &Expr) -> Self {
1385 self.where_clause = Some(predicate.clone());
1386 self
1387 }
1388
1389 pub async fn exec(&self) -> Result<JsValue, JsValue> {
1391 let mut cache = self.cache.borrow_mut();
1392 let store = cache
1393 .get_table_mut(&self.table_name)
1394 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1395
1396 let schema = store.schema().clone();
1397
1398 let rows_to_delete: Vec<Row> = store
1400 .scan()
1401 .filter(|row| {
1402 if let Some(ref pred) = self.where_clause {
1403 evaluate_predicate(pred, &**row, &schema)
1404 } else {
1405 true
1406 }
1407 })
1408 .map(|rc| (*rc).clone())
1409 .collect();
1410
1411 let _deltas: Vec<Delta<Row>> = rows_to_delete
1412 .iter()
1413 .map(|r| Delta::delete(r.clone()))
1414 .collect();
1415
1416 let delete_count = rows_to_delete.len();
1417
1418 let mut deleted_ids = hashbrown::HashSet::new();
1420 for row in rows_to_delete {
1421 deleted_ids.insert(row.id());
1422 store
1423 .delete(row.id())
1424 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1425 }
1426
1427 if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1429 drop(cache);
1430 self.query_registry
1431 .borrow_mut()
1432 .on_table_change(table_id, &deleted_ids);
1433 }
1434
1435 Ok(JsValue::from_f64(delete_count as f64))
1436 }
1437}
1438
1439pub(crate) fn evaluate_predicate(predicate: &Expr, row: &Row, schema: &Table) -> bool {
1441 match predicate.inner() {
1442 ExprInner::Comparison { column, op, value } => {
1443 let col = schema.get_column(&column.name());
1444 if col.is_none() {
1445 return false;
1446 }
1447 let col = col.unwrap();
1448 let idx = col.index();
1449
1450 let row_val = match row.get(idx) {
1451 Some(v) => v,
1452 None => return false,
1453 };
1454
1455 let cmp_val = match js_to_value(value, col.data_type()) {
1456 Ok(v) => v,
1457 Err(_) => return false,
1458 };
1459
1460 use crate::expr::ComparisonOp;
1461 match op {
1462 ComparisonOp::Eq => row_val == &cmp_val,
1463 ComparisonOp::Ne => row_val != &cmp_val,
1464 ComparisonOp::Gt => row_val > &cmp_val,
1465 ComparisonOp::Gte => row_val >= &cmp_val,
1466 ComparisonOp::Lt => row_val < &cmp_val,
1467 ComparisonOp::Lte => row_val <= &cmp_val,
1468 }
1469 }
1470 ExprInner::Between { column, low, high } => {
1471 let col = schema.get_column(&column.name());
1472 if col.is_none() {
1473 return false;
1474 }
1475 let col = col.unwrap();
1476 let idx = col.index();
1477
1478 let row_val = match row.get(idx) {
1479 Some(v) => v,
1480 None => return false,
1481 };
1482
1483 let low_val = match js_to_value(low, col.data_type()) {
1484 Ok(v) => v,
1485 Err(_) => return false,
1486 };
1487 let high_val = match js_to_value(high, col.data_type()) {
1488 Ok(v) => v,
1489 Err(_) => return false,
1490 };
1491
1492 row_val >= &low_val && row_val <= &high_val
1493 }
1494 ExprInner::InList { column, values } => {
1495 let col = schema.get_column(&column.name());
1496 if col.is_none() {
1497 return false;
1498 }
1499 let col = col.unwrap();
1500 let idx = col.index();
1501
1502 let row_val = match row.get(idx) {
1503 Some(v) => v,
1504 None => return false,
1505 };
1506
1507 let arr = js_sys::Array::from(values);
1508 arr.iter().any(|v| {
1509 if let Ok(cmp_val) = js_to_value(&v, col.data_type()) {
1510 row_val == &cmp_val
1511 } else {
1512 false
1513 }
1514 })
1515 }
1516 ExprInner::Like { column, pattern } => {
1517 let col = schema.get_column(&column.name());
1518 if col.is_none() {
1519 return false;
1520 }
1521 let col = col.unwrap();
1522 let idx = col.index();
1523
1524 let row_val = match row.get(idx) {
1525 Some(Value::String(s)) => s,
1526 _ => return false,
1527 };
1528
1529 like_match(row_val, pattern)
1531 }
1532 ExprInner::IsNull { column } => {
1533 let col = schema.get_column(&column.name());
1534 if col.is_none() {
1535 return false;
1536 }
1537 let idx = col.unwrap().index();
1538
1539 match row.get(idx) {
1540 Some(Value::Null) | None => true,
1541 _ => false,
1542 }
1543 }
1544 ExprInner::IsNotNull { column } => {
1545 let col = schema.get_column(&column.name());
1546 if col.is_none() {
1547 return false;
1548 }
1549 let idx = col.unwrap().index();
1550
1551 match row.get(idx) {
1552 Some(Value::Null) | None => false,
1553 _ => true,
1554 }
1555 }
1556 ExprInner::And { left, right } => {
1557 evaluate_predicate(left, row, schema) && evaluate_predicate(right, row, schema)
1558 }
1559 ExprInner::Or { left, right } => {
1560 evaluate_predicate(left, row, schema) || evaluate_predicate(right, row, schema)
1561 }
1562 ExprInner::Not { inner } => !evaluate_predicate(inner, row, schema),
1563 ExprInner::True => true,
1564 _ => true, }
1566}
1567
1568fn like_match(s: &str, pattern: &str) -> bool {
1570 let mut s_chars = s.chars().peekable();
1571 let mut p_chars = pattern.chars().peekable();
1572
1573 while let Some(p) = p_chars.next() {
1574 match p {
1575 '%' => {
1576 if p_chars.peek().is_none() {
1578 return true; }
1580 let rest_pattern: String = p_chars.collect();
1582 while s_chars.peek().is_some() {
1583 let rest_s: String = s_chars.clone().collect();
1584 if like_match(&rest_s, &rest_pattern) {
1585 return true;
1586 }
1587 s_chars.next();
1588 }
1589 return like_match("", &rest_pattern);
1590 }
1591 '_' => {
1592 if s_chars.next().is_none() {
1594 return false;
1595 }
1596 }
1597 c => {
1598 if s_chars.next() != Some(c) {
1600 return false;
1601 }
1602 }
1603 }
1604 }
1605
1606 s_chars.peek().is_none()
1607}
1608
1609#[cfg(test)]
1610mod tests {
1611 use super::*;
1612 use wasm_bindgen_test::*;
1613
1614 wasm_bindgen_test_configure!(run_in_browser);
1615
1616 #[wasm_bindgen_test]
1617 fn test_like_match_exact() {
1618 assert!(like_match("hello", "hello"));
1619 assert!(!like_match("hello", "world"));
1620 }
1621
1622 #[wasm_bindgen_test]
1623 fn test_like_match_percent() {
1624 assert!(like_match("hello", "%"));
1625 assert!(like_match("hello", "h%"));
1626 assert!(like_match("hello", "%o"));
1627 assert!(like_match("hello", "h%o"));
1628 assert!(like_match("hello", "%ell%"));
1629 assert!(!like_match("hello", "x%"));
1630 }
1631
1632 #[wasm_bindgen_test]
1633 fn test_like_match_underscore() {
1634 assert!(like_match("hello", "_ello"));
1635 assert!(like_match("hello", "h_llo"));
1636 assert!(like_match("hello", "hell_"));
1637 assert!(like_match("hello", "_____"));
1638 assert!(!like_match("hello", "______"));
1639 }
1640
1641 #[wasm_bindgen_test]
1642 fn test_like_match_combined() {
1643 assert!(like_match("hello", "h%_o"));
1644 assert!(like_match("hello world", "hello%"));
1645 assert!(like_match("hello world", "%world"));
1646 }
1647}