Skip to main content

cynos_database/
query_builder.rs

1//! Query builders for SELECT, INSERT, UPDATE, DELETE operations.
2//!
3//! This module provides fluent API builders for constructing and executing
4//! database queries.
5
6use crate::binary_protocol::{SchemaLayout, SchemaLayoutCache};
7use crate::convert::{js_array_to_rows, js_to_value, joined_rows_to_js_array, projected_rows_to_js_array, rows_to_js_array};
8use crate::expr::{Expr, ExprInner};
9use crate::query_engine::{compile_plan, execute_physical_plan, execute_plan, explain_plan};
10use crate::reactive_bridge::{JsChangesStream, JsObservableQuery, QueryRegistry, ReQueryObservable};
11use cynos_storage::TableCache;
12use crate::JsSortOrder;
13use alloc::boxed::Box;
14use alloc::rc::Rc;
15use alloc::string::{String, ToString};
16use alloc::vec::Vec;
17use cynos_core::schema::Table;
18use cynos_core::{reserve_row_ids, DataType, Row, Value};
19use cynos_incremental::Delta;
20use cynos_query::ast::{AggregateFunc, SortOrder};
21use cynos_query::plan_cache::{compute_plan_fingerprint, PlanCache};
22use cynos_query::planner::LogicalPlan;
23use cynos_reactive::TableId;
24use core::cell::RefCell;
25use wasm_bindgen::prelude::*;
26
27/// SELECT query builder.
28#[wasm_bindgen]
29pub struct SelectBuilder {
30    cache: Rc<RefCell<TableCache>>,
31    query_registry: Rc<RefCell<QueryRegistry>>,
32    table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
33    schema_layout_cache: Rc<RefCell<SchemaLayoutCache>>,
34    plan_cache: Rc<RefCell<PlanCache>>,
35    columns: JsValue,
36    from_table: Option<String>,
37    where_clause: Option<Expr>,
38    order_by: Vec<(String, SortOrder)>,
39    limit_val: Option<usize>,
40    offset_val: Option<usize>,
41    joins: Vec<JoinClause>,
42    group_by_cols: Vec<String>,
43    aggregates: Vec<(AggregateFunc, Option<String>)>, // (func, column_name or None for COUNT(*))
44}
45
46#[derive(Clone, Debug)]
47#[allow(dead_code)]
48struct JoinClause {
49    table: String,       // The actual table name (for schema lookup)
50    alias: Option<String>, // Optional alias (for column reference)
51    condition: Expr,
52    join_type: JoinType,
53}
54
55impl JoinClause {
56    /// Returns the name to use for column references (alias if present, otherwise table name)
57    fn reference_name(&self) -> &str {
58        self.alias.as_deref().unwrap_or(&self.table)
59    }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
63#[allow(dead_code)]
64enum JoinType {
65    Inner,
66    Left,
67    Right,
68}
69
70impl SelectBuilder {
71    pub(crate) fn new(
72        cache: Rc<RefCell<TableCache>>,
73        query_registry: Rc<RefCell<QueryRegistry>>,
74        table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
75        schema_layout_cache: Rc<RefCell<SchemaLayoutCache>>,
76        plan_cache: Rc<RefCell<PlanCache>>,
77        columns: JsValue,
78    ) -> Self {
79        Self {
80            cache,
81            query_registry,
82            table_id_map,
83            schema_layout_cache,
84            plan_cache,
85            columns,
86            from_table: None,
87            where_clause: None,
88            order_by: Vec::new(),
89            limit_val: None,
90            offset_val: None,
91            joins: Vec::new(),
92            group_by_cols: Vec::new(),
93            aggregates: Vec::new(),
94        }
95    }
96
97    fn get_schema(&self) -> Option<Table> {
98        self.from_table.as_ref().and_then(|name| {
99            self.cache
100                .borrow()
101                .get_table(name)
102                .map(|s| s.schema().clone())
103        })
104    }
105
106    /// Gets column info for any table (main table or joined tables).
107    /// Supports qualified column names like `users.name` and table aliases.
108    fn get_column_info_any_table(&self, col_name: &str) -> Option<(String, usize, DataType)> {
109        // Check if column name is qualified (contains '.')
110        if let Some(dot_pos) = col_name.find('.') {
111            let table_part = &col_name[..dot_pos];
112            let col_part = &col_name[dot_pos + 1..];
113
114            // First check if table_part matches the main table
115            if let Some(main_table) = &self.from_table {
116                if main_table == table_part {
117                    if let Some(schema) = self.get_schema() {
118                        if let Some(col) = schema.get_column(col_part) {
119                            return Some((table_part.to_string(), col.index(), col.data_type()));
120                        }
121                    }
122                }
123            }
124
125            // Check if table_part matches any join's alias or table name
126            for join in &self.joins {
127                let ref_name = join.reference_name();
128                if ref_name == table_part {
129                    // Use the actual table name for schema lookup
130                    if let Some(store) = self.cache.borrow().get_table(&join.table) {
131                        if let Some(col) = store.schema().get_column(col_part) {
132                            // Return the reference name (alias if present) for consistency
133                            return Some((ref_name.to_string(), col.index(), col.data_type()));
134                        }
135                    }
136                }
137            }
138
139            // Try direct table lookup (for cases without alias)
140            if let Some(info) = self.cache
141                .borrow()
142                .get_table(table_part)
143                .and_then(|store| {
144                    store.schema().get_column(col_part).map(|c| (table_part.to_string(), c.index(), c.data_type()))
145                })
146            {
147                return Some(info);
148            }
149        }
150
151        // Try the main table for unqualified column names
152        if let Some(table_name) = &self.from_table {
153            if let Some(schema) = self.get_schema() {
154                if let Some(col) = schema.get_column(col_name) {
155                    return Some((table_name.clone(), col.index(), col.data_type()));
156                }
157            }
158        }
159
160        // Try all joined tables
161        for join in &self.joins {
162            if let Some(info) = self.cache
163                .borrow()
164                .get_table(&join.table)
165                .and_then(|store| {
166                    store.schema().get_column(col_name).map(|c| (join.reference_name().to_string(), c.index(), c.data_type()))
167                })
168            {
169                return Some(info);
170            }
171        }
172
173        None
174    }
175
176    /// Parses the columns JsValue into a list of column names.
177    /// Returns None if selecting all columns (empty array, undefined, or contains "*").
178    fn parse_columns(&self) -> Option<Vec<String>> {
179        if self.columns.is_undefined() || self.columns.is_null() {
180            return None;
181        }
182
183        if let Some(arr) = self.columns.dyn_ref::<js_sys::Array>() {
184            if arr.length() == 0 {
185                return None; // Empty array means select all
186            }
187
188            // Check if first element is an array (nested array case from variadic)
189            // e.g., select(['name', 'age']) becomes [['name', 'age']] with variadic
190            let first = arr.get(0);
191            if let Some(inner_arr) = first.dyn_ref::<js_sys::Array>() {
192                // Handle nested array: [['name', 'age']]
193                let cols: Vec<String> = inner_arr.iter().filter_map(|v| v.as_string()).collect();
194                if cols.is_empty() {
195                    return None;
196                } else if cols.len() == 1 && cols[0] == "*" {
197                    return None; // ["*"] means select all
198                } else {
199                    return Some(cols);
200                }
201            }
202
203            // Handle flat array: ['name', 'age'] (variadic args)
204            let cols: Vec<String> = arr.iter().filter_map(|v| v.as_string()).collect();
205            if cols.is_empty() {
206                None
207            } else if cols.len() == 1 && cols[0] == "*" {
208                None // ["*"] means select all
209            } else {
210                Some(cols)
211            }
212        } else if let Some(s) = self.columns.as_string() {
213            if s == "*" {
214                None // "*" means select all
215            } else {
216                Some(alloc::vec![s])
217            }
218        } else {
219            None
220        }
221    }
222
223    /// Builds a LogicalPlan from the query builder state.
224    fn build_logical_plan(&self, table_name: &str) -> LogicalPlan {
225        // Start with a table scan
226        let mut plan = LogicalPlan::Scan {
227            table: table_name.to_string(),
228        };
229
230        // Track column offsets for each table in the JOIN
231        // Key: reference name (alias if present, otherwise table name), Value: starting column offset
232        let mut table_offsets: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
233
234        // Add main table offset
235        if let Some(schema) = self.get_schema() {
236            table_offsets.insert(table_name.to_string(), 0);
237            let mut current_offset = schema.columns().len();
238
239            // Add JOINs if any
240            for join in &self.joins {
241                let right_plan = LogicalPlan::Scan {
242                    table: join.table.clone(),
243                };
244
245                // Record offset using reference name (alias if present)
246                let ref_name = join.reference_name().to_string();
247                table_offsets.insert(ref_name.clone(), current_offset);
248
249                // Convert join condition to AST with correct offsets
250                let get_col_info = |name: &str| self.get_column_info_for_join_with_offsets_alias(name, join, &table_offsets);
251                let ast_condition = join.condition.to_ast_with_table(&get_col_info);
252
253                plan = match join.join_type {
254                    JoinType::Inner => LogicalPlan::inner_join(plan, right_plan, ast_condition),
255                    JoinType::Left => LogicalPlan::left_join(plan, right_plan, ast_condition),
256                    JoinType::Right => {
257                        // Right join is left join with swapped operands
258                        LogicalPlan::left_join(right_plan, plan, ast_condition)
259                    }
260                };
261
262                // Update offset for next join
263                if let Some(store) = self.cache.borrow().get_table(&join.table) {
264                    current_offset += store.schema().columns().len();
265                }
266            }
267        } else {
268            // Fallback: no schema available, use old logic
269            for join in &self.joins {
270                let right_plan = LogicalPlan::Scan {
271                    table: join.table.clone(),
272                };
273
274                let get_col_info = |name: &str| self.get_column_info_for_join(name, &join.table);
275                let ast_condition = join.condition.to_ast_with_table(&get_col_info);
276
277                plan = match join.join_type {
278                    JoinType::Inner => LogicalPlan::inner_join(plan, right_plan, ast_condition),
279                    JoinType::Left => LogicalPlan::left_join(plan, right_plan, ast_condition),
280                    JoinType::Right => {
281                        LogicalPlan::left_join(right_plan, plan, ast_condition)
282                    }
283                };
284            }
285        }
286
287        // Add filter if WHERE clause exists
288        if let Some(ref predicate) = self.where_clause {
289            // Use get_column_info_any_table to get the correct table-relative index
290            // The optimizer may push this filter to a single table, so we need table-relative indices
291            let get_col_info = |name: &str| {
292                self.get_column_info_any_table(name)
293            };
294            let ast_predicate = predicate.to_ast_with_table(&get_col_info);
295            plan = LogicalPlan::Filter {
296                input: Box::new(plan),
297                predicate: ast_predicate,
298            };
299        }
300
301        // Add aggregate if GROUP BY or aggregate functions are specified
302        if !self.group_by_cols.is_empty() || !self.aggregates.is_empty() {
303            let group_by_exprs: Vec<_> = self.group_by_cols.iter().filter_map(|col| {
304                self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
305                    let col_name = if let Some(dot_pos) = col.find('.') {
306                        &col[dot_pos + 1..]
307                    } else {
308                        col.as_str()
309                    };
310                    cynos_query::ast::Expr::column(&tbl, col_name, idx)
311                })
312            }).collect();
313
314            let agg_exprs: Vec<_> = self.aggregates.iter().filter_map(|(func, col_opt)| {
315                if let Some(col) = col_opt {
316                    self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
317                        let col_name = if let Some(dot_pos) = col.find('.') {
318                            &col[dot_pos + 1..]
319                        } else {
320                            col.as_str()
321                        };
322                        (*func, cynos_query::ast::Expr::column(&tbl, col_name, idx))
323                    })
324                } else {
325                    // COUNT(*) - use a dummy column expression
326                    Some((*func, cynos_query::ast::Expr::literal(cynos_core::Value::Int64(1))))
327                }
328            }).collect();
329
330            plan = LogicalPlan::aggregate(plan, group_by_exprs, agg_exprs);
331        }
332
333        // Add ORDER BY if specified
334        if !self.order_by.is_empty() {
335            let order_exprs: Vec<_> = self.order_by.iter().filter_map(|(col, order)| {
336                // Use get_column_info_for_projection to correctly handle JOIN queries
337                self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
338                    // Extract just the column name if qualified
339                    let col_name = if let Some(dot_pos) = col.find('.') {
340                        &col[dot_pos + 1..]
341                    } else {
342                        col.as_str()
343                    };
344                    (cynos_query::ast::Expr::column(&tbl, col_name, idx), *order)
345                })
346            }).collect();
347            plan = LogicalPlan::Sort {
348                input: Box::new(plan),
349                order_by: order_exprs,
350            };
351        }
352
353        // Add LIMIT/OFFSET if specified
354        // Use a very large limit if only offset is specified
355        if self.limit_val.is_some() || self.offset_val.is_some() {
356            plan = LogicalPlan::Limit {
357                input: Box::new(plan),
358                limit: self.limit_val.unwrap_or(1_000_000_000), // Large but safe limit
359                offset: self.offset_val.unwrap_or(0),
360            };
361        }
362
363        // Add projection if specific columns are selected
364        if let Some(cols) = self.parse_columns() {
365            let project_exprs: Vec<_> = cols
366                .iter()
367                .filter_map(|col| {
368                    // Use get_column_info_for_projection to get the correct index for JOIN queries
369                    self.get_column_info_for_projection(col).map(|(tbl, idx, _)| {
370                        // Extract just the column name if qualified
371                        let col_name = if let Some(dot_pos) = col.find('.') {
372                            &col[dot_pos + 1..]
373                        } else {
374                            col.as_str()
375                        };
376                        cynos_query::ast::Expr::column(&tbl, col_name, idx)
377                    })
378                })
379                .collect();
380
381            if !project_exprs.is_empty() {
382                plan = LogicalPlan::Project {
383                    input: Box::new(plan),
384                    columns: project_exprs,
385                };
386            }
387        }
388
389        plan
390    }
391
392    /// Gets column info for projection, calculating the correct index for JOIN queries.
393    /// For JOIN queries, returns the table-relative index (not the absolute offset).
394    /// The absolute index will be computed at runtime based on actual table order.
395    /// Supports table aliases.
396    fn get_column_info_for_projection(&self, col_name: &str) -> Option<(String, usize, DataType)> {
397        // Check if column name is qualified (contains '.')
398        let (target_table, target_col) = if let Some(dot_pos) = col_name.find('.') {
399            (Some(&col_name[..dot_pos]), &col_name[dot_pos + 1..])
400        } else {
401            (None, col_name)
402        };
403
404        // First check the main table
405        if let Some(main_table) = &self.from_table {
406            if let Some(schema) = self.get_schema() {
407                if target_table.is_none() || target_table == Some(main_table.as_str()) {
408                    if let Some(col) = schema.get_column(target_col) {
409                        // Return table-relative index, not absolute offset
410                        return Some((main_table.clone(), col.index(), col.data_type()));
411                    }
412                }
413            }
414        }
415
416        // Then check joined tables in order
417        for join in &self.joins {
418            if let Some(store) = self.cache.borrow().get_table(&join.table) {
419                let schema = store.schema();
420                let ref_name = join.reference_name();
421                // Match against both the reference name (alias) and the actual table name
422                if target_table.is_none() || target_table == Some(ref_name) || target_table == Some(join.table.as_str()) {
423                    if let Some(col) = schema.get_column(target_col) {
424                        // Return table-relative index, not absolute offset
425                        return Some((ref_name.to_string(), col.index(), col.data_type()));
426                    }
427                }
428            }
429        }
430
431        None
432    }
433
434    /// Creates a SchemaLayout for projected columns, supporting multi-table column references.
435    fn create_projection_layout(&self, column_names: &[String]) -> crate::binary_protocol::SchemaLayout {
436        use crate::binary_protocol::{BinaryDataType, ColumnLayout, SchemaLayout};
437
438        // Extract just the column part from qualified names and count occurrences
439        let mut name_counts: hashbrown::HashMap<&str, usize> = hashbrown::HashMap::new();
440        for col_name in column_names {
441            let simple_name = if let Some(dot_pos) = col_name.find('.') {
442                &col_name[dot_pos + 1..]
443            } else {
444                col_name.as_str()
445            };
446            *name_counts.entry(simple_name).or_insert(0) += 1;
447        }
448
449        let mut columns: Vec<ColumnLayout> = Vec::new();
450        let mut offset = 0usize;
451
452        for name in column_names {
453            // Look up column info from any table
454            if let Some((_, _, data_type)) = self.get_column_info_any_table(name) {
455                let binary_type = BinaryDataType::from(data_type);
456                let fixed_size = binary_type.fixed_size();
457
458                // Determine the final column name - use simple name when unique
459                let final_name = if let Some(dot_pos) = name.find('.') {
460                    let simple_name = &name[dot_pos + 1..];
461                    if name_counts.get(simple_name).copied().unwrap_or(0) > 1 {
462                        // Duplicate - keep qualified name
463                        name.clone()
464                    } else {
465                        // Unique - use simple name
466                        simple_name.to_string()
467                    }
468                } else {
469                    name.clone()
470                };
471
472                columns.push(ColumnLayout {
473                    name: final_name,
474                    data_type: binary_type,
475                    fixed_size,
476                    is_nullable: true, // Conservative: assume nullable
477                    offset,
478                });
479                offset += fixed_size;
480            }
481        }
482
483        let null_mask_size = (columns.len() + 7) / 8;
484        let data_size: usize = columns.iter().map(|c| c.fixed_size).sum();
485        let row_stride = null_mask_size + data_size;
486
487        SchemaLayout::new(columns, row_stride, null_mask_size)
488    }
489
490    /// Gets column info for JOIN conditions, checking both the main table and the join table.
491    /// Returns (table_name, column_index, data_type).
492    ///
493    /// For JOIN conditions like `col('dept_id').eq('id')`:
494    /// - The left column (dept_id) should come from the main table
495    /// - The right column (id) should come from the join table
496    ///
497    /// Also supports qualified column names like `col('orders.year')`.
498    fn get_column_info_for_join(&self, col_name: &str, join_table: &str) -> Option<(String, usize, DataType)> {
499        // Check if column name is qualified (contains '.')
500        if let Some(dot_pos) = col_name.find('.') {
501            let table_part = &col_name[..dot_pos];
502            let col_part = &col_name[dot_pos + 1..];
503
504            // Try to find the table and column
505            if let Some(info) = self.cache
506                .borrow()
507                .get_table(table_part)
508                .and_then(|store| {
509                    store.schema().get_column(col_part).map(|c| (table_part.to_string(), c.index(), c.data_type()))
510                })
511            {
512                return Some(info);
513            }
514
515            // Also check if it matches the main table
516            if let Some(table_name) = &self.from_table {
517                if table_name == table_part {
518                    if let Some(schema) = self.get_schema() {
519                        if let Some(col) = schema.get_column(col_part) {
520                            return Some((table_name.clone(), col.index(), col.data_type()));
521                        }
522                    }
523                }
524            }
525        }
526
527        // First try the join table (for the right side of JOIN conditions)
528        if let Some(info) = self.cache
529            .borrow()
530            .get_table(join_table)
531            .and_then(|store| {
532                store.schema().get_column(col_name).map(|c| (join_table.to_string(), c.index(), c.data_type()))
533            })
534        {
535            return Some(info);
536        }
537
538        // Then try the main table
539        if let Some(table_name) = &self.from_table {
540            if let Some(schema) = self.get_schema() {
541                if let Some(col) = schema.get_column(col_name) {
542                    return Some((table_name.clone(), col.index(), col.data_type()));
543                }
544            }
545        }
546
547        None
548    }
549
550    /// Gets column info for JOIN conditions with pre-computed table offsets and alias support.
551    /// This is used for multi-table JOINs where column indices need to account for
552    /// the combined row structure and table aliases.
553    fn get_column_info_for_join_with_offsets_alias(
554        &self,
555        col_name: &str,
556        current_join: &JoinClause,
557        table_offsets: &hashbrown::HashMap<String, usize>,
558    ) -> Option<(String, usize, DataType)> {
559        let current_ref_name = current_join.reference_name();
560
561        // Check if column name is qualified (contains '.')
562        if let Some(dot_pos) = col_name.find('.') {
563            let table_part = &col_name[..dot_pos];
564            let col_part = &col_name[dot_pos + 1..];
565
566            // Check if table_part matches the current join's reference name (alias or table)
567            if table_part == current_ref_name {
568                if let Some(store) = self.cache.borrow().get_table(&current_join.table) {
569                    if let Some(col) = store.schema().get_column(col_part) {
570                        // Current join table uses original index (no offset)
571                        // Return actual table name for Relation compatibility
572                        return Some((current_join.table.clone(), col.index(), col.data_type()));
573                    }
574                }
575            }
576
577            // Check if table_part matches the main table
578            if let Some(main_table) = &self.from_table {
579                if table_part == main_table {
580                    if let Some(schema) = self.get_schema() {
581                        if let Some(col) = schema.get_column(col_part) {
582                            let offset = table_offsets.get(main_table).copied().unwrap_or(0);
583                            return Some((table_part.to_string(), offset + col.index(), col.data_type()));
584                        }
585                    }
586                }
587            }
588
589            // Check if table_part matches any other join's reference name
590            for join in &self.joins {
591                let ref_name = join.reference_name();
592                if table_part == ref_name && ref_name != current_ref_name {
593                    if let Some(store) = self.cache.borrow().get_table(&join.table) {
594                        if let Some(col) = store.schema().get_column(col_part) {
595                            let offset = table_offsets.get(ref_name).copied().unwrap_or(0);
596                            return Some((table_part.to_string(), offset + col.index(), col.data_type()));
597                        }
598                    }
599                }
600            }
601
602            // Try direct table lookup (for cases without alias)
603            if let Some(store) = self.cache.borrow().get_table(table_part) {
604                if let Some(col) = store.schema().get_column(col_part) {
605                    let idx = if table_part == &current_join.table {
606                        col.index()
607                    } else {
608                        let offset = table_offsets.get(table_part).copied().unwrap_or(0);
609                        offset + col.index()
610                    };
611                    return Some((table_part.to_string(), idx, col.data_type()));
612                }
613            }
614        }
615
616        // For unqualified column names, try current join table first
617        if let Some(store) = self.cache.borrow().get_table(&current_join.table) {
618            if let Some(col) = store.schema().get_column(col_name) {
619                // Current join table uses original index (no offset)
620                return Some((current_ref_name.to_string(), col.index(), col.data_type()));
621            }
622        }
623
624        // Then try the main table
625        if let Some(table_name) = &self.from_table {
626            let offset = table_offsets.get(table_name).copied().unwrap_or(0);
627            if let Some(schema) = self.get_schema() {
628                if let Some(col) = schema.get_column(col_name) {
629                    return Some((table_name.clone(), offset + col.index(), col.data_type()));
630                }
631            }
632        }
633
634        None
635    }
636}
637
638#[wasm_bindgen]
639impl SelectBuilder {
640    /// Sets the FROM table.
641    pub fn from(mut self, table: &str) -> Self {
642        self.from_table = Some(table.to_string());
643        self
644    }
645
646    /// Sets the WHERE clause.
647    #[wasm_bindgen(js_name = "where")]
648    pub fn where_(mut self, predicate: &Expr) -> Self {
649        self.where_clause = Some(predicate.clone());
650        self
651    }
652
653    /// Adds an ORDER BY clause.
654    #[wasm_bindgen(js_name = orderBy)]
655    pub fn order_by(mut self, column: &str, order: JsSortOrder) -> Self {
656        self.order_by.push((column.to_string(), order.into()));
657        self
658    }
659
660    /// Sets the LIMIT.
661    pub fn limit(mut self, n: usize) -> Self {
662        self.limit_val = Some(n);
663        self
664    }
665
666    /// Sets the OFFSET.
667    pub fn offset(mut self, n: usize) -> Self {
668        self.offset_val = Some(n);
669        self
670    }
671
672    /// Parses a table specification that may include an alias.
673    /// Supports formats: "table_name" or "table_name as alias"
674    fn parse_table_spec(table_spec: &str) -> (String, Option<String>) {
675        // Check for " as " (case insensitive)
676        let lower = table_spec.to_lowercase();
677        if let Some(pos) = lower.find(" as ") {
678            let table = table_spec[..pos].trim().to_string();
679            let alias = table_spec[pos + 4..].trim().to_string();
680            (table, Some(alias))
681        } else {
682            (table_spec.trim().to_string(), None)
683        }
684    }
685
686    /// Adds an INNER JOIN.
687    #[wasm_bindgen(js_name = innerJoin)]
688    pub fn inner_join(mut self, table: &str, condition: &Expr) -> Self {
689        let (table_name, alias) = Self::parse_table_spec(table);
690        self.joins.push(JoinClause {
691            table: table_name,
692            alias,
693            condition: condition.clone(),
694            join_type: JoinType::Inner,
695        });
696        self
697    }
698
699    /// Adds a LEFT JOIN.
700    #[wasm_bindgen(js_name = leftJoin)]
701    pub fn left_join(mut self, table: &str, condition: &Expr) -> Self {
702        let (table_name, alias) = Self::parse_table_spec(table);
703        self.joins.push(JoinClause {
704            table: table_name,
705            alias,
706            condition: condition.clone(),
707            join_type: JoinType::Left,
708        });
709        self
710    }
711
712    /// Adds a GROUP BY clause.
713    #[wasm_bindgen(js_name = groupBy)]
714    pub fn group_by(mut self, columns: &JsValue) -> Self {
715        if let Some(arr) = columns.dyn_ref::<js_sys::Array>() {
716            self.group_by_cols = arr.iter().filter_map(|v| v.as_string()).collect();
717        } else if let Some(s) = columns.as_string() {
718            self.group_by_cols = alloc::vec![s];
719        }
720        self
721    }
722
723    /// Adds a COUNT(*) aggregate.
724    #[wasm_bindgen(js_name = count)]
725    pub fn count(mut self) -> Self {
726        self.aggregates.push((AggregateFunc::Count, None));
727        self
728    }
729
730    /// Adds a COUNT(column) aggregate.
731    #[wasm_bindgen(js_name = countCol)]
732    pub fn count_col(mut self, column: &str) -> Self {
733        self.aggregates.push((AggregateFunc::Count, Some(column.to_string())));
734        self
735    }
736
737    /// Adds a SUM(column) aggregate.
738    #[wasm_bindgen(js_name = sum)]
739    pub fn sum(mut self, column: &str) -> Self {
740        self.aggregates.push((AggregateFunc::Sum, Some(column.to_string())));
741        self
742    }
743
744    /// Adds an AVG(column) aggregate.
745    #[wasm_bindgen(js_name = avg)]
746    pub fn avg(mut self, column: &str) -> Self {
747        self.aggregates.push((AggregateFunc::Avg, Some(column.to_string())));
748        self
749    }
750
751    /// Adds a MIN(column) aggregate.
752    #[wasm_bindgen(js_name = min)]
753    pub fn min(mut self, column: &str) -> Self {
754        self.aggregates.push((AggregateFunc::Min, Some(column.to_string())));
755        self
756    }
757
758    /// Adds a MAX(column) aggregate.
759    #[wasm_bindgen(js_name = max)]
760    pub fn max(mut self, column: &str) -> Self {
761        self.aggregates.push((AggregateFunc::Max, Some(column.to_string())));
762        self
763    }
764
765    /// Adds a STDDEV(column) aggregate.
766    #[wasm_bindgen(js_name = stddev)]
767    pub fn stddev(mut self, column: &str) -> Self {
768        self.aggregates.push((AggregateFunc::StdDev, Some(column.to_string())));
769        self
770    }
771
772    /// Adds a GEOMEAN(column) aggregate.
773    #[wasm_bindgen(js_name = geomean)]
774    pub fn geomean(mut self, column: &str) -> Self {
775        self.aggregates.push((AggregateFunc::GeoMean, Some(column.to_string())));
776        self
777    }
778
779    /// Adds a DISTINCT(column) aggregate (returns count of distinct values).
780    #[wasm_bindgen(js_name = distinct)]
781    pub fn distinct(mut self, column: &str) -> Self {
782        self.aggregates.push((AggregateFunc::Distinct, Some(column.to_string())));
783        self
784    }
785
786    /// Executes the query and returns results.
787    pub async fn exec(&self) -> Result<JsValue, JsValue> {
788        let table_name = self
789            .from_table
790            .as_ref()
791            .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
792
793        let cache = self.cache.borrow();
794        let store = cache
795            .get_table(table_name)
796            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
797
798        let schema = store.schema().clone();
799
800        // Build logical plan using query engine
801        // ORDER BY, LIMIT, and OFFSET are now handled in the logical plan
802        let plan = self.build_logical_plan(table_name);
803
804        // Execute using query engine (with index optimization)
805        let rows = execute_plan(&cache, table_name, plan)
806            .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?;
807
808        // Convert to JS array
809        if !self.aggregates.is_empty() || !self.group_by_cols.is_empty() {
810            // For aggregate queries, build column names from group_by + aggregates
811            Ok(self.aggregate_rows_to_js_array(&rows))
812        } else if let Some(cols) = self.parse_columns() {
813            // When we have projection, the rows contain only the projected columns
814            // in the order specified by the projection
815            Ok(projected_rows_to_js_array(&rows, &cols))
816        } else if !self.joins.is_empty() {
817            // For JOIN queries without projection, collect all schemas and use joined conversion
818            let mut schemas: Vec<&Table> = Vec::with_capacity(1 + self.joins.len());
819            schemas.push(store.schema());
820            for join in &self.joins {
821                let join_store = cache.get_table(&join.table).ok_or_else(|| {
822                    JsValue::from_str(&alloc::format!("Join table not found: {}", join.table))
823                })?;
824                schemas.push(join_store.schema());
825            }
826            Ok(joined_rows_to_js_array(&rows, &schemas))
827        } else {
828            Ok(rows_to_js_array(&rows, &schema))
829        }
830    }
831
832    /// Builds column names for aggregate queries.
833    /// Returns: group_by columns + aggregate function names (e.g., "count", "sum_value")
834    fn build_aggregate_column_names(&self) -> Vec<String> {
835        let mut col_names: Vec<String> = Vec::new();
836        for col in &self.group_by_cols {
837            // Use simple column name (without table prefix)
838            let simple_name = if let Some(dot_pos) = col.find('.') {
839                &col[dot_pos + 1..]
840            } else {
841                col.as_str()
842            };
843            col_names.push(simple_name.to_string());
844        }
845        for (func, col_opt) in &self.aggregates {
846            let func_name = match func {
847                AggregateFunc::Count => "count",
848                AggregateFunc::Sum => "sum",
849                AggregateFunc::Avg => "avg",
850                AggregateFunc::Min => "min",
851                AggregateFunc::Max => "max",
852                AggregateFunc::Distinct => "distinct",
853                AggregateFunc::StdDev => "stddev",
854                AggregateFunc::GeoMean => "geomean",
855            };
856            let col_name = if let Some(col) = col_opt {
857                let simple_name = if let Some(dot_pos) = col.find('.') {
858                    &col[dot_pos + 1..]
859                } else {
860                    col.as_str()
861                };
862                alloc::format!("{}_{}", func_name, simple_name)
863            } else {
864                func_name.to_string() // COUNT(*)
865            };
866            col_names.push(col_name);
867        }
868        col_names
869    }
870
871    /// Converts aggregate result rows to JS array.
872    fn aggregate_rows_to_js_array(&self, rows: &[Rc<Row>]) -> JsValue {
873        let result = js_sys::Array::new();
874        let col_names = self.build_aggregate_column_names();
875
876        for row in rows {
877            let obj = js_sys::Object::new();
878            for (i, name) in col_names.iter().enumerate() {
879                if let Some(value) = row.get(i) {
880                    let js_val = crate::convert::value_to_js(value);
881                    let _ = js_sys::Reflect::set(&obj, &JsValue::from_str(name), &js_val);
882                }
883            }
884            result.push(&obj);
885        }
886
887        result.into()
888    }
889
890    /// Explains the query plan without executing it.
891    ///
892    /// Returns an object with:
893    /// - `logical`: The original logical plan
894    /// - `optimized`: The optimized logical plan (after index selection, etc.)
895    /// - `physical`: The final physical execution plan
896    pub fn explain(&self) -> Result<JsValue, JsValue> {
897        let table_name = self
898            .from_table
899            .as_ref()
900            .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
901
902        let cache = self.cache.borrow();
903        let _ = cache
904            .get_table(table_name)
905            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
906
907        // Build logical plan
908        let plan = self.build_logical_plan(table_name);
909
910        // Get explain result
911        let result = explain_plan(&cache, table_name, plan);
912
913        // Convert to JS object
914        let obj = js_sys::Object::new();
915        js_sys::Reflect::set(&obj, &"logical".into(), &result.logical_plan.into())?;
916        js_sys::Reflect::set(&obj, &"optimized".into(), &result.optimized_plan.into())?;
917        js_sys::Reflect::set(&obj, &"physical".into(), &result.physical_plan.into())?;
918
919        Ok(obj.into())
920    }
921
922    /// Creates an observable query using re-query strategy.
923    /// When data changes, the cached physical plan is re-executed (no optimization overhead).
924    pub fn observe(&self) -> Result<JsObservableQuery, JsValue> {
925        let table_name = self
926            .from_table
927            .as_ref()
928            .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
929
930        let table_id = self
931            .table_id_map
932            .borrow()
933            .get(table_name)
934            .copied()
935            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table ID not found: {}", table_name)))?;
936
937        let cache_ref = self.cache.clone();
938        let cache = cache_ref.borrow();
939        let store = cache
940            .get_table(table_name)
941            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
942
943        let schema = store.schema().clone();
944
945        // Build binary layout: merge main table + all joined table schemas
946        let binary_layout = if self.joins.is_empty() {
947            if let Some(cols) = self.parse_columns() {
948                SchemaLayout::from_projection(&schema, &cols)
949            } else {
950                SchemaLayout::from_schema(&schema)
951            }
952        } else {
953            let mut schemas: Vec<&Table> = Vec::with_capacity(1 + self.joins.len());
954            schemas.push(store.schema());
955            for join in &self.joins {
956                let join_store = cache.get_table(&join.table).ok_or_else(|| {
957                    JsValue::from_str(&alloc::format!("Join table not found: {}", join.table))
958                })?;
959                schemas.push(join_store.schema());
960            }
961            SchemaLayout::from_schemas(&schemas)
962        };
963
964        // Build logical plan and compile to physical plan (cached for re-execution)
965        let logical_plan = self.build_logical_plan(table_name);
966        let physical_plan = compile_plan(&cache, table_name, logical_plan.clone());
967
968        // Get initial result using the compiled physical plan
969        let initial_rows = execute_physical_plan(&cache, &physical_plan)
970            .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?;
971
972        drop(cache); // Release borrow
973
974        // Create re-query observable with cached physical plan
975        let observable = ReQueryObservable::new(
976            physical_plan,
977            cache_ref.clone(),
978            initial_rows,
979        );
980        let observable_rc = Rc::new(RefCell::new(observable));
981
982        // Register with query registry
983        self.query_registry
984            .borrow_mut()
985            .register(observable_rc.clone(), table_id);
986
987        // Return observable with appropriate column info
988        if !self.aggregates.is_empty() || !self.group_by_cols.is_empty() {
989            // For aggregate queries, build column names from group_by + aggregates
990            let aggregate_cols = self.build_aggregate_column_names();
991            Ok(JsObservableQuery::new_with_aggregates(observable_rc, schema, aggregate_cols, binary_layout))
992        } else if let Some(cols) = self.parse_columns() {
993            Ok(JsObservableQuery::new_with_projection(observable_rc, schema, cols, binary_layout))
994        } else {
995            Ok(JsObservableQuery::new(observable_rc, schema, binary_layout))
996        }
997    }
998
999    /// Creates an observable query with join support.
1000    /// Note: Join queries still use the old incremental approach for now.
1001    #[allow(dead_code)]
1002    fn observe_with_joins(
1003        &self,
1004        _left_table_id: TableId,
1005        _left_schema: &Table,
1006        _cache: &TableCache,
1007    ) -> Result<JsObservableQuery, JsValue> {
1008        // TODO: Implement re-query for joins
1009        Err(JsValue::from_str("Join queries with observe() not yet supported in re-query mode"))
1010    }
1011
1012    /// Extracts join key column names from a join condition.
1013    /// Expects condition like: left_table.col = right_table.col
1014    #[allow(dead_code)]
1015    fn extract_join_keys(&self, condition: &Expr) -> Result<(String, String), JsValue> {
1016        match condition.inner() {
1017            ExprInner::Comparison { column, op, value } => {
1018                use crate::expr::ComparisonOp;
1019                if *op != ComparisonOp::Eq {
1020                    return Err(JsValue::from_str("Join condition must be an equality"));
1021                }
1022
1023                // The column is the left key
1024                let left_key = column.name();
1025
1026                // The value should be a column reference (as string for now)
1027                // In a real implementation, we'd have a proper ColumnRef expression
1028                let right_key = if let Some(s) = value.as_string() {
1029                    s
1030                } else {
1031                    return Err(JsValue::from_str("Join condition right side must be a column name"));
1032                };
1033
1034                Ok((left_key, right_key))
1035            }
1036            _ => Err(JsValue::from_str("Join condition must be a column equality")),
1037        }
1038    }
1039
1040    /// Creates a changes stream (initial + incremental).
1041    pub fn changes(&self) -> Result<JsChangesStream, JsValue> {
1042        let observable = self.observe()?;
1043        Ok(JsChangesStream::from_observable(observable))
1044    }
1045
1046    /// Gets the schema layout for binary decoding.
1047    /// The layout can be cached by JS for repeated queries on the same table.
1048    #[wasm_bindgen(js_name = getSchemaLayout)]
1049    pub fn get_schema_layout(&self) -> Result<crate::binary_protocol::SchemaLayout, JsValue> {
1050        let table_name = self
1051            .from_table
1052            .as_ref()
1053            .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
1054
1055        let cache = self.cache.borrow();
1056        let store = cache
1057            .get_table(table_name)
1058            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
1059
1060        let schema = store.schema();
1061
1062        // Create layout based on projection
1063        // - Projection queries: create new each time
1064        // - Full table queries: use cache (get_or_create)
1065        let layout = if let Some(cols) = self.parse_columns() {
1066            // Projection query: create layout from projected columns
1067            // For JOIN queries, we need to look up columns from multiple tables
1068            self.create_projection_layout(&cols)
1069        } else {
1070            self.schema_layout_cache
1071                .borrow_mut()
1072                .get_or_create_full(table_name, schema)
1073                .clone()
1074        };
1075
1076        Ok(layout)
1077    }
1078
1079    /// Executes the query and returns a binary result buffer.
1080    /// Use with getSchemaLayout() for zero-copy decoding in JS.
1081    #[wasm_bindgen(js_name = execBinary)]
1082    pub async fn exec_binary(&self) -> Result<crate::binary_protocol::BinaryResult, JsValue> {
1083        let table_name = self
1084            .from_table
1085            .as_ref()
1086            .ok_or_else(|| JsValue::from_str("FROM table not specified"))?;
1087
1088        let cache = self.cache.borrow();
1089        let store = cache
1090            .get_table(table_name)
1091            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table_name)))?;
1092
1093        let schema = store.schema();
1094
1095        // Build logical plan
1096        let plan = self.build_logical_plan(table_name);
1097
1098        // Compute plan fingerprint for caching
1099        let fingerprint = compute_plan_fingerprint(&plan);
1100
1101        // Get or compile physical plan (cached)
1102        let rows = {
1103            let mut plan_cache = self.plan_cache.borrow_mut();
1104            let physical_plan = plan_cache.get_or_insert_with(fingerprint, || {
1105                compile_plan(&cache, table_name, plan)
1106            });
1107
1108            // Execute the cached physical plan
1109            execute_physical_plan(&cache, physical_plan)
1110                .map_err(|e| JsValue::from_str(&alloc::format!("Query execution error: {:?}", e)))?
1111        };
1112
1113        // Create layout based on projection
1114        // - Projection queries: create new each time (column combinations vary too much)
1115        // - Full table queries: use cache (get_or_create)
1116        let layout = if let Some(cols) = self.parse_columns() {
1117            // Projection query: create layout from projected columns
1118            // For JOIN queries, we need to look up columns from multiple tables
1119            self.create_projection_layout(&cols)
1120        } else {
1121            // Full table query: use cached layout (clone for encoder)
1122            self.schema_layout_cache
1123                .borrow_mut()
1124                .get_or_create_full(table_name, schema)
1125                .clone()
1126        };
1127
1128        // Encode to binary
1129        let mut encoder = crate::binary_protocol::BinaryEncoder::new(layout, rows.len());
1130        encoder.encode_rows(&rows);
1131        let buffer = encoder.finish();
1132
1133        Ok(crate::binary_protocol::BinaryResult::new(buffer))
1134    }
1135}
1136
1137/// INSERT query builder.
1138#[wasm_bindgen]
1139pub struct InsertBuilder {
1140    cache: Rc<RefCell<TableCache>>,
1141    query_registry: Rc<RefCell<QueryRegistry>>,
1142    table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1143    table_name: String,
1144    values_data: Option<JsValue>,
1145}
1146
1147impl InsertBuilder {
1148    pub(crate) fn new(
1149        cache: Rc<RefCell<TableCache>>,
1150        query_registry: Rc<RefCell<QueryRegistry>>,
1151        table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1152        table: &str,
1153    ) -> Self {
1154        Self {
1155            cache,
1156            query_registry,
1157            table_id_map,
1158            table_name: table.to_string(),
1159            values_data: None,
1160        }
1161    }
1162}
1163
1164#[wasm_bindgen]
1165impl InsertBuilder {
1166    /// Sets the values to insert.
1167    pub fn values(mut self, data: &JsValue) -> Self {
1168        self.values_data = Some(data.clone());
1169        self
1170    }
1171
1172    /// Executes the insert operation.
1173    pub async fn exec(&self) -> Result<JsValue, JsValue> {
1174        let values = self
1175            .values_data
1176            .as_ref()
1177            .ok_or_else(|| JsValue::from_str("No values specified"))?;
1178
1179        let mut cache = self.cache.borrow_mut();
1180        let store = cache
1181            .get_table_mut(&self.table_name)
1182            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1183
1184        let schema = store.schema().clone();
1185
1186        // Get the count of rows to insert first
1187        let arr = js_sys::Array::from(values);
1188        let row_count = arr.length() as u64;
1189
1190        // Reserve row IDs for all rows at once to avoid ID conflicts
1191        let start_row_id = reserve_row_ids(row_count);
1192
1193        // Convert JS values to rows
1194        let rows = js_array_to_rows(values, &schema, start_row_id)?;
1195        let row_count = rows.len();
1196
1197        // Build deltas for notification
1198        let _deltas: Vec<Delta<Row>> = rows.iter().map(|r| Delta::insert(r.clone())).collect();
1199
1200        // Insert rows and collect their IDs
1201        let mut inserted_ids = hashbrown::HashSet::new();
1202        for row in rows {
1203            inserted_ids.insert(row.id());
1204            store
1205                .insert(row)
1206                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1207        }
1208
1209        // Notify query registry with changed IDs
1210        if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1211            drop(cache); // Release borrow before notifying
1212            self.query_registry
1213                .borrow_mut()
1214                .on_table_change(table_id, &inserted_ids);
1215        }
1216
1217        Ok(JsValue::from_f64(row_count as f64))
1218    }
1219}
1220
1221/// UPDATE query builder.
1222#[wasm_bindgen]
1223pub struct UpdateBuilder {
1224    cache: Rc<RefCell<TableCache>>,
1225    query_registry: Rc<RefCell<QueryRegistry>>,
1226    table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1227    table_name: String,
1228    set_values: Vec<(String, JsValue)>,
1229    where_clause: Option<Expr>,
1230}
1231
1232impl UpdateBuilder {
1233    pub(crate) fn new(
1234        cache: Rc<RefCell<TableCache>>,
1235        query_registry: Rc<RefCell<QueryRegistry>>,
1236        table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1237        table: &str,
1238    ) -> Self {
1239        Self {
1240            cache,
1241            query_registry,
1242            table_id_map,
1243            table_name: table.to_string(),
1244            set_values: Vec::new(),
1245            where_clause: None,
1246        }
1247    }
1248}
1249
1250#[wasm_bindgen]
1251impl UpdateBuilder {
1252    /// Sets column values.
1253    /// Can be called with either:
1254    /// - An object: set({ column: value, ... })
1255    /// - Two arguments: set(column, value)
1256    pub fn set(mut self, column_or_obj: &JsValue, value: Option<JsValue>) -> Self {
1257        if let Some(val) = value {
1258            // Two-argument form: set(column, value)
1259            if let Some(col_name) = column_or_obj.as_string() {
1260                self.set_values.push((col_name, val));
1261            }
1262        } else if let Some(obj) = column_or_obj.dyn_ref::<js_sys::Object>() {
1263            // Object form: set({ column: value, ... })
1264            let keys = js_sys::Object::keys(obj);
1265            for key in keys.iter() {
1266                if let Some(col_name) = key.as_string() {
1267                    let value = js_sys::Reflect::get(obj, &key).unwrap_or(JsValue::NULL);
1268                    self.set_values.push((col_name, value));
1269                }
1270            }
1271        }
1272        self
1273    }
1274
1275    /// Sets the WHERE clause.
1276    #[wasm_bindgen(js_name = "where")]
1277    pub fn where_(mut self, predicate: &Expr) -> Self {
1278        self.where_clause = Some(predicate.clone());
1279        self
1280    }
1281
1282    /// Executes the update operation.
1283    pub async fn exec(&self) -> Result<JsValue, JsValue> {
1284        let mut cache = self.cache.borrow_mut();
1285        let store = cache
1286            .get_table_mut(&self.table_name)
1287            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1288
1289        let schema = store.schema().clone();
1290
1291        // Find rows to update
1292        let rows_to_update: Vec<Row> = store
1293            .scan()
1294            .filter(|row| {
1295                if let Some(ref pred) = self.where_clause {
1296                    evaluate_predicate(pred, &**row, &schema)
1297                } else {
1298                    true
1299                }
1300            })
1301            .map(|rc| (*rc).clone())
1302            .collect();
1303
1304        let mut deltas = Vec::new();
1305        let mut update_count = 0;
1306        let mut updated_ids = hashbrown::HashSet::new();
1307
1308        for old_row in rows_to_update {
1309            // Create new row with updated values
1310            let mut new_values = old_row.values().to_vec();
1311
1312            for (col_name, js_val) in &self.set_values {
1313                if let Some(col) = schema.get_column(col_name) {
1314                    let idx = col.index();
1315                    let value = js_to_value(js_val, col.data_type())?;
1316                    if idx < new_values.len() {
1317                        new_values[idx] = value;
1318                    }
1319                }
1320            }
1321
1322            // Create new row with incremented version
1323            let new_version = old_row.version().wrapping_add(1);
1324            let new_row = Row::new_with_version(old_row.id(), new_version, new_values);
1325
1326            // Build deltas
1327            deltas.push(Delta::delete(old_row.clone()));
1328            deltas.push(Delta::insert(new_row.clone()));
1329
1330            // Track updated row ID
1331            updated_ids.insert(old_row.id());
1332
1333            // Update in store
1334            store
1335                .update(old_row.id(), new_row)
1336                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1337
1338            update_count += 1;
1339        }
1340
1341        // Notify query registry with changed IDs
1342        if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1343            drop(cache);
1344            self.query_registry
1345                .borrow_mut()
1346                .on_table_change(table_id, &updated_ids);
1347        }
1348
1349        Ok(JsValue::from_f64(update_count as f64))
1350    }
1351}
1352
1353/// DELETE query builder.
1354#[wasm_bindgen]
1355pub struct DeleteBuilder {
1356    cache: Rc<RefCell<TableCache>>,
1357    query_registry: Rc<RefCell<QueryRegistry>>,
1358    table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1359    table_name: String,
1360    where_clause: Option<Expr>,
1361}
1362
1363impl DeleteBuilder {
1364    pub(crate) fn new(
1365        cache: Rc<RefCell<TableCache>>,
1366        query_registry: Rc<RefCell<QueryRegistry>>,
1367        table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
1368        table: &str,
1369    ) -> Self {
1370        Self {
1371            cache,
1372            query_registry,
1373            table_id_map,
1374            table_name: table.to_string(),
1375            where_clause: None,
1376        }
1377    }
1378}
1379
1380#[wasm_bindgen]
1381impl DeleteBuilder {
1382    /// Sets the WHERE clause.
1383    #[wasm_bindgen(js_name = "where")]
1384    pub fn where_(mut self, predicate: &Expr) -> Self {
1385        self.where_clause = Some(predicate.clone());
1386        self
1387    }
1388
1389    /// Executes the delete operation.
1390    pub async fn exec(&self) -> Result<JsValue, JsValue> {
1391        let mut cache = self.cache.borrow_mut();
1392        let store = cache
1393            .get_table_mut(&self.table_name)
1394            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", self.table_name)))?;
1395
1396        let schema = store.schema().clone();
1397
1398        // Find rows to delete
1399        let rows_to_delete: Vec<Row> = store
1400            .scan()
1401            .filter(|row| {
1402                if let Some(ref pred) = self.where_clause {
1403                    evaluate_predicate(pred, &**row, &schema)
1404                } else {
1405                    true
1406                }
1407            })
1408            .map(|rc| (*rc).clone())
1409            .collect();
1410
1411        let _deltas: Vec<Delta<Row>> = rows_to_delete
1412            .iter()
1413            .map(|r| Delta::delete(r.clone()))
1414            .collect();
1415
1416        let delete_count = rows_to_delete.len();
1417
1418        // Delete rows and collect their IDs
1419        let mut deleted_ids = hashbrown::HashSet::new();
1420        for row in rows_to_delete {
1421            deleted_ids.insert(row.id());
1422            store
1423                .delete(row.id())
1424                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
1425        }
1426
1427        // Notify query registry with changed IDs
1428        if let Some(table_id) = self.table_id_map.borrow().get(&self.table_name).copied() {
1429            drop(cache);
1430            self.query_registry
1431                .borrow_mut()
1432                .on_table_change(table_id, &deleted_ids);
1433        }
1434
1435        Ok(JsValue::from_f64(delete_count as f64))
1436    }
1437}
1438
1439/// Evaluates a predicate against a row.
1440pub(crate) fn evaluate_predicate(predicate: &Expr, row: &Row, schema: &Table) -> bool {
1441    match predicate.inner() {
1442        ExprInner::Comparison { column, op, value } => {
1443            let col = schema.get_column(&column.name());
1444            if col.is_none() {
1445                return false;
1446            }
1447            let col = col.unwrap();
1448            let idx = col.index();
1449
1450            let row_val = match row.get(idx) {
1451                Some(v) => v,
1452                None => return false,
1453            };
1454
1455            let cmp_val = match js_to_value(value, col.data_type()) {
1456                Ok(v) => v,
1457                Err(_) => return false,
1458            };
1459
1460            use crate::expr::ComparisonOp;
1461            match op {
1462                ComparisonOp::Eq => row_val == &cmp_val,
1463                ComparisonOp::Ne => row_val != &cmp_val,
1464                ComparisonOp::Gt => row_val > &cmp_val,
1465                ComparisonOp::Gte => row_val >= &cmp_val,
1466                ComparisonOp::Lt => row_val < &cmp_val,
1467                ComparisonOp::Lte => row_val <= &cmp_val,
1468            }
1469        }
1470        ExprInner::Between { column, low, high } => {
1471            let col = schema.get_column(&column.name());
1472            if col.is_none() {
1473                return false;
1474            }
1475            let col = col.unwrap();
1476            let idx = col.index();
1477
1478            let row_val = match row.get(idx) {
1479                Some(v) => v,
1480                None => return false,
1481            };
1482
1483            let low_val = match js_to_value(low, col.data_type()) {
1484                Ok(v) => v,
1485                Err(_) => return false,
1486            };
1487            let high_val = match js_to_value(high, col.data_type()) {
1488                Ok(v) => v,
1489                Err(_) => return false,
1490            };
1491
1492            row_val >= &low_val && row_val <= &high_val
1493        }
1494        ExprInner::InList { column, values } => {
1495            let col = schema.get_column(&column.name());
1496            if col.is_none() {
1497                return false;
1498            }
1499            let col = col.unwrap();
1500            let idx = col.index();
1501
1502            let row_val = match row.get(idx) {
1503                Some(v) => v,
1504                None => return false,
1505            };
1506
1507            let arr = js_sys::Array::from(values);
1508            arr.iter().any(|v| {
1509                if let Ok(cmp_val) = js_to_value(&v, col.data_type()) {
1510                    row_val == &cmp_val
1511                } else {
1512                    false
1513                }
1514            })
1515        }
1516        ExprInner::Like { column, pattern } => {
1517            let col = schema.get_column(&column.name());
1518            if col.is_none() {
1519                return false;
1520            }
1521            let col = col.unwrap();
1522            let idx = col.index();
1523
1524            let row_val = match row.get(idx) {
1525                Some(Value::String(s)) => s,
1526                _ => return false,
1527            };
1528
1529            // Simple LIKE implementation (% = any, _ = single char)
1530            like_match(row_val, pattern)
1531        }
1532        ExprInner::IsNull { column } => {
1533            let col = schema.get_column(&column.name());
1534            if col.is_none() {
1535                return false;
1536            }
1537            let idx = col.unwrap().index();
1538
1539            match row.get(idx) {
1540                Some(Value::Null) | None => true,
1541                _ => false,
1542            }
1543        }
1544        ExprInner::IsNotNull { column } => {
1545            let col = schema.get_column(&column.name());
1546            if col.is_none() {
1547                return false;
1548            }
1549            let idx = col.unwrap().index();
1550
1551            match row.get(idx) {
1552                Some(Value::Null) | None => false,
1553                _ => true,
1554            }
1555        }
1556        ExprInner::And { left, right } => {
1557            evaluate_predicate(left, row, schema) && evaluate_predicate(right, row, schema)
1558        }
1559        ExprInner::Or { left, right } => {
1560            evaluate_predicate(left, row, schema) || evaluate_predicate(right, row, schema)
1561        }
1562        ExprInner::Not { inner } => !evaluate_predicate(inner, row, schema),
1563        ExprInner::True => true,
1564        _ => true, // Default to true for unsupported expressions
1565    }
1566}
1567
1568/// Simple LIKE pattern matching.
1569fn like_match(s: &str, pattern: &str) -> bool {
1570    let mut s_chars = s.chars().peekable();
1571    let mut p_chars = pattern.chars().peekable();
1572
1573    while let Some(p) = p_chars.next() {
1574        match p {
1575            '%' => {
1576                // Match any sequence
1577                if p_chars.peek().is_none() {
1578                    return true; // % at end matches everything
1579                }
1580                // Try matching rest of pattern at each position
1581                let rest_pattern: String = p_chars.collect();
1582                while s_chars.peek().is_some() {
1583                    let rest_s: String = s_chars.clone().collect();
1584                    if like_match(&rest_s, &rest_pattern) {
1585                        return true;
1586                    }
1587                    s_chars.next();
1588                }
1589                return like_match("", &rest_pattern);
1590            }
1591            '_' => {
1592                // Match single character
1593                if s_chars.next().is_none() {
1594                    return false;
1595                }
1596            }
1597            c => {
1598                // Match exact character
1599                if s_chars.next() != Some(c) {
1600                    return false;
1601                }
1602            }
1603        }
1604    }
1605
1606    s_chars.peek().is_none()
1607}
1608
1609#[cfg(test)]
1610mod tests {
1611    use super::*;
1612    use wasm_bindgen_test::*;
1613
1614    wasm_bindgen_test_configure!(run_in_browser);
1615
1616    #[wasm_bindgen_test]
1617    fn test_like_match_exact() {
1618        assert!(like_match("hello", "hello"));
1619        assert!(!like_match("hello", "world"));
1620    }
1621
1622    #[wasm_bindgen_test]
1623    fn test_like_match_percent() {
1624        assert!(like_match("hello", "%"));
1625        assert!(like_match("hello", "h%"));
1626        assert!(like_match("hello", "%o"));
1627        assert!(like_match("hello", "h%o"));
1628        assert!(like_match("hello", "%ell%"));
1629        assert!(!like_match("hello", "x%"));
1630    }
1631
1632    #[wasm_bindgen_test]
1633    fn test_like_match_underscore() {
1634        assert!(like_match("hello", "_ello"));
1635        assert!(like_match("hello", "h_llo"));
1636        assert!(like_match("hello", "hell_"));
1637        assert!(like_match("hello", "_____"));
1638        assert!(!like_match("hello", "______"));
1639    }
1640
1641    #[wasm_bindgen_test]
1642    fn test_like_match_combined() {
1643        assert!(like_match("hello", "h%_o"));
1644        assert!(like_match("hello world", "hello%"));
1645        assert!(like_match("hello world", "%world"));
1646    }
1647}