1use crate::storage::{MemoryEngine, Row, StorageEngine, RowId};
2use crate::types::{DbValue, DbResult};
3use crate::index::btree::BTreeIndex;
4use crate::index::manager::IndexMeta;
5use std::sync::{Arc, RwLock};
6
7#[derive(Debug, Clone, Copy, PartialEq)]
9pub enum Order {
10 Asc,
11 Desc,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum JoinType {
17 Inner, Left, Right, Full, }
22
23#[derive(Debug, Clone)]
25pub struct JoinCondition {
26 pub join_type: JoinType,
27 pub right_table: String,
28 pub left_field: String, pub right_field: String, }
31
32#[derive(Debug, Clone)]
34pub enum FilterExpr {
35 Eq { field: String, value: DbValue },
36 Ne { field: String, value: DbValue },
37 Lt { field: String, value: DbValue },
38 Le { field: String, value: DbValue },
39 Gt { field: String, value: DbValue },
40 Ge { field: String, value: DbValue },
41 In { field: String, values: Vec<DbValue> },
42 Contains { field: String, value: String },
43 IsNull { field: String },
44 IsNotNull { field: String },
45 And(Box<FilterExpr>, Box<FilterExpr>),
46 Or(Box<FilterExpr>, Box<FilterExpr>),
47 Not(Box<FilterExpr>),
48}
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum AggregateFunction {
53 Count,
54 Sum,
55 Avg,
56 Max,
57 Min,
58}
59
60#[derive(Debug, Clone)]
62pub struct AggregateExpr {
63 pub func: AggregateFunction,
64 pub column: Option<String>, pub alias: Option<String>, }
67
68#[derive(Debug, Clone)]
70pub enum HavingExpr {
71 Eq { value: DbValue },
72 Ne { value: DbValue },
73 Lt { value: DbValue },
74 Le { value: DbValue },
75 Gt { value: DbValue },
76 Ge { value: DbValue },
77}
78
79fn evaluate_filter(expr: &FilterExpr, row: &Row) -> bool {
81 match expr {
82 FilterExpr::Eq { field, value } => {
83 row.get(field).map(|v| v == value).unwrap_or(false)
84 }
85 FilterExpr::Ne { field, value } => {
86 row.get(field).map(|v| v != value).unwrap_or(true)
87 }
88 FilterExpr::Lt { field, value } => compare_field(row, field, value, |a, b| a < b),
89 FilterExpr::Le { field, value } => compare_field(row, field, value, |a, b| a <= b),
90 FilterExpr::Gt { field, value } => compare_field(row, field, value, |a, b| a > b),
91 FilterExpr::Ge { field, value } => compare_field(row, field, value, |a, b| a >= b),
92 FilterExpr::In { field, values } => {
93 row.get(field).map(|v| values.contains(v)).unwrap_or(false)
94 }
95 FilterExpr::Contains { field, value } => {
96 row.get(field)
97 .and_then(|v| v.as_text())
98 .map(|s| s.contains(value))
99 .unwrap_or(false)
100 }
101 FilterExpr::IsNull { field } => {
102 !row.contains_key(field)
103 }
104 FilterExpr::IsNotNull { field } => {
105 row.contains_key(field)
106 }
107 FilterExpr::And(left, right) => {
108 evaluate_filter(left, row) && evaluate_filter(right, row)
109 }
110 FilterExpr::Or(left, right) => {
111 evaluate_filter(left, row) || evaluate_filter(right, row)
112 }
113 FilterExpr::Not(inner) => {
114 !evaluate_filter(inner, row)
115 }
116 }
117}
118
119fn compare_field<F>(row: &Row, field: &str, value: &DbValue, cmp: F) -> bool
121where
122 F: Fn(f64, f64) -> bool,
123{
124 let row_val = match row.get(field) {
125 Some(v) => v,
126 None => return false,
127 };
128
129 let row_num = match row_val {
130 DbValue::Integer(i) => *i as f64,
131 DbValue::Real(r) => *r,
132 _ => return false,
133 };
134
135 let cmp_num = match value {
136 DbValue::Integer(i) => *i as f64,
137 DbValue::Real(r) => *r,
138 _ => return false,
139 };
140
141 cmp(row_num, cmp_num)
142}
143
144pub struct QueryBuilder {
146 table: String,
147 joins: Vec<JoinCondition>, selected_columns: Vec<String>, filters: Vec<FilterExpr>,
150 order_by: Option<(String, Order)>,
151 limit: Option<usize>,
152 offset: Option<usize>,
153 engine: Arc<RwLock<MemoryEngine>>,
154 aggregate: Option<AggregateExpr>, group_by: Vec<String>, having: Option<HavingExpr>, distinct: bool, }
160
161impl QueryBuilder {
162 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
163 QueryBuilder {
164 table,
165 joins: Vec::new(),
166 selected_columns: Vec::new(),
167 filters: Vec::new(),
168 order_by: None,
169 limit: None,
170 offset: None,
171 engine,
172 aggregate: None,
173 group_by: Vec::new(),
174 having: None,
175 distinct: false,
176 }
177 }
178
179 fn find_best_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a IndexMeta, &'a BTreeIndex, Vec<DbValue>)> {
182 let eq_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| matches!(f, FilterExpr::Eq { .. })).collect();
184
185 let range_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| {
187 matches!(f, FilterExpr::Gt { .. } | FilterExpr::Ge { .. } | FilterExpr::Lt { .. } | FilterExpr::Le { .. })
188 }).collect();
189
190 if !eq_filters.is_empty() {
192 let eq_columns: Vec<&str> = eq_filters.iter().map(|f| {
193 if let FilterExpr::Eq { field, .. } = f {
194 field.as_str()
195 } else {
196 ""
197 }
198 }).collect();
199
200 if let Some((meta, index)) = engine.find_best_index(&self.table, &eq_columns) {
201 let mut key_values = Vec::new();
203 for col in &meta.columns {
204 let value = eq_filters.iter()
205 .find_map(|f| {
206 if let FilterExpr::Eq { field, value } = f {
207 if field == col { Some(value) } else { None }
208 } else { None }
209 });
210 if let Some(v) = value {
211 key_values.push(v.clone());
212 } else {
213 break;
215 }
216 }
217 if !key_values.is_empty() {
218 return Some((meta, index, key_values));
219 }
220 }
221 }
222
223 if let Some(_filter) = range_filters.first() {
225 }
227
228 None
229 }
230
231 fn find_best_single_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a String, &'a BTreeIndex)> {
233 for filter in &self.filters {
235 if let FilterExpr::Eq { field, .. } = filter {
236 if let Some(index) = engine.get_index(&self.table, field) {
237 return Some((field, index));
238 }
239 }
240 }
241 for filter in &self.filters {
243 if let FilterExpr::Gt { field, .. }
244 | FilterExpr::Ge { field, .. }
245 | FilterExpr::Lt { field, .. }
246 | FilterExpr::Le { field, .. } = filter {
247 if let Some(index) = engine.get_index(&self.table, field) {
248 return Some((field, index));
249 }
250 }
251 }
252 None
253 }
254
255 fn execute_with_index(&self, engine: &MemoryEngine, _field: &str, index: &BTreeIndex) -> DbResult<Vec<Row>> {
257 let row_ids = self.get_matching_row_ids(index);
259
260 let mut results = Vec::new();
262 for row_id in row_ids {
263 if let Some(row) = engine.get(&self.table, row_id)? {
264 if self.matches_all_filters(&row) {
266 results.push(row.clone());
267 }
268 }
269 }
270
271 if let Some((field, order)) = &self.order_by {
273 results.sort_by(|a, b| {
274 self.compare_rows(a, b, field, *order)
275 });
276 }
277
278 let start = self.offset.unwrap_or(0);
280 let end = start + self.limit.unwrap_or(results.len());
281
282 Ok(results.into_iter().skip(start).take(end - start).collect())
283 }
284
285 fn execute_with_composite_index(&self, engine: &MemoryEngine, _meta: &IndexMeta, index: &BTreeIndex, key_values: &[DbValue]) -> DbResult<Vec<Row>> {
287 let row_ids = index.search_composite(key_values);
289
290 let mut results = Vec::new();
292 for row_id in row_ids {
293 if let Some(row) = engine.get(&self.table, row_id)? {
294 if self.matches_all_filters(&row) {
296 results.push(row.clone());
297 }
298 }
299 }
300
301 if let Some((field, order)) = &self.order_by {
303 results.sort_by(|a, b| {
304 self.compare_rows(a, b, field, *order)
305 });
306 }
307
308 let start = self.offset.unwrap_or(0);
310 let end = start + self.limit.unwrap_or(results.len());
311
312 Ok(results.into_iter().skip(start).take(end - start).collect())
313 }
314
315 fn get_matching_row_ids(&self, index: &BTreeIndex) -> Vec<RowId> {
317 let mut row_ids = Vec::new();
318
319 let mut eq_value: Option<&DbValue> = None;
321 let mut gt_value: Option<&DbValue> = None;
322 let mut ge_value: Option<&DbValue> = None;
323 let mut lt_value: Option<&DbValue> = None;
324 let mut le_value: Option<&DbValue> = None;
325
326 for filter in &self.filters {
327 match filter {
328 FilterExpr::Eq { field, value } if eq_value.is_none() => {
329 eq_value = Some(value);
330 }
331 FilterExpr::Gt { field: _, value } => {
332 if gt_value.is_none() || value > gt_value.unwrap() {
333 gt_value = Some(value);
334 }
335 }
336 FilterExpr::Ge { field: _, value } => {
337 if ge_value.is_none() || value > ge_value.unwrap() {
338 ge_value = Some(value);
339 }
340 }
341 FilterExpr::Lt { field: _, value } => {
342 if lt_value.is_none() || value < lt_value.unwrap() {
343 lt_value = Some(value);
344 }
345 }
346 FilterExpr::Le { field: _, value } => {
347 if le_value.is_none() || value < le_value.unwrap() {
348 le_value = Some(value);
349 }
350 }
351 _ => {}
352 }
353 }
354
355 if let Some(eq) = eq_value {
357 row_ids.extend(index.search(eq));
358 } else {
359 let range_start = gt_value.or(ge_value);
361 let range_end = lt_value.or(le_value);
362
363 if let Some(start) = range_start {
364 if let Some(end) = range_end {
366 row_ids.extend(index.range(start, end));
367 } else {
368 row_ids.extend(index.range_from(start));
369 }
370 } else if let Some(_end) = range_end {
371 } else {
376 }
379 }
380
381 row_ids
382 }
383
384 fn matches_all_filters(&self, row: &Row) -> bool {
386 self.filters.iter().all(|expr| evaluate_filter(expr, row))
387 }
388
389 fn compare_rows(&self, a: &Row, b: &Row, field: &str, order: Order) -> std::cmp::Ordering {
391 let a_val = a.get(field);
392 let b_val = b.get(field);
393
394 let cmp = match (a_val, b_val) {
395 (Some(DbValue::Integer(a)), Some(DbValue::Integer(b))) => a.partial_cmp(b),
396 (Some(DbValue::Real(a)), Some(DbValue::Real(b))) => a.partial_cmp(b),
397 (Some(DbValue::Integer(a)), Some(DbValue::Real(b))) => (*a as f64).partial_cmp(b),
398 (Some(DbValue::Real(a)), Some(DbValue::Integer(b))) => a.partial_cmp(&(*b as f64)),
399 (Some(DbValue::Text(a)), Some(DbValue::Text(b))) => Some(a.cmp(b)),
400 _ => Some(std::cmp::Ordering::Equal),
401 };
402
403 match order {
404 Order::Asc => cmp.unwrap_or(std::cmp::Ordering::Equal),
405 Order::Desc => cmp.unwrap_or(std::cmp::Ordering::Equal).reverse(),
406 }
407 }
408
409 fn prefix_row(&self, row: &Row, table: &str) -> Row {
413 row.iter()
414 .map(|(k, v)| (format!("{}.{}", table, k), v.clone()))
415 .collect::<Row>()
416 }
417
418 fn deduplicate_rows(&self, rows: Vec<Row>) -> Vec<Row> {
421 use std::collections::HashSet;
422
423 let dedup_column_indices: Option<Vec<usize>> = if self.selected_columns.is_empty() {
425 None } else {
427 rows.first().map(|first_row| {
429 let all_columns: Vec<&String> = first_row.iter().map(|(k, _)| k).collect();
430 self.selected_columns.iter()
431 .map(|sel| {
432 all_columns.iter().position(|col| {
434 col.as_str() == sel.as_str() ||
435 col.ends_with(&format!(".{}", sel)) ||
436 (sel.contains('.') && col.as_str() == sel.as_str())
437 }).unwrap_or(0)
438 })
439 .collect()
440 })
441 };
442
443 let mut seen = HashSet::new();
444 let mut result = Vec::new();
445
446 for row in rows {
447 let key: Vec<&DbValue> = match &dedup_column_indices {
449 Some(indices) => {
450 indices.iter()
451 .filter_map(|&i| row.iter().nth(i).map(|(_, v)| v))
452 .collect()
453 }
454 None => {
455 row.iter().map(|(_, v)| v).collect()
456 }
457 };
458
459 let serialized_key: Vec<String> = key.iter()
461 .map(|v| format!("{:?}", v))
462 .collect();
463
464 if seen.insert(serialized_key) {
465 result.push(row);
466 }
467 }
468
469 result
470 }
471
472 fn create_null_row(&self, engine: &MemoryEngine, table: &str) -> DbResult<Row> {
474 let schema = engine.get_schema(table)?;
475 let mut row = Row::new();
476 for column in &schema.columns {
477 row.insert(format!("{}.{}", table, column.name), DbValue::Null);
478 }
479 Ok(row)
480 }
481
482 fn match_join_condition(&self, left: &Row, right: &Row, join: &JoinCondition) -> bool {
484 let left_val = left.get(&join.left_field);
485 let right_val = right.get(&join.right_field);
486 left_val == right_val
487 }
488
489 fn extract_column_name(field: &str) -> &str {
493 if let Some(pos) = field.rfind('.') {
494 &field[pos + 1..]
495 } else {
496 field
497 }
498 }
499
500 fn optimize_join_order(&self, engine: &MemoryEngine) -> DbResult<Vec<JoinCondition>> {
503 let mut tables: Vec<(String, usize)> = Vec::new();
505
506 let main_count = engine.get_row_count(&self.table)?;
508 tables.push((self.table.clone(), main_count));
509
510 for join in &self.joins {
512 if !tables.iter().any(|(name, _)| name == &join.right_table) {
513 let count = engine.get_row_count(&join.right_table)?;
514 tables.push((join.right_table.clone(), count));
515 }
516 }
517
518 tables.sort_by_key(|(_, count)| *count);
520
521 let smallest_table = tables.first()
523 .map(|(name, _)| name.clone());
524
525 let mut optimized_joins = self.joins.clone();
527
528 if let Some(smallest) = smallest_table {
529 if let Some(min_idx) = optimized_joins.iter().position(|j| j.right_table == smallest) {
531 optimized_joins.swap(0, min_idx);
532 }
533 }
534
535 Ok(optimized_joins)
536 }
537
538 fn execute_join(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
540 if self.joins.is_empty() {
541 return self.execute_simple_scan(engine);
542 }
543
544 let has_right_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Right));
546 let has_full_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Full));
547
548 let mut results: Vec<Row> = Vec::new();
550
551 if has_right_join || has_full_join {
552 self.execute_right_or_full_join(engine, &mut results)?;
554 } else {
555 let optimized_joins = self.optimize_join_order(engine)?;
557
558 let main_table_rows = engine.scan(&self.table)?;
560
561 for (_row_id, main_row) in main_table_rows {
562 let main_prefixed = self.prefix_row(&main_row, &self.table);
563
564 self.process_joins_with_order(engine, main_prefixed, 0, &mut results, &optimized_joins)?;
566 }
567 }
568
569 let mut filtered: Vec<Row> = results
571 .into_iter()
572 .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
573 .collect();
574
575 if !self.selected_columns.is_empty() {
577 filtered = filtered
578 .into_iter()
579 .map(|row| {
580 let mut projected = Row::new();
581 for col in &self.selected_columns {
582 if let Some(value) = row.get(col) {
583 projected.insert(col.clone(), value.clone());
584 }
585 }
586 projected
587 })
588 .collect();
589 }
590
591 if let Some((ref field, order)) = self.order_by {
593 filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
594 }
595
596 let start = self.offset.unwrap_or(0);
598 let end = start + self.limit.unwrap_or(filtered.len());
599
600 Ok(filtered.into_iter().skip(start).take(end - start).collect())
601 }
602
603 fn execute_right_or_full_join(&self, engine: &MemoryEngine, results: &mut Vec<Row>) -> DbResult<()> {
605 let right_or_full_join_index = self.joins.iter().position(|j| {
607 matches!(j.join_type, JoinType::Right | JoinType::Full)
608 });
609
610 if let Some(join_idx) = right_or_full_join_index {
611 let join = &self.joins[join_idx];
612 let is_full = matches!(join.join_type, JoinType::Full);
613
614 let right_table_rows = engine.scan(&join.right_table)?;
616
617 let left_table_rows = if join_idx == 0 {
619 engine.scan(&self.table)?
620 } else {
621 Vec::new()
624 };
625
626 let mut matched_left_row_ids: std::collections::HashSet<RowId> = std::collections::HashSet::new();
628
629 for (_right_id, right_row) in right_table_rows {
631 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
632
633 let mut has_match = false;
635
636 for (left_id, left_row) in &left_table_rows {
637 let left_prefixed = self.prefix_row(&left_row, &self.table);
638
639 if self.match_join_condition(&left_prefixed, &right_prefixed, join) {
640 has_match = true;
641 matched_left_row_ids.insert(*left_id);
642 let mut merged = left_prefixed.clone();
643 merged.extend(right_prefixed.clone());
644
645 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
647 }
648 }
649
650 if !has_match {
652 let null_row = self.create_null_row(engine, &self.table)?;
653 let mut merged = null_row;
654 merged.extend(right_prefixed);
655
656 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
658 }
659 }
660
661 if is_full {
663 for (left_id, left_row) in &left_table_rows {
664 if !matched_left_row_ids.contains(left_id) {
665 let left_prefixed = self.prefix_row(&left_row, &self.table);
666 let null_row = self.create_null_row(engine, &join.right_table)?;
667 let mut merged = left_prefixed;
668 merged.extend(null_row);
669
670 self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
672 }
673 }
674 }
675 }
676
677 Ok(())
678 }
679
680 fn process_joins_right_full(
682 &self,
683 engine: &MemoryEngine,
684 current_row: Row,
685 join_index: usize,
686 results: &mut Vec<Row>,
687 is_full_context: bool,
688 ) -> DbResult<()> {
689 if join_index >= self.joins.len() {
690 results.push(current_row);
691 return Ok(());
692 }
693
694 let join = &self.joins[join_index];
695
696 if is_full_context && matches!(join.join_type, JoinType::Full) {
698 }
701
702 let right_column = Self::extract_column_name(&join.right_field);
704 if let Some(index) = engine.get_index(&join.right_table, right_column) {
705 self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
707 } else {
708 self.process_join_scan_impl(engine, current_row, join_index, results, join)
710 }
711 }
712
713 fn process_joins_with_order(
715 &self,
716 engine: &MemoryEngine,
717 current_row: Row,
718 join_index: usize,
719 results: &mut Vec<Row>,
720 optimized_joins: &[JoinCondition],
721 ) -> DbResult<()> {
722 if join_index >= optimized_joins.len() {
723 results.push(current_row);
725 return Ok(());
726 }
727
728 let join = &optimized_joins[join_index];
729
730 if matches!(join.join_type, JoinType::Right) {
732 return self.process_joins_with_order(engine, current_row, join_index + 1, results, optimized_joins);
733 }
734
735 let right_column = Self::extract_column_name(&join.right_field);
737 if let Some(index) = engine.get_index(&join.right_table, right_column) {
738 self.process_join_with_index_impl_optimized(engine, current_row, join_index, results, join, right_column, index, optimized_joins)
739 } else {
740 self.process_join_scan_impl_optimized(engine, current_row, join_index, results, join, optimized_joins)
741 }
742 }
743
744 fn process_join_scan_impl_optimized(
746 &self,
747 engine: &MemoryEngine,
748 current_row: Row,
749 join_index: usize,
750 results: &mut Vec<Row>,
751 join: &JoinCondition,
752 optimized_joins: &[JoinCondition],
753 ) -> DbResult<()> {
754 let right_rows = engine.scan(&join.right_table)?;
755
756 let mut has_match = false;
757
758 for (_right_id, right_row) in right_rows {
759 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
760
761 if self.match_join_condition(¤t_row, &right_prefixed, join) {
763 has_match = true;
764 let mut merged = current_row.clone();
766 merged.extend(right_prefixed);
767
768 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
770 }
771 }
772
773 if !has_match && matches!(join.join_type, JoinType::Left) {
775 let null_row = self.create_null_row(engine, &join.right_table)?;
776 let mut merged = current_row.clone();
777 merged.extend(null_row);
778
779 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
781 }
782
783 Ok(())
784 }
785
786 fn process_join_with_index_impl_optimized(
788 &self,
789 engine: &MemoryEngine,
790 current_row: Row,
791 join_index: usize,
792 results: &mut Vec<Row>,
793 join: &JoinCondition,
794 _right_column: &str,
795 index: &BTreeIndex,
796 optimized_joins: &[JoinCondition],
797 ) -> DbResult<()> {
798 let join_value = match current_row.get(&join.left_field) {
800 Some(v) => v,
801 None => return Ok(()), };
803
804 let matching_row_ids = index.search(join_value);
806
807 let mut has_match = false;
808
809 for row_id in matching_row_ids {
810 has_match = true;
811 if let Some(right_row) = engine.get(&join.right_table, row_id)? {
812 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
813 let mut merged = current_row.clone();
814 merged.extend(right_prefixed);
815
816 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
818 }
819 }
820
821 if !has_match && matches!(join.join_type, JoinType::Left) {
823 let null_row = self.create_null_row(engine, &join.right_table)?;
824 let mut merged = current_row.clone();
825 merged.extend(null_row);
826
827 self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
829 }
830
831 Ok(())
832 }
833
834 fn process_joins(
836 &self,
837 engine: &MemoryEngine,
838 current_row: Row,
839 join_index: usize,
840 results: &mut Vec<Row>,
841 ) -> DbResult<()> {
842 if join_index >= self.joins.len() {
843 results.push(current_row);
845 return Ok(());
846 }
847
848 let join = &self.joins[join_index];
849
850 if matches!(join.join_type, JoinType::Right) {
853 return self.process_joins(engine, current_row, join_index + 1, results);
856 }
857
858 let right_column = Self::extract_column_name(&join.right_field);
860 if let Some(index) = engine.get_index(&join.right_table, right_column) {
861 self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
863 } else {
864 self.process_join_scan_impl(engine, current_row, join_index, results, join)
866 }
867 }
868
869 fn process_join_scan_impl(
871 &self,
872 engine: &MemoryEngine,
873 current_row: Row,
874 join_index: usize,
875 results: &mut Vec<Row>,
876 join: &JoinCondition,
877 ) -> DbResult<()> {
878 let right_rows = engine.scan(&join.right_table)?;
879
880 let mut has_match = false;
881
882 for (_right_id, right_row) in right_rows {
883 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
884
885 if self.match_join_condition(¤t_row, &right_prefixed, join) {
887 has_match = true;
888 let mut merged = current_row.clone();
890 merged.extend(right_prefixed);
891
892 self.process_joins(engine, merged, join_index + 1, results)?;
894 }
895 }
896
897 if !has_match && matches!(join.join_type, JoinType::Left) {
899 let null_row = self.create_null_row(engine, &join.right_table)?;
900 let mut merged = current_row.clone();
901 merged.extend(null_row);
902
903 self.process_joins(engine, merged, join_index + 1, results)?;
905 }
906
907 Ok(())
908 }
909
910 fn process_join_with_index_impl(
912 &self,
913 engine: &MemoryEngine,
914 current_row: Row,
915 join_index: usize,
916 results: &mut Vec<Row>,
917 join: &JoinCondition,
918 _right_column: &str,
919 index: &BTreeIndex,
920 ) -> DbResult<()> {
921 let join_value = match current_row.get(&join.left_field) {
923 Some(v) => v,
924 None => return Ok(()), };
926
927 let matching_row_ids = index.search(join_value);
929
930 let mut has_match = false;
931
932 for row_id in matching_row_ids {
933 has_match = true;
934 if let Some(right_row) = engine.get(&join.right_table, row_id)? {
935 let right_prefixed = self.prefix_row(&right_row, &join.right_table);
936 let mut merged = current_row.clone();
937 for (k, v) in right_prefixed {
938 merged.insert(k, v);
939 }
940
941 self.process_joins(engine, merged, join_index + 1, results)?;
943 }
944 }
945
946 if !has_match && matches!(join.join_type, JoinType::Left) {
948 let null_row = self.create_null_row(engine, &join.right_table)?;
949 let mut merged = current_row.clone();
950 for (k, v) in null_row {
951 merged.insert(k, v);
952 }
953
954 self.process_joins(engine, merged, join_index + 1, results)?;
956 }
957
958 Ok(())
959 }
960
961 fn execute_simple_scan(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
963 let rows = engine.scan(&self.table)?;
964
965 let mut filtered: Vec<Row> = rows
966 .into_iter()
967 .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
968 .map(|(_, row)| row.clone())
969 .collect();
970
971 if !self.selected_columns.is_empty() {
973 filtered = filtered
974 .into_iter()
975 .map(|row| {
976 let mut projected = Row::new();
977 for col in &self.selected_columns {
978 if let Some(value) = row.get(col) {
979 projected.insert(col.clone(), value.clone());
980 }
981 }
982 projected
983 })
984 .collect();
985 }
986
987 if let Some((ref field, order)) = self.order_by {
989 filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
990 }
991
992 let start = self.offset.unwrap_or(0);
994 let end = start + self.limit.unwrap_or(filtered.len());
995
996 Ok(filtered.into_iter().skip(start).take(end - start).collect())
997 }
998
999 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1001 self.filters.push(FilterExpr::Eq {
1002 field: field.to_string(),
1003 value,
1004 });
1005 self
1006 }
1007
1008 pub fn ne(mut self, field: &str, value: DbValue) -> Self {
1009 self.filters.push(FilterExpr::Ne {
1010 field: field.to_string(),
1011 value,
1012 });
1013 self
1014 }
1015
1016 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1017 self.filters.push(FilterExpr::Lt {
1018 field: field.to_string(),
1019 value,
1020 });
1021 self
1022 }
1023
1024 pub fn le(mut self, field: &str, value: DbValue) -> Self {
1025 self.filters.push(FilterExpr::Le {
1026 field: field.to_string(),
1027 value,
1028 });
1029 self
1030 }
1031
1032 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1033 self.filters.push(FilterExpr::Gt {
1034 field: field.to_string(),
1035 value,
1036 });
1037 self
1038 }
1039
1040 pub fn ge(mut self, field: &str, value: DbValue) -> Self {
1041 self.filters.push(FilterExpr::Ge {
1042 field: field.to_string(),
1043 value,
1044 });
1045 self
1046 }
1047
1048 pub fn in_list(mut self, field: &str, values: Vec<DbValue>) -> Self {
1049 self.filters.push(FilterExpr::In {
1050 field: field.to_string(),
1051 values,
1052 });
1053 self
1054 }
1055
1056 pub fn contains(mut self, field: &str, value: &str) -> Self {
1057 self.filters.push(FilterExpr::Contains {
1058 field: field.to_string(),
1059 value: value.to_string(),
1060 });
1061 self
1062 }
1063
1064 pub fn is_null(mut self, field: &str) -> Self {
1066 self.filters.push(FilterExpr::IsNull {
1067 field: field.to_string(),
1068 });
1069 self
1070 }
1071
1072 pub fn is_not_null(mut self, field: &str) -> Self {
1074 self.filters.push(FilterExpr::IsNotNull {
1075 field: field.to_string(),
1076 });
1077 self
1078 }
1079
1080 pub fn and(mut self, other: QueryBuilder) -> Self {
1082 for filter in other.filters {
1084 self.filters.push(filter);
1085 }
1086 self
1087 }
1088
1089 pub fn or(mut self, left: FilterExpr, right: FilterExpr) -> Self {
1109 self.filters.push(FilterExpr::Or(Box::new(left), Box::new(right)));
1110 self
1111 }
1112
1113 pub fn not(self) -> NotBuilder {
1148 NotBuilder { query: self }
1149 }
1150
1151 pub fn where_expr(mut self, expr: FilterExpr) -> Self {
1171 self.filters.push(expr);
1172 self
1173 }
1174
1175 pub fn or_simple<F1, F2>(self, left_builder: F1, right_builder: F2) -> Self
1192 where
1193 F1: FnOnce(QueryBuilder) -> QueryBuilder,
1194 F2: FnOnce(QueryBuilder) -> QueryBuilder,
1195 {
1196 let left_qb = left_builder(QueryBuilder::new(self.table.clone(), Arc::clone(&self.engine)));
1198 let right_qb = right_builder(QueryBuilder::new(self.table.clone(), Arc::clone(&self.engine)));
1200
1201 let left_filters: Vec<FilterExpr> = left_qb.filters;
1203 let right_filters: Vec<FilterExpr> = right_qb.filters;
1204
1205 let left_expr = left_filters.into_iter().reduce(|a, b| FilterExpr::And(Box::new(a), Box::new(b)));
1207 let right_expr = right_filters.into_iter().reduce(|a, b| FilterExpr::And(Box::new(a), Box::new(b)));
1208
1209 match (left_expr, right_expr) {
1210 (Some(left), Some(right)) => {
1211 let mut new_self = self;
1212 new_self.filters.push(FilterExpr::Or(Box::new(left), Box::new(right)));
1213 new_self
1214 }
1215 (Some(left), None) => {
1216 let mut new_self = self;
1217 new_self.filters.push(left);
1218 new_self
1219 }
1220 (None, Some(right)) => {
1221 let mut new_self = self;
1222 new_self.filters.push(right);
1223 new_self
1224 }
1225 (None, None) => self,
1226 }
1227 }
1228
1229 pub fn not_simple<F>(self, builder: F) -> Self
1246 where
1247 F: FnOnce(QueryBuilder) -> QueryBuilder,
1248 {
1249 let inner_qb = builder(QueryBuilder::new(self.table.clone(), Arc::clone(&self.engine)));
1251 let inner_filters: Vec<FilterExpr> = inner_qb.filters;
1252
1253 let inner_expr = inner_filters.into_iter().reduce(|a, b| FilterExpr::And(Box::new(a), Box::new(b)));
1255
1256 match inner_expr {
1257 Some(expr) => {
1258 let mut new_self = self;
1259 new_self.filters.push(FilterExpr::Not(Box::new(expr)));
1260 new_self
1261 }
1262 None => self,
1263 }
1264 }
1265
1266 pub fn order_by(mut self, field: &str, order: Order) -> Self {
1268 self.order_by = Some((field.to_string(), order));
1269 self
1270 }
1271
1272 pub fn limit(mut self, limit: usize) -> Self {
1274 self.limit = Some(limit);
1275 self
1276 }
1277
1278 pub fn offset(mut self, offset: usize) -> Self {
1279 self.offset = Some(offset);
1280 self
1281 }
1282
1283 pub fn inner_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1285 self.joins.push(JoinCondition {
1286 join_type: JoinType::Inner,
1287 right_table: right_table.to_string(),
1288 left_field: left_field.to_string(),
1289 right_field: right_field.to_string(),
1290 });
1291 self
1292 }
1293
1294 pub fn left_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1295 self.joins.push(JoinCondition {
1296 join_type: JoinType::Left,
1297 right_table: right_table.to_string(),
1298 left_field: left_field.to_string(),
1299 right_field: right_field.to_string(),
1300 });
1301 self
1302 }
1303
1304 pub fn right_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1305 self.joins.push(JoinCondition {
1306 join_type: JoinType::Right,
1307 right_table: right_table.to_string(),
1308 left_field: left_field.to_string(),
1309 right_field: right_field.to_string(),
1310 });
1311 self
1312 }
1313
1314 pub fn full_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1315 self.joins.push(JoinCondition {
1316 join_type: JoinType::Full,
1317 right_table: right_table.to_string(),
1318 left_field: left_field.to_string(),
1319 right_field: right_field.to_string(),
1320 });
1321 self
1322 }
1323
1324 pub fn select(mut self, columns: &[&str]) -> Self {
1326 self.selected_columns = columns.iter().map(|s| s.to_string()).collect();
1327 self
1328 }
1329
1330 pub fn count(mut self) -> Self {
1334 self.aggregate = Some(AggregateExpr {
1335 func: AggregateFunction::Count,
1336 column: None, alias: None,
1338 });
1339 self
1340 }
1341
1342 pub fn count_column(mut self, column: &str) -> Self {
1344 self.aggregate = Some(AggregateExpr {
1345 func: AggregateFunction::Count,
1346 column: Some(column.to_string()),
1347 alias: None,
1348 });
1349 self
1350 }
1351
1352 pub fn sum(mut self, column: &str) -> Self {
1354 self.aggregate = Some(AggregateExpr {
1355 func: AggregateFunction::Sum,
1356 column: Some(column.to_string()),
1357 alias: None,
1358 });
1359 self
1360 }
1361
1362 pub fn avg(mut self, column: &str) -> Self {
1364 self.aggregate = Some(AggregateExpr {
1365 func: AggregateFunction::Avg,
1366 column: Some(column.to_string()),
1367 alias: None,
1368 });
1369 self
1370 }
1371
1372 pub fn max(mut self, column: &str) -> Self {
1374 self.aggregate = Some(AggregateExpr {
1375 func: AggregateFunction::Max,
1376 column: Some(column.to_string()),
1377 alias: None,
1378 });
1379 self
1380 }
1381
1382 pub fn min(mut self, column: &str) -> Self {
1384 self.aggregate = Some(AggregateExpr {
1385 func: AggregateFunction::Min,
1386 column: Some(column.to_string()),
1387 alias: None,
1388 });
1389 self
1390 }
1391
1392 pub fn alias(mut self, name: &str) -> Self {
1394 if let Some(ref mut agg) = self.aggregate {
1395 agg.alias = Some(name.to_string());
1396 }
1397 self
1398 }
1399
1400 pub fn distinct(mut self) -> Self {
1418 self.distinct = true;
1419 self
1420 }
1421
1422 pub fn group_by(mut self, columns: &[&str]) -> Self {
1426 self.group_by = columns.iter().map(|s| s.to_string()).collect();
1427 self
1428 }
1429
1430 pub fn having_eq(mut self, value: DbValue) -> Self {
1432 self.having = Some(HavingExpr::Eq { value });
1433 self
1434 }
1435
1436 pub fn having_ne(mut self, value: DbValue) -> Self {
1438 self.having = Some(HavingExpr::Ne { value });
1439 self
1440 }
1441
1442 pub fn having_gt(mut self, value: DbValue) -> Self {
1444 self.having = Some(HavingExpr::Gt { value });
1445 self
1446 }
1447
1448 pub fn having_ge(mut self, value: DbValue) -> Self {
1450 self.having = Some(HavingExpr::Ge { value });
1451 self
1452 }
1453
1454 pub fn having_lt(mut self, value: DbValue) -> Self {
1456 self.having = Some(HavingExpr::Lt { value });
1457 self
1458 }
1459
1460 pub fn having_le(mut self, value: DbValue) -> Self {
1462 self.having = Some(HavingExpr::Le { value });
1463 self
1464 }
1465
1466 pub fn execute(self) -> DbResult<Vec<Row>> {
1468 let engine = self.engine.read().unwrap();
1469
1470 if self.aggregate.is_some() || !self.group_by.is_empty() {
1472 return self.execute_aggregate(&engine);
1473 }
1474
1475 if !self.joins.is_empty() {
1477 return self.execute_join(&engine);
1478 }
1479
1480 if let Some((meta, index, key_values)) = self.find_best_index(&engine) {
1482 return self.execute_with_composite_index(&engine, meta, index, &key_values);
1484 }
1485
1486 if let Some((field, index)) = self.find_best_single_index(&engine) {
1488 return self.execute_with_index(&engine, field, index);
1490 }
1491
1492 let rows = engine.scan(&self.table)?;
1494
1495 let mut filtered: Vec<Row> = rows
1497 .into_iter()
1498 .filter(|(_, row)| {
1499 self.filters.iter().all(|expr| evaluate_filter(expr, row))
1500 })
1501 .map(|(_, row)| row.clone())
1502 .collect();
1503
1504 if self.distinct {
1506 filtered = self.deduplicate_rows(filtered);
1507 }
1508
1509 if let Some((ref field, order)) = self.order_by {
1511 filtered.sort_by(|a, b| {
1512 self.compare_rows(a, b, field, order)
1513 });
1514 }
1515
1516 let start = self.offset.unwrap_or(0);
1518 let end = start + self.limit.unwrap_or(filtered.len());
1519
1520 Ok(filtered.into_iter().skip(start).take(end - start).collect())
1521 }
1522
1523 fn execute_aggregate(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
1525 let rows: Vec<Row> = if !self.joins.is_empty() {
1527 let join_rows = self.execute_join(engine)?;
1529 join_rows.into_iter()
1531 .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1532 .collect()
1533 } else {
1534 let scanned = engine.scan(&self.table)?;
1536 scanned.into_iter()
1537 .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1538 .map(|(_, row)| row.clone())
1539 .collect()
1540 };
1541
1542 if !self.group_by.is_empty() {
1544 self.execute_grouped_aggregate(rows)
1545 } else {
1546 self.execute_simple_aggregate(rows)
1548 }
1549 }
1550
1551 fn execute_simple_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1553 let agg = match &self.aggregate {
1554 Some(a) => a,
1555 None => {
1556 return Ok(rows);
1558 }
1559 };
1560
1561 let result_value = match agg.func {
1562 AggregateFunction::Count => {
1563 let count = match &agg.column {
1564 Some(col) => {
1565 rows.iter()
1567 .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1568 .count() as i64
1569 }
1570 None => {
1571 rows.len() as i64
1573 }
1574 };
1575 DbValue::Integer(count)
1576 }
1577 AggregateFunction::Sum => {
1578 match &agg.column {
1579 Some(col) => self.sum_column(&rows, col),
1580 None => DbValue::Integer(0),
1581 }
1582 }
1583 AggregateFunction::Avg => {
1584 match &agg.column {
1585 Some(col) => self.avg_column(&rows, col),
1586 None => DbValue::Null,
1587 }
1588 }
1589 AggregateFunction::Max => {
1590 match &agg.column {
1591 Some(col) => self.max_column(&rows, col),
1592 None => DbValue::Null,
1593 }
1594 }
1595 AggregateFunction::Min => {
1596 match &agg.column {
1597 Some(col) => self.min_column(&rows, col),
1598 None => DbValue::Null,
1599 }
1600 }
1601 };
1602
1603 let mut result_row = Row::new();
1605 let column_name = match &agg.alias {
1606 Some(alias) => alias.clone(),
1607 None => {
1608 let func_name = match agg.func {
1609 AggregateFunction::Count => "COUNT",
1610 AggregateFunction::Sum => "SUM",
1611 AggregateFunction::Avg => "AVG",
1612 AggregateFunction::Max => "MAX",
1613 AggregateFunction::Min => "MIN",
1614 };
1615 match &agg.column {
1616 Some(col) => format!("{}({})", func_name, col),
1617 None => "COUNT(*)".to_string(),
1618 }
1619 }
1620 };
1621
1622 let db_value = match result_value {
1624 DbValue::Integer(i) => DbValue::Integer(i),
1625 DbValue::Real(r) => DbValue::Real(r),
1626 DbValue::Null => DbValue::Null,
1627 _ => DbValue::Null,
1628 };
1629 result_row.insert(column_name, db_value);
1630
1631 Ok(vec![result_row])
1632 }
1633
1634 fn execute_grouped_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1636 let agg = match &self.aggregate {
1637 Some(a) => a,
1638 None => {
1639 return self.execute_group_by_only(rows);
1641 }
1642 };
1643
1644 use std::collections::HashMap;
1646 let mut groups: HashMap<Vec<DbValue>, Vec<Row>> = HashMap::new();
1647
1648 for row in rows {
1649 let mut key = Vec::new();
1651 for col in &self.group_by {
1652 if let Some(val) = row.get(col) {
1653 key.push(val.clone());
1654 } else {
1655 key.push(DbValue::Null);
1656 }
1657 }
1658
1659 groups.entry(key).or_insert_with(Vec::new).push(row);
1660 }
1661
1662 let mut results = Vec::new();
1664 for (key, group_rows) in groups {
1665 let agg_value = match agg.func {
1667 AggregateFunction::Count => {
1668 let count = match &agg.column {
1669 Some(col) => {
1670 group_rows.iter()
1671 .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1672 .count() as i64
1673 }
1674 None => group_rows.len() as i64,
1675 };
1676 DbValue::Integer(count)
1677 }
1678 AggregateFunction::Sum => {
1679 match &agg.column {
1680 Some(col) => self.sum_column(&group_rows, col),
1681 None => DbValue::Integer(0),
1682 }
1683 }
1684 AggregateFunction::Avg => {
1685 match &agg.column {
1686 Some(col) => self.avg_column(&group_rows, col),
1687 None => DbValue::Null,
1688 }
1689 }
1690 AggregateFunction::Max => {
1691 match &agg.column {
1692 Some(col) => self.max_column(&group_rows, col),
1693 None => DbValue::Null,
1694 }
1695 }
1696 AggregateFunction::Min => {
1697 match &agg.column {
1698 Some(col) => self.min_column(&group_rows, col),
1699 None => DbValue::Null,
1700 }
1701 }
1702 };
1703
1704 if let Some(ref having) = self.having {
1706 if !self.evaluate_having(&agg_value, having) {
1707 continue;
1708 }
1709 }
1710
1711 let mut result_row = Row::new();
1713
1714 for (i, col) in self.group_by.iter().enumerate() {
1716 result_row.insert(col.clone(), key[i].clone());
1717 }
1718
1719 let column_name = match &agg.alias {
1721 Some(alias) => alias.clone(),
1722 None => {
1723 let func_name = match agg.func {
1724 AggregateFunction::Count => "COUNT",
1725 AggregateFunction::Sum => "SUM",
1726 AggregateFunction::Avg => "AVG",
1727 AggregateFunction::Max => "MAX",
1728 AggregateFunction::Min => "MIN",
1729 };
1730 match &agg.column {
1731 Some(col) => format!("{}({})", func_name, col),
1732 None => "COUNT(*)".to_string(),
1733 }
1734 }
1735 };
1736
1737 let db_value = match agg_value {
1738 DbValue::Integer(i) => DbValue::Integer(i),
1739 DbValue::Real(r) => DbValue::Real(r),
1740 DbValue::Null => DbValue::Null,
1741 _ => DbValue::Null,
1742 };
1743 result_row.insert(column_name, db_value);
1744
1745 results.push(result_row);
1746 }
1747
1748 Ok(results)
1749 }
1750
1751 fn execute_group_by_only(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1753 use std::collections::HashSet;
1754
1755 let mut seen: HashSet<Vec<DbValue>> = HashSet::new();
1756 let mut results = Vec::new();
1757
1758 for row in rows {
1759 let mut key = Vec::new();
1760 for col in &self.group_by {
1761 if let Some(val) = row.get(col) {
1762 key.push(val.clone());
1763 } else {
1764 key.push(DbValue::Null);
1765 }
1766 }
1767
1768 if !seen.contains(&key) {
1769 seen.insert(key.clone());
1770
1771 let mut result_row = Row::new();
1772 for (i, col) in self.group_by.iter().enumerate() {
1773 result_row.insert(col.clone(), key[i].clone());
1774 }
1775 results.push(result_row);
1776 }
1777 }
1778
1779 Ok(results)
1780 }
1781
1782 fn sum_column(&self, rows: &[Row], column: &str) -> DbValue {
1784 let mut sum: f64 = 0.0;
1785 let mut has_value = false;
1786
1787 for row in rows {
1788 if let Some(val) = row.get(column) {
1789 match val {
1790 DbValue::Integer(i) => {
1791 sum += *i as f64;
1792 has_value = true;
1793 }
1794 DbValue::Real(r) => {
1795 sum += *r;
1796 has_value = true;
1797 }
1798 _ => {}
1799 }
1800 }
1801 }
1802
1803 if has_value {
1804 if rows.iter().filter_map(|r| r.get(column)).all(|v| matches!(v, DbValue::Integer(_))) {
1806 DbValue::Integer(sum as i64)
1807 } else {
1808 DbValue::Real(sum)
1809 }
1810 } else {
1811 DbValue::Null
1812 }
1813 }
1814
1815 fn avg_column(&self, rows: &[Row], column: &str) -> DbValue {
1817 let mut sum: f64 = 0.0;
1818 let mut count: i64 = 0;
1819
1820 for row in rows {
1821 if let Some(val) = row.get(column) {
1822 match val {
1823 DbValue::Integer(i) => {
1824 sum += *i as f64;
1825 count += 1;
1826 }
1827 DbValue::Real(r) => {
1828 sum += *r;
1829 count += 1;
1830 }
1831 _ => {}
1832 }
1833 }
1834 }
1835
1836 if count > 0 {
1837 DbValue::Real(sum / count as f64)
1838 } else {
1839 DbValue::Null
1840 }
1841 }
1842
1843 fn max_column(&self, rows: &[Row], column: &str) -> DbValue {
1845 let mut max_val: Option<DbValue> = None;
1846
1847 for row in rows {
1848 if let Some(val) = row.get(column) {
1849 if !val.is_null() {
1850 match &max_val {
1851 None => max_val = Some(val.clone()),
1852 Some(current) => {
1853 if self.compare_values(val, current) > 0 {
1854 max_val = Some(val.clone());
1855 }
1856 }
1857 }
1858 }
1859 }
1860 }
1861
1862 max_val.unwrap_or(DbValue::Null)
1863 }
1864
1865 fn min_column(&self, rows: &[Row], column: &str) -> DbValue {
1867 let mut min_val: Option<DbValue> = None;
1868
1869 for row in rows {
1870 if let Some(val) = row.get(column) {
1871 if !val.is_null() {
1872 match &min_val {
1873 None => min_val = Some(val.clone()),
1874 Some(current) => {
1875 if self.compare_values(val, current) < 0 {
1876 min_val = Some(val.clone());
1877 }
1878 }
1879 }
1880 }
1881 }
1882 }
1883
1884 min_val.unwrap_or(DbValue::Null)
1885 }
1886
1887 fn compare_values(&self, a: &DbValue, b: &DbValue) -> i32 {
1889 match (a, b) {
1890 (DbValue::Integer(a), DbValue::Integer(b)) => a.cmp(b) as i32,
1891 (DbValue::Integer(a), DbValue::Real(b)) => (*a as f64).partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1892 (DbValue::Real(a), DbValue::Integer(b)) => a.partial_cmp(&(*b as f64)).map(|o| o as i32).unwrap_or(0),
1893 (DbValue::Real(a), DbValue::Real(b)) => a.partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1894 (DbValue::Text(a), DbValue::Text(b)) => a.cmp(b) as i32,
1895 _ => 0,
1896 }
1897 }
1898
1899 fn evaluate_having(&self, agg_value: &DbValue, having: &HavingExpr) -> bool {
1901 match having {
1902 HavingExpr::Eq { value } => agg_value == value,
1903 HavingExpr::Ne { value } => agg_value != value,
1904 HavingExpr::Gt { value } => self.compare_values(agg_value, value) > 0,
1905 HavingExpr::Ge { value } => self.compare_values(agg_value, value) >= 0,
1906 HavingExpr::Lt { value } => self.compare_values(agg_value, value) < 0,
1907 HavingExpr::Le { value } => self.compare_values(agg_value, value) <= 0,
1908 }
1909 }
1910}
1911
1912pub struct UpdateBuilder {
1914 table: String,
1915 filters: Vec<FilterExpr>,
1916 values: Vec<(String, DbValue)>,
1917 engine: Arc<RwLock<MemoryEngine>>,
1918}
1919
1920impl UpdateBuilder {
1921 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
1922 UpdateBuilder {
1923 table,
1924 filters: Vec::new(),
1925 values: Vec::new(),
1926 engine,
1927 }
1928 }
1929
1930 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1931 self.filters.push(FilterExpr::Eq {
1932 field: field.to_string(),
1933 value,
1934 });
1935 self
1936 }
1937
1938 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1939 self.filters.push(FilterExpr::Lt {
1940 field: field.to_string(),
1941 value,
1942 });
1943 self
1944 }
1945
1946 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1947 self.filters.push(FilterExpr::Gt {
1948 field: field.to_string(),
1949 value,
1950 });
1951 self
1952 }
1953
1954 pub fn set(mut self, field: &str, value: DbValue) -> Self {
1955 self.values.push((field.to_string(), value));
1956 self
1957 }
1958
1959 pub fn execute(self) -> DbResult<usize> {
1961 let mut engine = self.engine.write().unwrap();
1962 let rows = engine.scan(&self.table)?;
1963
1964 let mut to_update = Vec::new();
1966 for (row_id, row) in rows {
1967 let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
1968 if matches {
1969 to_update.push(row_id);
1970 }
1971 }
1972
1973 let mut count = 0;
1975 for row_id in to_update {
1976 let mut update_row: Row = self.values.iter().cloned().collect::<Row>();
1977 if let Some(existing) = engine.get(&self.table, row_id)? {
1979 for (key, value) in existing.iter() {
1980 if !update_row.contains_key(key) {
1981 update_row.insert(key.clone(), value.clone());
1982 }
1983 }
1984 }
1985 engine.update(&self.table, row_id, update_row)?;
1986 count += 1;
1987 }
1988
1989 Ok(count)
1990 }
1991}
1992
1993pub struct DeleteBuilder {
1995 table: String,
1996 filters: Vec<FilterExpr>,
1997 engine: Arc<RwLock<MemoryEngine>>,
1998}
1999
2000impl DeleteBuilder {
2001 pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
2002 DeleteBuilder {
2003 table,
2004 filters: Vec::new(),
2005 engine,
2006 }
2007 }
2008
2009 pub fn eq(mut self, field: &str, value: DbValue) -> Self {
2010 self.filters.push(FilterExpr::Eq {
2011 field: field.to_string(),
2012 value,
2013 });
2014 self
2015 }
2016
2017 pub fn lt(mut self, field: &str, value: DbValue) -> Self {
2018 self.filters.push(FilterExpr::Lt {
2019 field: field.to_string(),
2020 value,
2021 });
2022 self
2023 }
2024
2025 pub fn gt(mut self, field: &str, value: DbValue) -> Self {
2026 self.filters.push(FilterExpr::Gt {
2027 field: field.to_string(),
2028 value,
2029 });
2030 self
2031 }
2032
2033 pub fn execute(self) -> DbResult<usize> {
2035 let mut engine = self.engine.write().unwrap();
2036 let rows = engine.scan(&self.table)?;
2037
2038 let mut to_delete = Vec::new();
2040 for (row_id, row) in rows {
2041 let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
2042 if matches {
2043 to_delete.push(row_id);
2044 }
2045 }
2046
2047 let mut count = 0;
2049 for row_id in to_delete {
2050 engine.delete(&self.table, row_id)?;
2051 count += 1;
2052 }
2053
2054 Ok(count)
2055 }
2056}
2057
2058pub struct NotBuilder {
2063 query: QueryBuilder,
2064}
2065
2066impl NotBuilder {
2067 pub fn expr(mut self, expr: FilterExpr) -> QueryBuilder {
2086 self.query.filters.push(FilterExpr::Not(Box::new(expr)));
2087 self.query
2088 }
2089
2090 pub fn eq(self, field: &str, value: DbValue) -> QueryBuilder {
2092 self.expr(FilterExpr::Eq {
2093 field: field.to_string(),
2094 value,
2095 })
2096 }
2097
2098 pub fn ne(self, field: &str, value: DbValue) -> QueryBuilder {
2100 self.expr(FilterExpr::Ne {
2101 field: field.to_string(),
2102 value,
2103 })
2104 }
2105
2106 pub fn lt(self, field: &str, value: DbValue) -> QueryBuilder {
2108 self.expr(FilterExpr::Lt {
2109 field: field.to_string(),
2110 value,
2111 })
2112 }
2113
2114 pub fn le(self, field: &str, value: DbValue) -> QueryBuilder {
2116 self.expr(FilterExpr::Le {
2117 field: field.to_string(),
2118 value,
2119 })
2120 }
2121
2122 pub fn gt(self, field: &str, value: DbValue) -> QueryBuilder {
2124 self.expr(FilterExpr::Gt {
2125 field: field.to_string(),
2126 value,
2127 })
2128 }
2129
2130 pub fn ge(self, field: &str, value: DbValue) -> QueryBuilder {
2132 self.expr(FilterExpr::Ge {
2133 field: field.to_string(),
2134 value,
2135 })
2136 }
2137
2138 pub fn in_list(self, field: &str, values: Vec<DbValue>) -> QueryBuilder {
2140 self.expr(FilterExpr::In {
2141 field: field.to_string(),
2142 values,
2143 })
2144 }
2145
2146 pub fn contains(self, field: &str, value: &str) -> QueryBuilder {
2148 self.expr(FilterExpr::Contains {
2149 field: field.to_string(),
2150 value: value.to_string(),
2151 })
2152 }
2153
2154 pub fn is_null(self, field: &str) -> QueryBuilder {
2156 self.expr(FilterExpr::IsNull {
2157 field: field.to_string(),
2158 })
2159 }
2160
2161 pub fn is_not_null(self, field: &str) -> QueryBuilder {
2163 self.expr(FilterExpr::IsNotNull {
2164 field: field.to_string(),
2165 })
2166 }
2167}