1use crate::storage::{MemoryEngine, Row, StorageEngine, RowId};
2use crate::types::{DbValue, DbResult};
3use crate::index::btree::BTreeIndex;
4use crate::index::manager::IndexMeta;
5use std::sync::{Arc, RwLock};
6
7#[derive(Debug, Clone, Copy, PartialEq)]
9pub enum Order {
10 Asc,
11 Desc,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum JoinType {
17 Inner, Left, Right, Full, }
22
23#[derive(Debug, Clone)]
25pub struct JoinCondition {
26 pub join_type: JoinType,
27 pub right_table: String,
28 pub left_field: String, pub right_field: String, }
31
32#[derive(Debug, Clone)]
34pub enum FilterExpr {
35 Eq { field: String, value: DbValue },
36 Ne { field: String, value: DbValue },
37 Lt { field: String, value: DbValue },
38 Le { field: String, value: DbValue },
39 Gt { field: String, value: DbValue },
40 Ge { field: String, value: DbValue },
41 In { field: String, values: Vec<DbValue> },
42 Contains { field: String, value: String },
43 And(Box<FilterExpr>, Box<FilterExpr>),
44 Or(Box<FilterExpr>, Box<FilterExpr>),
45 Not(Box<FilterExpr>),
46}
47
48#[derive(Debug, Clone, Copy, PartialEq)]
50pub enum AggregateFunction {
51 Count,
52 Sum,
53 Avg,
54 Max,
55 Min,
56}
57
58#[derive(Debug, Clone)]
60pub struct AggregateExpr {
61 pub func: AggregateFunction,
62 pub column: Option<String>, pub alias: Option<String>, }
65
66#[derive(Debug, Clone)]
68pub enum HavingExpr {
69 Eq { value: DbValue },
70 Ne { value: DbValue },
71 Lt { value: DbValue },
72 Le { value: DbValue },
73 Gt { value: DbValue },
74 Ge { value: DbValue },
75}
76
77fn evaluate_filter(expr: &FilterExpr, row: &Row) -> bool {
79 match expr {
80 FilterExpr::Eq { field, value } => {
81 row.get(field).map(|v| v == value).unwrap_or(false)
82 }
83 FilterExpr::Ne { field, value } => {
84 row.get(field).map(|v| v != value).unwrap_or(true)
85 }
86 FilterExpr::Lt { field, value } => compare_field(row, field, value, |a, b| a < b),
87 FilterExpr::Le { field, value } => compare_field(row, field, value, |a, b| a <= b),
88 FilterExpr::Gt { field, value } => compare_field(row, field, value, |a, b| a > b),
89 FilterExpr::Ge { field, value } => compare_field(row, field, value, |a, b| a >= b),
90 FilterExpr::In { field, values } => {
91 row.get(field).map(|v| values.contains(v)).unwrap_or(false)
92 }
93 FilterExpr::Contains { field, value } => {
94 row.get(field)
95 .and_then(|v| v.as_text())
96 .map(|s| s.contains(value))
97 .unwrap_or(false)
98 }
99 FilterExpr::And(left, right) => {
100 evaluate_filter(left, row) && evaluate_filter(right, row)
101 }
102 FilterExpr::Or(left, right) => {
103 evaluate_filter(left, row) || evaluate_filter(right, row)
104 }
105 FilterExpr::Not(inner) => {
106 !evaluate_filter(inner, row)
107 }
108 }
109}
110
111fn compare_field<F>(row: &Row, field: &str, value: &DbValue, cmp: F) -> bool
113where
114 F: Fn(f64, f64) -> bool,
115{
116 let row_val = match row.get(field) {
117 Some(v) => v,
118 None => return false,
119 };
120
121 let row_num = match row_val {
122 DbValue::Integer(i) => *i as f64,
123 DbValue::Real(r) => *r,
124 _ => return false,
125 };
126
127 let cmp_num = match value {
128 DbValue::Integer(i) => *i as f64,
129 DbValue::Real(r) => *r,
130 _ => return false,
131 };
132
133 cmp(row_num, cmp_num)
134}
135
136pub struct QueryBuilder {
138 table: String,
139 joins: Vec<JoinCondition>, selected_columns: Vec<String>, filters: Vec<FilterExpr>,
142 order_by: Option<(String, Order)>,
143 limit: Option<usize>,
144 offset: Option<usize>,
145 engine: Arc<RwLock<MemoryEngine>>,
146 aggregate: Option<AggregateExpr>, group_by: Vec<String>, having: Option<HavingExpr>, }
151
152impl QueryBuilder {
153 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
154 QueryBuilder {
155 table,
156 joins: Vec::new(),
157 selected_columns: Vec::new(),
158 filters: Vec::new(),
159 order_by: None,
160 limit: None,
161 offset: None,
162 engine,
163 aggregate: None,
164 group_by: Vec::new(),
165 having: None,
166 }
167 }
168
169 fn find_best_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a IndexMeta, &'a BTreeIndex, Vec<DbValue>)> {
172 let eq_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| matches!(f, FilterExpr::Eq { .. })).collect();
174
175 let range_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| {
177 matches!(f, FilterExpr::Gt { .. } | FilterExpr::Ge { .. } | FilterExpr::Lt { .. } | FilterExpr::Le { .. })
178 }).collect();
179
180 if !eq_filters.is_empty() {
182 let eq_columns: Vec<&str> = eq_filters.iter().map(|f| {
183 if let FilterExpr::Eq { field, .. } = f {
184 field.as_str()
185 } else {
186 ""
187 }
188 }).collect();
189
190 if let Some((meta, index)) = engine.find_best_index(&self.table, &eq_columns) {
191 let mut key_values = Vec::new();
193 for col in &meta.columns {
194 let value = eq_filters.iter()
195 .find_map(|f| {
196 if let FilterExpr::Eq { field, value } = f {
197 if field == col { Some(value) } else { None }
198 } else { None }
199 });
200 if let Some(v) = value {
201 key_values.push(v.clone());
202 } else {
203 break;
205 }
206 }
207 if !key_values.is_empty() {
208 return Some((meta, index, key_values));
209 }
210 }
211 }
212
213 if let Some(_filter) = range_filters.first() {
215 }
217
218 None
219 }
220
221 fn find_best_single_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a String, &'a BTreeIndex)> {
223 for filter in &self.filters {
225 if let FilterExpr::Eq { field, .. } = filter {
226 if let Some(index) = engine.get_index(&self.table, field) {
227 return Some((field, index));
228 }
229 }
230 }
231 for filter in &self.filters {
233 if let FilterExpr::Gt { field, .. }
234 | FilterExpr::Ge { field, .. }
235 | FilterExpr::Lt { field, .. }
236 | FilterExpr::Le { field, .. } = filter {
237 if let Some(index) = engine.get_index(&self.table, field) {
238 return Some((field, index));
239 }
240 }
241 }
242 None
243 }
244
245 fn execute_with_index(&self, engine: &MemoryEngine, _field: &str, index: &BTreeIndex) -> DbResult<Vec<Row>> {
247 let row_ids = self.get_matching_row_ids(index);
249
250 let mut results = Vec::new();
252 for row_id in row_ids {
253 if let Some(row) = engine.get(&self.table, row_id)? {
254 if self.matches_all_filters(&row) {
256 results.push(row.clone());
257 }
258 }
259 }
260
261 if let Some((field, order)) = &self.order_by {
263 results.sort_by(|a, b| {
264 self.compare_rows(a, b, field, *order)
265 });
266 }
267
268 let start = self.offset.unwrap_or(0);
270 let end = start + self.limit.unwrap_or(results.len());
271
272 Ok(results.into_iter().skip(start).take(end - start).collect())
273 }
274
275 fn execute_with_composite_index(&self, engine: &MemoryEngine, _meta: &IndexMeta, index: &BTreeIndex, key_values: &[DbValue]) -> DbResult<Vec<Row>> {
277 let row_ids = index.search_composite(key_values);
279
280 let mut results = Vec::new();
282 for row_id in row_ids {
283 if let Some(row) = engine.get(&self.table, row_id)? {
284 if self.matches_all_filters(&row) {
286 results.push(row.clone());
287 }
288 }
289 }
290
291 if let Some((field, order)) = &self.order_by {
293 results.sort_by(|a, b| {
294 self.compare_rows(a, b, field, *order)
295 });
296 }
297
298 let start = self.offset.unwrap_or(0);
300 let end = start + self.limit.unwrap_or(results.len());
301
302 Ok(results.into_iter().skip(start).take(end - start).collect())
303 }
304
305 fn get_matching_row_ids(&self, index: &BTreeIndex) -> Vec<RowId> {
307 let mut row_ids = Vec::new();
308
309 let mut eq_value: Option<&DbValue> = None;
311 let mut gt_value: Option<&DbValue> = None;
312 let mut ge_value: Option<&DbValue> = None;
313 let mut lt_value: Option<&DbValue> = None;
314 let mut le_value: Option<&DbValue> = None;
315
316 for filter in &self.filters {
317 match filter {
318 FilterExpr::Eq { field, value } if eq_value.is_none() => {
319 eq_value = Some(value);
320 }
321 FilterExpr::Gt { field: _, value } => {
322 if gt_value.is_none() || value > gt_value.unwrap() {
323 gt_value = Some(value);
324 }
325 }
326 FilterExpr::Ge { field: _, value } => {
327 if ge_value.is_none() || value > ge_value.unwrap() {
328 ge_value = Some(value);
329 }
330 }
331 FilterExpr::Lt { field: _, value } => {
332 if lt_value.is_none() || value < lt_value.unwrap() {
333 lt_value = Some(value);
334 }
335 }
336 FilterExpr::Le { field: _, value } => {
337 if le_value.is_none() || value < le_value.unwrap() {
338 le_value = Some(value);
339 }
340 }
341 _ => {}
342 }
343 }
344
345 if let Some(eq) = eq_value {
347 row_ids.extend(index.search(eq));
348 } else {
349 let range_start = gt_value.or(ge_value);
351 let range_end = lt_value.or(le_value);
352
353 if let Some(start) = range_start {
354 if let Some(end) = range_end {
356 row_ids.extend(index.range(start, end));
357 } else {
358 row_ids.extend(index.range_from(start));
359 }
360 } else if let Some(_end) = range_end {
361 } else {
366 }
369 }
370
371 row_ids
372 }
373
374 fn matches_all_filters(&self, row: &Row) -> bool {
376 self.filters.iter().all(|expr| evaluate_filter(expr, row))
377 }
378
379 fn compare_rows(&self, a: &Row, b: &Row, field: &str, order: Order) -> std::cmp::Ordering {
381 let a_val = a.get(field);
382 let b_val = b.get(field);
383
384 let cmp = match (a_val, b_val) {
385 (Some(DbValue::Integer(a)), Some(DbValue::Integer(b))) => a.partial_cmp(b),
386 (Some(DbValue::Real(a)), Some(DbValue::Real(b))) => a.partial_cmp(b),
387 (Some(DbValue::Integer(a)), Some(DbValue::Real(b))) => (*a as f64).partial_cmp(b),
388 (Some(DbValue::Real(a)), Some(DbValue::Integer(b))) => a.partial_cmp(&(*b as f64)),
389 (Some(DbValue::Text(a)), Some(DbValue::Text(b))) => Some(a.cmp(b)),
390 _ => Some(std::cmp::Ordering::Equal),
391 };
392
393 match order {
394 Order::Asc => cmp.unwrap_or(std::cmp::Ordering::Equal),
395 Order::Desc => cmp.unwrap_or(std::cmp::Ordering::Equal).reverse(),
396 }
397 }
398
399 fn prefix_row(&self, row: &Row, table: &str) -> Row {
403 row.iter()
404 .map(|(k, v)| (format!("{}.{}", table, k), v.clone()))
405 .collect::<Row>()
406 }
407
408 fn create_null_row(&self, engine: &MemoryEngine, table: &str) -> DbResult<Row> {
410 let schema = engine.get_schema(table)?;
411 let mut row = Row::new();
412 for column in &schema.columns {
413 row.insert(format!("{}.{}", table, column.name), DbValue::Null);
414 }
415 Ok(row)
416 }
417
418 fn match_join_condition(&self, left: &Row, right: &Row, join: &JoinCondition) -> bool {
420 let left_val = left.get(&join.left_field);
421 let right_val = right.get(&join.right_field);
422 left_val == right_val
423 }
424
425 fn extract_column_name(field: &str) -> &str {
429 if let Some(pos) = field.rfind('.') {
430 &field[pos + 1..]
431 } else {
432 field
433 }
434 }
435
436 fn optimize_join_order(&self, engine: &MemoryEngine) -> DbResult<Vec<JoinCondition>> {
439 let mut tables: Vec<(String, usize)> = Vec::new();
441
442 let main_count = engine.get_row_count(&self.table)?;
444 tables.push((self.table.clone(), main_count));
445
446 for join in &self.joins {
448 if !tables.iter().any(|(name, _)| name == &join.right_table) {
449 let count = engine.get_row_count(&join.right_table)?;
450 tables.push((join.right_table.clone(), count));
451 }
452 }
453
454 tables.sort_by_key(|(_, count)| *count);
456
457 let smallest_table = tables.first()
459 .map(|(name, _)| name.clone());
460
461 let mut optimized_joins = self.joins.clone();
463
464 if let Some(smallest) = smallest_table {
465 if let Some(min_idx) = optimized_joins.iter().position(|j| j.right_table == smallest) {
467 optimized_joins.swap(0, min_idx);
468 }
469 }
470
471 Ok(optimized_joins)
472 }
473
474 fn execute_join(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
476 if self.joins.is_empty() {
477 return self.execute_simple_scan(engine);
478 }
479
480 let has_right_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Right));
482 let has_full_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Full));
483
484 let mut results: Vec<Row> = Vec::new();
486
487 if has_right_join || has_full_join {
488 self.execute_right_or_full_join(engine, &mut results)?;
490 } else {
491 let optimized_joins = self.optimize_join_order(engine)?;
493
494 let main_table_rows = engine.scan(&self.table)?;
496
497 for (_row_id, main_row) in main_table_rows {
498 let main_prefixed = self.prefix_row(&main_row, &self.table);
499
500 self.process_joins_with_order(engine, main_prefixed, 0, &mut results, &optimized_joins)?;
502 }
503 }
504
505 let mut filtered: Vec<Row> = results
507 .into_iter()
508 .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
509 .collect();
510
511 if !self.selected_columns.is_empty() {
513 filtered = filtered
514 .into_iter()
515 .map(|row| {
516 let mut projected = Row::new();
517 for col in &self.selected_columns {
518 if let Some(value) = row.get(col) {
519 projected.insert(col.clone(), value.clone());
520 }
521 }
522 projected
523 })
524 .collect();
525 }
526
527 if let Some((ref field, order)) = self.order_by {
529 filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
530 }
531
532 let start = self.offset.unwrap_or(0);
534 let end = start + self.limit.unwrap_or(filtered.len());
535
536 Ok(filtered.into_iter().skip(start).take(end - start).collect())
537 }
538
539 fn execute_right_or_full_join(&self, engine: &MemoryEngine, results: &mut Vec<Row>) -> DbResult<()> {
541 let right_or_full_join_index = self.joins.iter().position(|j| {
543 matches!(j.join_type, JoinType::Right | JoinType::Full)
544 });
545
546 if let Some(join_idx) = right_or_full_join_index {
547 let join = &self.joins[join_idx];
548 let is_full = matches!(join.join_type, JoinType::Full);
549
550 let right_table_rows = engine.scan(&join.right_table)?;
552
553 let left_table_rows = if join_idx == 0 {
555 engine.scan(&self.table)?
556 } else {
557 Vec::new()
560 };
561
562 let mut matched_left_row_ids: std::collections::HashSet<RowId> = std::collections::HashSet::new();
564
565 for (_right_id, right_row) in right_table_rows {
567 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
568
569 let mut has_match = false;
571
572 for (left_id, left_row) in &left_table_rows {
573 let left_prefixed = self.prefix_row(&left_row, &self.table);
574
575 if self.match_join_condition(&left_prefixed, &right_prefixed, join) {
576 has_match = true;
577 matched_left_row_ids.insert(*left_id);
578 let mut merged = left_prefixed.clone();
579 merged.extend(right_prefixed.clone());
580
581 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
583 }
584 }
585
586 if !has_match {
588 let null_row = self.create_null_row(engine, &self.table)?;
589 let mut merged = null_row;
590 merged.extend(right_prefixed);
591
592 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
594 }
595 }
596
597 if is_full {
599 for (left_id, left_row) in &left_table_rows {
600 if !matched_left_row_ids.contains(left_id) {
601 let left_prefixed = self.prefix_row(&left_row, &self.table);
602 let null_row = self.create_null_row(engine, &join.right_table)?;
603 let mut merged = left_prefixed;
604 merged.extend(null_row);
605
606 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
608 }
609 }
610 }
611 }
612
613 Ok(())
614 }
615
616 fn process_joins_right_full(
618 &self,
619 engine: &MemoryEngine,
620 current_row: Row,
621 join_index: usize,
622 results: &mut Vec<Row>,
623 is_full_context: bool,
624 ) -> DbResult<()> {
625 if join_index >= self.joins.len() {
626 results.push(current_row);
627 return Ok(());
628 }
629
630 let join = &self.joins[join_index];
631
632 if is_full_context && matches!(join.join_type, JoinType::Full) {
634 }
637
638 let right_column = Self::extract_column_name(&join.right_field);
640 if let Some(index) = engine.get_index(&join.right_table, right_column) {
641 self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
643 } else {
644 self.process_join_scan_impl(engine, current_row, join_index, results, join)
646 }
647 }
648
649 fn process_joins_with_order(
651 &self,
652 engine: &MemoryEngine,
653 current_row: Row,
654 join_index: usize,
655 results: &mut Vec<Row>,
656 optimized_joins: &[JoinCondition],
657 ) -> DbResult<()> {
658 if join_index >= optimized_joins.len() {
659 results.push(current_row);
661 return Ok(());
662 }
663
664 let join = &optimized_joins[join_index];
665
666 if matches!(join.join_type, JoinType::Right) {
668 return self.process_joins_with_order(engine, current_row, join_index + 1, results, optimized_joins);
669 }
670
671 let right_column = Self::extract_column_name(&join.right_field);
673 if let Some(index) = engine.get_index(&join.right_table, right_column) {
674 self.process_join_with_index_impl_optimized(engine, current_row, join_index, results, join, right_column, index, optimized_joins)
675 } else {
676 self.process_join_scan_impl_optimized(engine, current_row, join_index, results, join, optimized_joins)
677 }
678 }
679
680 fn process_join_scan_impl_optimized(
682 &self,
683 engine: &MemoryEngine,
684 current_row: Row,
685 join_index: usize,
686 results: &mut Vec<Row>,
687 join: &JoinCondition,
688 optimized_joins: &[JoinCondition],
689 ) -> DbResult<()> {
690 let right_rows = engine.scan(&join.right_table)?;
691
692 let mut has_match = false;
693
694 for (_right_id, right_row) in right_rows {
695 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
696
697 if self.match_join_condition(¤t_row, &right_prefixed, join) {
699 has_match = true;
700 let mut merged = current_row.clone();
702 merged.extend(right_prefixed);
703
704 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
706 }
707 }
708
709 if !has_match && matches!(join.join_type, JoinType::Left) {
711 let null_row = self.create_null_row(engine, &join.right_table)?;
712 let mut merged = current_row.clone();
713 merged.extend(null_row);
714
715 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
717 }
718
719 Ok(())
720 }
721
722 fn process_join_with_index_impl_optimized(
724 &self,
725 engine: &MemoryEngine,
726 current_row: Row,
727 join_index: usize,
728 results: &mut Vec<Row>,
729 join: &JoinCondition,
730 _right_column: &str,
731 index: &BTreeIndex,
732 optimized_joins: &[JoinCondition],
733 ) -> DbResult<()> {
734 let join_value = match current_row.get(&join.left_field) {
736 Some(v) => v,
737 None => return Ok(()), };
739
740 let matching_row_ids = index.search(join_value);
742
743 let mut has_match = false;
744
745 for row_id in matching_row_ids {
746 has_match = true;
747 if let Some(right_row) = engine.get(&join.right_table, row_id)? {
748 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
749 let mut merged = current_row.clone();
750 merged.extend(right_prefixed);
751
752 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
754 }
755 }
756
757 if !has_match && matches!(join.join_type, JoinType::Left) {
759 let null_row = self.create_null_row(engine, &join.right_table)?;
760 let mut merged = current_row.clone();
761 merged.extend(null_row);
762
763 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
765 }
766
767 Ok(())
768 }
769
770 fn process_joins(
772 &self,
773 engine: &MemoryEngine,
774 current_row: Row,
775 join_index: usize,
776 results: &mut Vec<Row>,
777 ) -> DbResult<()> {
778 if join_index >= self.joins.len() {
779 results.push(current_row);
781 return Ok(());
782 }
783
784 let join = &self.joins[join_index];
785
786 if matches!(join.join_type, JoinType::Right) {
789 return self.process_joins(engine, current_row, join_index + 1, results);
792 }
793
794 let right_column = Self::extract_column_name(&join.right_field);
796 if let Some(index) = engine.get_index(&join.right_table, right_column) {
797 self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
799 } else {
800 self.process_join_scan_impl(engine, current_row, join_index, results, join)
802 }
803 }
804
805 fn process_join_scan_impl(
807 &self,
808 engine: &MemoryEngine,
809 current_row: Row,
810 join_index: usize,
811 results: &mut Vec<Row>,
812 join: &JoinCondition,
813 ) -> DbResult<()> {
814 let right_rows = engine.scan(&join.right_table)?;
815
816 let mut has_match = false;
817
818 for (_right_id, right_row) in right_rows {
819 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
820
821 if self.match_join_condition(¤t_row, &right_prefixed, join) {
823 has_match = true;
824 let mut merged = current_row.clone();
826 merged.extend(right_prefixed);
827
828 self.process_joins(engine, merged, join_index + 1, results)?;
830 }
831 }
832
833 if !has_match && matches!(join.join_type, JoinType::Left) {
835 let null_row = self.create_null_row(engine, &join.right_table)?;
836 let mut merged = current_row.clone();
837 merged.extend(null_row);
838
839 self.process_joins(engine, merged, join_index + 1, results)?;
841 }
842
843 Ok(())
844 }
845
846 fn process_join_with_index_impl(
848 &self,
849 engine: &MemoryEngine,
850 current_row: Row,
851 join_index: usize,
852 results: &mut Vec<Row>,
853 join: &JoinCondition,
854 _right_column: &str,
855 index: &BTreeIndex,
856 ) -> DbResult<()> {
857 let join_value = match current_row.get(&join.left_field) {
859 Some(v) => v,
860 None => return Ok(()), };
862
863 let matching_row_ids = index.search(join_value);
865
866 let mut has_match = false;
867
868 for row_id in matching_row_ids {
869 has_match = true;
870 if let Some(right_row) = engine.get(&join.right_table, row_id)? {
871 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
872 let mut merged = current_row.clone();
873 for (k, v) in right_prefixed {
874 merged.insert(k, v);
875 }
876
877 self.process_joins(engine, merged, join_index + 1, results)?;
879 }
880 }
881
882 if !has_match && matches!(join.join_type, JoinType::Left) {
884 let null_row = self.create_null_row(engine, &join.right_table)?;
885 let mut merged = current_row.clone();
886 for (k, v) in null_row {
887 merged.insert(k, v);
888 }
889
890 self.process_joins(engine, merged, join_index + 1, results)?;
892 }
893
894 Ok(())
895 }
896
897 fn execute_simple_scan(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
899 let rows = engine.scan(&self.table)?;
900
901 let mut filtered: Vec<Row> = rows
902 .into_iter()
903 .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
904 .map(|(_, row)| row.clone())
905 .collect();
906
907 if !self.selected_columns.is_empty() {
909 filtered = filtered
910 .into_iter()
911 .map(|row| {
912 let mut projected = Row::new();
913 for col in &self.selected_columns {
914 if let Some(value) = row.get(col) {
915 projected.insert(col.clone(), value.clone());
916 }
917 }
918 projected
919 })
920 .collect();
921 }
922
923 if let Some((ref field, order)) = self.order_by {
925 filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
926 }
927
928 let start = self.offset.unwrap_or(0);
930 let end = start + self.limit.unwrap_or(filtered.len());
931
932 Ok(filtered.into_iter().skip(start).take(end - start).collect())
933 }
934
935 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
937 self.filters.push(FilterExpr::Eq {
938 field: field.to_string(),
939 value,
940 });
941 self
942 }
943
944 pub fn ne(mut self, field: &str, value: DbValue) -> Self {
945 self.filters.push(FilterExpr::Ne {
946 field: field.to_string(),
947 value,
948 });
949 self
950 }
951
952 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
953 self.filters.push(FilterExpr::Lt {
954 field: field.to_string(),
955 value,
956 });
957 self
958 }
959
960 pub fn le(mut self, field: &str, value: DbValue) -> Self {
961 self.filters.push(FilterExpr::Le {
962 field: field.to_string(),
963 value,
964 });
965 self
966 }
967
968 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
969 self.filters.push(FilterExpr::Gt {
970 field: field.to_string(),
971 value,
972 });
973 self
974 }
975
976 pub fn ge(mut self, field: &str, value: DbValue) -> Self {
977 self.filters.push(FilterExpr::Ge {
978 field: field.to_string(),
979 value,
980 });
981 self
982 }
983
984 pub fn in_list(mut self, field: &str, values: Vec<DbValue>) -> Self {
985 self.filters.push(FilterExpr::In {
986 field: field.to_string(),
987 values,
988 });
989 self
990 }
991
992 pub fn contains(mut self, field: &str, value: &str) -> Self {
993 self.filters.push(FilterExpr::Contains {
994 field: field.to_string(),
995 value: value.to_string(),
996 });
997 self
998 }
999
1000 pub fn and(mut self, other: QueryBuilder) -> Self {
1002 for filter in other.filters {
1004 self.filters.push(filter);
1005 }
1006 self
1007 }
1008
1009 pub fn order_by(mut self, field: &str, order: Order) -> Self {
1011 self.order_by = Some((field.to_string(), order));
1012 self
1013 }
1014
1015 pub fn limit(mut self, limit: usize) -> Self {
1017 self.limit = Some(limit);
1018 self
1019 }
1020
1021 pub fn offset(mut self, offset: usize) -> Self {
1022 self.offset = Some(offset);
1023 self
1024 }
1025
1026 pub fn inner_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1028 self.joins.push(JoinCondition {
1029 join_type: JoinType::Inner,
1030 right_table: right_table.to_string(),
1031 left_field: left_field.to_string(),
1032 right_field: right_field.to_string(),
1033 });
1034 self
1035 }
1036
1037 pub fn left_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1038 self.joins.push(JoinCondition {
1039 join_type: JoinType::Left,
1040 right_table: right_table.to_string(),
1041 left_field: left_field.to_string(),
1042 right_field: right_field.to_string(),
1043 });
1044 self
1045 }
1046
1047 pub fn right_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1048 self.joins.push(JoinCondition {
1049 join_type: JoinType::Right,
1050 right_table: right_table.to_string(),
1051 left_field: left_field.to_string(),
1052 right_field: right_field.to_string(),
1053 });
1054 self
1055 }
1056
1057 pub fn full_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1058 self.joins.push(JoinCondition {
1059 join_type: JoinType::Full,
1060 right_table: right_table.to_string(),
1061 left_field: left_field.to_string(),
1062 right_field: right_field.to_string(),
1063 });
1064 self
1065 }
1066
1067 pub fn select(mut self, columns: &[&str]) -> Self {
1069 self.selected_columns = columns.iter().map(|s| s.to_string()).collect();
1070 self
1071 }
1072
1073 pub fn count(mut self) -> Self {
1077 self.aggregate = Some(AggregateExpr {
1078 func: AggregateFunction::Count,
1079 column: None, alias: None,
1081 });
1082 self
1083 }
1084
1085 pub fn count_column(mut self, column: &str) -> Self {
1087 self.aggregate = Some(AggregateExpr {
1088 func: AggregateFunction::Count,
1089 column: Some(column.to_string()),
1090 alias: None,
1091 });
1092 self
1093 }
1094
1095 pub fn sum(mut self, column: &str) -> Self {
1097 self.aggregate = Some(AggregateExpr {
1098 func: AggregateFunction::Sum,
1099 column: Some(column.to_string()),
1100 alias: None,
1101 });
1102 self
1103 }
1104
1105 pub fn avg(mut self, column: &str) -> Self {
1107 self.aggregate = Some(AggregateExpr {
1108 func: AggregateFunction::Avg,
1109 column: Some(column.to_string()),
1110 alias: None,
1111 });
1112 self
1113 }
1114
1115 pub fn max(mut self, column: &str) -> Self {
1117 self.aggregate = Some(AggregateExpr {
1118 func: AggregateFunction::Max,
1119 column: Some(column.to_string()),
1120 alias: None,
1121 });
1122 self
1123 }
1124
1125 pub fn min(mut self, column: &str) -> Self {
1127 self.aggregate = Some(AggregateExpr {
1128 func: AggregateFunction::Min,
1129 column: Some(column.to_string()),
1130 alias: None,
1131 });
1132 self
1133 }
1134
1135 pub fn alias(mut self, name: &str) -> Self {
1137 if let Some(ref mut agg) = self.aggregate {
1138 agg.alias = Some(name.to_string());
1139 }
1140 self
1141 }
1142
1143 pub fn group_by(mut self, columns: &[&str]) -> Self {
1147 self.group_by = columns.iter().map(|s| s.to_string()).collect();
1148 self
1149 }
1150
1151 pub fn having_eq(mut self, value: DbValue) -> Self {
1153 self.having = Some(HavingExpr::Eq { value });
1154 self
1155 }
1156
1157 pub fn having_ne(mut self, value: DbValue) -> Self {
1159 self.having = Some(HavingExpr::Ne { value });
1160 self
1161 }
1162
1163 pub fn having_gt(mut self, value: DbValue) -> Self {
1165 self.having = Some(HavingExpr::Gt { value });
1166 self
1167 }
1168
1169 pub fn having_ge(mut self, value: DbValue) -> Self {
1171 self.having = Some(HavingExpr::Ge { value });
1172 self
1173 }
1174
1175 pub fn having_lt(mut self, value: DbValue) -> Self {
1177 self.having = Some(HavingExpr::Lt { value });
1178 self
1179 }
1180
1181 pub fn having_le(mut self, value: DbValue) -> Self {
1183 self.having = Some(HavingExpr::Le { value });
1184 self
1185 }
1186
1187 pub fn execute(self) -> DbResult<Vec<Row>> {
1189 let engine = self.engine.read().unwrap();
1190
1191 if self.aggregate.is_some() || !self.group_by.is_empty() {
1193 return self.execute_aggregate(&engine);
1194 }
1195
1196 if !self.joins.is_empty() {
1198 return self.execute_join(&engine);
1199 }
1200
1201 if let Some((meta, index, key_values)) = self.find_best_index(&engine) {
1203 return self.execute_with_composite_index(&engine, meta, index, &key_values);
1205 }
1206
1207 if let Some((field, index)) = self.find_best_single_index(&engine) {
1209 return self.execute_with_index(&engine, field, index);
1211 }
1212
1213 let rows = engine.scan(&self.table)?;
1215
1216 let mut filtered: Vec<Row> = rows
1218 .into_iter()
1219 .filter(|(_, row)| {
1220 self.filters.iter().all(|expr| evaluate_filter(expr, row))
1221 })
1222 .map(|(_, row)| row.clone())
1223 .collect();
1224
1225 if let Some((ref field, order)) = self.order_by {
1227 filtered.sort_by(|a, b| {
1228 self.compare_rows(a, b, field, order)
1229 });
1230 }
1231
1232 let start = self.offset.unwrap_or(0);
1234 let end = start + self.limit.unwrap_or(filtered.len());
1235
1236 Ok(filtered.into_iter().skip(start).take(end - start).collect())
1237 }
1238
1239 fn execute_aggregate(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
1241 let rows: Vec<Row> = if !self.joins.is_empty() {
1243 let join_rows = self.execute_join(engine)?;
1245 join_rows.into_iter()
1247 .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1248 .collect()
1249 } else {
1250 let scanned = engine.scan(&self.table)?;
1252 scanned.into_iter()
1253 .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1254 .map(|(_, row)| row.clone())
1255 .collect()
1256 };
1257
1258 if !self.group_by.is_empty() {
1260 self.execute_grouped_aggregate(rows)
1261 } else {
1262 self.execute_simple_aggregate(rows)
1264 }
1265 }
1266
1267 fn execute_simple_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1269 let agg = match &self.aggregate {
1270 Some(a) => a,
1271 None => {
1272 return Ok(rows);
1274 }
1275 };
1276
1277 let result_value = match agg.func {
1278 AggregateFunction::Count => {
1279 let count = match &agg.column {
1280 Some(col) => {
1281 rows.iter()
1283 .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1284 .count() as i64
1285 }
1286 None => {
1287 rows.len() as i64
1289 }
1290 };
1291 DbValue::Integer(count)
1292 }
1293 AggregateFunction::Sum => {
1294 match &agg.column {
1295 Some(col) => self.sum_column(&rows, col),
1296 None => DbValue::Integer(0),
1297 }
1298 }
1299 AggregateFunction::Avg => {
1300 match &agg.column {
1301 Some(col) => self.avg_column(&rows, col),
1302 None => DbValue::Null,
1303 }
1304 }
1305 AggregateFunction::Max => {
1306 match &agg.column {
1307 Some(col) => self.max_column(&rows, col),
1308 None => DbValue::Null,
1309 }
1310 }
1311 AggregateFunction::Min => {
1312 match &agg.column {
1313 Some(col) => self.min_column(&rows, col),
1314 None => DbValue::Null,
1315 }
1316 }
1317 };
1318
1319 let mut result_row = Row::new();
1321 let column_name = match &agg.alias {
1322 Some(alias) => alias.clone(),
1323 None => {
1324 let func_name = match agg.func {
1325 AggregateFunction::Count => "COUNT",
1326 AggregateFunction::Sum => "SUM",
1327 AggregateFunction::Avg => "AVG",
1328 AggregateFunction::Max => "MAX",
1329 AggregateFunction::Min => "MIN",
1330 };
1331 match &agg.column {
1332 Some(col) => format!("{}({})", func_name, col),
1333 None => "COUNT(*)".to_string(),
1334 }
1335 }
1336 };
1337
1338 let db_value = match result_value {
1340 DbValue::Integer(i) => DbValue::Integer(i),
1341 DbValue::Real(r) => DbValue::Real(r),
1342 DbValue::Null => DbValue::Null,
1343 _ => DbValue::Null,
1344 };
1345 result_row.insert(column_name, db_value);
1346
1347 Ok(vec![result_row])
1348 }
1349
1350 fn execute_grouped_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1352 let agg = match &self.aggregate {
1353 Some(a) => a,
1354 None => {
1355 return self.execute_group_by_only(rows);
1357 }
1358 };
1359
1360 use std::collections::HashMap;
1362 let mut groups: HashMap<Vec<DbValue>, Vec<Row>> = HashMap::new();
1363
1364 for row in rows {
1365 let mut key = Vec::new();
1367 for col in &self.group_by {
1368 if let Some(val) = row.get(col) {
1369 key.push(val.clone());
1370 } else {
1371 key.push(DbValue::Null);
1372 }
1373 }
1374
1375 groups.entry(key).or_insert_with(Vec::new).push(row);
1376 }
1377
1378 let mut results = Vec::new();
1380 for (key, group_rows) in groups {
1381 let agg_value = match agg.func {
1383 AggregateFunction::Count => {
1384 let count = match &agg.column {
1385 Some(col) => {
1386 group_rows.iter()
1387 .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1388 .count() as i64
1389 }
1390 None => group_rows.len() as i64,
1391 };
1392 DbValue::Integer(count)
1393 }
1394 AggregateFunction::Sum => {
1395 match &agg.column {
1396 Some(col) => self.sum_column(&group_rows, col),
1397 None => DbValue::Integer(0),
1398 }
1399 }
1400 AggregateFunction::Avg => {
1401 match &agg.column {
1402 Some(col) => self.avg_column(&group_rows, col),
1403 None => DbValue::Null,
1404 }
1405 }
1406 AggregateFunction::Max => {
1407 match &agg.column {
1408 Some(col) => self.max_column(&group_rows, col),
1409 None => DbValue::Null,
1410 }
1411 }
1412 AggregateFunction::Min => {
1413 match &agg.column {
1414 Some(col) => self.min_column(&group_rows, col),
1415 None => DbValue::Null,
1416 }
1417 }
1418 };
1419
1420 if let Some(ref having) = self.having {
1422 if !self.evaluate_having(&agg_value, having) {
1423 continue;
1424 }
1425 }
1426
1427 let mut result_row = Row::new();
1429
1430 for (i, col) in self.group_by.iter().enumerate() {
1432 result_row.insert(col.clone(), key[i].clone());
1433 }
1434
1435 let column_name = match &agg.alias {
1437 Some(alias) => alias.clone(),
1438 None => {
1439 let func_name = match agg.func {
1440 AggregateFunction::Count => "COUNT",
1441 AggregateFunction::Sum => "SUM",
1442 AggregateFunction::Avg => "AVG",
1443 AggregateFunction::Max => "MAX",
1444 AggregateFunction::Min => "MIN",
1445 };
1446 match &agg.column {
1447 Some(col) => format!("{}({})", func_name, col),
1448 None => "COUNT(*)".to_string(),
1449 }
1450 }
1451 };
1452
1453 let db_value = match agg_value {
1454 DbValue::Integer(i) => DbValue::Integer(i),
1455 DbValue::Real(r) => DbValue::Real(r),
1456 DbValue::Null => DbValue::Null,
1457 _ => DbValue::Null,
1458 };
1459 result_row.insert(column_name, db_value);
1460
1461 results.push(result_row);
1462 }
1463
1464 Ok(results)
1465 }
1466
1467 fn execute_group_by_only(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1469 use std::collections::HashSet;
1470
1471 let mut seen: HashSet<Vec<DbValue>> = HashSet::new();
1472 let mut results = Vec::new();
1473
1474 for row in rows {
1475 let mut key = Vec::new();
1476 for col in &self.group_by {
1477 if let Some(val) = row.get(col) {
1478 key.push(val.clone());
1479 } else {
1480 key.push(DbValue::Null);
1481 }
1482 }
1483
1484 if !seen.contains(&key) {
1485 seen.insert(key.clone());
1486
1487 let mut result_row = Row::new();
1488 for (i, col) in self.group_by.iter().enumerate() {
1489 result_row.insert(col.clone(), key[i].clone());
1490 }
1491 results.push(result_row);
1492 }
1493 }
1494
1495 Ok(results)
1496 }
1497
1498 fn sum_column(&self, rows: &[Row], column: &str) -> DbValue {
1500 let mut sum: f64 = 0.0;
1501 let mut has_value = false;
1502
1503 for row in rows {
1504 if let Some(val) = row.get(column) {
1505 match val {
1506 DbValue::Integer(i) => {
1507 sum += *i as f64;
1508 has_value = true;
1509 }
1510 DbValue::Real(r) => {
1511 sum += *r;
1512 has_value = true;
1513 }
1514 _ => {}
1515 }
1516 }
1517 }
1518
1519 if has_value {
1520 if rows.iter().filter_map(|r| r.get(column)).all(|v| matches!(v, DbValue::Integer(_))) {
1522 DbValue::Integer(sum as i64)
1523 } else {
1524 DbValue::Real(sum)
1525 }
1526 } else {
1527 DbValue::Null
1528 }
1529 }
1530
1531 fn avg_column(&self, rows: &[Row], column: &str) -> DbValue {
1533 let mut sum: f64 = 0.0;
1534 let mut count: i64 = 0;
1535
1536 for row in rows {
1537 if let Some(val) = row.get(column) {
1538 match val {
1539 DbValue::Integer(i) => {
1540 sum += *i as f64;
1541 count += 1;
1542 }
1543 DbValue::Real(r) => {
1544 sum += *r;
1545 count += 1;
1546 }
1547 _ => {}
1548 }
1549 }
1550 }
1551
1552 if count > 0 {
1553 DbValue::Real(sum / count as f64)
1554 } else {
1555 DbValue::Null
1556 }
1557 }
1558
1559 fn max_column(&self, rows: &[Row], column: &str) -> DbValue {
1561 let mut max_val: Option<DbValue> = None;
1562
1563 for row in rows {
1564 if let Some(val) = row.get(column) {
1565 if !val.is_null() {
1566 match &max_val {
1567 None => max_val = Some(val.clone()),
1568 Some(current) => {
1569 if self.compare_values(val, current) > 0 {
1570 max_val = Some(val.clone());
1571 }
1572 }
1573 }
1574 }
1575 }
1576 }
1577
1578 max_val.unwrap_or(DbValue::Null)
1579 }
1580
1581 fn min_column(&self, rows: &[Row], column: &str) -> DbValue {
1583 let mut min_val: Option<DbValue> = None;
1584
1585 for row in rows {
1586 if let Some(val) = row.get(column) {
1587 if !val.is_null() {
1588 match &min_val {
1589 None => min_val = Some(val.clone()),
1590 Some(current) => {
1591 if self.compare_values(val, current) < 0 {
1592 min_val = Some(val.clone());
1593 }
1594 }
1595 }
1596 }
1597 }
1598 }
1599
1600 min_val.unwrap_or(DbValue::Null)
1601 }
1602
1603 fn compare_values(&self, a: &DbValue, b: &DbValue) -> i32 {
1605 match (a, b) {
1606 (DbValue::Integer(a), DbValue::Integer(b)) => a.cmp(b) as i32,
1607 (DbValue::Integer(a), DbValue::Real(b)) => (*a as f64).partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1608 (DbValue::Real(a), DbValue::Integer(b)) => a.partial_cmp(&(*b as f64)).map(|o| o as i32).unwrap_or(0),
1609 (DbValue::Real(a), DbValue::Real(b)) => a.partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1610 (DbValue::Text(a), DbValue::Text(b)) => a.cmp(b) as i32,
1611 _ => 0,
1612 }
1613 }
1614
1615 fn evaluate_having(&self, agg_value: &DbValue, having: &HavingExpr) -> bool {
1617 match having {
1618 HavingExpr::Eq { value } => agg_value == value,
1619 HavingExpr::Ne { value } => agg_value != value,
1620 HavingExpr::Gt { value } => self.compare_values(agg_value, value) > 0,
1621 HavingExpr::Ge { value } => self.compare_values(agg_value, value) >= 0,
1622 HavingExpr::Lt { value } => self.compare_values(agg_value, value) < 0,
1623 HavingExpr::Le { value } => self.compare_values(agg_value, value) <= 0,
1624 }
1625 }
1626}
1627
1628pub struct UpdateBuilder {
1630 table: String,
1631 filters: Vec<FilterExpr>,
1632 values: Vec<(String, DbValue)>,
1633 engine: Arc<RwLock<MemoryEngine>>,
1634}
1635
1636impl UpdateBuilder {
1637 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
1638 UpdateBuilder {
1639 table,
1640 filters: Vec::new(),
1641 values: Vec::new(),
1642 engine,
1643 }
1644 }
1645
1646 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1647 self.filters.push(FilterExpr::Eq {
1648 field: field.to_string(),
1649 value,
1650 });
1651 self
1652 }
1653
1654 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1655 self.filters.push(FilterExpr::Lt {
1656 field: field.to_string(),
1657 value,
1658 });
1659 self
1660 }
1661
1662 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1663 self.filters.push(FilterExpr::Gt {
1664 field: field.to_string(),
1665 value,
1666 });
1667 self
1668 }
1669
1670 pub fn set(mut self, field: &str, value: DbValue) -> Self {
1671 self.values.push((field.to_string(), value));
1672 self
1673 }
1674
1675 pub fn execute(self) -> DbResult<usize> {
1677 let mut engine = self.engine.write().unwrap();
1678 let rows = engine.scan(&self.table)?;
1679
1680 let mut to_update = Vec::new();
1682 for (row_id, row) in rows {
1683 let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
1684 if matches {
1685 to_update.push(row_id);
1686 }
1687 }
1688
1689 let mut count = 0;
1691 for row_id in to_update {
1692 let mut update_row: Row = self.values.iter().cloned().collect::<Row>();
1693 if let Some(existing) = engine.get(&self.table, row_id)? {
1695 for (key, value) in existing.iter() {
1696 if !update_row.contains_key(key) {
1697 update_row.insert(key.clone(), value.clone());
1698 }
1699 }
1700 }
1701 engine.update(&self.table, row_id, update_row)?;
1702 count += 1;
1703 }
1704
1705 Ok(count)
1706 }
1707}
1708
1709pub struct DeleteBuilder {
1711 table: String,
1712 filters: Vec<FilterExpr>,
1713 engine: Arc<RwLock<MemoryEngine>>,
1714}
1715
1716impl DeleteBuilder {
1717 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
1718 DeleteBuilder {
1719 table,
1720 filters: Vec::new(),
1721 engine,
1722 }
1723 }
1724
1725 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1726 self.filters.push(FilterExpr::Eq {
1727 field: field.to_string(),
1728 value,
1729 });
1730 self
1731 }
1732
1733 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1734 self.filters.push(FilterExpr::Lt {
1735 field: field.to_string(),
1736 value,
1737 });
1738 self
1739 }
1740
1741 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1742 self.filters.push(FilterExpr::Gt {
1743 field: field.to_string(),
1744 value,
1745 });
1746 self
1747 }
1748
1749 pub fn execute(self) -> DbResult<usize> {
1751 let mut engine = self.engine.write().unwrap();
1752 let rows = engine.scan(&self.table)?;
1753
1754 let mut to_delete = Vec::new();
1756 for (row_id, row) in rows {
1757 let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
1758 if matches {
1759 to_delete.push(row_id);
1760 }
1761 }
1762
1763 let mut count = 0;
1765 for row_id in to_delete {
1766 engine.delete(&self.table, row_id)?;
1767 count += 1;
1768 }
1769
1770 Ok(count)
1771 }
1772}