Skip to main content

nu_command/database/values/
sqlite.rs

1use super::definitions::{
2    db_column::DbColumn, db_constraint::DbConstraint, db_foreignkey::DbForeignKey,
3    db_index::DbIndex, db_table::DbTable,
4};
5use nu_protocol::{
6    CustomValue, IntoPipelineData, PipelineData, Record, ShellError, Signals, Span, Spanned, Value,
7    ast, casing::Casing, engine::EngineState, shell_error::generic::GenericError,
8    shell_error::io::IoError,
9};
10use rusqlite::{
11    Connection, Error as SqliteError, OpenFlags, Row, Statement, ToSql, types::ValueRef,
12};
13use serde::{Deserialize, Serialize};
14use std::{
15    collections::BTreeMap,
16    fmt::Write,
17    fs::File,
18    io::Read,
19    path::{Path, PathBuf},
20};
21
22const SQLITE_MAGIC_BYTES: &[u8] = "SQLite format 3\0".as_bytes();
23pub const MEMORY_DB: &str = "file:memdb1?mode=memory&cache=shared";
24const DATABASE_NAME: &str = "main";
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SQLiteDatabase {
28    // I considered storing a SQLite connection here, but decided against it because
29    // 1) YAGNI, 2) it's not obvious how cloning a connection could work, 3) state
30    // management gets tricky quick. Revisit this approach if we find a compelling use case.
31    pub path: PathBuf,
32    #[serde(skip, default = "Signals::empty")]
33    // this understandably can't be serialized. think that's OK, I'm not aware of a
34    // reason why a CustomValue would be serialized outside of a plugin
35    signals: Signals,
36}
37
38impl SQLiteDatabase {
39    pub fn new(path: &Path, signals: Signals) -> Self {
40        Self {
41            path: PathBuf::from(path),
42            signals,
43        }
44    }
45
46    pub fn try_from_path(path: &Path, span: Span, signals: Signals) -> Result<Self, ShellError> {
47        let mut file = File::open(path).map_err(|e| IoError::new(e, span, PathBuf::from(path)))?;
48
49        let mut buf: [u8; 16] = [0; 16];
50        file.read_exact(&mut buf)
51            .map_err(|e| ShellError::Io(IoError::new(e, span, PathBuf::from(path))))
52            .and_then(|_| {
53                if buf == SQLITE_MAGIC_BYTES {
54                    Ok(SQLiteDatabase::new(path, signals))
55                } else {
56                    Err(ShellError::Generic(GenericError::new(
57                        "Not a SQLite file",
58                        format!("Could not read '{}' as SQLite file", path.display()),
59                        span,
60                    )))
61                }
62            })
63    }
64
65    pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
66        let span = value.span();
67        match value {
68            Value::Custom { val, .. } => match val.as_any().downcast_ref::<Self>() {
69                Some(db) => Ok(Self {
70                    path: db.path.clone(),
71                    signals: db.signals.clone(),
72                }),
73                None => Err(ShellError::CantConvert {
74                    to_type: "database".into(),
75                    from_type: "non-database".into(),
76                    span,
77                    help: None,
78                }),
79            },
80            x => Err(ShellError::CantConvert {
81                to_type: "database".into(),
82                from_type: x.get_type().to_string(),
83                span: x.span(),
84                help: None,
85            }),
86        }
87    }
88
89    pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
90        let value = input.into_value(span)?;
91        Self::try_from_value(value)
92    }
93
94    pub fn into_value(self, span: Span) -> Value {
95        let db = Box::new(self);
96        Value::custom(db, span)
97    }
98
99    pub fn query(
100        &self,
101        sql: &Spanned<String>,
102        params: NuSqlParams,
103        call_span: Span,
104    ) -> Result<Value, ShellError> {
105        let conn = open_sqlite_db(&self.path, call_span)?;
106        let stream = run_sql_query(conn, sql, params, &self.signals, None)
107            .map_err(|e| e.into_shell_error(sql.span, "Failed to query SQLite database"))?;
108
109        Ok(stream)
110    }
111
112    pub fn open_connection(&self) -> Result<Connection, ShellError> {
113        if self.path.to_string_lossy() == MEMORY_DB {
114            open_connection_in_memory_custom()
115        } else {
116            let conn = Connection::open(&self.path).map_err(|e| {
117                ShellError::Generic(GenericError::new_internal(
118                    "Failed to open SQLite database from open_connection",
119                    e.to_string(),
120                ))
121            })?;
122            conn.busy_handler(Some(SQLiteDatabase::sleeper))
123                .map_err(|e| {
124                    ShellError::Generic(GenericError::new_internal(
125                        "Failed to set busy handler for SQLite database",
126                        e.to_string(),
127                    ))
128                })?;
129            Ok(conn)
130        }
131    }
132
133    fn sleeper(attempts: i32) -> bool {
134        log::warn!("SQLITE_BUSY, retrying after 250ms (attempt {attempts})");
135        std::thread::sleep(std::time::Duration::from_millis(250));
136        true
137    }
138
139    pub fn get_tables(&self, conn: &Connection) -> Result<Vec<DbTable>, SqliteError> {
140        let mut table_names =
141            conn.prepare("SELECT name FROM sqlite_master WHERE type = 'table'")?;
142        let rows = table_names.query_map([], |row| row.get(0))?;
143        let mut tables = Vec::new();
144
145        for row in rows {
146            let table_name: String = row?;
147            tables.push(DbTable {
148                name: table_name,
149                create_time: None,
150                update_time: None,
151                engine: None,
152                schema: None,
153            })
154        }
155
156        Ok(tables.into_iter().collect())
157    }
158
159    pub fn drop_all_tables(&self, conn: &Connection) -> Result<(), SqliteError> {
160        let tables = self.get_tables(conn)?;
161
162        for table in tables {
163            conn.execute(&format!("DROP TABLE {}", table.name), [])?;
164        }
165
166        Ok(())
167    }
168
169    pub fn export_in_memory_database_to_file(
170        &self,
171        conn: &Connection,
172        filename: String,
173    ) -> Result<(), SqliteError> {
174        //vacuum main into 'c:\\temp\\foo.db'
175        conn.execute(&format!("vacuum main into '{filename}'"), [])?;
176
177        Ok(())
178    }
179
180    pub fn backup_database_to_file(
181        &self,
182        conn: &Connection,
183        filename: String,
184    ) -> Result<(), SqliteError> {
185        conn.backup(DATABASE_NAME, Path::new(&filename), None)?;
186        Ok(())
187    }
188
189    pub fn restore_database_from_file(
190        &self,
191        conn: &mut Connection,
192        filename: String,
193    ) -> Result<(), SqliteError> {
194        conn.restore(
195            DATABASE_NAME,
196            Path::new(&filename),
197            Some(|p: rusqlite::backup::Progress| {
198                let percent = if p.pagecount == 0 {
199                    100
200                } else {
201                    (p.pagecount - p.remaining) * 100 / p.pagecount
202                };
203                if percent % 10 == 0 {
204                    log::trace!("Restoring: {percent} %");
205                }
206            }),
207        )?;
208        Ok(())
209    }
210
211    fn get_column_info(&self, row: &Row) -> Result<DbColumn, SqliteError> {
212        let dbc = DbColumn {
213            cid: row.get("cid")?,
214            name: row.get("name")?,
215            r#type: row.get("type")?,
216            notnull: row.get("notnull")?,
217            default: row.get("dflt_value")?,
218            pk: row.get("pk")?,
219        };
220        Ok(dbc)
221    }
222
223    pub fn get_columns(
224        &self,
225        conn: &Connection,
226        table: &DbTable,
227    ) -> Result<Vec<DbColumn>, SqliteError> {
228        let mut column_names = conn.prepare(&format!(
229            "SELECT * FROM pragma_table_info('{}');",
230            table.name
231        ))?;
232
233        let mut columns: Vec<DbColumn> = Vec::new();
234        let rows = column_names.query_and_then([], |row| self.get_column_info(row))?;
235
236        for row in rows {
237            columns.push(row?);
238        }
239
240        Ok(columns)
241    }
242
243    fn get_constraint_info(&self, row: &Row) -> Result<DbConstraint, SqliteError> {
244        let dbc = DbConstraint {
245            name: row.get("index_name")?,
246            column_name: row.get("column_name")?,
247            origin: row.get("origin")?,
248        };
249        Ok(dbc)
250    }
251
252    pub fn get_constraints(
253        &self,
254        conn: &Connection,
255        table: &DbTable,
256    ) -> Result<Vec<DbConstraint>, SqliteError> {
257        let mut column_names = conn.prepare(&format!(
258            "
259            SELECT
260                p.origin,
261                s.name AS index_name,
262                i.name AS column_name
263            FROM
264                sqlite_master s
265                JOIN pragma_index_list(s.tbl_name) p ON s.name = p.name,
266                pragma_index_info(s.name) i
267            WHERE
268                s.type = 'index'
269                AND tbl_name = '{}'
270                AND NOT p.origin = 'c'
271            ",
272            &table.name
273        ))?;
274
275        let mut constraints: Vec<DbConstraint> = Vec::new();
276        let rows = column_names.query_and_then([], |row| self.get_constraint_info(row))?;
277
278        for row in rows {
279            constraints.push(row?);
280        }
281
282        Ok(constraints)
283    }
284
285    fn get_foreign_keys_info(&self, row: &Row) -> Result<DbForeignKey, SqliteError> {
286        let dbc = DbForeignKey {
287            column_name: row.get("from")?,
288            ref_table: row.get("table")?,
289            ref_column: row.get("to")?,
290        };
291        Ok(dbc)
292    }
293
294    pub fn get_foreign_keys(
295        &self,
296        conn: &Connection,
297        table: &DbTable,
298    ) -> Result<Vec<DbForeignKey>, SqliteError> {
299        let mut column_names = conn.prepare(&format!(
300            "SELECT p.`from`, p.`to`, p.`table` FROM pragma_foreign_key_list('{}') p",
301            &table.name
302        ))?;
303
304        let mut foreign_keys: Vec<DbForeignKey> = Vec::new();
305        let rows = column_names.query_and_then([], |row| self.get_foreign_keys_info(row))?;
306
307        for row in rows {
308            foreign_keys.push(row?);
309        }
310
311        Ok(foreign_keys)
312    }
313
314    fn get_index_info(&self, row: &Row) -> Result<DbIndex, SqliteError> {
315        let dbc = DbIndex {
316            name: row.get("index_name")?,
317            column_name: row.get("name")?,
318            seqno: row.get("seqno")?,
319        };
320        Ok(dbc)
321    }
322
323    pub fn get_indexes(
324        &self,
325        conn: &Connection,
326        table: &DbTable,
327    ) -> Result<Vec<DbIndex>, SqliteError> {
328        let mut column_names = conn.prepare(&format!(
329            "
330            SELECT
331                m.name AS index_name,
332                p.*
333            FROM
334                sqlite_master m,
335                pragma_index_info(m.name) p
336            WHERE
337                m.type = 'index'
338                AND m.tbl_name = '{}'
339            ",
340            &table.name,
341        ))?;
342
343        let mut indexes: Vec<DbIndex> = Vec::new();
344        let rows = column_names.query_and_then([], |row| self.get_index_info(row))?;
345
346        for row in rows {
347            indexes.push(row?);
348        }
349
350        Ok(indexes)
351    }
352}
353
354impl CustomValue for SQLiteDatabase {
355    fn clone_value(&self, span: Span) -> Value {
356        Value::custom(Box::new(self.clone()), span)
357    }
358
359    fn type_name(&self) -> String {
360        self.typetag_name().to_string()
361    }
362
363    fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
364        let db = open_sqlite_db(&self.path, span)?;
365        read_entire_sqlite_db(db, span, &self.signals)
366            .map_err(|e| e.into_shell_error(span, "Failed to read from SQLite database"))
367    }
368
369    fn as_any(&self) -> &dyn std::any::Any {
370        self
371    }
372
373    fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
374        self
375    }
376
377    fn follow_path_int(
378        &self,
379        _self_span: Span,
380        _index: usize,
381        path_span: Span,
382        _optional: bool,
383    ) -> Result<Value, ShellError> {
384        // In theory we could support this, but tables don't have an especially well-defined order
385        Err(ShellError::IncompatiblePathAccess { type_name: "SQLite databases do not support integer-indexed access. Try specifying a table name instead".into(), span: path_span })
386    }
387
388    fn follow_path_string(
389        &self,
390        _self_span: Span,
391        column_name: String,
392        path_span: Span,
393        _optional: bool,
394        _casing: Casing,
395    ) -> Result<Value, ShellError> {
396        // Return a lazy SQLiteQueryBuilder instead of executing the query immediately
397        let table = SQLiteQueryBuilder::new(self.path.clone(), column_name, self.signals.clone());
398        Ok(Value::custom(Box::new(table), path_span))
399    }
400
401    fn typetag_name(&self) -> &'static str {
402        "SQLiteDatabase"
403    }
404
405    fn typetag_deserialize(&self) {
406        unimplemented!("typetag_deserialize")
407    }
408}
409
410pub fn open_sqlite_db(path: &Path, call_span: Span) -> Result<Connection, ShellError> {
411    if path.to_string_lossy() == MEMORY_DB {
412        open_connection_in_memory_custom()
413    } else {
414        let path = path.to_string_lossy().to_string();
415        Connection::open(path).map_err(|err| {
416            ShellError::Generic(GenericError::new(
417                "Failed to open SQLite database",
418                err.to_string(),
419                call_span,
420            ))
421        })
422    }
423}
424
425fn run_sql_query(
426    conn: Connection,
427    sql: &Spanned<String>,
428    params: NuSqlParams,
429    signals: &Signals,
430    column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
431) -> Result<Value, SqliteOrShellError> {
432    let stmt = conn.prepare(&sql.item)?;
433    prepared_statement_to_nu_list(stmt, params, sql.span, signals, column_adapters)
434}
435
436// This is taken from to text local_into_string but tweaks it a bit so that certain formatting does not happen
437pub fn value_to_sql(
438    engine_state: &EngineState,
439    value: Value,
440    call_span: Span,
441) -> Result<Box<dyn rusqlite::ToSql>, ShellError> {
442    match value {
443        Value::Bool { val, .. } => Ok(Box::new(val)),
444        Value::Int { val, .. } => Ok(Box::new(val)),
445        Value::Float { val, .. } => Ok(Box::new(val)),
446        Value::Filesize { val, .. } => Ok(Box::new(val.get())),
447        Value::Duration { val, .. } => Ok(Box::new(val)),
448        Value::Date { val, .. } => Ok(Box::new(val)),
449        Value::String { val, .. } => Ok(Box::new(val)),
450        Value::Binary { val, .. } => Ok(Box::new(val)),
451        Value::Nothing { .. } => Ok(Box::new(rusqlite::types::Null)),
452        val => {
453            let span = val.span();
454            let ty = val.get_type();
455            let json_value = crate::value_to_json_value(engine_state, val, call_span, false)?;
456            match nu_json::to_string_raw(&json_value) {
457                Ok(s) => Ok(Box::new(s)),
458                Err(err) => Err(ShellError::CantConvert {
459                    to_type: "JSON".into(),
460                    from_type: ty.to_string(),
461                    span,
462                    help: Some(err.to_string()),
463                }),
464            }
465        }
466    }
467}
468
469pub fn values_to_sql(
470    engine_state: &EngineState,
471    values: impl IntoIterator<Item = Value>,
472    call_span: Span,
473) -> Result<Vec<Box<dyn rusqlite::ToSql>>, ShellError> {
474    values
475        .into_iter()
476        .map(|v| value_to_sql(engine_state, v, call_span))
477        .collect::<Result<Vec<_>, _>>()
478}
479
480pub enum NuSqlParams {
481    List(Vec<Box<dyn ToSql>>),
482    Named(Vec<(String, Box<dyn ToSql>)>),
483}
484
485impl Default for NuSqlParams {
486    fn default() -> Self {
487        NuSqlParams::List(Vec::new())
488    }
489}
490
491pub fn nu_value_to_params(
492    engine_state: &EngineState,
493    value: Value,
494    call_span: Span,
495) -> Result<NuSqlParams, ShellError> {
496    match value {
497        Value::Record { val, .. } => {
498            let mut params = Vec::with_capacity(val.len());
499
500            for (mut column, value) in val.into_owned().into_iter() {
501                let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
502
503                if !column.starts_with([':', '@', '$']) {
504                    column.insert(0, ':');
505                }
506
507                params.push((column, sql_type_erased));
508            }
509
510            Ok(NuSqlParams::Named(params))
511        }
512        Value::List { vals, .. } => {
513            let mut params = Vec::with_capacity(vals.len());
514
515            for value in vals.into_iter() {
516                let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
517
518                params.push(sql_type_erased);
519            }
520
521            Ok(NuSqlParams::List(params))
522        }
523
524        // We accept no parameters
525        Value::Nothing { .. } => Ok(NuSqlParams::default()),
526
527        _ => Err(ShellError::TypeMismatch {
528            err_message: "Invalid parameters value: expected record or list".to_string(),
529            span: value.span(),
530        }),
531    }
532}
533
534#[derive(Debug)]
535enum SqliteOrShellError {
536    SqliteError(SqliteError),
537    ShellError(ShellError),
538}
539
540impl From<SqliteError> for SqliteOrShellError {
541    fn from(error: SqliteError) -> Self {
542        Self::SqliteError(error)
543    }
544}
545
546impl From<ShellError> for SqliteOrShellError {
547    fn from(error: ShellError) -> Self {
548        Self::ShellError(error)
549    }
550}
551
552impl SqliteOrShellError {
553    fn into_shell_error(self, span: Span, msg: &str) -> ShellError {
554        match self {
555            Self::SqliteError(err) => {
556                ShellError::Generic(GenericError::new(msg.to_string(), err.to_string(), span))
557            }
558            Self::ShellError(err) => err,
559        }
560    }
561}
562
563/// The SQLite type behind a query column returned as some raw type (e.g. 'text')
564#[derive(Clone, Copy)]
565pub enum DeclType {
566    Json,
567    Jsonb,
568}
569
570impl DeclType {
571    pub fn from_str(s: &str) -> Option<Self> {
572        match s.to_uppercase().as_str() {
573            "JSON" => Some(DeclType::Json),
574            "JSONB" => Some(DeclType::Jsonb),
575            _ => None, // We are only special-casing JSON(B) columns for now
576        }
577    }
578}
579
580/// A column out of an SQLite query, together with its type
581pub struct TypedColumn {
582    pub name: String,
583    pub decl_type: Option<DeclType>,
584}
585
586impl TypedColumn {
587    pub fn from_rusqlite_column(c: &rusqlite::Column) -> Self {
588        Self {
589            name: c.name().to_owned(),
590            decl_type: c.decl_type().and_then(DeclType::from_str),
591        }
592    }
593}
594
595fn prepared_statement_to_nu_list(
596    mut stmt: Statement,
597    params: NuSqlParams,
598    call_span: Span,
599    signals: &Signals,
600    column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
601) -> Result<Value, SqliteOrShellError> {
602    let columns: Vec<TypedColumn> = stmt
603        .columns()
604        .iter()
605        .map(TypedColumn::from_rusqlite_column)
606        .collect();
607
608    fn collect_row_values(
609        row_results: impl IntoIterator<Item = Result<Value, SqliteError>>,
610        signals: &Signals,
611        call_span: Span,
612    ) -> Result<Vec<Value>, SqliteOrShellError> {
613        let mut row_values = vec![];
614
615        for row_result in row_results {
616            signals.check(&call_span)?;
617            if let Ok(row_value) = row_result {
618                row_values.push(row_value);
619            }
620        }
621
622        Ok(row_values)
623    }
624
625    // Both parameter styles need separate query_map calls because rusqlite uses
626    // different parameter reference types for positional and named bindings.
627    // Keep the row processing shared through `collect_row_values`.
628    let row_values = match params {
629        NuSqlParams::List(params) => {
630            let refs: Vec<&dyn ToSql> = params.iter().map(|value| &**value).collect();
631
632            let row_results = stmt.query_map(refs.as_slice(), |row| {
633                Ok(convert_sqlite_row_to_nu_value(
634                    row,
635                    call_span,
636                    &columns,
637                    column_adapters,
638                ))
639            })?;
640
641            collect_row_values(row_results, signals, call_span)?
642        }
643        NuSqlParams::Named(pairs) => {
644            let refs: Vec<_> = pairs
645                .iter()
646                .map(|(column, value)| (column.as_str(), &**value))
647                .collect();
648
649            let row_results = stmt.query_map(refs.as_slice(), |row| {
650                Ok(convert_sqlite_row_to_nu_value(
651                    row,
652                    call_span,
653                    &columns,
654                    column_adapters,
655                ))
656            })?;
657
658            collect_row_values(row_results, signals, call_span)?
659        }
660    };
661
662    Ok(Value::list(row_values, call_span))
663}
664
665fn read_entire_sqlite_db(
666    conn: Connection,
667    call_span: Span,
668    signals: &Signals,
669) -> Result<Value, SqliteOrShellError> {
670    let mut tables = Record::new();
671
672    let mut get_table_names =
673        conn.prepare("SELECT name FROM sqlite_master WHERE type = 'table'")?;
674    let rows = get_table_names.query_map([], |row| row.get(0))?;
675
676    for row in rows {
677        let table_name: String = row?;
678        // TODO: Should use params here?
679        let table_stmt = conn.prepare(&format!("select * from [{table_name}]"))?;
680        let rows = prepared_statement_to_nu_list(
681            table_stmt,
682            NuSqlParams::default(),
683            call_span,
684            signals,
685            None,
686        )?;
687        tables.push(table_name, rows);
688    }
689
690    Ok(Value::record(tables, call_span))
691}
692
693pub fn convert_sqlite_row_to_nu_value(
694    row: &Row,
695    span: Span,
696    columns: &[TypedColumn],
697    column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
698) -> Value {
699    let record = columns
700        .iter()
701        .enumerate()
702        .map(|(i, col)| {
703            let adapter = column_adapters
704                .and_then(|adapters| adapters.get(&col.name))
705                .copied();
706            (
707                col.name.clone(),
708                convert_sqlite_value_to_nu_value_with_adapter(
709                    row.get_ref_unwrap(i),
710                    col.decl_type,
711                    adapter,
712                    span,
713                ),
714            )
715        })
716        .collect();
717
718    Value::record(record, span)
719}
720
721#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
722pub enum SQLiteColumnAdapter {
723    /// Convert integer values interpreted as Unix epoch milliseconds into Nu datetimes.
724    UnixMillisToDate,
725    /// Convert integer values interpreted as milliseconds into Nu durations.
726    MillisToDuration,
727}
728
729fn convert_sqlite_value_to_nu_value_with_adapter(
730    value: ValueRef,
731    decl_type: Option<DeclType>,
732    adapter: Option<SQLiteColumnAdapter>,
733    span: Span,
734) -> Value {
735    match adapter {
736        Some(SQLiteColumnAdapter::UnixMillisToDate) => match value {
737            ValueRef::Integer(i) => chrono::DateTime::from_timestamp_millis(i)
738                .map(|datetime| Value::date(datetime.into(), span))
739                .unwrap_or_else(|| Value::int(i, span)),
740            _ => convert_sqlite_value_to_nu_value(value, decl_type, span),
741        },
742        Some(SQLiteColumnAdapter::MillisToDuration) => match value {
743            ValueRef::Integer(i) => i
744                .checked_mul(1_000_000)
745                .map(|nanos| Value::duration(nanos, span))
746                .unwrap_or_else(|| Value::int(i, span)),
747            _ => convert_sqlite_value_to_nu_value(value, decl_type, span),
748        },
749        None => convert_sqlite_value_to_nu_value(value, decl_type, span),
750    }
751}
752
753pub fn convert_sqlite_value_to_nu_value(
754    value: ValueRef,
755    decl_type: Option<DeclType>,
756    span: Span,
757) -> Value {
758    match value {
759        ValueRef::Null => Value::nothing(span),
760        ValueRef::Integer(i) => Value::int(i, span),
761        ValueRef::Real(f) => Value::float(f, span),
762        ValueRef::Text(buf) => match (std::str::from_utf8(buf), decl_type) {
763            (Ok(txt), Some(DeclType::Json | DeclType::Jsonb)) => {
764                match crate::try_json_str_to_value(txt, span, false) {
765                    Ok(val) => val,
766                    Err(err) => Value::error(err, span),
767                }
768            }
769            (Ok(txt), _) => Value::string(txt.to_string(), span),
770            (Err(_), _) => Value::error(ShellError::NonUtf8 { span }, span),
771        },
772        ValueRef::Blob(u) => Value::binary(u.to_vec(), span),
773    }
774}
775
776pub fn open_connection_in_memory_custom() -> Result<Connection, ShellError> {
777    let flags = OpenFlags::default();
778    let conn = Connection::open_with_flags(MEMORY_DB, flags).map_err(|e| {
779        ShellError::Generic(GenericError::new(
780            "Failed to open SQLite custom connection in memory",
781            e.to_string(),
782            Span::test_data(),
783        ))
784    })?;
785    conn.busy_handler(Some(SQLiteDatabase::sleeper))
786        .map_err(|e| {
787            ShellError::Generic(GenericError::new(
788                "Failed to set busy handler for SQLite custom connection in memory",
789                e.to_string(),
790                Span::test_data(),
791            ))
792        })?;
793    Ok(conn)
794}
795
796pub fn open_connection_in_memory() -> Result<Connection, ShellError> {
797    Connection::open_in_memory().map_err(|e| {
798        ShellError::Generic(GenericError::new(
799            "Failed to open SQLite standard connection in memory",
800            e.to_string(),
801            Span::test_data(),
802        ))
803    })
804}
805
806/// A lazy query builder for SQLite tables, allowing SQL pushdown optimizations
807/// for commands like `length`, `select`, `first`, and `last`.
808#[derive(Debug, Clone, Serialize, Deserialize)]
809pub struct SQLiteQueryBuilder {
810    pub db_path: PathBuf,
811    pub table_name: String,
812    pub sql_select: Option<String>, // e.g., "column1, column2" or "*" for all
813    pub sql_where: Option<String>,  // e.g., "column = ?"
814    pub sql_params: Vec<String>,    // parameters for the where clause
815    pub sql_order_by: Option<String>, // e.g., "id DESC"
816    pub sql_limit: Option<i64>,
817    #[serde(default)]
818    pub column_adapters: BTreeMap<String, SQLiteColumnAdapter>,
819    #[serde(skip, default = "Signals::empty")]
820    signals: Signals,
821}
822
823impl SQLiteQueryBuilder {
824    pub fn new(db_path: PathBuf, table_name: String, signals: Signals) -> Self {
825        Self {
826            db_path,
827            table_name,
828            sql_select: None,
829            sql_where: None,
830            sql_params: Vec::new(),
831            sql_order_by: None,
832            sql_limit: None,
833            column_adapters: BTreeMap::new(),
834            signals,
835        }
836    }
837
838    pub fn with_select(mut self, select: String) -> Self {
839        self.sql_select = Some(select);
840        self
841    }
842
843    pub fn with_where(mut self, where_clause: String, params: Vec<String>) -> Self {
844        self.sql_where = Some(where_clause);
845        self.sql_params = params;
846        self
847    }
848
849    pub fn with_order_by(mut self, order_by: String) -> Self {
850        self.sql_order_by = Some(order_by);
851        self
852    }
853
854    pub fn with_limit(mut self, limit: i64) -> Self {
855        self.sql_limit = Some(limit);
856        self
857    }
858
859    pub fn with_column_adapter(
860        mut self,
861        column_name: String,
862        adapter: SQLiteColumnAdapter,
863    ) -> Self {
864        self.column_adapters.insert(column_name, adapter);
865        self
866    }
867
868    /// Register a datetime adapter for a column containing Unix epoch milliseconds.
869    pub fn with_unix_millis_datetime_column(self, column_name: String) -> Self {
870        self.with_column_adapter(column_name, SQLiteColumnAdapter::UnixMillisToDate)
871    }
872
873    /// Register a duration adapter for a column containing milliseconds.
874    pub fn with_millis_duration_column(self, column_name: String) -> Self {
875        self.with_column_adapter(column_name, SQLiteColumnAdapter::MillisToDuration)
876    }
877
878    /// Projects a subset of *output* columns from the current SELECT list.
879    ///
880    /// This is used by filter pushdowns (for example, `history | select command`) where
881    /// Nushell refers to post-alias output names, but the underlying SQLite table may have
882    /// different source column names.
883    ///
884    /// Example:
885    /// - current projection: `command_line as command, duration_ms as duration`
886    /// - requested output: `command`
887    /// - rewritten projection: `command_line as command`
888    ///
889    /// If a requested output name cannot be mapped unambiguously to the existing projection,
890    /// this returns `None` so callers can safely fall back to non-pushdown behavior.
891    ///
892    /// This method intentionally does not parse full SQL grammar; it relies on a small,
893    /// conservative parser that is sufficient for projections we generate internally.
894    pub fn project_output_columns(&self, columns: &[String]) -> Option<Self> {
895        if columns.is_empty() {
896            return Some(self.clone());
897        }
898
899        let new_select = if let Some(select) = &self.sql_select {
900            // Parse the current projection into `(output_name, full_expression)` pairs.
901            // We preserve the full expression so aliases and conversions stay intact.
902            let current = parse_sql_select_projection(select)?;
903            let mut projected = Vec::with_capacity(columns.len());
904
905            for requested in columns {
906                // Match by output column name (case-insensitive)
907                let expression = current.iter().find_map(|(output_name, expression)| {
908                    output_name
909                        .eq_ignore_ascii_case(requested)
910                        .then_some(expression)
911                })?;
912                projected.push(expression.clone());
913            }
914
915            projected.join(", ")
916        } else {
917            columns.join(", ")
918        };
919
920        Some(self.clone().with_select(new_select))
921    }
922
923    pub fn build_sql(&self) -> String {
924        let select = self.sql_select.as_deref().unwrap_or("*");
925        let mut sql = format!("SELECT {} FROM [{}]", select, self.table_name);
926
927        if let Some(where_clause) = &self.sql_where {
928            write!(sql, " WHERE {}", where_clause).expect("writing to a String is infallible");
929        }
930
931        if let Some(order_by) = &self.sql_order_by {
932            write!(sql, " ORDER BY {}", order_by).expect("writing to a String is infallible");
933        }
934
935        if let Some(limit) = self.sql_limit {
936            write!(sql, " LIMIT {}", limit).expect("writing to a String is infallible");
937        }
938
939        sql
940    }
941
942    pub fn execute(&self, call_span: Span) -> Result<PipelineData, ShellError> {
943        let conn = open_sqlite_db(&self.db_path, call_span)?;
944        let sql = self.build_sql();
945        let params = NuSqlParams::List(Vec::new()); // FIXME: handle params properly
946        let query = Spanned {
947            item: sql,
948            span: call_span,
949        };
950        run_sql_query(
951            conn,
952            &query,
953            params,
954            &self.signals,
955            (!self.column_adapters.is_empty()).then_some(&self.column_adapters),
956        )
957        .map(IntoPipelineData::into_pipeline_data)
958        .map_err(|e| e.into_shell_error(call_span, "Failed to execute query"))
959    }
960
961    pub fn count(&self, call_span: Span) -> Result<i64, ShellError> {
962        let conn = open_sqlite_db(&self.db_path, call_span)?;
963        let mut sql = format!("SELECT COUNT(*) FROM [{}]", self.table_name);
964        if let Some(where_clause) = &self.sql_where {
965            write!(sql, " WHERE {}", where_clause).expect("writing to a String is infallible");
966        }
967        let mut stmt = conn.prepare(&sql).map_err(|e| {
968            ShellError::Generic(GenericError::new(
969                "Failed to prepare count query",
970                e.to_string(),
971                call_span,
972            ))
973        })?;
974        let params: Vec<Box<dyn ToSql>> = self
975            .sql_params
976            .iter()
977            .map(|s| Box::new(s.clone()) as Box<dyn ToSql>)
978            .collect();
979        let count: i64 = stmt
980            .query_row(rusqlite::params_from_iter(params), |row| row.get(0))
981            .map_err(|e| {
982                ShellError::Generic(GenericError::new(
983                    "Failed to execute count query",
984                    e.to_string(),
985                    call_span,
986                ))
987            })?;
988        Ok(count)
989    }
990}
991
992/// Parses a SELECT projection list into `(output_name, expression)` entries.
993///
994/// Input is the text after `SELECT` and before `FROM`, for example:
995/// `command_line as command, duration_ms as duration`.
996///
997/// The returned expression is preserved exactly so it can be re-used in a rewritten
998/// projection without dropping aliases.
999///
1000/// Returns `None` for malformed/unsupported entries; callers should then skip pushdown.
1001fn parse_sql_select_projection(select: &str) -> Option<Vec<(String, String)>> {
1002    let projection = split_select_expressions(select)
1003        .into_iter()
1004        .map(|expr| parse_projection_expression(&expr))
1005        .collect::<Option<Vec<_>>>()?;
1006
1007    (!projection.is_empty()).then_some(projection)
1008}
1009
1010/// Splits a SELECT projection list on top-level commas.
1011///
1012/// We only split commas that are outside:
1013/// - single/double quoted strings
1014/// - parenthesized expressions
1015///
1016/// This is intentionally a lightweight splitter rather than a full SQL parser.
1017fn split_select_expressions(select: &str) -> Vec<String> {
1018    let mut expressions = Vec::new();
1019    let mut current = String::new();
1020    let mut depth = 0usize;
1021    let mut quote = None;
1022
1023    for ch in select.chars() {
1024        match ch {
1025            '\'' | '"' => {
1026                // Enter/exit quote mode so commas inside strings are preserved.
1027                if quote == Some(ch) {
1028                    quote = None;
1029                } else if quote.is_none() {
1030                    quote = Some(ch);
1031                }
1032                current.push(ch);
1033            }
1034            '(' if quote.is_none() => {
1035                // Track nesting depth so commas inside function calls do not split.
1036                depth = depth.saturating_add(1);
1037                current.push(ch);
1038            }
1039            ')' if quote.is_none() => {
1040                depth = depth.saturating_sub(1);
1041                current.push(ch);
1042            }
1043            ',' if quote.is_none() && depth == 0 => {
1044                // Top-level separator between projection expressions.
1045                let trimmed = current.trim();
1046                if !trimmed.is_empty() {
1047                    expressions.push(trimmed.to_string());
1048                }
1049                current.clear();
1050            }
1051            _ => current.push(ch),
1052        }
1053    }
1054
1055    let trimmed = current.trim();
1056    if !trimmed.is_empty() {
1057        expressions.push(trimmed.to_string());
1058    }
1059
1060    expressions
1061}
1062
1063/// Parses one projection expression into `(output_name, full_expression)`.
1064///
1065/// Supported forms include:
1066/// - `source_col as alias`
1067/// - `qualified.name`
1068/// - `column`
1069///
1070/// If no explicit alias is present, the output name is derived from the last
1071/// identifier segment (`foo.bar` -> `bar`).
1072fn parse_projection_expression(expr: &str) -> Option<(String, String)> {
1073    let trimmed = expr.trim();
1074    if trimmed.is_empty() {
1075        return None;
1076    }
1077
1078    if let Some((_lhs, rhs)) = split_alias(trimmed) {
1079        // Explicit alias wins and represents the user-visible output column name.
1080        let alias = normalize_identifier(rhs.trim());
1081        if alias.is_empty() {
1082            return None;
1083        }
1084        return Some((alias, trimmed.to_string()));
1085    }
1086
1087    let output_name = normalize_identifier(last_identifier_segment(trimmed));
1088    if output_name.is_empty() {
1089        return None;
1090    }
1091
1092    Some((output_name, trimmed.to_string()))
1093}
1094
1095/// Finds an `AS` alias split in a projection expression.
1096///
1097/// This intentionally requires whitespace around `AS` to avoid false positives in
1098/// identifiers or function names containing `as` as a substring.
1099///
1100/// Returns `(lhs, rhs)` for `lhs AS rhs`.
1101fn split_alias(expr: &str) -> Option<(&str, &str)> {
1102    let bytes = expr.as_bytes();
1103    for idx in 0..bytes.len().saturating_sub(2) {
1104        if idx > 0
1105            && bytes[idx - 1].is_ascii_whitespace()
1106            && bytes[idx + 2].is_ascii_whitespace()
1107            && bytes[idx].eq_ignore_ascii_case(&b'a')
1108            && bytes[idx + 1].eq_ignore_ascii_case(&b's')
1109        {
1110            // Keep the original expression parts intact so rewritten SQL maintains
1111            // the same semantics and formatting as much as possible.
1112            let lhs = expr[..idx].trim_end();
1113            let rhs = expr[idx + 2..].trim_start();
1114            if !lhs.is_empty() && !rhs.is_empty() {
1115                return Some((lhs, rhs));
1116            }
1117        }
1118    }
1119
1120    None
1121}
1122
1123fn last_identifier_segment(expr: &str) -> &str {
1124    expr.rsplit('.').next().unwrap_or(expr)
1125}
1126
1127/// Normalizes an identifier token for matching:
1128/// - trims surrounding whitespace
1129/// - removes a single layer of common SQL identifier wrappers (`"name"`, `` `name` ``, `[name]`)
1130///
1131/// The result is used only for name matching, not for SQL generation.
1132fn normalize_identifier(identifier: &str) -> String {
1133    let trimmed = identifier.trim();
1134    if trimmed.len() >= 2 {
1135        let first = trimmed.as_bytes()[0] as char;
1136        let last = trimmed.as_bytes()[trimmed.len() - 1] as char;
1137        let is_wrapped = matches!((first, last), ('"', '"') | ('`', '`') | ('[', ']'));
1138        if is_wrapped {
1139            return trimmed[1..trimmed.len() - 1].trim().to_string();
1140        }
1141    }
1142
1143    trimmed.to_string()
1144}
1145
1146impl CustomValue for SQLiteQueryBuilder {
1147    fn clone_value(&self, span: Span) -> Value {
1148        Value::custom(Box::new(self.clone()), span)
1149    }
1150
1151    fn type_name(&self) -> String {
1152        "SQLiteQueryBuilder".to_string()
1153    }
1154
1155    fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
1156        self.execute(span).and_then(|pd| pd.into_value(span))
1157    }
1158
1159    fn as_any(&self) -> &dyn std::any::Any {
1160        self
1161    }
1162
1163    fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1164        self
1165    }
1166
1167    fn follow_path_int(
1168        &self,
1169        _self_span: Span,
1170        index: usize,
1171        path_span: Span,
1172        optional: bool,
1173    ) -> Result<Value, ShellError> {
1174        // Execute and then index - this could be optimized with LIMIT/OFFSET later
1175        let data = self.to_base_value(path_span)?;
1176        data.follow_cell_path(&[ast::PathMember::Int {
1177            val: index,
1178            span: path_span,
1179            optional,
1180        }])
1181        .map(|v| v.into_owned())
1182    }
1183
1184    fn follow_path_string(
1185        &self,
1186        _self_span: Span,
1187        column_name: String,
1188        path_span: Span,
1189        _optional: bool,
1190        _: Casing,
1191    ) -> Result<Value, ShellError> {
1192        // For now, just execute and get the column - this could be optimized later
1193        let data = self.to_base_value(path_span)?;
1194        data.follow_cell_path(&[ast::PathMember::String {
1195            val: column_name,
1196            span: path_span,
1197            optional: false,
1198            casing: Casing::default(),
1199        }])
1200        .map(|v| v.into_owned())
1201    }
1202
1203    fn typetag_name(&self) -> &'static str {
1204        "SQLiteQueryBuilder"
1205    }
1206
1207    fn typetag_deserialize(&self) {
1208        unimplemented!("typetag_deserialize")
1209    }
1210
1211    fn is_iterable(&self) -> bool {
1212        true
1213    }
1214}
1215
1216#[cfg(test)]
1217mod test {
1218    use super::*;
1219    use nu_protocol::record;
1220
1221    #[test]
1222    fn can_read_empty_db() {
1223        let db = open_connection_in_memory().unwrap();
1224        let converted_db = read_entire_sqlite_db(db, Span::test_data(), &Signals::empty()).unwrap();
1225
1226        let expected = Value::test_record(Record::new());
1227
1228        assert_eq!(converted_db, expected);
1229    }
1230
1231    #[test]
1232    fn can_read_empty_table() {
1233        let db = open_connection_in_memory().unwrap();
1234
1235        db.execute(
1236            "CREATE TABLE person (
1237                    id     INTEGER PRIMARY KEY,
1238                    name   TEXT NOT NULL,
1239                    data   BLOB
1240                    )",
1241            [],
1242        )
1243        .unwrap();
1244        let converted_db = read_entire_sqlite_db(db, Span::test_data(), &Signals::empty()).unwrap();
1245
1246        let expected = Value::test_record(record! {
1247            "person" => Value::test_list(vec![]),
1248        });
1249
1250        assert_eq!(converted_db, expected);
1251    }
1252
1253    #[test]
1254    fn can_read_null_and_non_null_data() {
1255        let span = Span::test_data();
1256        let db = open_connection_in_memory().unwrap();
1257
1258        db.execute(
1259            "CREATE TABLE item (
1260                    id     INTEGER PRIMARY KEY,
1261                    name   TEXT
1262                    )",
1263            [],
1264        )
1265        .unwrap();
1266
1267        db.execute("INSERT INTO item (id, name) VALUES (123, NULL)", [])
1268            .unwrap();
1269
1270        db.execute("INSERT INTO item (id, name) VALUES (456, 'foo bar')", [])
1271            .unwrap();
1272
1273        let converted_db = read_entire_sqlite_db(db, span, &Signals::empty()).unwrap();
1274
1275        let expected = Value::test_record(record! {
1276            "item" => Value::test_list(
1277                vec![
1278                    Value::test_record(record! {
1279                        "id" =>   Value::test_int(123),
1280                        "name" => Value::nothing(span),
1281                    }),
1282                    Value::test_record(record! {
1283                        "id" =>   Value::test_int(456),
1284                        "name" => Value::test_string("foo bar"),
1285                    }),
1286                ]
1287            ),
1288        });
1289
1290        assert_eq!(converted_db, expected);
1291    }
1292
1293    #[test]
1294    fn sqlite_table_build_sql_combined() {
1295        let table = SQLiteQueryBuilder::new(
1296            PathBuf::from(":memory:"),
1297            "test".to_string(),
1298            Signals::empty(),
1299        )
1300        .with_select("col1".to_string())
1301        .with_where("col2 = ?".to_string(), vec!["val".to_string()])
1302        .with_order_by("col1".to_string())
1303        .with_limit(5);
1304        assert_eq!(
1305            table.build_sql(),
1306            "SELECT col1 FROM [test] WHERE col2 = ? ORDER BY col1 LIMIT 5"
1307        );
1308    }
1309
1310    #[test]
1311    fn sqlite_table_count_integration() {
1312        use tempfile::NamedTempFile;
1313
1314        let temp_file = NamedTempFile::new().unwrap();
1315        let db_path = temp_file.path().to_path_buf();
1316        let signals = Signals::empty();
1317
1318        // Create a test DB with data
1319        {
1320            let conn = Connection::open(&db_path).unwrap();
1321            conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1322                .unwrap();
1323            for i in 0..10 {
1324                conn.execute(
1325                    "INSERT INTO test (id, name) VALUES (?, ?)",
1326                    rusqlite::params![i, format!("name{}", i)],
1327                )
1328                .unwrap();
1329            }
1330        }
1331
1332        let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals);
1333        let count = table.count(Span::test_data()).unwrap();
1334        assert_eq!(count, 10);
1335    }
1336
1337    #[test]
1338    fn sqlite_table_execute_integration() {
1339        use tempfile::NamedTempFile;
1340
1341        let temp_file = NamedTempFile::new().unwrap();
1342        let db_path = temp_file.path().to_path_buf();
1343        let signals = Signals::empty();
1344
1345        // Create a test DB with data
1346        {
1347            let conn = Connection::open(&db_path).unwrap();
1348            conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1349                .unwrap();
1350            conn.execute("INSERT INTO test (id, name) VALUES (1, 'first')", [])
1351                .unwrap();
1352            conn.execute("INSERT INTO test (id, name) VALUES (2, 'second')", [])
1353                .unwrap();
1354        }
1355
1356        let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals);
1357        let result = table.execute(Span::test_data()).unwrap();
1358        let value = result.into_value(Span::test_data()).unwrap();
1359
1360        if let Value::List { vals, .. } = value {
1361            assert_eq!(vals.len(), 2);
1362        } else {
1363            panic!("Expected list");
1364        }
1365    }
1366
1367    #[test]
1368    fn sqlite_table_first_integration() {
1369        use tempfile::NamedTempFile;
1370
1371        let temp_file = NamedTempFile::new().unwrap();
1372        let db_path = temp_file.path().to_path_buf();
1373        let signals = Signals::empty();
1374
1375        // Create a test DB with data
1376        {
1377            let conn = Connection::open(&db_path).unwrap();
1378            conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1379                .unwrap();
1380            for i in 0..5 {
1381                conn.execute(
1382                    "INSERT INTO test (id, name) VALUES (?, ?)",
1383                    rusqlite::params![i, format!("name{}", i)],
1384                )
1385                .unwrap();
1386            }
1387        }
1388
1389        let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals).with_limit(2);
1390        let result = table.execute(Span::test_data()).unwrap();
1391        let value = result.into_value(Span::test_data()).unwrap();
1392
1393        if let Value::List { vals, .. } = value {
1394            assert_eq!(vals.len(), 2);
1395            // Check first two ids
1396            if let Value::Record { val: record, .. } = &vals[0] {
1397                assert_eq!(record.get("id"), Some(&Value::int(0, Span::test_data())));
1398            }
1399        } else {
1400            panic!("Expected list");
1401        }
1402    }
1403
1404    #[test]
1405    fn sqlite_table_last_integration() {
1406        use tempfile::NamedTempFile;
1407
1408        let temp_file = NamedTempFile::new().unwrap();
1409        let db_path = temp_file.path().to_path_buf();
1410        let signals = Signals::empty();
1411
1412        // Create a test DB with data
1413        {
1414            let conn = Connection::open(&db_path).unwrap();
1415            conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1416                .unwrap();
1417            for i in 0..5 {
1418                conn.execute(
1419                    "INSERT INTO test (id, name) VALUES (?, ?)",
1420                    rusqlite::params![i, format!("name{}", i)],
1421                )
1422                .unwrap();
1423            }
1424        }
1425
1426        let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals)
1427            .with_order_by("rowid DESC".to_string())
1428            .with_limit(2);
1429        let result = table.execute(Span::test_data()).unwrap();
1430        let value = result.into_value(Span::test_data()).unwrap();
1431
1432        if let Value::List { vals, .. } = value {
1433            assert_eq!(vals.len(), 2);
1434            // Check last two ids (since DESC, first in result is highest)
1435            if let Value::Record { val: record, .. } = &vals[0] {
1436                assert_eq!(record.get("id"), Some(&Value::int(4, Span::test_data())));
1437            }
1438        } else {
1439            panic!("Expected list");
1440        }
1441    }
1442
1443    #[test]
1444    fn sqlite_table_build_sql_with_select() {
1445        let table = SQLiteQueryBuilder::new(
1446            PathBuf::from(":memory:"),
1447            "test".to_string(),
1448            Signals::empty(),
1449        )
1450        .with_select("col1, col2".to_string());
1451        assert_eq!(table.build_sql(), "SELECT col1, col2 FROM [test]");
1452    }
1453
1454    #[test]
1455    fn sqlite_table_build_sql_with_where() {
1456        let table = SQLiteQueryBuilder::new(
1457            PathBuf::from(":memory:"),
1458            "test".to_string(),
1459            Signals::empty(),
1460        )
1461        .with_where("col = ?".to_string(), vec!["val".to_string()]);
1462        assert_eq!(table.build_sql(), "SELECT * FROM [test] WHERE col = ?");
1463    }
1464
1465    #[test]
1466    fn sqlite_table_build_sql_with_order_by() {
1467        let table = SQLiteQueryBuilder::new(
1468            PathBuf::from(":memory:"),
1469            "test".to_string(),
1470            Signals::empty(),
1471        )
1472        .with_order_by("id DESC".to_string());
1473        assert_eq!(table.build_sql(), "SELECT * FROM [test] ORDER BY id DESC");
1474    }
1475
1476    #[test]
1477    fn sqlite_table_build_sql_with_limit() {
1478        let table = SQLiteQueryBuilder::new(
1479            PathBuf::from(":memory:"),
1480            "test".to_string(),
1481            Signals::empty(),
1482        )
1483        .with_limit(10);
1484        assert_eq!(table.build_sql(), "SELECT * FROM [test] LIMIT 10");
1485    }
1486
1487    #[test]
1488    fn sqlite_table_execute_with_column_adapters() {
1489        use tempfile::NamedTempFile;
1490
1491        let temp_file = NamedTempFile::new().unwrap();
1492        let db_path = temp_file.path().to_path_buf();
1493        let signals = Signals::empty();
1494
1495        {
1496            let conn = Connection::open(&db_path).unwrap();
1497            conn.execute(
1498                "CREATE TABLE history (start_timestamp INTEGER, duration INTEGER)",
1499                [],
1500            )
1501            .unwrap();
1502            conn.execute(
1503                "INSERT INTO history (start_timestamp, duration) VALUES (1736041045123, 30002)",
1504                [],
1505            )
1506            .unwrap();
1507            conn.execute(
1508                "INSERT INTO history (start_timestamp, duration) VALUES (NULL, NULL)",
1509                [],
1510            )
1511            .unwrap();
1512        }
1513
1514        let table = SQLiteQueryBuilder::new(db_path, "history".to_string(), signals)
1515            .with_select("start_timestamp, duration".to_string())
1516            .with_unix_millis_datetime_column("start_timestamp".to_string())
1517            .with_millis_duration_column("duration".to_string());
1518
1519        let result = table.execute(Span::test_data()).unwrap();
1520        let value = result.into_value(Span::test_data()).unwrap();
1521
1522        if let Value::List { vals, .. } = value {
1523            assert_eq!(vals.len(), 2);
1524
1525            if let Value::Record { val: first, .. } = &vals[0] {
1526                assert!(matches!(
1527                    first.get("start_timestamp"),
1528                    Some(Value::Date { .. })
1529                ));
1530                assert!(matches!(
1531                    first.get("duration"),
1532                    Some(Value::Duration { .. })
1533                ));
1534            } else {
1535                panic!("Expected first row to be a record");
1536            }
1537
1538            if let Value::Record { val: second, .. } = &vals[1] {
1539                assert!(matches!(
1540                    second.get("start_timestamp"),
1541                    Some(Value::Nothing { .. })
1542                ));
1543                assert!(matches!(
1544                    second.get("duration"),
1545                    Some(Value::Nothing { .. })
1546                ));
1547            } else {
1548                panic!("Expected second row to be a record");
1549            }
1550        } else {
1551            panic!("Expected list");
1552        }
1553    }
1554
1555    #[test]
1556    fn sqlite_table_project_output_columns_preserves_aliases() {
1557        let table = SQLiteQueryBuilder::new(
1558            PathBuf::from(":memory:"),
1559            "history".to_string(),
1560            Signals::empty(),
1561        )
1562        .with_select(
1563            "start_timestamp, command_line as command, cwd, duration_ms as duration, exit_status"
1564                .to_string(),
1565        );
1566
1567        let projected = table
1568            .project_output_columns(&["command".to_string(), "duration".to_string()])
1569            .expect("projection should succeed");
1570
1571        assert_eq!(
1572            projected.build_sql(),
1573            "SELECT command_line as command, duration_ms as duration FROM [history]"
1574        );
1575    }
1576
1577    #[test]
1578    fn sqlite_table_project_output_columns_returns_none_for_missing_column() {
1579        let table = SQLiteQueryBuilder::new(
1580            PathBuf::from(":memory:"),
1581            "history".to_string(),
1582            Signals::empty(),
1583        )
1584        .with_select("command_line as command".to_string());
1585
1586        assert!(
1587            table
1588                .project_output_columns(&["missing".to_string()])
1589                .is_none()
1590        );
1591    }
1592}