Skip to main content

regulus_db/query/
builder.rs

1use crate::storage::{MemoryEngine, Row, StorageEngine, RowId};
2use crate::types::{DbValue, DbResult};
3use crate::index::btree::BTreeIndex;
4use crate::index::manager::IndexMeta;
5use std::sync::{Arc, RwLock};
6
7/// 排序方向
8#[derive(Debug, Clone, Copy, PartialEq)]
9pub enum Order {
10    Asc,
11    Desc,
12}
13
14/// JOIN 类型
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum JoinType {
17    Inner,   // INNER JOIN - 只返回匹配行
18    Left,    // LEFT JOIN - 返回左表所有行,右表无匹配则 NULL
19    Right,   // RIGHT JOIN - 返回右表所有行,左表无匹配则 NULL
20    Full,    // FULL OUTER JOIN - 返回左右表所有行,无匹配时双方填充 NULL
21}
22
23/// 连接条件
24#[derive(Debug, Clone)]
25pub struct JoinCondition {
26    pub join_type: JoinType,
27    pub right_table: String,
28    pub left_field: String,   // 格式:"table.column"
29    pub right_field: String,  // 格式:"table.column"
30}
31
32/// 过滤表达式 AST
33#[derive(Debug, Clone)]
34pub enum FilterExpr {
35    Eq { field: String, value: DbValue },
36    Ne { field: String, value: DbValue },
37    Lt { field: String, value: DbValue },
38    Le { field: String, value: DbValue },
39    Gt { field: String, value: DbValue },
40    Ge { field: String, value: DbValue },
41    In { field: String, values: Vec<DbValue> },
42    Contains { field: String, value: String },
43    And(Box<FilterExpr>, Box<FilterExpr>),
44    Or(Box<FilterExpr>, Box<FilterExpr>),
45    Not(Box<FilterExpr>),
46}
47
48/// 聚合函数类型
49#[derive(Debug, Clone, Copy, PartialEq)]
50pub enum AggregateFunction {
51    Count,
52    Sum,
53    Avg,
54    Max,
55    Min,
56}
57
58/// 聚合表达式
59#[derive(Debug, Clone)]
60pub struct AggregateExpr {
61    pub func: AggregateFunction,
62    pub column: Option<String>,  // COUNT(*) 为 None
63    pub alias: Option<String>,   // 可选的别名
64}
65
66/// HAVING 子句表达式(用于过滤分组)
67#[derive(Debug, Clone)]
68pub enum HavingExpr {
69    Eq { value: DbValue },
70    Ne { value: DbValue },
71    Lt { value: DbValue },
72    Le { value: DbValue },
73    Gt { value: DbValue },
74    Ge { value: DbValue },
75}
76
77/// 评估过滤表达式
78fn evaluate_filter(expr: &FilterExpr, row: &Row) -> bool {
79    match expr {
80        FilterExpr::Eq { field, value } => {
81            row.get(field).map(|v| v == value).unwrap_or(false)
82        }
83        FilterExpr::Ne { field, value } => {
84            row.get(field).map(|v| v != value).unwrap_or(true)
85        }
86        FilterExpr::Lt { field, value } => compare_field(row, field, value, |a, b| a < b),
87        FilterExpr::Le { field, value } => compare_field(row, field, value, |a, b| a <= b),
88        FilterExpr::Gt { field, value } => compare_field(row, field, value, |a, b| a > b),
89        FilterExpr::Ge { field, value } => compare_field(row, field, value, |a, b| a >= b),
90        FilterExpr::In { field, values } => {
91            row.get(field).map(|v| values.contains(v)).unwrap_or(false)
92        }
93        FilterExpr::Contains { field, value } => {
94            row.get(field)
95                .and_then(|v| v.as_text())
96                .map(|s| s.contains(value))
97                .unwrap_or(false)
98        }
99        FilterExpr::And(left, right) => {
100            evaluate_filter(left, row) && evaluate_filter(right, row)
101        }
102        FilterExpr::Or(left, right) => {
103            evaluate_filter(left, row) || evaluate_filter(right, row)
104        }
105        FilterExpr::Not(inner) => {
106            !evaluate_filter(inner, row)
107        }
108    }
109}
110
111/// 比较字段值(支持数值比较)
112fn compare_field<F>(row: &Row, field: &str, value: &DbValue, cmp: F) -> bool
113where
114    F: Fn(f64, f64) -> bool,
115{
116    let row_val = match row.get(field) {
117        Some(v) => v,
118        None => return false,
119    };
120
121    let row_num = match row_val {
122        DbValue::Integer(i) => *i as f64,
123        DbValue::Real(r) => *r,
124        _ => return false,
125    };
126
127    let cmp_num = match value {
128        DbValue::Integer(i) => *i as f64,
129        DbValue::Real(r) => *r,
130        _ => return false,
131    };
132
133    cmp(row_num, cmp_num)
134}
135
136/// 查询构建器
137pub struct QueryBuilder {
138    table: String,
139    joins: Vec<JoinCondition>,           // 新增:JOIN 列表
140    selected_columns: Vec<String>,       // 新增:选择字段
141    filters: Vec<FilterExpr>,
142    order_by: Option<(String, Order)>,
143    limit: Option<usize>,
144    offset: Option<usize>,
145    engine: Arc<RwLock<MemoryEngine>>,
146    // 聚合函数相关
147    aggregate: Option<AggregateExpr>,    // 聚合函数
148    group_by: Vec<String>,               // GROUP BY 字段
149    having: Option<HavingExpr>,          // HAVING 子句
150}
151
152impl QueryBuilder {
153    pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
154        QueryBuilder {
155            table,
156            joins: Vec::new(),
157            selected_columns: Vec::new(),
158            filters: Vec::new(),
159            order_by: None,
160            limit: None,
161            offset: None,
162            engine,
163            aggregate: None,
164            group_by: Vec::new(),
165            having: None,
166        }
167    }
168
169    /// 查找最佳可用索引(优先等值条件,其次范围条件)
170    /// 返回:(索引列名,索引对象,构建的复合键值)
171    fn find_best_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a IndexMeta, &'a BTreeIndex, Vec<DbValue>)> {
172        // 收集所有等值条件
173        let eq_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| matches!(f, FilterExpr::Eq { .. })).collect();
174
175        // 收集所有范围条件
176        let range_filters: Vec<&FilterExpr> = self.filters.iter().filter(|f| {
177            matches!(f, FilterExpr::Gt { .. } | FilterExpr::Ge { .. } | FilterExpr::Lt { .. } | FilterExpr::Le { .. })
178        }).collect();
179
180        // 使用引擎的 find_best_index 方法查找最佳复合索引
181        if !eq_filters.is_empty() {
182            let eq_columns: Vec<&str> = eq_filters.iter().map(|f| {
183                if let FilterExpr::Eq { field, .. } = f {
184                    field.as_str()
185                } else {
186                    ""
187                }
188            }).collect();
189
190            if let Some((meta, index)) = engine.find_best_index(&self.table, &eq_columns) {
191                // 构建复合键值
192                let mut key_values = Vec::new();
193                for col in &meta.columns {
194                    let value = eq_filters.iter()
195                        .find_map(|f| {
196                            if let FilterExpr::Eq { field, value } = f {
197                                if field == col { Some(value) } else { None }
198                            } else { None }
199                        });
200                    if let Some(v) = value {
201                        key_values.push(v.clone());
202                    } else {
203                        // 对于前缀匹配,后续列不需要
204                        break;
205                    }
206                }
207                if !key_values.is_empty() {
208                    return Some((meta, index, key_values));
209                }
210            }
211        }
212
213        // 检查范围条件(暂时不处理,由单列索引方法处理)
214        if let Some(_filter) = range_filters.first() {
215            // 范围条件的索引查找在 find_best_single_index 中处理
216        }
217
218        None
219    }
220
221    /// 查找最佳单列索引(向后兼容)
222    fn find_best_single_index<'a>(&'a self, engine: &'a MemoryEngine) -> Option<(&'a String, &'a BTreeIndex)> {
223        // 优先查找等值条件的字段
224        for filter in &self.filters {
225            if let FilterExpr::Eq { field, .. } = filter {
226                if let Some(index) = engine.get_index(&self.table, field) {
227                    return Some((field, index));
228                }
229            }
230        }
231        // 其次查找范围条件的字段
232        for filter in &self.filters {
233            if let FilterExpr::Gt { field, .. }
234               | FilterExpr::Ge { field, .. }
235               | FilterExpr::Lt { field, .. }
236               | FilterExpr::Le { field, .. } = filter {
237                if let Some(index) = engine.get_index(&self.table, field) {
238                    return Some((field, index));
239                }
240            }
241        }
242        None
243    }
244
245    /// 使用索引执行查询(优化版本)
246    fn execute_with_index(&self, engine: &MemoryEngine, _field: &str, index: &BTreeIndex) -> DbResult<Vec<Row>> {
247        // 从索引中获取匹配的 row_id
248        let row_ids = self.get_matching_row_ids(index);
249
250        // 根据 row_ids 回表查询完整行数据
251        let mut results = Vec::new();
252        for row_id in row_ids {
253            if let Some(row) = engine.get(&self.table, row_id)? {
254                // 需要验证所有过滤条件(因为索引只针对一个字段)
255                if self.matches_all_filters(&row) {
256                    results.push(row.clone());
257                }
258            }
259        }
260
261        // 排序
262        if let Some((field, order)) = &self.order_by {
263            results.sort_by(|a, b| {
264                self.compare_rows(a, b, field, *order)
265            });
266        }
267
268        // 分页
269        let start = self.offset.unwrap_or(0);
270        let end = start + self.limit.unwrap_or(results.len());
271
272        Ok(results.into_iter().skip(start).take(end - start).collect())
273    }
274
275    /// 使用复合索引执行查询
276    fn execute_with_composite_index(&self, engine: &MemoryEngine, _meta: &IndexMeta, index: &BTreeIndex, key_values: &[DbValue]) -> DbResult<Vec<Row>> {
277        // 从复合索引中获取匹配的 row_id
278        let row_ids = index.search_composite(key_values);
279
280        // 根据 row_ids 回表查询完整行数据
281        let mut results = Vec::new();
282        for row_id in row_ids {
283            if let Some(row) = engine.get(&self.table, row_id)? {
284                // 验证所有过滤条件
285                if self.matches_all_filters(&row) {
286                    results.push(row.clone());
287                }
288            }
289        }
290
291        // 排序
292        if let Some((field, order)) = &self.order_by {
293            results.sort_by(|a, b| {
294                self.compare_rows(a, b, field, *order)
295            });
296        }
297
298        // 分页
299        let start = self.offset.unwrap_or(0);
300        let end = start + self.limit.unwrap_or(results.len());
301
302        Ok(results.into_iter().skip(start).take(end - start).collect())
303    }
304
305    /// 根据过滤条件从索引中获取匹配的 row_ids
306    fn get_matching_row_ids(&self, index: &BTreeIndex) -> Vec<RowId> {
307        let mut row_ids = Vec::new();
308
309        // 收集所有针对同一字段的等值和范围条件
310        let mut eq_value: Option<&DbValue> = None;
311        let mut gt_value: Option<&DbValue> = None;
312        let mut ge_value: Option<&DbValue> = None;
313        let mut lt_value: Option<&DbValue> = None;
314        let mut le_value: Option<&DbValue> = None;
315
316        for filter in &self.filters {
317            match filter {
318                FilterExpr::Eq { field, value } if eq_value.is_none() => {
319                    eq_value = Some(value);
320                }
321                FilterExpr::Gt { field: _, value } => {
322                    if gt_value.is_none() || value > gt_value.unwrap() {
323                        gt_value = Some(value);
324                    }
325                }
326                FilterExpr::Ge { field: _, value } => {
327                    if ge_value.is_none() || value > ge_value.unwrap() {
328                        ge_value = Some(value);
329                    }
330                }
331                FilterExpr::Lt { field: _, value } => {
332                    if lt_value.is_none() || value < lt_value.unwrap() {
333                        lt_value = Some(value);
334                    }
335                }
336                FilterExpr::Le { field: _, value } => {
337                    if le_value.is_none() || value < le_value.unwrap() {
338                        le_value = Some(value);
339                    }
340                }
341                _ => {}
342            }
343        }
344
345        // 如果有等值条件,直接使用 search
346        if let Some(eq) = eq_value {
347            row_ids.extend(index.search(eq));
348        } else {
349            // 计算范围 [range_start, range_end)
350            let range_start = gt_value.or(ge_value);
351            let range_end = lt_value.or(le_value);
352
353            if let Some(start) = range_start {
354                // 需要调整范围边界(gt 需要 +1,但这里简化处理)
355                if let Some(end) = range_end {
356                    row_ids.extend(index.range(start, end));
357                } else {
358                    row_ids.extend(index.range_from(start));
359                }
360            } else if let Some(_end) = range_end {
361                // 只有上界,从最小值开始
362                // 这里需要一个最小值,简化处理:使用 range_from 然后过滤
363                // 由于 DbValue 没有明确的最小值,我们直接扫描
364                // 这种情况较少,暂不优化
365            } else {
366                // 没有范围条件,返回所有
367                // 这不应该发生,因为调用者会先检查是否有可用索引
368            }
369        }
370
371        row_ids
372    }
373
374    /// 检查行是否匹配所有过滤条件
375    fn matches_all_filters(&self, row: &Row) -> bool {
376        self.filters.iter().all(|expr| evaluate_filter(expr, row))
377    }
378
379    /// 比较两行(用于排序)
380    fn compare_rows(&self, a: &Row, b: &Row, field: &str, order: Order) -> std::cmp::Ordering {
381        let a_val = a.get(field);
382        let b_val = b.get(field);
383
384        let cmp = match (a_val, b_val) {
385            (Some(DbValue::Integer(a)), Some(DbValue::Integer(b))) => a.partial_cmp(b),
386            (Some(DbValue::Real(a)), Some(DbValue::Real(b))) => a.partial_cmp(b),
387            (Some(DbValue::Integer(a)), Some(DbValue::Real(b))) => (*a as f64).partial_cmp(b),
388            (Some(DbValue::Real(a)), Some(DbValue::Integer(b))) => a.partial_cmp(&(*b as f64)),
389            (Some(DbValue::Text(a)), Some(DbValue::Text(b))) => Some(a.cmp(b)),
390            _ => Some(std::cmp::Ordering::Equal),
391        };
392
393        match order {
394            Order::Asc => cmp.unwrap_or(std::cmp::Ordering::Equal),
395            Order::Desc => cmp.unwrap_or(std::cmp::Ordering::Equal).reverse(),
396        }
397    }
398
399    // ==================== JOIN 相关辅助方法 ====================
400
401    /// 字段名前缀化:"name" -> "table.name"
402    fn prefix_row(&self, row: &Row, table: &str) -> Row {
403        row.iter()
404            .map(|(k, v)| (format!("{}.{}", table, k), v.clone()))
405            .collect::<Row>()
406    }
407
408    /// 创建 NULL 行(用于 LEFT/RIGHT JOIN 无匹配时)
409    fn create_null_row(&self, engine: &MemoryEngine, table: &str) -> DbResult<Row> {
410        let schema = engine.get_schema(table)?;
411        let mut row = Row::new();
412        for column in &schema.columns {
413            row.insert(format!("{}.{}", table, column.name), DbValue::Null);
414        }
415        Ok(row)
416    }
417
418    /// 匹配 JOIN 条件
419    fn match_join_condition(&self, left: &Row, right: &Row, join: &JoinCondition) -> bool {
420        let left_val = left.get(&join.left_field);
421        let right_val = right.get(&join.right_field);
422        left_val == right_val
423    }
424
425    /// 从 qualified field name 提取列名
426    /// "table.column" -> "column"
427    /// "column" -> "column"
428    fn extract_column_name(field: &str) -> &str {
429        if let Some(pos) = field.rfind('.') {
430            &field[pos + 1..]
431        } else {
432            field
433        }
434    }
435
436    /// 优化 JOIN 顺序
437    /// 策略:选择最小的表作为驱动表,减少中间结果集
438    fn optimize_join_order(&self, engine: &MemoryEngine) -> DbResult<Vec<JoinCondition>> {
439        // 1. 收集所有涉及的表及其大小
440        let mut tables: Vec<(String, usize)> = Vec::new();
441
442        // 主表
443        let main_count = engine.get_row_count(&self.table)?;
444        tables.push((self.table.clone(), main_count));
445
446        // JOIN 表
447        for join in &self.joins {
448            if !tables.iter().any(|(name, _)| name == &join.right_table) {
449                let count = engine.get_row_count(&join.right_table)?;
450                tables.push((join.right_table.clone(), count));
451            }
452        }
453
454        // 2. 按表大小排序(从小到大)
455        tables.sort_by_key(|(_, count)| *count);
456
457        // 3. 找出最小的右表
458        let smallest_table = tables.first()
459            .map(|(name, _)| name.clone());
460
461        // 4. 重排 JOIN 顺序:将最小表对应的 JOIN 交换到前面
462        let mut optimized_joins = self.joins.clone();
463
464        if let Some(smallest) = smallest_table {
465            // 找到包含最小表的 JOIN
466            if let Some(min_idx) = optimized_joins.iter().position(|j| j.right_table == smallest) {
467                optimized_joins.swap(0, min_idx);
468            }
469        }
470
471        Ok(optimized_joins)
472    }
473
474    /// 执行 JOIN 查询(嵌套循环连接)
475    fn execute_join(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
476        if self.joins.is_empty() {
477            return self.execute_simple_scan(engine);
478        }
479
480        // 检查是否有 RIGHT JOIN 或 FULL OUTER JOIN
481        let has_right_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Right));
482        let has_full_join = self.joins.iter().any(|j| matches!(j.join_type, JoinType::Full));
483
484        // 从第一个 JOIN 开始,逐步连接所有表
485        let mut results: Vec<Row> = Vec::new();
486
487        if has_right_join || has_full_join {
488            // RIGHT JOIN 和 FULL OUTER JOIN 需要特殊处理:以右表为驱动表
489            self.execute_right_or_full_join(engine, &mut results)?;
490        } else {
491            // 优化 JOIN 顺序:选择最小的表作为驱动表
492            let optimized_joins = self.optimize_join_order(engine)?;
493
494            // 扫描主表
495            let main_table_rows = engine.scan(&self.table)?;
496
497            for (_row_id, main_row) in main_table_rows {
498                let main_prefixed = self.prefix_row(&main_row, &self.table);
499
500                // 使用优化后的 JOIN 顺序
501                self.process_joins_with_order(engine, main_prefixed, 0, &mut results, &optimized_joins)?;
502            }
503        }
504
505        // 应用过滤条件
506        let mut filtered: Vec<Row> = results
507            .into_iter()
508            .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
509            .collect();
510
511        // 应用字段选择(投影)
512        if !self.selected_columns.is_empty() {
513            filtered = filtered
514                .into_iter()
515                .map(|row| {
516                    let mut projected = Row::new();
517                    for col in &self.selected_columns {
518                        if let Some(value) = row.get(col) {
519                            projected.insert(col.clone(), value.clone());
520                        }
521                    }
522                    projected
523                })
524                .collect();
525        }
526
527        // 排序
528        if let Some((ref field, order)) = self.order_by {
529            filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
530        }
531
532        // 分页
533        let start = self.offset.unwrap_or(0);
534        let end = start + self.limit.unwrap_or(filtered.len());
535
536        Ok(filtered.into_iter().skip(start).take(end - start).collect())
537    }
538
539    /// 执行 RIGHT JOIN 或 FULL OUTER JOIN(以右表为驱动表)
540    fn execute_right_or_full_join(&self, engine: &MemoryEngine, results: &mut Vec<Row>) -> DbResult<()> {
541        // 找到第一个 RIGHT JOIN 或 FULL OUTER JOIN
542        let right_or_full_join_index = self.joins.iter().position(|j| {
543            matches!(j.join_type, JoinType::Right | JoinType::Full)
544        });
545
546        if let Some(join_idx) = right_or_full_join_index {
547            let join = &self.joins[join_idx];
548            let is_full = matches!(join.join_type, JoinType::Full);
549
550            // 扫描右表
551            let right_table_rows = engine.scan(&join.right_table)?;
552
553            // 扫描左表用于匹配
554            let left_table_rows = if join_idx == 0 {
555                engine.scan(&self.table)?
556            } else {
557                // 多表 JOIN 场景:左表是前面 JOIN 的结果
558                // 简化处理:先不支持复杂的链式 RIGHT/FULL JOIN
559                Vec::new()
560            };
561
562            // 跟踪已匹配的左表行(用于 FULL OUTER JOIN)
563            let mut matched_left_row_ids: std::collections::HashSet<RowId> = std::collections::HashSet::new();
564
565            // 处理右表的每一行
566            for (_right_id, right_row) in right_table_rows {
567                let right_prefixed = self.prefix_row(&right_row, &join.right_table);
568
569                // 为每个右表行查找左表匹配
570                let mut has_match = false;
571
572                for (left_id, left_row) in &left_table_rows {
573                    let left_prefixed = self.prefix_row(&left_row, &self.table);
574
575                    if self.match_join_condition(&left_prefixed, &right_prefixed, join) {
576                        has_match = true;
577                        matched_left_row_ids.insert(*left_id);
578                        let mut merged = left_prefixed.clone();
579                        merged.extend(right_prefixed.clone());
580
581                        // 处理后续 JOIN(如果有)
582                        self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
583                    }
584                }
585
586                // 无匹配:左表为 NULL,右表保留
587                if !has_match {
588                    let null_row = self.create_null_row(engine, &self.table)?;
589                    let mut merged = null_row;
590                    merged.extend(right_prefixed);
591
592                    // 处理后续 JOIN
593                    self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
594                }
595            }
596
597            // FULL OUTER JOIN:处理左表中未匹配的行
598            if is_full {
599                for (left_id, left_row) in &left_table_rows {
600                    if !matched_left_row_ids.contains(left_id) {
601                        let left_prefixed = self.prefix_row(&left_row, &self.table);
602                        let null_row = self.create_null_row(engine, &join.right_table)?;
603                        let mut merged = left_prefixed;
604                        merged.extend(null_row);
605
606                        // 处理后续 JOIN
607                        self.process_joins_right_full(engine, merged, join_idx + 1, results, is_full)?;
608                    }
609                }
610            }
611        }
612
613        Ok(())
614    }
615
616    /// 递归处理 JOIN 链(用于 RIGHT/FULL JOIN)
617    fn process_joins_right_full(
618        &self,
619        engine: &MemoryEngine,
620        current_row: Row,
621        join_index: usize,
622        results: &mut Vec<Row>,
623        is_full_context: bool,
624    ) -> DbResult<()> {
625        if join_index >= self.joins.len() {
626            results.push(current_row);
627            return Ok(());
628        }
629
630        let join = &self.joins[join_index];
631
632        // 如果是 FULL OUTER JOIN 上下文,且当前也是 FULL OUTER JOIN
633        if is_full_context && matches!(join.join_type, JoinType::Full) {
634            // 简化处理:将 FULL OUTER JOIN 视为 LEFT JOIN 处理后续
635            // 完整的实现需要更复杂的状态跟踪
636        }
637
638        // 尝试使用索引优化
639        let right_column = Self::extract_column_name(&join.right_field);
640        if let Some(index) = engine.get_index(&join.right_table, right_column) {
641            // 有索引,使用索引优化
642            self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
643        } else {
644            // 无索引,降级为全表扫描
645            self.process_join_scan_impl(engine, current_row, join_index, results, join)
646        }
647    }
648
649    /// 递归处理 JOIN 链(使用优化后的顺序)
650    fn process_joins_with_order(
651        &self,
652        engine: &MemoryEngine,
653        current_row: Row,
654        join_index: usize,
655        results: &mut Vec<Row>,
656        optimized_joins: &[JoinCondition],
657    ) -> DbResult<()> {
658        if join_index >= optimized_joins.len() {
659            // 所有 JOIN 处理完毕,添加结果
660            results.push(current_row);
661            return Ok(());
662        }
663
664        let join = &optimized_joins[join_index];
665
666        // RIGHT JOIN 需要特殊处理
667        if matches!(join.join_type, JoinType::Right) {
668            return self.process_joins_with_order(engine, current_row, join_index + 1, results, optimized_joins);
669        }
670
671        // 尝试使用索引优化
672        let right_column = Self::extract_column_name(&join.right_field);
673        if let Some(index) = engine.get_index(&join.right_table, right_column) {
674            self.process_join_with_index_impl_optimized(engine, current_row, join_index, results, join, right_column, index, optimized_joins)
675        } else {
676            self.process_join_scan_impl_optimized(engine, current_row, join_index, results, join, optimized_joins)
677        }
678    }
679
680    /// 使用全表扫描执行 JOIN(优化版本,支持自定义 JOIN 顺序)
681    fn process_join_scan_impl_optimized(
682        &self,
683        engine: &MemoryEngine,
684        current_row: Row,
685        join_index: usize,
686        results: &mut Vec<Row>,
687        join: &JoinCondition,
688        optimized_joins: &[JoinCondition],
689    ) -> DbResult<()> {
690        let right_rows = engine.scan(&join.right_table)?;
691
692        let mut has_match = false;
693
694        for (_right_id, right_row) in right_rows {
695            let right_prefixed = self.prefix_row(&right_row, &join.right_table);
696
697            // 检查 JOIN 条件
698            if self.match_join_condition(&current_row, &right_prefixed, join) {
699                has_match = true;
700                // 合并行
701                let mut merged = current_row.clone();
702                merged.extend(right_prefixed);
703
704                // 递归处理下一个 JOIN(使用优化后的顺序)
705                self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
706            }
707        }
708
709        // 处理 LEFT JOIN 无匹配情况
710        if !has_match && matches!(join.join_type, JoinType::Left) {
711            let null_row = self.create_null_row(engine, &join.right_table)?;
712            let mut merged = current_row.clone();
713            merged.extend(null_row);
714
715            // 递归处理下一个 JOIN(使用优化后的顺序)
716            self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
717        }
718
719        Ok(())
720    }
721
722    /// 使用索引执行 JOIN(优化版本,支持自定义 JOIN 顺序)
723    fn process_join_with_index_impl_optimized(
724        &self,
725        engine: &MemoryEngine,
726        current_row: Row,
727        join_index: usize,
728        results: &mut Vec<Row>,
729        join: &JoinCondition,
730        _right_column: &str,
731        index: &BTreeIndex,
732        optimized_joins: &[JoinCondition],
733    ) -> DbResult<()> {
734        // 从左表获取 JOIN 字段值
735        let join_value = match current_row.get(&join.left_field) {
736            Some(v) => v,
737            None => return Ok(()),  // 左表字段为 NULL,无法匹配
738        };
739
740        // 使用索引查找匹配的 RowId(O(log n))
741        let matching_row_ids = index.search(join_value);
742
743        let mut has_match = false;
744
745        for row_id in matching_row_ids {
746            has_match = true;
747            if let Some(right_row) = engine.get(&join.right_table, row_id)? {
748                let right_prefixed = self.prefix_row(&right_row, &join.right_table);
749                let mut merged = current_row.clone();
750                merged.extend(right_prefixed);
751
752                // 递归处理下一个 JOIN(使用优化后的顺序)
753                self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
754            }
755        }
756
757        // LEFT JOIN 无匹配处理
758        if !has_match && matches!(join.join_type, JoinType::Left) {
759            let null_row = self.create_null_row(engine, &join.right_table)?;
760            let mut merged = current_row.clone();
761            merged.extend(null_row);
762
763            // 递归处理下一个 JOIN(使用优化后的顺序)
764            self.process_joins_with_order(engine, merged, join_index + 1, results, optimized_joins)?;
765        }
766
767        Ok(())
768    }
769
770    /// 递归处理 JOIN 链
771    fn process_joins(
772        &self,
773        engine: &MemoryEngine,
774        current_row: Row,
775        join_index: usize,
776        results: &mut Vec<Row>,
777    ) -> DbResult<()> {
778        if join_index >= self.joins.len() {
779            // 所有 JOIN 处理完毕,添加结果
780            results.push(current_row);
781            return Ok(());
782        }
783
784        let join = &self.joins[join_index];
785
786        // RIGHT JOIN 需要特殊处理(在 execute_right_join 中已处理)
787        // 这里处理 INNER JOIN 和 LEFT JOIN
788        if matches!(join.join_type, JoinType::Right) {
789            // RIGHT JOIN 已在 execute_right_join 中处理,这里只需要处理后续的非 RIGHT JOIN
790            // 递归处理下一个 JOIN
791            return self.process_joins(engine, current_row, join_index + 1, results);
792        }
793
794        // 尝试使用索引优化
795        let right_column = Self::extract_column_name(&join.right_field);
796        if let Some(index) = engine.get_index(&join.right_table, right_column) {
797            // 有索引,使用索引优化
798            self.process_join_with_index_impl(engine, current_row, join_index, results, join, right_column, index)
799        } else {
800            // 无索引,降级为全表扫描
801            self.process_join_scan_impl(engine, current_row, join_index, results, join)
802        }
803    }
804
805    /// 使用全表扫描执行 JOIN(降级方案)
806    fn process_join_scan_impl(
807        &self,
808        engine: &MemoryEngine,
809        current_row: Row,
810        join_index: usize,
811        results: &mut Vec<Row>,
812        join: &JoinCondition,
813    ) -> DbResult<()> {
814        let right_rows = engine.scan(&join.right_table)?;
815
816        let mut has_match = false;
817
818        for (_right_id, right_row) in right_rows {
819            let right_prefixed = self.prefix_row(&right_row, &join.right_table);
820
821            // 检查 JOIN 条件
822            if self.match_join_condition(&current_row, &right_prefixed, join) {
823                has_match = true;
824                // 合并行
825                let mut merged = current_row.clone();
826                merged.extend(right_prefixed);
827
828                // 递归处理下一个 JOIN
829                self.process_joins(engine, merged, join_index + 1, results)?;
830            }
831        }
832
833        // 处理 LEFT JOIN 无匹配情况
834        if !has_match && matches!(join.join_type, JoinType::Left) {
835            let null_row = self.create_null_row(engine, &join.right_table)?;
836            let mut merged = current_row.clone();
837            merged.extend(null_row);
838
839            // 递归处理下一个 JOIN
840            self.process_joins(engine, merged, join_index + 1, results)?;
841        }
842
843        Ok(())
844    }
845
846    /// 使用索引执行 JOIN(优化方案)
847    fn process_join_with_index_impl(
848        &self,
849        engine: &MemoryEngine,
850        current_row: Row,
851        join_index: usize,
852        results: &mut Vec<Row>,
853        join: &JoinCondition,
854        _right_column: &str,
855        index: &BTreeIndex,
856    ) -> DbResult<()> {
857        // 从左表获取 JOIN 字段值
858        let join_value = match current_row.get(&join.left_field) {
859            Some(v) => v,
860            None => return Ok(()),  // 左表字段为 NULL,无法匹配
861        };
862
863        // 使用索引查找匹配的 RowId(O(log n))
864        let matching_row_ids = index.search(join_value);
865
866        let mut has_match = false;
867
868        for row_id in matching_row_ids {
869            has_match = true;
870            if let Some(right_row) = engine.get(&join.right_table, row_id)? {
871                let right_prefixed = self.prefix_row(&right_row, &join.right_table);
872                let mut merged = current_row.clone();
873                for (k, v) in right_prefixed {
874                    merged.insert(k, v);
875                }
876
877                // 递归处理下一个 JOIN
878                self.process_joins(engine, merged, join_index + 1, results)?;
879            }
880        }
881
882        // LEFT JOIN 无匹配处理
883        if !has_match && matches!(join.join_type, JoinType::Left) {
884            let null_row = self.create_null_row(engine, &join.right_table)?;
885            let mut merged = current_row.clone();
886            for (k, v) in null_row {
887                merged.insert(k, v);
888            }
889
890            // 递归处理下一个 JOIN
891            self.process_joins(engine, merged, join_index + 1, results)?;
892        }
893
894        Ok(())
895    }
896
897    /// 简单全表扫描(无 JOIN 时)
898    fn execute_simple_scan(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
899        let rows = engine.scan(&self.table)?;
900
901        let mut filtered: Vec<Row> = rows
902            .into_iter()
903            .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
904            .map(|(_, row)| row.clone())
905            .collect();
906
907        // 应用字段选择
908        if !self.selected_columns.is_empty() {
909            filtered = filtered
910                .into_iter()
911                .map(|row| {
912                    let mut projected = Row::new();
913                    for col in &self.selected_columns {
914                        if let Some(value) = row.get(col) {
915                            projected.insert(col.clone(), value.clone());
916                        }
917                    }
918                    projected
919                })
920                .collect();
921        }
922
923        // 排序
924        if let Some((ref field, order)) = self.order_by {
925            filtered.sort_by(|a, b| self.compare_rows(a, b, field, order));
926        }
927
928        // 分页
929        let start = self.offset.unwrap_or(0);
930        let end = start + self.limit.unwrap_or(filtered.len());
931
932        Ok(filtered.into_iter().skip(start).take(end - start).collect())
933    }
934
935    // 过滤条件
936    pub fn eq(mut self, field: &str, value: DbValue) -> Self {
937        self.filters.push(FilterExpr::Eq {
938            field: field.to_string(),
939            value,
940        });
941        self
942    }
943
944    pub fn ne(mut self, field: &str, value: DbValue) -> Self {
945        self.filters.push(FilterExpr::Ne {
946            field: field.to_string(),
947            value,
948        });
949        self
950    }
951
952    pub fn lt(mut self, field: &str, value: DbValue) -> Self {
953        self.filters.push(FilterExpr::Lt {
954            field: field.to_string(),
955            value,
956        });
957        self
958    }
959
960    pub fn le(mut self, field: &str, value: DbValue) -> Self {
961        self.filters.push(FilterExpr::Le {
962            field: field.to_string(),
963            value,
964        });
965        self
966    }
967
968    pub fn gt(mut self, field: &str, value: DbValue) -> Self {
969        self.filters.push(FilterExpr::Gt {
970            field: field.to_string(),
971            value,
972        });
973        self
974    }
975
976    pub fn ge(mut self, field: &str, value: DbValue) -> Self {
977        self.filters.push(FilterExpr::Ge {
978            field: field.to_string(),
979            value,
980        });
981        self
982    }
983
984    pub fn in_list(mut self, field: &str, values: Vec<DbValue>) -> Self {
985        self.filters.push(FilterExpr::In {
986            field: field.to_string(),
987            values,
988        });
989        self
990    }
991
992    pub fn contains(mut self, field: &str, value: &str) -> Self {
993        self.filters.push(FilterExpr::Contains {
994            field: field.to_string(),
995            value: value.to_string(),
996        });
997        self
998    }
999
1000    // 逻辑组合
1001    pub fn and(mut self, other: QueryBuilder) -> Self {
1002        // 合并另一个 QueryBuilder 的过滤条件
1003        for filter in other.filters {
1004            self.filters.push(filter);
1005        }
1006        self
1007    }
1008
1009    // 排序
1010    pub fn order_by(mut self, field: &str, order: Order) -> Self {
1011        self.order_by = Some((field.to_string(), order));
1012        self
1013    }
1014
1015    // 分页
1016    pub fn limit(mut self, limit: usize) -> Self {
1017        self.limit = Some(limit);
1018        self
1019    }
1020
1021    pub fn offset(mut self, offset: usize) -> Self {
1022        self.offset = Some(offset);
1023        self
1024    }
1025
1026    // JOIN 操作
1027    pub fn inner_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1028        self.joins.push(JoinCondition {
1029            join_type: JoinType::Inner,
1030            right_table: right_table.to_string(),
1031            left_field: left_field.to_string(),
1032            right_field: right_field.to_string(),
1033        });
1034        self
1035    }
1036
1037    pub fn left_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1038        self.joins.push(JoinCondition {
1039            join_type: JoinType::Left,
1040            right_table: right_table.to_string(),
1041            left_field: left_field.to_string(),
1042            right_field: right_field.to_string(),
1043        });
1044        self
1045    }
1046
1047    pub fn right_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1048        self.joins.push(JoinCondition {
1049            join_type: JoinType::Right,
1050            right_table: right_table.to_string(),
1051            left_field: left_field.to_string(),
1052            right_field: right_field.to_string(),
1053        });
1054        self
1055    }
1056
1057    pub fn full_join(mut self, right_table: &str, left_field: &str, right_field: &str) -> Self {
1058        self.joins.push(JoinCondition {
1059            join_type: JoinType::Full,
1060            right_table: right_table.to_string(),
1061            left_field: left_field.to_string(),
1062            right_field: right_field.to_string(),
1063        });
1064        self
1065    }
1066
1067    // 选择字段(支持 qualified names)
1068    pub fn select(mut self, columns: &[&str]) -> Self {
1069        self.selected_columns = columns.iter().map(|s| s.to_string()).collect();
1070        self
1071    }
1072
1073    // ==================== 聚合函数方法 ====================
1074
1075    /// COUNT(*) - 统计行数
1076    pub fn count(mut self) -> Self {
1077        self.aggregate = Some(AggregateExpr {
1078            func: AggregateFunction::Count,
1079            column: None,  // COUNT(*) 不需要列名
1080            alias: None,
1081        });
1082        self
1083    }
1084
1085    /// COUNT(column) - 统计非 NULL 行数
1086    pub fn count_column(mut self, column: &str) -> Self {
1087        self.aggregate = Some(AggregateExpr {
1088            func: AggregateFunction::Count,
1089            column: Some(column.to_string()),
1090            alias: None,
1091        });
1092        self
1093    }
1094
1095    /// SUM(column) - 求和
1096    pub fn sum(mut self, column: &str) -> Self {
1097        self.aggregate = Some(AggregateExpr {
1098            func: AggregateFunction::Sum,
1099            column: Some(column.to_string()),
1100            alias: None,
1101        });
1102        self
1103    }
1104
1105    /// AVG(column) - 平均值
1106    pub fn avg(mut self, column: &str) -> Self {
1107        self.aggregate = Some(AggregateExpr {
1108            func: AggregateFunction::Avg,
1109            column: Some(column.to_string()),
1110            alias: None,
1111        });
1112        self
1113    }
1114
1115    /// MAX(column) - 最大值
1116    pub fn max(mut self, column: &str) -> Self {
1117        self.aggregate = Some(AggregateExpr {
1118            func: AggregateFunction::Max,
1119            column: Some(column.to_string()),
1120            alias: None,
1121        });
1122        self
1123    }
1124
1125    /// MIN(column) - 最小值
1126    pub fn min(mut self, column: &str) -> Self {
1127        self.aggregate = Some(AggregateExpr {
1128            func: AggregateFunction::Min,
1129            column: Some(column.to_string()),
1130            alias: None,
1131        });
1132        self
1133    }
1134
1135    /// 设置聚合结果别名
1136    pub fn alias(mut self, name: &str) -> Self {
1137        if let Some(ref mut agg) = self.aggregate {
1138            agg.alias = Some(name.to_string());
1139        }
1140        self
1141    }
1142
1143    // ==================== GROUP BY 和 HAVING ====================
1144
1145    /// GROUP BY 子句
1146    pub fn group_by(mut self, columns: &[&str]) -> Self {
1147        self.group_by = columns.iter().map(|s| s.to_string()).collect();
1148        self
1149    }
1150
1151    /// HAVING 子句 - 等于
1152    pub fn having_eq(mut self, value: DbValue) -> Self {
1153        self.having = Some(HavingExpr::Eq { value });
1154        self
1155    }
1156
1157    /// HAVING 子句 - 不等于
1158    pub fn having_ne(mut self, value: DbValue) -> Self {
1159        self.having = Some(HavingExpr::Ne { value });
1160        self
1161    }
1162
1163    /// HAVING 子句 - 大于
1164    pub fn having_gt(mut self, value: DbValue) -> Self {
1165        self.having = Some(HavingExpr::Gt { value });
1166        self
1167    }
1168
1169    /// HAVING 子句 - 大于等于
1170    pub fn having_ge(mut self, value: DbValue) -> Self {
1171        self.having = Some(HavingExpr::Ge { value });
1172        self
1173    }
1174
1175    /// HAVING 子句 - 小于
1176    pub fn having_lt(mut self, value: DbValue) -> Self {
1177        self.having = Some(HavingExpr::Lt { value });
1178        self
1179    }
1180
1181    /// HAVING 子句 - 小于等于
1182    pub fn having_le(mut self, value: DbValue) -> Self {
1183        self.having = Some(HavingExpr::Le { value });
1184        self
1185    }
1186
1187    /// 执行查询
1188    pub fn execute(self) -> DbResult<Vec<Row>> {
1189        let engine = self.engine.read().unwrap();
1190
1191        // 检查是否有聚合函数
1192        if self.aggregate.is_some() || !self.group_by.is_empty() {
1193            return self.execute_aggregate(&engine);
1194        }
1195
1196        // 检查是否有 JOIN
1197        if !self.joins.is_empty() {
1198            return self.execute_join(&engine);
1199        }
1200
1201        // 尝试查找最佳复合索引
1202        if let Some((meta, index, key_values)) = self.find_best_index(&engine) {
1203            // 使用复合索引优化查询
1204            return self.execute_with_composite_index(&engine, meta, index, &key_values);
1205        }
1206
1207        // 尝试查找最佳单列索引(向后兼容)
1208        if let Some((field, index)) = self.find_best_single_index(&engine) {
1209            // 使用单列索引优化查询
1210            return self.execute_with_index(&engine, field, index);
1211        }
1212
1213        // 降级为全表扫描
1214        let rows = engine.scan(&self.table)?;
1215
1216        // 过滤
1217        let mut filtered: Vec<Row> = rows
1218            .into_iter()
1219            .filter(|(_, row)| {
1220                self.filters.iter().all(|expr| evaluate_filter(expr, row))
1221            })
1222            .map(|(_, row)| row.clone())
1223            .collect();
1224
1225        // 排序
1226        if let Some((ref field, order)) = self.order_by {
1227            filtered.sort_by(|a, b| {
1228                self.compare_rows(a, b, field, order)
1229            });
1230        }
1231
1232        // 分页
1233        let start = self.offset.unwrap_or(0);
1234        let end = start + self.limit.unwrap_or(filtered.len());
1235
1236        Ok(filtered.into_iter().skip(start).take(end - start).collect())
1237    }
1238
1239    /// 执行聚合查询
1240    fn execute_aggregate(&self, engine: &MemoryEngine) -> DbResult<Vec<Row>> {
1241        // 获取数据(考虑 JOIN 和过滤)
1242        let rows: Vec<Row> = if !self.joins.is_empty() {
1243            // 有 JOIN 时,先执行 JOIN
1244            let join_rows = self.execute_join(engine)?;
1245            // 应用过滤条件
1246            join_rows.into_iter()
1247                .filter(|row| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1248                .collect()
1249        } else {
1250            // 无 JOIN 时,直接扫描并过滤
1251            let scanned = engine.scan(&self.table)?;
1252            scanned.into_iter()
1253                .filter(|(_, row)| self.filters.iter().all(|expr| evaluate_filter(expr, row)))
1254                .map(|(_, row)| row.clone())
1255                .collect()
1256        };
1257
1258        // 处理 GROUP BY
1259        if !self.group_by.is_empty() {
1260            self.execute_grouped_aggregate(rows)
1261        } else {
1262            // 无 GROUP BY,全局聚合
1263            self.execute_simple_aggregate(rows)
1264        }
1265    }
1266
1267    /// 执行简单聚合(无 GROUP BY)
1268    fn execute_simple_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1269        let agg = match &self.aggregate {
1270            Some(a) => a,
1271            None => {
1272                // 没有聚合函数,返回原始数据(不应该发生)
1273                return Ok(rows);
1274            }
1275        };
1276
1277        let result_value = match agg.func {
1278            AggregateFunction::Count => {
1279                let count = match &agg.column {
1280                    Some(col) => {
1281                        // COUNT(column) - 统计非 NULL 值
1282                        rows.iter()
1283                            .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1284                            .count() as i64
1285                    }
1286                    None => {
1287                        // COUNT(*) - 统计所有行
1288                        rows.len() as i64
1289                    }
1290                };
1291                DbValue::Integer(count)
1292            }
1293            AggregateFunction::Sum => {
1294                match &agg.column {
1295                    Some(col) => self.sum_column(&rows, col),
1296                    None => DbValue::Integer(0),
1297                }
1298            }
1299            AggregateFunction::Avg => {
1300                match &agg.column {
1301                    Some(col) => self.avg_column(&rows, col),
1302                    None => DbValue::Null,
1303                }
1304            }
1305            AggregateFunction::Max => {
1306                match &agg.column {
1307                    Some(col) => self.max_column(&rows, col),
1308                    None => DbValue::Null,
1309                }
1310            }
1311            AggregateFunction::Min => {
1312                match &agg.column {
1313                    Some(col) => self.min_column(&rows, col),
1314                    None => DbValue::Null,
1315                }
1316            }
1317        };
1318
1319        // 构建结果行
1320        let mut result_row = Row::new();
1321        let column_name = match &agg.alias {
1322            Some(alias) => alias.clone(),
1323            None => {
1324                let func_name = match agg.func {
1325                    AggregateFunction::Count => "COUNT",
1326                    AggregateFunction::Sum => "SUM",
1327                    AggregateFunction::Avg => "AVG",
1328                    AggregateFunction::Max => "MAX",
1329                    AggregateFunction::Min => "MIN",
1330                };
1331                match &agg.column {
1332                    Some(col) => format!("{}({})", func_name, col),
1333                    None => "COUNT(*)".to_string(),
1334                }
1335            }
1336        };
1337
1338        // 将 i64 转换为 DbValue
1339        let db_value = match result_value {
1340            DbValue::Integer(i) => DbValue::Integer(i),
1341            DbValue::Real(r) => DbValue::Real(r),
1342            DbValue::Null => DbValue::Null,
1343            _ => DbValue::Null,
1344        };
1345        result_row.insert(column_name, db_value);
1346
1347        Ok(vec![result_row])
1348    }
1349
1350    /// 执行分组聚合(有 GROUP BY)
1351    fn execute_grouped_aggregate(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1352        let agg = match &self.aggregate {
1353            Some(a) => a,
1354            None => {
1355                // 没有聚合函数,只返回 GROUP BY 列的不同组合
1356                return self.execute_group_by_only(rows);
1357            }
1358        };
1359
1360        // 按 GROUP BY 列分组
1361        use std::collections::HashMap;
1362        let mut groups: HashMap<Vec<DbValue>, Vec<Row>> = HashMap::new();
1363
1364        for row in rows {
1365            // 构建分组键
1366            let mut key = Vec::new();
1367            for col in &self.group_by {
1368                if let Some(val) = row.get(col) {
1369                    key.push(val.clone());
1370                } else {
1371                    key.push(DbValue::Null);
1372                }
1373            }
1374
1375            groups.entry(key).or_insert_with(Vec::new).push(row);
1376        }
1377
1378        // 对每组计算聚合函数
1379        let mut results = Vec::new();
1380        for (key, group_rows) in groups {
1381            // 计算聚合值
1382            let agg_value = match agg.func {
1383                AggregateFunction::Count => {
1384                    let count = match &agg.column {
1385                        Some(col) => {
1386                            group_rows.iter()
1387                                .filter(|r| r.get(col).map(|v| !v.is_null()).unwrap_or(false))
1388                                .count() as i64
1389                        }
1390                        None => group_rows.len() as i64,
1391                    };
1392                    DbValue::Integer(count)
1393                }
1394                AggregateFunction::Sum => {
1395                    match &agg.column {
1396                        Some(col) => self.sum_column(&group_rows, col),
1397                        None => DbValue::Integer(0),
1398                    }
1399                }
1400                AggregateFunction::Avg => {
1401                    match &agg.column {
1402                        Some(col) => self.avg_column(&group_rows, col),
1403                        None => DbValue::Null,
1404                    }
1405                }
1406                AggregateFunction::Max => {
1407                    match &agg.column {
1408                        Some(col) => self.max_column(&group_rows, col),
1409                        None => DbValue::Null,
1410                    }
1411                }
1412                AggregateFunction::Min => {
1413                    match &agg.column {
1414                        Some(col) => self.min_column(&group_rows, col),
1415                        None => DbValue::Null,
1416                    }
1417                }
1418            };
1419
1420            // 应用 HAVING 过滤
1421            if let Some(ref having) = self.having {
1422                if !self.evaluate_having(&agg_value, having) {
1423                    continue;
1424                }
1425            }
1426
1427            // 构建结果行
1428            let mut result_row = Row::new();
1429
1430            // 添加 GROUP BY 列
1431            for (i, col) in self.group_by.iter().enumerate() {
1432                result_row.insert(col.clone(), key[i].clone());
1433            }
1434
1435            // 添加聚合结果
1436            let column_name = match &agg.alias {
1437                Some(alias) => alias.clone(),
1438                None => {
1439                    let func_name = match agg.func {
1440                        AggregateFunction::Count => "COUNT",
1441                        AggregateFunction::Sum => "SUM",
1442                        AggregateFunction::Avg => "AVG",
1443                        AggregateFunction::Max => "MAX",
1444                        AggregateFunction::Min => "MIN",
1445                    };
1446                    match &agg.column {
1447                        Some(col) => format!("{}({})", func_name, col),
1448                        None => "COUNT(*)".to_string(),
1449                    }
1450                }
1451            };
1452
1453            let db_value = match agg_value {
1454                DbValue::Integer(i) => DbValue::Integer(i),
1455                DbValue::Real(r) => DbValue::Real(r),
1456                DbValue::Null => DbValue::Null,
1457                _ => DbValue::Null,
1458            };
1459            result_row.insert(column_name, db_value);
1460
1461            results.push(result_row);
1462        }
1463
1464        Ok(results)
1465    }
1466
1467    /// 仅执行 GROUP BY(没有聚合函数)
1468    fn execute_group_by_only(&self, rows: Vec<Row>) -> DbResult<Vec<Row>> {
1469        use std::collections::HashSet;
1470
1471        let mut seen: HashSet<Vec<DbValue>> = HashSet::new();
1472        let mut results = Vec::new();
1473
1474        for row in rows {
1475            let mut key = Vec::new();
1476            for col in &self.group_by {
1477                if let Some(val) = row.get(col) {
1478                    key.push(val.clone());
1479                } else {
1480                    key.push(DbValue::Null);
1481                }
1482            }
1483
1484            if !seen.contains(&key) {
1485                seen.insert(key.clone());
1486
1487                let mut result_row = Row::new();
1488                for (i, col) in self.group_by.iter().enumerate() {
1489                    result_row.insert(col.clone(), key[i].clone());
1490                }
1491                results.push(result_row);
1492            }
1493        }
1494
1495        Ok(results)
1496    }
1497
1498    /// 计算列的总和
1499    fn sum_column(&self, rows: &[Row], column: &str) -> DbValue {
1500        let mut sum: f64 = 0.0;
1501        let mut has_value = false;
1502
1503        for row in rows {
1504            if let Some(val) = row.get(column) {
1505                match val {
1506                    DbValue::Integer(i) => {
1507                        sum += *i as f64;
1508                        has_value = true;
1509                    }
1510                    DbValue::Real(r) => {
1511                        sum += *r;
1512                        has_value = true;
1513                    }
1514                    _ => {}
1515                }
1516            }
1517        }
1518
1519        if has_value {
1520            // 如果所有值都是整数,返回整数
1521            if rows.iter().filter_map(|r| r.get(column)).all(|v| matches!(v, DbValue::Integer(_))) {
1522                DbValue::Integer(sum as i64)
1523            } else {
1524                DbValue::Real(sum)
1525            }
1526        } else {
1527            DbValue::Null
1528        }
1529    }
1530
1531    /// 计算列的平均值
1532    fn avg_column(&self, rows: &[Row], column: &str) -> DbValue {
1533        let mut sum: f64 = 0.0;
1534        let mut count: i64 = 0;
1535
1536        for row in rows {
1537            if let Some(val) = row.get(column) {
1538                match val {
1539                    DbValue::Integer(i) => {
1540                        sum += *i as f64;
1541                        count += 1;
1542                    }
1543                    DbValue::Real(r) => {
1544                        sum += *r;
1545                        count += 1;
1546                    }
1547                    _ => {}
1548                }
1549            }
1550        }
1551
1552        if count > 0 {
1553            DbValue::Real(sum / count as f64)
1554        } else {
1555            DbValue::Null
1556        }
1557    }
1558
1559    /// 计算列的最大值
1560    fn max_column(&self, rows: &[Row], column: &str) -> DbValue {
1561        let mut max_val: Option<DbValue> = None;
1562
1563        for row in rows {
1564            if let Some(val) = row.get(column) {
1565                if !val.is_null() {
1566                    match &max_val {
1567                        None => max_val = Some(val.clone()),
1568                        Some(current) => {
1569                            if self.compare_values(val, current) > 0 {
1570                                max_val = Some(val.clone());
1571                            }
1572                        }
1573                    }
1574                }
1575            }
1576        }
1577
1578        max_val.unwrap_or(DbValue::Null)
1579    }
1580
1581    /// 计算列的最小值
1582    fn min_column(&self, rows: &[Row], column: &str) -> DbValue {
1583        let mut min_val: Option<DbValue> = None;
1584
1585        for row in rows {
1586            if let Some(val) = row.get(column) {
1587                if !val.is_null() {
1588                    match &min_val {
1589                        None => min_val = Some(val.clone()),
1590                        Some(current) => {
1591                            if self.compare_values(val, current) < 0 {
1592                                min_val = Some(val.clone());
1593                            }
1594                        }
1595                    }
1596                }
1597            }
1598        }
1599
1600        min_val.unwrap_or(DbValue::Null)
1601    }
1602
1603    /// 比较两个 DbValue
1604    fn compare_values(&self, a: &DbValue, b: &DbValue) -> i32 {
1605        match (a, b) {
1606            (DbValue::Integer(a), DbValue::Integer(b)) => a.cmp(b) as i32,
1607            (DbValue::Integer(a), DbValue::Real(b)) => (*a as f64).partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1608            (DbValue::Real(a), DbValue::Integer(b)) => a.partial_cmp(&(*b as f64)).map(|o| o as i32).unwrap_or(0),
1609            (DbValue::Real(a), DbValue::Real(b)) => a.partial_cmp(b).map(|o| o as i32).unwrap_or(0),
1610            (DbValue::Text(a), DbValue::Text(b)) => a.cmp(b) as i32,
1611            _ => 0,
1612        }
1613    }
1614
1615    /// 评估 HAVING 子句
1616    fn evaluate_having(&self, agg_value: &DbValue, having: &HavingExpr) -> bool {
1617        match having {
1618            HavingExpr::Eq { value } => agg_value == value,
1619            HavingExpr::Ne { value } => agg_value != value,
1620            HavingExpr::Gt { value } => self.compare_values(agg_value, value) > 0,
1621            HavingExpr::Ge { value } => self.compare_values(agg_value, value) >= 0,
1622            HavingExpr::Lt { value } => self.compare_values(agg_value, value) < 0,
1623            HavingExpr::Le { value } => self.compare_values(agg_value, value) <= 0,
1624        }
1625    }
1626}
1627
1628/// 更新构建器
1629pub struct UpdateBuilder {
1630    table: String,
1631    filters: Vec<FilterExpr>,
1632    values: Vec<(String, DbValue)>,
1633    engine: Arc<RwLock<MemoryEngine>>,
1634}
1635
1636impl UpdateBuilder {
1637    pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
1638        UpdateBuilder {
1639            table,
1640            filters: Vec::new(),
1641            values: Vec::new(),
1642            engine,
1643        }
1644    }
1645
1646    pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1647        self.filters.push(FilterExpr::Eq {
1648            field: field.to_string(),
1649            value,
1650        });
1651        self
1652    }
1653
1654    pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1655        self.filters.push(FilterExpr::Lt {
1656            field: field.to_string(),
1657            value,
1658        });
1659        self
1660    }
1661
1662    pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1663        self.filters.push(FilterExpr::Gt {
1664            field: field.to_string(),
1665            value,
1666        });
1667        self
1668    }
1669
1670    pub fn set(mut self, field: &str, value: DbValue) -> Self {
1671        self.values.push((field.to_string(), value));
1672        self
1673    }
1674
1675    /// 执行更新,返回受影响的行数
1676    pub fn execute(self) -> DbResult<usize> {
1677        let mut engine = self.engine.write().unwrap();
1678        let rows = engine.scan(&self.table)?;
1679
1680        // 先收集需要更新的 row_id
1681        let mut to_update = Vec::new();
1682        for (row_id, row) in rows {
1683            let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
1684            if matches {
1685                to_update.push(row_id);
1686            }
1687        }
1688
1689        // 执行更新
1690        let mut count = 0;
1691        for row_id in to_update {
1692            let mut update_row: Row = self.values.iter().cloned().collect::<Row>();
1693            // 保留原有行的其他字段
1694            if let Some(existing) = engine.get(&self.table, row_id)? {
1695                for (key, value) in existing.iter() {
1696                    if !update_row.contains_key(key) {
1697                        update_row.insert(key.clone(), value.clone());
1698                    }
1699                }
1700            }
1701            engine.update(&self.table, row_id, update_row)?;
1702            count += 1;
1703        }
1704
1705        Ok(count)
1706    }
1707}
1708
1709/// 删除构建器
1710pub struct DeleteBuilder {
1711    table: String,
1712    filters: Vec<FilterExpr>,
1713    engine: Arc<RwLock<MemoryEngine>>,
1714}
1715
1716impl DeleteBuilder {
1717    pub fn new(table: String, engine: Arc<RwLock<MemoryEngine>>) -> Self {
1718        DeleteBuilder {
1719            table,
1720            filters: Vec::new(),
1721            engine,
1722        }
1723    }
1724
1725    pub fn eq(mut self, field: &str, value: DbValue) -> Self {
1726        self.filters.push(FilterExpr::Eq {
1727            field: field.to_string(),
1728            value,
1729        });
1730        self
1731    }
1732
1733    pub fn lt(mut self, field: &str, value: DbValue) -> Self {
1734        self.filters.push(FilterExpr::Lt {
1735            field: field.to_string(),
1736            value,
1737        });
1738        self
1739    }
1740
1741    pub fn gt(mut self, field: &str, value: DbValue) -> Self {
1742        self.filters.push(FilterExpr::Gt {
1743            field: field.to_string(),
1744            value,
1745        });
1746        self
1747    }
1748
1749    /// 执行删除,返回删除的行数
1750    pub fn execute(self) -> DbResult<usize> {
1751        let mut engine = self.engine.write().unwrap();
1752        let rows = engine.scan(&self.table)?;
1753
1754        // 先收集需要删除的 row_id
1755        let mut to_delete = Vec::new();
1756        for (row_id, row) in rows {
1757            let matches = self.filters.iter().all(|expr| evaluate_filter(expr, &row));
1758            if matches {
1759                to_delete.push(row_id);
1760            }
1761        }
1762
1763        // 执行删除
1764        let mut count = 0;
1765        for row_id in to_delete {
1766            engine.delete(&self.table, row_id)?;
1767            count += 1;
1768        }
1769
1770        Ok(count)
1771    }
1772}