Skip to main content

chryso_adapter/
lib.rs

1#[cfg(feature = "duckdb")]
2use ::duckdb::params_from_iter;
3#[cfg(feature = "duckdb")]
4use ::duckdb::types::Value as DuckValue;
5#[cfg(feature = "duckdb")]
6use chryso_core::ast::Statement;
7use chryso_core::error::{ChrysoError, ChrysoResult};
8#[cfg(feature = "duckdb")]
9use chryso_metadata::{ColumnStats, StatsCache, StatsProvider, TableStats};
10#[cfg(feature = "duckdb")]
11use chryso_parser::{ParserConfig, SimpleParser, SqlParser};
12use chryso_planner::PhysicalPlan;
13use std::cell::RefCell;
14
15#[derive(Debug, Clone)]
16pub struct QueryResult {
17    pub columns: Vec<String>,
18    pub rows: Vec<Vec<String>>,
19}
20
21#[derive(Debug, Clone)]
22pub enum ParamValue {
23    Int(i64),
24    Float(f64),
25    Bool(bool),
26    String(String),
27    Null,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct AdapterCapabilities {
32    pub joins: bool,
33    pub aggregates: bool,
34    pub distinct: bool,
35    pub topn: bool,
36    pub sort: bool,
37    pub limit: bool,
38    pub offset: bool,
39}
40
41impl AdapterCapabilities {
42    pub fn all() -> Self {
43        Self {
44            joins: true,
45            aggregates: true,
46            distinct: true,
47            topn: true,
48            sort: true,
49            limit: true,
50            offset: true,
51        }
52    }
53
54    pub fn supports_plan(&self, plan: &PhysicalPlan) -> bool {
55        match plan {
56            PhysicalPlan::TableScan { .. } => true,
57            PhysicalPlan::IndexScan { .. } => true,
58            PhysicalPlan::Dml { .. } => true,
59            PhysicalPlan::Derived { input, .. } => self.supports_plan(input),
60            PhysicalPlan::Filter { input, .. } => self.supports_plan(input),
61            PhysicalPlan::Projection { input, .. } => self.supports_plan(input),
62            PhysicalPlan::Join { left, right, .. } => {
63                self.joins && self.supports_plan(left) && self.supports_plan(right)
64            }
65            PhysicalPlan::Aggregate { input, .. } => self.aggregates && self.supports_plan(input),
66            PhysicalPlan::Distinct { input } => self.distinct && self.supports_plan(input),
67            PhysicalPlan::TopN { input, .. } => self.topn && self.supports_plan(input),
68            PhysicalPlan::Sort { input, .. } => self.sort && self.supports_plan(input),
69            PhysicalPlan::Limit {
70                limit,
71                offset,
72                input,
73            } => {
74                let offset_ok = if offset.is_some() { self.offset } else { true };
75                let limit_ok = if limit.is_some() { self.limit } else { true };
76                offset_ok && limit_ok && self.supports_plan(input)
77            }
78        }
79    }
80}
81
82pub trait ExecutorAdapter {
83    fn execute(&self, plan: &PhysicalPlan) -> ChrysoResult<QueryResult>;
84
85    fn execute_with_params(
86        &self,
87        plan: &PhysicalPlan,
88        params: &[ParamValue],
89    ) -> ChrysoResult<QueryResult> {
90        if params.is_empty() {
91            self.execute(plan)
92        } else {
93            Err(ChrysoError::new(
94                "adapter does not support parameter binding",
95            ))
96        }
97    }
98
99    fn capabilities(&self) -> AdapterCapabilities {
100        AdapterCapabilities::all()
101    }
102
103    fn validate_plan(&self, plan: &PhysicalPlan) -> ChrysoResult<()> {
104        if self.capabilities().supports_plan(plan) {
105            Ok(())
106        } else {
107            Err(ChrysoError::new(
108                "adapter does not support required plan operators",
109            ))
110        }
111    }
112}
113
114#[derive(Debug, Clone)]
115pub struct MockAdapter {
116    result: QueryResult,
117    last_plan: RefCell<Option<String>>,
118    capabilities: AdapterCapabilities,
119}
120
121impl MockAdapter {
122    pub fn new() -> Self {
123        Self {
124            result: QueryResult {
125                columns: vec!["demo".to_string()],
126                rows: vec![vec!["ok".to_string()]],
127            },
128            last_plan: RefCell::new(None),
129            capabilities: AdapterCapabilities::all(),
130        }
131    }
132
133    pub fn with_result(result: QueryResult) -> Self {
134        Self {
135            result,
136            last_plan: RefCell::new(None),
137            capabilities: AdapterCapabilities::all(),
138        }
139    }
140
141    pub fn with_capabilities(mut self, capabilities: AdapterCapabilities) -> Self {
142        self.capabilities = capabilities;
143        self
144    }
145
146    pub fn last_plan(&self) -> Option<String> {
147        self.last_plan.borrow().clone()
148    }
149}
150
151impl Default for MockAdapter {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl ExecutorAdapter for MockAdapter {
158    fn execute(&self, plan: &PhysicalPlan) -> ChrysoResult<QueryResult> {
159        self.validate_plan(plan)?;
160        *self.last_plan.borrow_mut() = Some(plan.explain(0));
161        Ok(self.result.clone())
162    }
163
164    fn capabilities(&self) -> AdapterCapabilities {
165        self.capabilities.clone()
166    }
167}
168
169pub struct DuckDbAdapter {
170    #[cfg(feature = "duckdb")]
171    conn: RefCell<::duckdb::Connection>,
172}
173
174impl DuckDbAdapter {
175    pub fn new() -> Self {
176        #[cfg(feature = "duckdb")]
177        {
178            let conn = crate::duckdb::connect().expect("duckdb connect failed");
179            Self {
180                conn: RefCell::new(conn),
181            }
182        }
183        #[cfg(not(feature = "duckdb"))]
184        {
185            Self {}
186        }
187    }
188
189    pub fn try_new() -> ChrysoResult<Self> {
190        #[cfg(feature = "duckdb")]
191        {
192            let conn = crate::duckdb::connect()?;
193            Ok(Self {
194                conn: RefCell::new(conn),
195            })
196        }
197        #[cfg(not(feature = "duckdb"))]
198        {
199            Err(ChrysoError::new(
200                "duckdb feature is disabled; enable with --features duckdb",
201            ))
202        }
203    }
204
205    #[cfg(feature = "duckdb")]
206    pub fn execute_sql(&self, sql: &str) -> ChrysoResult<()> {
207        let conn = self.conn.borrow();
208        conn.execute(sql, [])
209            .map_err(|err| ChrysoError::new(format!("duckdb execute failed: {err}")))?;
210        Ok(())
211    }
212
213    #[cfg(feature = "duckdb")]
214    pub fn analyze_table(&self, table: &str, cache: &mut StatsCache) -> ChrysoResult<()> {
215        self.execute_sql(&format!("analyze {table}"))?;
216        let conn = self.conn.borrow();
217        let row_count = query_i64(&conn, &format!("select count(*) from {table}"))?;
218        cache.insert_table_stats(
219            table,
220            TableStats {
221                row_count: row_count as f64,
222            },
223        );
224        let columns = fetch_columns(&conn, table)?;
225        for column in columns {
226            let sql = format!(
227                "select count(distinct {column}), coalesce(sum(case when {column} is null then 1 else 0 end), 0) from {table}"
228            );
229            let (distinct_count, nulls) = query_i64_pair(&conn, &sql)?;
230            let null_fraction = if row_count == 0 {
231                0.0
232            } else {
233                nulls as f64 / row_count as f64
234            };
235            cache.insert_column_stats(
236                table,
237                &column,
238                ColumnStats {
239                    distinct_count: distinct_count.max(1) as f64,
240                    null_fraction,
241                },
242            );
243        }
244        Ok(())
245    }
246
247    #[cfg(feature = "duckdb")]
248    fn execute_with_duckdb(&self, plan: &PhysicalPlan) -> ChrysoResult<QueryResult> {
249        let conn = self.conn.borrow();
250        if let PhysicalPlan::Dml { sql } = plan {
251            if Self::is_query_sql(sql) {
252                return Self::query_with_sql(&conn, sql);
253            }
254            let affected = conn
255                .execute(sql, [])
256                .map_err(|err| ChrysoError::new(format!("duckdb execute failed: {err}")))?;
257            return Ok(QueryResult {
258                columns: vec!["rows_affected".to_string()],
259                rows: vec![vec![affected.to_string()]],
260            });
261        }
262        let sql = crate::physical_to_sql(plan);
263        let mut stmt = conn
264            .prepare(&sql)
265            .map_err(|err| ChrysoError::new(format!("duckdb prepare failed: {err}")))?;
266        let mut rows_iter = stmt
267            .query([])
268            .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))?;
269        let columns = rows_iter
270            .as_ref()
271            .map(|stmt| stmt.column_names())
272            .unwrap_or_default();
273        let mut rows = Vec::new();
274        while let Some(row) = rows_iter
275            .next()
276            .map_err(|err| ChrysoError::new(format!("duckdb row error: {err}")))?
277        {
278            let mut values = Vec::new();
279            for idx in 0..columns.len() {
280                let value: DuckValue = row
281                    .get(idx)
282                    .map_err(|err| ChrysoError::new(format!("duckdb value error: {err}")))?;
283                values.push(format_duck_value(&value));
284            }
285            rows.push(values);
286        }
287        Ok(QueryResult { columns, rows })
288    }
289
290    #[cfg(feature = "duckdb")]
291    fn execute_with_duckdb_params(
292        &self,
293        plan: &PhysicalPlan,
294        params: &[ParamValue],
295    ) -> ChrysoResult<QueryResult> {
296        if let PhysicalPlan::Dml { .. } = plan {
297            return Err(ChrysoError::new(
298                "parameter binding not supported for DML in demo",
299            ));
300        }
301        let sql = crate::physical_to_sql(plan);
302        let conn = self.conn.borrow();
303        let mut stmt = conn
304            .prepare(&sql)
305            .map_err(|err| ChrysoError::new(format!("duckdb prepare failed: {err}")))?;
306        let duck_params = crate::duckdb::params_to_values(params);
307        let mut rows_iter = stmt
308            .query(params_from_iter(duck_params.iter()))
309            .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))?;
310        let columns = rows_iter
311            .as_ref()
312            .map(|stmt| stmt.column_names())
313            .unwrap_or_default();
314        let mut rows = Vec::new();
315        while let Some(row) = rows_iter
316            .next()
317            .map_err(|err| ChrysoError::new(format!("duckdb row error: {err}")))?
318        {
319            let mut values = Vec::new();
320            for idx in 0..columns.len() {
321                let value: DuckValue = row
322                    .get(idx)
323                    .map_err(|err| ChrysoError::new(format!("duckdb value error: {err}")))?;
324                values.push(format_duck_value(&value));
325            }
326            rows.push(values);
327        }
328        Ok(QueryResult { columns, rows })
329    }
330
331    #[cfg(feature = "duckdb")]
332    fn query_with_sql(conn: &::duckdb::Connection, sql: &str) -> ChrysoResult<QueryResult> {
333        let mut stmt = conn
334            .prepare(sql)
335            .map_err(|err| ChrysoError::new(format!("duckdb prepare failed: {err}")))?;
336        let mut rows_iter = stmt
337            .query([])
338            .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))?;
339        let columns = rows_iter
340            .as_ref()
341            .map(|stmt| stmt.column_names())
342            .unwrap_or_default();
343        let mut rows = Vec::new();
344        while let Some(row) = rows_iter
345            .next()
346            .map_err(|err| ChrysoError::new(format!("duckdb row error: {err}")))?
347        {
348            let mut values = Vec::new();
349            for idx in 0..columns.len() {
350                let value: DuckValue = row
351                    .get(idx)
352                    .map_err(|err| ChrysoError::new(format!("duckdb value error: {err}")))?;
353                values.push(format_duck_value(&value));
354            }
355            rows.push(values);
356        }
357        Ok(QueryResult { columns, rows })
358    }
359
360    #[cfg(feature = "duckdb")]
361    fn is_query_sql(sql: &str) -> bool {
362        let parser = SimpleParser::new(ParserConfig::default());
363        if let Ok(statement) = parser.parse(sql) {
364            return statement_returns_rows(&statement);
365        }
366        let Some(keyword) = first_keyword(sql) else {
367            return false;
368        };
369        matches!(keyword.as_str(), "select" | "with" | "explain")
370    }
371}
372
373#[cfg(feature = "duckdb")]
374fn first_keyword(sql: &str) -> Option<String> {
375    let mut chars = sql.chars().peekable();
376    loop {
377        while matches!(chars.peek(), Some(ch) if ch.is_whitespace()) {
378            chars.next();
379        }
380        let first = chars.peek().copied();
381        let second = chars.clone().nth(1);
382        match (first, second) {
383            (Some('-'), Some('-')) => {
384                chars.next();
385                chars.next();
386                while let Some(ch) = chars.next() {
387                    if ch == '\n' {
388                        break;
389                    }
390                }
391                continue;
392            }
393            (Some('/'), Some('*')) => {
394                chars.next();
395                chars.next();
396                while let Some(ch) = chars.next() {
397                    if ch == '*' && matches!(chars.peek(), Some('/')) {
398                        chars.next();
399                        break;
400                    }
401                }
402                continue;
403            }
404            _ => break,
405        }
406    }
407    let mut keyword = String::new();
408    while let Some(ch) = chars.peek().copied() {
409        if ch.is_ascii_alphabetic() || ch == '_' {
410            keyword.push(ch.to_ascii_lowercase());
411            chars.next();
412        } else {
413            break;
414        }
415    }
416    if keyword.is_empty() {
417        None
418    } else {
419        Some(keyword)
420    }
421}
422
423#[cfg(feature = "duckdb")]
424fn statement_returns_rows(statement: &Statement) -> bool {
425    match statement {
426        Statement::Select(_) | Statement::SetOp { .. } | Statement::Explain(_) => true,
427        Statement::With(with) => statement_returns_rows(&with.statement),
428        Statement::Insert(insert) => !insert.returning.is_empty(),
429        Statement::Update(update) => !update.returning.is_empty(),
430        Statement::Delete(delete) => !delete.returning.is_empty(),
431        _ => false,
432    }
433}
434
435#[cfg(feature = "duckdb")]
436impl StatsProvider for DuckDbAdapter {
437    fn load_stats(
438        &self,
439        tables: &[String],
440        _columns: &[(String, String)],
441        cache: &mut StatsCache,
442    ) -> ChrysoResult<()> {
443        for table in tables {
444            self.analyze_table(table, cache)?;
445        }
446        Ok(())
447    }
448}
449
450#[cfg(feature = "duckdb")]
451fn format_duck_value(value: &DuckValue) -> String {
452    match value {
453        DuckValue::Null => "null".to_string(),
454        DuckValue::Boolean(v) => v.to_string(),
455        DuckValue::TinyInt(v) => v.to_string(),
456        DuckValue::SmallInt(v) => v.to_string(),
457        DuckValue::Int(v) => v.to_string(),
458        DuckValue::BigInt(v) => v.to_string(),
459        DuckValue::HugeInt(v) => v.to_string(),
460        DuckValue::UTinyInt(v) => v.to_string(),
461        DuckValue::USmallInt(v) => v.to_string(),
462        DuckValue::UInt(v) => v.to_string(),
463        DuckValue::UBigInt(v) => v.to_string(),
464        DuckValue::Float(v) => v.to_string(),
465        DuckValue::Double(v) => v.to_string(),
466        DuckValue::Decimal(v) => v.to_string(),
467        DuckValue::Timestamp(_, v) => v.to_string(),
468        DuckValue::Text(v) => v.clone(),
469        DuckValue::Blob(v) => format!("{v:?}"),
470        DuckValue::Date32(v) => v.to_string(),
471        DuckValue::Time64(_, v) => v.to_string(),
472        DuckValue::Interval {
473            months,
474            days,
475            nanos,
476        } => {
477            format!("interval({months},{days},{nanos})")
478        }
479        DuckValue::List(items) => {
480            let rendered = items.iter().map(format_duck_value).collect::<Vec<_>>();
481            format!("[{}]", rendered.join(", "))
482        }
483        DuckValue::Enum(v) => v.clone(),
484        DuckValue::Struct(values) => format!("{values:?}"),
485        DuckValue::Array(values) => {
486            let rendered = values.iter().map(format_duck_value).collect::<Vec<_>>();
487            format!("[{}]", rendered.join(", "))
488        }
489        DuckValue::Map(values) => format!("{values:?}"),
490        DuckValue::Union(value) => format_duck_value(value),
491    }
492}
493
494#[cfg(feature = "duckdb")]
495fn fetch_columns(conn: &::duckdb::Connection, table: &str) -> ChrysoResult<Vec<String>> {
496    let mut stmt = conn
497        .prepare(&format!("pragma table_info('{table}')"))
498        .map_err(|err| ChrysoError::new(format!("duckdb prepare failed: {err}")))?;
499    let mut rows = stmt
500        .query([])
501        .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))?;
502    let mut columns = Vec::new();
503    while let Some(row) = rows
504        .next()
505        .map_err(|err| ChrysoError::new(format!("duckdb row error: {err}")))?
506    {
507        let name: String = row
508            .get(1)
509            .map_err(|err| ChrysoError::new(format!("duckdb value error: {err}")))?;
510        columns.push(name);
511    }
512    Ok(columns)
513}
514
515#[cfg(feature = "duckdb")]
516fn query_i64(conn: &::duckdb::Connection, sql: &str) -> ChrysoResult<i64> {
517    conn.query_row(sql, [], |row| row.get(0))
518        .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))
519}
520
521#[cfg(feature = "duckdb")]
522fn query_i64_pair(conn: &::duckdb::Connection, sql: &str) -> ChrysoResult<(i64, i64)> {
523    conn.query_row(sql, [], |row| {
524        let a: i64 = row.get(0)?;
525        let b: i64 = row.get(1)?;
526        Ok((a, b))
527    })
528    .map_err(|err| ChrysoError::new(format!("duckdb query failed: {err}")))
529}
530
531impl ExecutorAdapter for DuckDbAdapter {
532    fn execute(&self, plan: &PhysicalPlan) -> ChrysoResult<QueryResult> {
533        self.validate_plan(plan)?;
534        #[cfg(feature = "duckdb")]
535        {
536            self.execute_with_duckdb(plan)
537        }
538        #[cfg(not(feature = "duckdb"))]
539        {
540            let _ = plan;
541            Err(ChrysoError::new(
542                "duckdb feature is disabled; enable with --features duckdb",
543            ))
544        }
545    }
546
547    fn execute_with_params(
548        &self,
549        plan: &PhysicalPlan,
550        params: &[ParamValue],
551    ) -> ChrysoResult<QueryResult> {
552        self.validate_plan(plan)?;
553        #[cfg(feature = "duckdb")]
554        {
555            self.execute_with_duckdb_params(plan, params)
556        }
557        #[cfg(not(feature = "duckdb"))]
558        {
559            let _ = (plan, params);
560            Err(ChrysoError::new(
561                "duckdb feature is disabled; enable with --features duckdb",
562            ))
563        }
564    }
565}
566
567pub fn physical_to_sql(plan: &PhysicalPlan) -> String {
568    match plan {
569        PhysicalPlan::TableScan { table } => format!("select * from {table}"),
570        PhysicalPlan::IndexScan {
571            table,
572            index: _,
573            predicate,
574        } => format!("select * from {table} where {}", predicate.to_sql()),
575        PhysicalPlan::Dml { sql } => sql.clone(),
576        PhysicalPlan::Derived {
577            input,
578            alias,
579            column_aliases,
580        } => {
581            let base = physical_to_sql(input);
582            if column_aliases.is_empty() {
583                format!("select * from ({base}) as {alias}")
584            } else {
585                format!(
586                    "select * from ({base}) as {alias} ({})",
587                    column_aliases.join(", ")
588                )
589            }
590        }
591        PhysicalPlan::Filter { predicate, input } => {
592            let base = physical_to_sql(input);
593            format!("{base} where {}", predicate.to_sql())
594        }
595        PhysicalPlan::Projection { exprs, input } => {
596            let base = physical_to_sql(input);
597            let projection = exprs
598                .iter()
599                .map(|expr| expr.to_sql())
600                .collect::<Vec<_>>()
601                .join(", ");
602            format!("select {projection} from ({base}) as t")
603        }
604        PhysicalPlan::Join {
605            join_type,
606            algorithm: _,
607            left,
608            right,
609            on,
610        } => {
611            let left_sql = physical_to_sql(left);
612            let right_sql = physical_to_sql(right);
613            let join = match join_type {
614                chryso_core::ast::JoinType::Inner => "join",
615                chryso_core::ast::JoinType::Left => "left join",
616                chryso_core::ast::JoinType::Right => "right join",
617                chryso_core::ast::JoinType::Full => "full join",
618            };
619            format!(
620                "select * from ({left_sql}) as l {join} ({right_sql}) as r on {}",
621                on.to_sql()
622            )
623        }
624        PhysicalPlan::Aggregate {
625            group_exprs,
626            aggr_exprs,
627            input,
628        } => {
629            let base = physical_to_sql(input);
630            let mut select_list = Vec::new();
631            select_list.extend(group_exprs.iter().map(|expr| expr.to_sql()));
632            select_list.extend(aggr_exprs.iter().map(|expr| expr.to_sql()));
633            let select_list = if select_list.is_empty() {
634                "*".to_string()
635            } else {
636                select_list.join(", ")
637            };
638            if group_exprs.is_empty() {
639                format!("select {select_list} from ({base}) as t")
640            } else {
641                let group_list = group_exprs
642                    .iter()
643                    .map(|expr| expr.to_sql())
644                    .collect::<Vec<_>>()
645                    .join(", ");
646                format!("select {select_list} from ({base}) as t group by {group_list}")
647            }
648        }
649        PhysicalPlan::Distinct { input } => {
650            let base = physical_to_sql(input);
651            format!("select distinct * from ({base}) as t")
652        }
653        PhysicalPlan::TopN {
654            order_by,
655            limit,
656            input,
657        } => {
658            let base = physical_to_sql(input);
659            let order_list = order_by
660                .iter()
661                .map(|item| {
662                    let dir = if item.asc { "asc" } else { "desc" };
663                    let mut rendered = format!("{} {dir}", item.expr.to_sql());
664                    if let Some(nulls_first) = item.nulls_first {
665                        if nulls_first {
666                            rendered.push_str(" nulls first");
667                        } else {
668                            rendered.push_str(" nulls last");
669                        }
670                    }
671                    rendered
672                })
673                .collect::<Vec<_>>()
674                .join(", ");
675            format!("select * from ({base}) as t order by {order_list} limit {limit}")
676        }
677        PhysicalPlan::Sort { order_by, input } => {
678            let base = physical_to_sql(input);
679            let order_list = order_by
680                .iter()
681                .map(|item| {
682                    let dir = if item.asc { "asc" } else { "desc" };
683                    let mut rendered = format!("{} {dir}", item.expr.to_sql());
684                    if let Some(nulls_first) = item.nulls_first {
685                        if nulls_first {
686                            rendered.push_str(" nulls first");
687                        } else {
688                            rendered.push_str(" nulls last");
689                        }
690                    }
691                    rendered
692                })
693                .collect::<Vec<_>>()
694                .join(", ");
695            format!("select * from ({base}) as t order by {order_list}")
696        }
697        PhysicalPlan::Limit {
698            limit,
699            offset,
700            input,
701        } => {
702            let base = physical_to_sql(input);
703            match (limit, offset) {
704                (Some(limit), Some(offset)) => {
705                    format!("select * from ({base}) as t limit {limit} offset {offset}")
706                }
707                (Some(limit), None) => format!("select * from ({base}) as t limit {limit}"),
708                (None, Some(offset)) => format!("select * from ({base}) as t offset {offset}"),
709                (None, None) => format!("select * from ({base}) as t"),
710            }
711        }
712    }
713}
714
715#[cfg(feature = "duckdb")]
716pub mod duckdb {
717    use chryso_planner::PhysicalPlan;
718    use duckdb::Connection;
719
720    pub fn connect() -> chryso_core::error::ChrysoResult<Connection> {
721        Connection::open_in_memory().map_err(|err| {
722            chryso_core::error::ChrysoError::new(format!("duckdb open failed: {err}"))
723        })
724    }
725
726    pub fn physical_to_sql(plan: &PhysicalPlan) -> String {
727        crate::physical_to_sql(plan)
728    }
729
730    pub fn params_to_values(params: &[crate::ParamValue]) -> Vec<duckdb::types::Value> {
731        params
732            .iter()
733            .map(|param| match param {
734                crate::ParamValue::Int(value) => duckdb::types::Value::BigInt(*value),
735                crate::ParamValue::Float(value) => duckdb::types::Value::Double(*value),
736                crate::ParamValue::Bool(value) => duckdb::types::Value::Boolean(*value),
737                crate::ParamValue::String(value) => duckdb::types::Value::Text(value.clone()),
738                crate::ParamValue::Null => duckdb::types::Value::Null,
739            })
740            .collect()
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use super::{AdapterCapabilities, ExecutorAdapter, MockAdapter, ParamValue, QueryResult};
747    use chryso_planner::PhysicalPlan;
748
749    #[test]
750    fn mock_adapter_records_plan() {
751        let adapter = MockAdapter::new();
752        let plan = PhysicalPlan::TableScan {
753            table: "users".to_string(),
754        };
755        adapter.execute(&plan).expect("execute");
756        let recorded = adapter.last_plan().expect("recorded");
757        assert!(recorded.contains("TableScan"));
758    }
759
760    #[test]
761    fn capabilities_reject_join() {
762        let plan = PhysicalPlan::Join {
763            join_type: chryso_core::ast::JoinType::Inner,
764            algorithm: chryso_planner::JoinAlgorithm::Hash,
765            left: Box::new(PhysicalPlan::TableScan {
766                table: "t1".to_string(),
767            }),
768            right: Box::new(PhysicalPlan::TableScan {
769                table: "t2".to_string(),
770            }),
771            on: chryso_core::ast::Expr::Identifier("t1.id = t2.id".to_string()),
772        };
773        let caps = AdapterCapabilities {
774            joins: false,
775            ..AdapterCapabilities::all()
776        };
777        let adapter = MockAdapter::new().with_capabilities(caps);
778        let err = adapter.execute(&plan).expect_err("should reject join");
779        assert!(err.to_string().contains("does not support"));
780    }
781
782    #[test]
783    fn execute_with_params_rejects_on_mock() {
784        let adapter = MockAdapter::new();
785        let plan = PhysicalPlan::TableScan {
786            table: "users".to_string(),
787        };
788        let err = adapter
789            .execute_with_params(&plan, &[ParamValue::Int(1)])
790            .expect_err("should reject params");
791        assert!(err.to_string().contains("parameter"));
792    }
793
794    #[test]
795    fn mock_adapter_custom_result() {
796        let adapter = MockAdapter::with_result(QueryResult {
797            columns: vec!["id".to_string()],
798            rows: vec![vec!["1".to_string()]],
799        });
800        let plan = PhysicalPlan::TableScan {
801            table: "users".to_string(),
802        };
803        let result = adapter.execute(&plan).expect("execute");
804        assert_eq!(result.columns, vec!["id".to_string()]);
805        assert_eq!(result.rows.len(), 1);
806    }
807
808    #[test]
809    fn physical_to_sql_snapshot() {
810        let plan = PhysicalPlan::Limit {
811            limit: Some(10),
812            offset: Some(5),
813            input: Box::new(PhysicalPlan::Sort {
814                order_by: vec![chryso_core::ast::OrderByExpr {
815                    expr: chryso_core::ast::Expr::Identifier("id".to_string()),
816                    asc: false,
817                    nulls_first: None,
818                }],
819                input: Box::new(PhysicalPlan::TableScan {
820                    table: "users".to_string(),
821                }),
822            }),
823        };
824        let sql = super::physical_to_sql(&plan);
825        assert!(
826            sql.contains("order by id desc"),
827            "expected order by in sql: {sql}"
828        );
829        assert!(
830            sql.contains("limit 10 offset 5"),
831            "expected limit/offset: {sql}"
832        );
833    }
834
835    #[test]
836    fn physical_to_sql_index_scan() {
837        let plan = PhysicalPlan::IndexScan {
838            table: "users".to_string(),
839            index: "idx_users_id".to_string(),
840            predicate: chryso_core::ast::Expr::BinaryOp {
841                left: Box::new(chryso_core::ast::Expr::Identifier("id".to_string())),
842                op: chryso_core::ast::BinaryOperator::Eq,
843                right: Box::new(chryso_core::ast::Expr::Literal(
844                    chryso_core::ast::Literal::Number(1.0),
845                )),
846            },
847        };
848        let sql = super::physical_to_sql(&plan);
849        assert!(sql.contains("from users"));
850        assert!(sql.contains("where id = 1"));
851    }
852}