1use super::definitions::{
2 db_column::DbColumn, db_constraint::DbConstraint, db_foreignkey::DbForeignKey,
3 db_index::DbIndex, db_table::DbTable,
4};
5use nu_protocol::{
6 CustomValue, IntoPipelineData, PipelineData, Record, ShellError, Signals, Span, Spanned, Value,
7 ast, casing::Casing, engine::EngineState, shell_error::generic::GenericError,
8 shell_error::io::IoError,
9};
10use rusqlite::{
11 Connection, Error as SqliteError, OpenFlags, Row, Statement, ToSql, types::ValueRef,
12};
13use serde::{Deserialize, Serialize};
14use std::{
15 collections::BTreeMap,
16 fmt::Write,
17 fs::File,
18 io::Read,
19 path::{Path, PathBuf},
20};
21
22const SQLITE_MAGIC_BYTES: &[u8] = "SQLite format 3\0".as_bytes();
23pub const MEMORY_DB: &str = "file:memdb1?mode=memory&cache=shared";
24const DATABASE_NAME: &str = "main";
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SQLiteDatabase {
28 pub path: PathBuf,
32 #[serde(skip, default = "Signals::empty")]
33 signals: Signals,
36}
37
38impl SQLiteDatabase {
39 pub fn new(path: &Path, signals: Signals) -> Self {
40 Self {
41 path: PathBuf::from(path),
42 signals,
43 }
44 }
45
46 pub fn try_from_path(path: &Path, span: Span, signals: Signals) -> Result<Self, ShellError> {
47 let mut file = File::open(path).map_err(|e| IoError::new(e, span, PathBuf::from(path)))?;
48
49 let mut buf: [u8; 16] = [0; 16];
50 file.read_exact(&mut buf)
51 .map_err(|e| ShellError::Io(IoError::new(e, span, PathBuf::from(path))))
52 .and_then(|_| {
53 if buf == SQLITE_MAGIC_BYTES {
54 Ok(SQLiteDatabase::new(path, signals))
55 } else {
56 Err(ShellError::Generic(GenericError::new(
57 "Not a SQLite file",
58 format!("Could not read '{}' as SQLite file", path.display()),
59 span,
60 )))
61 }
62 })
63 }
64
65 pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
66 let span = value.span();
67 match value {
68 Value::Custom { val, .. } => match val.as_any().downcast_ref::<Self>() {
69 Some(db) => Ok(Self {
70 path: db.path.clone(),
71 signals: db.signals.clone(),
72 }),
73 None => Err(ShellError::CantConvert {
74 to_type: "database".into(),
75 from_type: "non-database".into(),
76 span,
77 help: None,
78 }),
79 },
80 x => Err(ShellError::CantConvert {
81 to_type: "database".into(),
82 from_type: x.get_type().to_string(),
83 span: x.span(),
84 help: None,
85 }),
86 }
87 }
88
89 pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
90 let value = input.into_value(span)?;
91 Self::try_from_value(value)
92 }
93
94 pub fn into_value(self, span: Span) -> Value {
95 let db = Box::new(self);
96 Value::custom(db, span)
97 }
98
99 pub fn query(
100 &self,
101 sql: &Spanned<String>,
102 params: NuSqlParams,
103 call_span: Span,
104 ) -> Result<Value, ShellError> {
105 let conn = open_sqlite_db(&self.path, call_span)?;
106 let stream = run_sql_query(conn, sql, params, &self.signals, None)
107 .map_err(|e| e.into_shell_error(sql.span, "Failed to query SQLite database"))?;
108
109 Ok(stream)
110 }
111
112 pub fn open_connection(&self) -> Result<Connection, ShellError> {
113 if self.path.to_string_lossy() == MEMORY_DB {
114 open_connection_in_memory_custom()
115 } else {
116 let conn = Connection::open(&self.path).map_err(|e| {
117 ShellError::Generic(GenericError::new_internal(
118 "Failed to open SQLite database from open_connection",
119 e.to_string(),
120 ))
121 })?;
122 conn.busy_handler(Some(SQLiteDatabase::sleeper))
123 .map_err(|e| {
124 ShellError::Generic(GenericError::new_internal(
125 "Failed to set busy handler for SQLite database",
126 e.to_string(),
127 ))
128 })?;
129 Ok(conn)
130 }
131 }
132
133 fn sleeper(attempts: i32) -> bool {
134 log::warn!("SQLITE_BUSY, retrying after 250ms (attempt {attempts})");
135 std::thread::sleep(std::time::Duration::from_millis(250));
136 true
137 }
138
139 pub fn get_tables(&self, conn: &Connection) -> Result<Vec<DbTable>, SqliteError> {
140 let mut table_names =
141 conn.prepare("SELECT name FROM sqlite_master WHERE type = 'table'")?;
142 let rows = table_names.query_map([], |row| row.get(0))?;
143 let mut tables = Vec::new();
144
145 for row in rows {
146 let table_name: String = row?;
147 tables.push(DbTable {
148 name: table_name,
149 create_time: None,
150 update_time: None,
151 engine: None,
152 schema: None,
153 })
154 }
155
156 Ok(tables.into_iter().collect())
157 }
158
159 pub fn drop_all_tables(&self, conn: &Connection) -> Result<(), SqliteError> {
160 let tables = self.get_tables(conn)?;
161
162 for table in tables {
163 conn.execute(&format!("DROP TABLE {}", table.name), [])?;
164 }
165
166 Ok(())
167 }
168
169 pub fn export_in_memory_database_to_file(
170 &self,
171 conn: &Connection,
172 filename: String,
173 ) -> Result<(), SqliteError> {
174 conn.execute(&format!("vacuum main into '{filename}'"), [])?;
176
177 Ok(())
178 }
179
180 pub fn backup_database_to_file(
181 &self,
182 conn: &Connection,
183 filename: String,
184 ) -> Result<(), SqliteError> {
185 conn.backup(DATABASE_NAME, Path::new(&filename), None)?;
186 Ok(())
187 }
188
189 pub fn restore_database_from_file(
190 &self,
191 conn: &mut Connection,
192 filename: String,
193 ) -> Result<(), SqliteError> {
194 conn.restore(
195 DATABASE_NAME,
196 Path::new(&filename),
197 Some(|p: rusqlite::backup::Progress| {
198 let percent = if p.pagecount == 0 {
199 100
200 } else {
201 (p.pagecount - p.remaining) * 100 / p.pagecount
202 };
203 if percent % 10 == 0 {
204 log::trace!("Restoring: {percent} %");
205 }
206 }),
207 )?;
208 Ok(())
209 }
210
211 fn get_column_info(&self, row: &Row) -> Result<DbColumn, SqliteError> {
212 let dbc = DbColumn {
213 cid: row.get("cid")?,
214 name: row.get("name")?,
215 r#type: row.get("type")?,
216 notnull: row.get("notnull")?,
217 default: row.get("dflt_value")?,
218 pk: row.get("pk")?,
219 };
220 Ok(dbc)
221 }
222
223 pub fn get_columns(
224 &self,
225 conn: &Connection,
226 table: &DbTable,
227 ) -> Result<Vec<DbColumn>, SqliteError> {
228 let mut column_names = conn.prepare(&format!(
229 "SELECT * FROM pragma_table_info('{}');",
230 table.name
231 ))?;
232
233 let mut columns: Vec<DbColumn> = Vec::new();
234 let rows = column_names.query_and_then([], |row| self.get_column_info(row))?;
235
236 for row in rows {
237 columns.push(row?);
238 }
239
240 Ok(columns)
241 }
242
243 fn get_constraint_info(&self, row: &Row) -> Result<DbConstraint, SqliteError> {
244 let dbc = DbConstraint {
245 name: row.get("index_name")?,
246 column_name: row.get("column_name")?,
247 origin: row.get("origin")?,
248 };
249 Ok(dbc)
250 }
251
252 pub fn get_constraints(
253 &self,
254 conn: &Connection,
255 table: &DbTable,
256 ) -> Result<Vec<DbConstraint>, SqliteError> {
257 let mut column_names = conn.prepare(&format!(
258 "
259 SELECT
260 p.origin,
261 s.name AS index_name,
262 i.name AS column_name
263 FROM
264 sqlite_master s
265 JOIN pragma_index_list(s.tbl_name) p ON s.name = p.name,
266 pragma_index_info(s.name) i
267 WHERE
268 s.type = 'index'
269 AND tbl_name = '{}'
270 AND NOT p.origin = 'c'
271 ",
272 &table.name
273 ))?;
274
275 let mut constraints: Vec<DbConstraint> = Vec::new();
276 let rows = column_names.query_and_then([], |row| self.get_constraint_info(row))?;
277
278 for row in rows {
279 constraints.push(row?);
280 }
281
282 Ok(constraints)
283 }
284
285 fn get_foreign_keys_info(&self, row: &Row) -> Result<DbForeignKey, SqliteError> {
286 let dbc = DbForeignKey {
287 column_name: row.get("from")?,
288 ref_table: row.get("table")?,
289 ref_column: row.get("to")?,
290 };
291 Ok(dbc)
292 }
293
294 pub fn get_foreign_keys(
295 &self,
296 conn: &Connection,
297 table: &DbTable,
298 ) -> Result<Vec<DbForeignKey>, SqliteError> {
299 let mut column_names = conn.prepare(&format!(
300 "SELECT p.`from`, p.`to`, p.`table` FROM pragma_foreign_key_list('{}') p",
301 &table.name
302 ))?;
303
304 let mut foreign_keys: Vec<DbForeignKey> = Vec::new();
305 let rows = column_names.query_and_then([], |row| self.get_foreign_keys_info(row))?;
306
307 for row in rows {
308 foreign_keys.push(row?);
309 }
310
311 Ok(foreign_keys)
312 }
313
314 fn get_index_info(&self, row: &Row) -> Result<DbIndex, SqliteError> {
315 let dbc = DbIndex {
316 name: row.get("index_name")?,
317 column_name: row.get("name")?,
318 seqno: row.get("seqno")?,
319 };
320 Ok(dbc)
321 }
322
323 pub fn get_indexes(
324 &self,
325 conn: &Connection,
326 table: &DbTable,
327 ) -> Result<Vec<DbIndex>, SqliteError> {
328 let mut column_names = conn.prepare(&format!(
329 "
330 SELECT
331 m.name AS index_name,
332 p.*
333 FROM
334 sqlite_master m,
335 pragma_index_info(m.name) p
336 WHERE
337 m.type = 'index'
338 AND m.tbl_name = '{}'
339 ",
340 &table.name,
341 ))?;
342
343 let mut indexes: Vec<DbIndex> = Vec::new();
344 let rows = column_names.query_and_then([], |row| self.get_index_info(row))?;
345
346 for row in rows {
347 indexes.push(row?);
348 }
349
350 Ok(indexes)
351 }
352}
353
354impl CustomValue for SQLiteDatabase {
355 fn clone_value(&self, span: Span) -> Value {
356 Value::custom(Box::new(self.clone()), span)
357 }
358
359 fn type_name(&self) -> String {
360 self.typetag_name().to_string()
361 }
362
363 fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
364 let db = open_sqlite_db(&self.path, span)?;
365 read_entire_sqlite_db(db, span, &self.signals)
366 .map_err(|e| e.into_shell_error(span, "Failed to read from SQLite database"))
367 }
368
369 fn as_any(&self) -> &dyn std::any::Any {
370 self
371 }
372
373 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
374 self
375 }
376
377 fn follow_path_int(
378 &self,
379 _self_span: Span,
380 _index: usize,
381 path_span: Span,
382 _optional: bool,
383 ) -> Result<Value, ShellError> {
384 Err(ShellError::IncompatiblePathAccess { type_name: "SQLite databases do not support integer-indexed access. Try specifying a table name instead".into(), span: path_span })
386 }
387
388 fn follow_path_string(
389 &self,
390 _self_span: Span,
391 column_name: String,
392 path_span: Span,
393 _optional: bool,
394 _casing: Casing,
395 ) -> Result<Value, ShellError> {
396 let table = SQLiteQueryBuilder::new(self.path.clone(), column_name, self.signals.clone());
398 Ok(Value::custom(Box::new(table), path_span))
399 }
400
401 fn typetag_name(&self) -> &'static str {
402 "SQLiteDatabase"
403 }
404
405 fn typetag_deserialize(&self) {
406 unimplemented!("typetag_deserialize")
407 }
408}
409
410pub fn open_sqlite_db(path: &Path, call_span: Span) -> Result<Connection, ShellError> {
411 if path.to_string_lossy() == MEMORY_DB {
412 open_connection_in_memory_custom()
413 } else {
414 let path = path.to_string_lossy().to_string();
415 Connection::open(path).map_err(|err| {
416 ShellError::Generic(GenericError::new(
417 "Failed to open SQLite database",
418 err.to_string(),
419 call_span,
420 ))
421 })
422 }
423}
424
425fn run_sql_query(
426 conn: Connection,
427 sql: &Spanned<String>,
428 params: NuSqlParams,
429 signals: &Signals,
430 column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
431) -> Result<Value, SqliteOrShellError> {
432 let stmt = conn.prepare(&sql.item)?;
433 prepared_statement_to_nu_list(stmt, params, sql.span, signals, column_adapters)
434}
435
436pub fn value_to_sql(
438 engine_state: &EngineState,
439 value: Value,
440 call_span: Span,
441) -> Result<Box<dyn rusqlite::ToSql>, ShellError> {
442 match value {
443 Value::Bool { val, .. } => Ok(Box::new(val)),
444 Value::Int { val, .. } => Ok(Box::new(val)),
445 Value::Float { val, .. } => Ok(Box::new(val)),
446 Value::Filesize { val, .. } => Ok(Box::new(val.get())),
447 Value::Duration { val, .. } => Ok(Box::new(val)),
448 Value::Date { val, .. } => Ok(Box::new(val)),
449 Value::String { val, .. } => Ok(Box::new(val)),
450 Value::Binary { val, .. } => Ok(Box::new(val)),
451 Value::Nothing { .. } => Ok(Box::new(rusqlite::types::Null)),
452 val => {
453 let span = val.span();
454 let ty = val.get_type();
455 let json_value = crate::value_to_json_value(engine_state, val, call_span, false)?;
456 match nu_json::to_string_raw(&json_value) {
457 Ok(s) => Ok(Box::new(s)),
458 Err(err) => Err(ShellError::CantConvert {
459 to_type: "JSON".into(),
460 from_type: ty.to_string(),
461 span,
462 help: Some(err.to_string()),
463 }),
464 }
465 }
466 }
467}
468
469pub fn values_to_sql(
470 engine_state: &EngineState,
471 values: impl IntoIterator<Item = Value>,
472 call_span: Span,
473) -> Result<Vec<Box<dyn rusqlite::ToSql>>, ShellError> {
474 values
475 .into_iter()
476 .map(|v| value_to_sql(engine_state, v, call_span))
477 .collect::<Result<Vec<_>, _>>()
478}
479
480pub enum NuSqlParams {
481 List(Vec<Box<dyn ToSql>>),
482 Named(Vec<(String, Box<dyn ToSql>)>),
483}
484
485impl Default for NuSqlParams {
486 fn default() -> Self {
487 NuSqlParams::List(Vec::new())
488 }
489}
490
491pub fn nu_value_to_params(
492 engine_state: &EngineState,
493 value: Value,
494 call_span: Span,
495) -> Result<NuSqlParams, ShellError> {
496 match value {
497 Value::Record { val, .. } => {
498 let mut params = Vec::with_capacity(val.len());
499
500 for (mut column, value) in val.into_owned().into_iter() {
501 let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
502
503 if !column.starts_with([':', '@', '$']) {
504 column.insert(0, ':');
505 }
506
507 params.push((column, sql_type_erased));
508 }
509
510 Ok(NuSqlParams::Named(params))
511 }
512 Value::List { vals, .. } => {
513 let mut params = Vec::with_capacity(vals.len());
514
515 for value in vals.into_iter() {
516 let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
517
518 params.push(sql_type_erased);
519 }
520
521 Ok(NuSqlParams::List(params))
522 }
523
524 Value::Nothing { .. } => Ok(NuSqlParams::default()),
526
527 _ => Err(ShellError::TypeMismatch {
528 err_message: "Invalid parameters value: expected record or list".to_string(),
529 span: value.span(),
530 }),
531 }
532}
533
534#[derive(Debug)]
535enum SqliteOrShellError {
536 SqliteError(SqliteError),
537 ShellError(ShellError),
538}
539
540impl From<SqliteError> for SqliteOrShellError {
541 fn from(error: SqliteError) -> Self {
542 Self::SqliteError(error)
543 }
544}
545
546impl From<ShellError> for SqliteOrShellError {
547 fn from(error: ShellError) -> Self {
548 Self::ShellError(error)
549 }
550}
551
552impl SqliteOrShellError {
553 fn into_shell_error(self, span: Span, msg: &str) -> ShellError {
554 match self {
555 Self::SqliteError(err) => {
556 ShellError::Generic(GenericError::new(msg.to_string(), err.to_string(), span))
557 }
558 Self::ShellError(err) => err,
559 }
560 }
561}
562
563#[derive(Clone, Copy)]
565pub enum DeclType {
566 Json,
567 Jsonb,
568}
569
570impl DeclType {
571 pub fn from_str(s: &str) -> Option<Self> {
572 match s.to_uppercase().as_str() {
573 "JSON" => Some(DeclType::Json),
574 "JSONB" => Some(DeclType::Jsonb),
575 _ => None, }
577 }
578}
579
580pub struct TypedColumn {
582 pub name: String,
583 pub decl_type: Option<DeclType>,
584}
585
586impl TypedColumn {
587 pub fn from_rusqlite_column(c: &rusqlite::Column) -> Self {
588 Self {
589 name: c.name().to_owned(),
590 decl_type: c.decl_type().and_then(DeclType::from_str),
591 }
592 }
593}
594
595fn prepared_statement_to_nu_list(
596 mut stmt: Statement,
597 params: NuSqlParams,
598 call_span: Span,
599 signals: &Signals,
600 column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
601) -> Result<Value, SqliteOrShellError> {
602 let columns: Vec<TypedColumn> = stmt
603 .columns()
604 .iter()
605 .map(TypedColumn::from_rusqlite_column)
606 .collect();
607
608 fn collect_row_values(
609 row_results: impl IntoIterator<Item = Result<Value, SqliteError>>,
610 signals: &Signals,
611 call_span: Span,
612 ) -> Result<Vec<Value>, SqliteOrShellError> {
613 let mut row_values = vec![];
614
615 for row_result in row_results {
616 signals.check(&call_span)?;
617 if let Ok(row_value) = row_result {
618 row_values.push(row_value);
619 }
620 }
621
622 Ok(row_values)
623 }
624
625 let row_values = match params {
629 NuSqlParams::List(params) => {
630 let refs: Vec<&dyn ToSql> = params.iter().map(|value| &**value).collect();
631
632 let row_results = stmt.query_map(refs.as_slice(), |row| {
633 Ok(convert_sqlite_row_to_nu_value(
634 row,
635 call_span,
636 &columns,
637 column_adapters,
638 ))
639 })?;
640
641 collect_row_values(row_results, signals, call_span)?
642 }
643 NuSqlParams::Named(pairs) => {
644 let refs: Vec<_> = pairs
645 .iter()
646 .map(|(column, value)| (column.as_str(), &**value))
647 .collect();
648
649 let row_results = stmt.query_map(refs.as_slice(), |row| {
650 Ok(convert_sqlite_row_to_nu_value(
651 row,
652 call_span,
653 &columns,
654 column_adapters,
655 ))
656 })?;
657
658 collect_row_values(row_results, signals, call_span)?
659 }
660 };
661
662 Ok(Value::list(row_values, call_span))
663}
664
665fn read_entire_sqlite_db(
666 conn: Connection,
667 call_span: Span,
668 signals: &Signals,
669) -> Result<Value, SqliteOrShellError> {
670 let mut tables = Record::new();
671
672 let mut get_table_names =
673 conn.prepare("SELECT name FROM sqlite_master WHERE type = 'table'")?;
674 let rows = get_table_names.query_map([], |row| row.get(0))?;
675
676 for row in rows {
677 let table_name: String = row?;
678 let table_stmt = conn.prepare(&format!("select * from [{table_name}]"))?;
680 let rows = prepared_statement_to_nu_list(
681 table_stmt,
682 NuSqlParams::default(),
683 call_span,
684 signals,
685 None,
686 )?;
687 tables.push(table_name, rows);
688 }
689
690 Ok(Value::record(tables, call_span))
691}
692
693pub fn convert_sqlite_row_to_nu_value(
694 row: &Row,
695 span: Span,
696 columns: &[TypedColumn],
697 column_adapters: Option<&BTreeMap<String, SQLiteColumnAdapter>>,
698) -> Value {
699 let record = columns
700 .iter()
701 .enumerate()
702 .map(|(i, col)| {
703 let adapter = column_adapters
704 .and_then(|adapters| adapters.get(&col.name))
705 .copied();
706 (
707 col.name.clone(),
708 convert_sqlite_value_to_nu_value_with_adapter(
709 row.get_ref_unwrap(i),
710 col.decl_type,
711 adapter,
712 span,
713 ),
714 )
715 })
716 .collect();
717
718 Value::record(record, span)
719}
720
721#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
722pub enum SQLiteColumnAdapter {
723 UnixMillisToDate,
725 MillisToDuration,
727}
728
729fn convert_sqlite_value_to_nu_value_with_adapter(
730 value: ValueRef,
731 decl_type: Option<DeclType>,
732 adapter: Option<SQLiteColumnAdapter>,
733 span: Span,
734) -> Value {
735 match adapter {
736 Some(SQLiteColumnAdapter::UnixMillisToDate) => match value {
737 ValueRef::Integer(i) => chrono::DateTime::from_timestamp_millis(i)
738 .map(|datetime| Value::date(datetime.into(), span))
739 .unwrap_or_else(|| Value::int(i, span)),
740 _ => convert_sqlite_value_to_nu_value(value, decl_type, span),
741 },
742 Some(SQLiteColumnAdapter::MillisToDuration) => match value {
743 ValueRef::Integer(i) => i
744 .checked_mul(1_000_000)
745 .map(|nanos| Value::duration(nanos, span))
746 .unwrap_or_else(|| Value::int(i, span)),
747 _ => convert_sqlite_value_to_nu_value(value, decl_type, span),
748 },
749 None => convert_sqlite_value_to_nu_value(value, decl_type, span),
750 }
751}
752
753pub fn convert_sqlite_value_to_nu_value(
754 value: ValueRef,
755 decl_type: Option<DeclType>,
756 span: Span,
757) -> Value {
758 match value {
759 ValueRef::Null => Value::nothing(span),
760 ValueRef::Integer(i) => Value::int(i, span),
761 ValueRef::Real(f) => Value::float(f, span),
762 ValueRef::Text(buf) => match (std::str::from_utf8(buf), decl_type) {
763 (Ok(txt), Some(DeclType::Json | DeclType::Jsonb)) => {
764 match crate::try_json_str_to_value(txt, span, false) {
765 Ok(val) => val,
766 Err(err) => Value::error(err, span),
767 }
768 }
769 (Ok(txt), _) => Value::string(txt.to_string(), span),
770 (Err(_), _) => Value::error(ShellError::NonUtf8 { span }, span),
771 },
772 ValueRef::Blob(u) => Value::binary(u.to_vec(), span),
773 }
774}
775
776pub fn open_connection_in_memory_custom() -> Result<Connection, ShellError> {
777 let flags = OpenFlags::default();
778 let conn = Connection::open_with_flags(MEMORY_DB, flags).map_err(|e| {
779 ShellError::Generic(GenericError::new(
780 "Failed to open SQLite custom connection in memory",
781 e.to_string(),
782 Span::test_data(),
783 ))
784 })?;
785 conn.busy_handler(Some(SQLiteDatabase::sleeper))
786 .map_err(|e| {
787 ShellError::Generic(GenericError::new(
788 "Failed to set busy handler for SQLite custom connection in memory",
789 e.to_string(),
790 Span::test_data(),
791 ))
792 })?;
793 Ok(conn)
794}
795
796pub fn open_connection_in_memory() -> Result<Connection, ShellError> {
797 Connection::open_in_memory().map_err(|e| {
798 ShellError::Generic(GenericError::new(
799 "Failed to open SQLite standard connection in memory",
800 e.to_string(),
801 Span::test_data(),
802 ))
803 })
804}
805
806#[derive(Debug, Clone, Serialize, Deserialize)]
809pub struct SQLiteQueryBuilder {
810 pub db_path: PathBuf,
811 pub table_name: String,
812 pub sql_select: Option<String>, pub sql_where: Option<String>, pub sql_params: Vec<String>, pub sql_order_by: Option<String>, pub sql_limit: Option<i64>,
817 #[serde(default)]
818 pub column_adapters: BTreeMap<String, SQLiteColumnAdapter>,
819 #[serde(skip, default = "Signals::empty")]
820 signals: Signals,
821}
822
823impl SQLiteQueryBuilder {
824 pub fn new(db_path: PathBuf, table_name: String, signals: Signals) -> Self {
825 Self {
826 db_path,
827 table_name,
828 sql_select: None,
829 sql_where: None,
830 sql_params: Vec::new(),
831 sql_order_by: None,
832 sql_limit: None,
833 column_adapters: BTreeMap::new(),
834 signals,
835 }
836 }
837
838 pub fn with_select(mut self, select: String) -> Self {
839 self.sql_select = Some(select);
840 self
841 }
842
843 pub fn with_where(mut self, where_clause: String, params: Vec<String>) -> Self {
844 self.sql_where = Some(where_clause);
845 self.sql_params = params;
846 self
847 }
848
849 pub fn with_order_by(mut self, order_by: String) -> Self {
850 self.sql_order_by = Some(order_by);
851 self
852 }
853
854 pub fn with_limit(mut self, limit: i64) -> Self {
855 self.sql_limit = Some(limit);
856 self
857 }
858
859 pub fn with_column_adapter(
860 mut self,
861 column_name: String,
862 adapter: SQLiteColumnAdapter,
863 ) -> Self {
864 self.column_adapters.insert(column_name, adapter);
865 self
866 }
867
868 pub fn with_unix_millis_datetime_column(self, column_name: String) -> Self {
870 self.with_column_adapter(column_name, SQLiteColumnAdapter::UnixMillisToDate)
871 }
872
873 pub fn with_millis_duration_column(self, column_name: String) -> Self {
875 self.with_column_adapter(column_name, SQLiteColumnAdapter::MillisToDuration)
876 }
877
878 pub fn project_output_columns(&self, columns: &[String]) -> Option<Self> {
895 if columns.is_empty() {
896 return Some(self.clone());
897 }
898
899 let new_select = if let Some(select) = &self.sql_select {
900 let current = parse_sql_select_projection(select)?;
903 let mut projected = Vec::with_capacity(columns.len());
904
905 for requested in columns {
906 let expression = current.iter().find_map(|(output_name, expression)| {
908 output_name
909 .eq_ignore_ascii_case(requested)
910 .then_some(expression)
911 })?;
912 projected.push(expression.clone());
913 }
914
915 projected.join(", ")
916 } else {
917 columns.join(", ")
918 };
919
920 Some(self.clone().with_select(new_select))
921 }
922
923 pub fn build_sql(&self) -> String {
924 let select = self.sql_select.as_deref().unwrap_or("*");
925 let mut sql = format!("SELECT {} FROM [{}]", select, self.table_name);
926
927 if let Some(where_clause) = &self.sql_where {
928 write!(sql, " WHERE {}", where_clause).expect("writing to a String is infallible");
929 }
930
931 if let Some(order_by) = &self.sql_order_by {
932 write!(sql, " ORDER BY {}", order_by).expect("writing to a String is infallible");
933 }
934
935 if let Some(limit) = self.sql_limit {
936 write!(sql, " LIMIT {}", limit).expect("writing to a String is infallible");
937 }
938
939 sql
940 }
941
942 pub fn execute(&self, call_span: Span) -> Result<PipelineData, ShellError> {
943 let conn = open_sqlite_db(&self.db_path, call_span)?;
944 let sql = self.build_sql();
945 let params = NuSqlParams::List(Vec::new()); let query = Spanned {
947 item: sql,
948 span: call_span,
949 };
950 run_sql_query(
951 conn,
952 &query,
953 params,
954 &self.signals,
955 (!self.column_adapters.is_empty()).then_some(&self.column_adapters),
956 )
957 .map(IntoPipelineData::into_pipeline_data)
958 .map_err(|e| e.into_shell_error(call_span, "Failed to execute query"))
959 }
960
961 pub fn count(&self, call_span: Span) -> Result<i64, ShellError> {
962 let conn = open_sqlite_db(&self.db_path, call_span)?;
963 let mut sql = format!("SELECT COUNT(*) FROM [{}]", self.table_name);
964 if let Some(where_clause) = &self.sql_where {
965 write!(sql, " WHERE {}", where_clause).expect("writing to a String is infallible");
966 }
967 let mut stmt = conn.prepare(&sql).map_err(|e| {
968 ShellError::Generic(GenericError::new(
969 "Failed to prepare count query",
970 e.to_string(),
971 call_span,
972 ))
973 })?;
974 let params: Vec<Box<dyn ToSql>> = self
975 .sql_params
976 .iter()
977 .map(|s| Box::new(s.clone()) as Box<dyn ToSql>)
978 .collect();
979 let count: i64 = stmt
980 .query_row(rusqlite::params_from_iter(params), |row| row.get(0))
981 .map_err(|e| {
982 ShellError::Generic(GenericError::new(
983 "Failed to execute count query",
984 e.to_string(),
985 call_span,
986 ))
987 })?;
988 Ok(count)
989 }
990}
991
992fn parse_sql_select_projection(select: &str) -> Option<Vec<(String, String)>> {
1002 let projection = split_select_expressions(select)
1003 .into_iter()
1004 .map(|expr| parse_projection_expression(&expr))
1005 .collect::<Option<Vec<_>>>()?;
1006
1007 (!projection.is_empty()).then_some(projection)
1008}
1009
1010fn split_select_expressions(select: &str) -> Vec<String> {
1018 let mut expressions = Vec::new();
1019 let mut current = String::new();
1020 let mut depth = 0usize;
1021 let mut quote = None;
1022
1023 for ch in select.chars() {
1024 match ch {
1025 '\'' | '"' => {
1026 if quote == Some(ch) {
1028 quote = None;
1029 } else if quote.is_none() {
1030 quote = Some(ch);
1031 }
1032 current.push(ch);
1033 }
1034 '(' if quote.is_none() => {
1035 depth = depth.saturating_add(1);
1037 current.push(ch);
1038 }
1039 ')' if quote.is_none() => {
1040 depth = depth.saturating_sub(1);
1041 current.push(ch);
1042 }
1043 ',' if quote.is_none() && depth == 0 => {
1044 let trimmed = current.trim();
1046 if !trimmed.is_empty() {
1047 expressions.push(trimmed.to_string());
1048 }
1049 current.clear();
1050 }
1051 _ => current.push(ch),
1052 }
1053 }
1054
1055 let trimmed = current.trim();
1056 if !trimmed.is_empty() {
1057 expressions.push(trimmed.to_string());
1058 }
1059
1060 expressions
1061}
1062
1063fn parse_projection_expression(expr: &str) -> Option<(String, String)> {
1073 let trimmed = expr.trim();
1074 if trimmed.is_empty() {
1075 return None;
1076 }
1077
1078 if let Some((_lhs, rhs)) = split_alias(trimmed) {
1079 let alias = normalize_identifier(rhs.trim());
1081 if alias.is_empty() {
1082 return None;
1083 }
1084 return Some((alias, trimmed.to_string()));
1085 }
1086
1087 let output_name = normalize_identifier(last_identifier_segment(trimmed));
1088 if output_name.is_empty() {
1089 return None;
1090 }
1091
1092 Some((output_name, trimmed.to_string()))
1093}
1094
1095fn split_alias(expr: &str) -> Option<(&str, &str)> {
1102 let bytes = expr.as_bytes();
1103 for idx in 0..bytes.len().saturating_sub(2) {
1104 if idx > 0
1105 && bytes[idx - 1].is_ascii_whitespace()
1106 && bytes[idx + 2].is_ascii_whitespace()
1107 && bytes[idx].eq_ignore_ascii_case(&b'a')
1108 && bytes[idx + 1].eq_ignore_ascii_case(&b's')
1109 {
1110 let lhs = expr[..idx].trim_end();
1113 let rhs = expr[idx + 2..].trim_start();
1114 if !lhs.is_empty() && !rhs.is_empty() {
1115 return Some((lhs, rhs));
1116 }
1117 }
1118 }
1119
1120 None
1121}
1122
1123fn last_identifier_segment(expr: &str) -> &str {
1124 expr.rsplit('.').next().unwrap_or(expr)
1125}
1126
1127fn normalize_identifier(identifier: &str) -> String {
1133 let trimmed = identifier.trim();
1134 if trimmed.len() >= 2 {
1135 let first = trimmed.as_bytes()[0] as char;
1136 let last = trimmed.as_bytes()[trimmed.len() - 1] as char;
1137 let is_wrapped = matches!((first, last), ('"', '"') | ('`', '`') | ('[', ']'));
1138 if is_wrapped {
1139 return trimmed[1..trimmed.len() - 1].trim().to_string();
1140 }
1141 }
1142
1143 trimmed.to_string()
1144}
1145
1146impl CustomValue for SQLiteQueryBuilder {
1147 fn clone_value(&self, span: Span) -> Value {
1148 Value::custom(Box::new(self.clone()), span)
1149 }
1150
1151 fn type_name(&self) -> String {
1152 "SQLiteQueryBuilder".to_string()
1153 }
1154
1155 fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
1156 self.execute(span).and_then(|pd| pd.into_value(span))
1157 }
1158
1159 fn as_any(&self) -> &dyn std::any::Any {
1160 self
1161 }
1162
1163 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1164 self
1165 }
1166
1167 fn follow_path_int(
1168 &self,
1169 _self_span: Span,
1170 index: usize,
1171 path_span: Span,
1172 optional: bool,
1173 ) -> Result<Value, ShellError> {
1174 let data = self.to_base_value(path_span)?;
1176 data.follow_cell_path(&[ast::PathMember::Int {
1177 val: index,
1178 span: path_span,
1179 optional,
1180 }])
1181 .map(|v| v.into_owned())
1182 }
1183
1184 fn follow_path_string(
1185 &self,
1186 _self_span: Span,
1187 column_name: String,
1188 path_span: Span,
1189 _optional: bool,
1190 _: Casing,
1191 ) -> Result<Value, ShellError> {
1192 let data = self.to_base_value(path_span)?;
1194 data.follow_cell_path(&[ast::PathMember::String {
1195 val: column_name,
1196 span: path_span,
1197 optional: false,
1198 casing: Casing::default(),
1199 }])
1200 .map(|v| v.into_owned())
1201 }
1202
1203 fn typetag_name(&self) -> &'static str {
1204 "SQLiteQueryBuilder"
1205 }
1206
1207 fn typetag_deserialize(&self) {
1208 unimplemented!("typetag_deserialize")
1209 }
1210
1211 fn is_iterable(&self) -> bool {
1212 true
1213 }
1214}
1215
1216#[cfg(test)]
1217mod test {
1218 use super::*;
1219 use nu_protocol::record;
1220
1221 #[test]
1222 fn can_read_empty_db() {
1223 let db = open_connection_in_memory().unwrap();
1224 let converted_db = read_entire_sqlite_db(db, Span::test_data(), &Signals::empty()).unwrap();
1225
1226 let expected = Value::test_record(Record::new());
1227
1228 assert_eq!(converted_db, expected);
1229 }
1230
1231 #[test]
1232 fn can_read_empty_table() {
1233 let db = open_connection_in_memory().unwrap();
1234
1235 db.execute(
1236 "CREATE TABLE person (
1237 id INTEGER PRIMARY KEY,
1238 name TEXT NOT NULL,
1239 data BLOB
1240 )",
1241 [],
1242 )
1243 .unwrap();
1244 let converted_db = read_entire_sqlite_db(db, Span::test_data(), &Signals::empty()).unwrap();
1245
1246 let expected = Value::test_record(record! {
1247 "person" => Value::test_list(vec![]),
1248 });
1249
1250 assert_eq!(converted_db, expected);
1251 }
1252
1253 #[test]
1254 fn can_read_null_and_non_null_data() {
1255 let span = Span::test_data();
1256 let db = open_connection_in_memory().unwrap();
1257
1258 db.execute(
1259 "CREATE TABLE item (
1260 id INTEGER PRIMARY KEY,
1261 name TEXT
1262 )",
1263 [],
1264 )
1265 .unwrap();
1266
1267 db.execute("INSERT INTO item (id, name) VALUES (123, NULL)", [])
1268 .unwrap();
1269
1270 db.execute("INSERT INTO item (id, name) VALUES (456, 'foo bar')", [])
1271 .unwrap();
1272
1273 let converted_db = read_entire_sqlite_db(db, span, &Signals::empty()).unwrap();
1274
1275 let expected = Value::test_record(record! {
1276 "item" => Value::test_list(
1277 vec![
1278 Value::test_record(record! {
1279 "id" => Value::test_int(123),
1280 "name" => Value::nothing(span),
1281 }),
1282 Value::test_record(record! {
1283 "id" => Value::test_int(456),
1284 "name" => Value::test_string("foo bar"),
1285 }),
1286 ]
1287 ),
1288 });
1289
1290 assert_eq!(converted_db, expected);
1291 }
1292
1293 #[test]
1294 fn sqlite_table_build_sql_combined() {
1295 let table = SQLiteQueryBuilder::new(
1296 PathBuf::from(":memory:"),
1297 "test".to_string(),
1298 Signals::empty(),
1299 )
1300 .with_select("col1".to_string())
1301 .with_where("col2 = ?".to_string(), vec!["val".to_string()])
1302 .with_order_by("col1".to_string())
1303 .with_limit(5);
1304 assert_eq!(
1305 table.build_sql(),
1306 "SELECT col1 FROM [test] WHERE col2 = ? ORDER BY col1 LIMIT 5"
1307 );
1308 }
1309
1310 #[test]
1311 fn sqlite_table_count_integration() {
1312 use tempfile::NamedTempFile;
1313
1314 let temp_file = NamedTempFile::new().unwrap();
1315 let db_path = temp_file.path().to_path_buf();
1316 let signals = Signals::empty();
1317
1318 {
1320 let conn = Connection::open(&db_path).unwrap();
1321 conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1322 .unwrap();
1323 for i in 0..10 {
1324 conn.execute(
1325 "INSERT INTO test (id, name) VALUES (?, ?)",
1326 rusqlite::params![i, format!("name{}", i)],
1327 )
1328 .unwrap();
1329 }
1330 }
1331
1332 let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals);
1333 let count = table.count(Span::test_data()).unwrap();
1334 assert_eq!(count, 10);
1335 }
1336
1337 #[test]
1338 fn sqlite_table_execute_integration() {
1339 use tempfile::NamedTempFile;
1340
1341 let temp_file = NamedTempFile::new().unwrap();
1342 let db_path = temp_file.path().to_path_buf();
1343 let signals = Signals::empty();
1344
1345 {
1347 let conn = Connection::open(&db_path).unwrap();
1348 conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1349 .unwrap();
1350 conn.execute("INSERT INTO test (id, name) VALUES (1, 'first')", [])
1351 .unwrap();
1352 conn.execute("INSERT INTO test (id, name) VALUES (2, 'second')", [])
1353 .unwrap();
1354 }
1355
1356 let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals);
1357 let result = table.execute(Span::test_data()).unwrap();
1358 let value = result.into_value(Span::test_data()).unwrap();
1359
1360 if let Value::List { vals, .. } = value {
1361 assert_eq!(vals.len(), 2);
1362 } else {
1363 panic!("Expected list");
1364 }
1365 }
1366
1367 #[test]
1368 fn sqlite_table_first_integration() {
1369 use tempfile::NamedTempFile;
1370
1371 let temp_file = NamedTempFile::new().unwrap();
1372 let db_path = temp_file.path().to_path_buf();
1373 let signals = Signals::empty();
1374
1375 {
1377 let conn = Connection::open(&db_path).unwrap();
1378 conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1379 .unwrap();
1380 for i in 0..5 {
1381 conn.execute(
1382 "INSERT INTO test (id, name) VALUES (?, ?)",
1383 rusqlite::params![i, format!("name{}", i)],
1384 )
1385 .unwrap();
1386 }
1387 }
1388
1389 let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals).with_limit(2);
1390 let result = table.execute(Span::test_data()).unwrap();
1391 let value = result.into_value(Span::test_data()).unwrap();
1392
1393 if let Value::List { vals, .. } = value {
1394 assert_eq!(vals.len(), 2);
1395 if let Value::Record { val: record, .. } = &vals[0] {
1397 assert_eq!(record.get("id"), Some(&Value::int(0, Span::test_data())));
1398 }
1399 } else {
1400 panic!("Expected list");
1401 }
1402 }
1403
1404 #[test]
1405 fn sqlite_table_last_integration() {
1406 use tempfile::NamedTempFile;
1407
1408 let temp_file = NamedTempFile::new().unwrap();
1409 let db_path = temp_file.path().to_path_buf();
1410 let signals = Signals::empty();
1411
1412 {
1414 let conn = Connection::open(&db_path).unwrap();
1415 conn.execute("CREATE TABLE test (id INTEGER, name TEXT)", [])
1416 .unwrap();
1417 for i in 0..5 {
1418 conn.execute(
1419 "INSERT INTO test (id, name) VALUES (?, ?)",
1420 rusqlite::params![i, format!("name{}", i)],
1421 )
1422 .unwrap();
1423 }
1424 }
1425
1426 let table = SQLiteQueryBuilder::new(db_path, "test".to_string(), signals)
1427 .with_order_by("rowid DESC".to_string())
1428 .with_limit(2);
1429 let result = table.execute(Span::test_data()).unwrap();
1430 let value = result.into_value(Span::test_data()).unwrap();
1431
1432 if let Value::List { vals, .. } = value {
1433 assert_eq!(vals.len(), 2);
1434 if let Value::Record { val: record, .. } = &vals[0] {
1436 assert_eq!(record.get("id"), Some(&Value::int(4, Span::test_data())));
1437 }
1438 } else {
1439 panic!("Expected list");
1440 }
1441 }
1442
1443 #[test]
1444 fn sqlite_table_build_sql_with_select() {
1445 let table = SQLiteQueryBuilder::new(
1446 PathBuf::from(":memory:"),
1447 "test".to_string(),
1448 Signals::empty(),
1449 )
1450 .with_select("col1, col2".to_string());
1451 assert_eq!(table.build_sql(), "SELECT col1, col2 FROM [test]");
1452 }
1453
1454 #[test]
1455 fn sqlite_table_build_sql_with_where() {
1456 let table = SQLiteQueryBuilder::new(
1457 PathBuf::from(":memory:"),
1458 "test".to_string(),
1459 Signals::empty(),
1460 )
1461 .with_where("col = ?".to_string(), vec!["val".to_string()]);
1462 assert_eq!(table.build_sql(), "SELECT * FROM [test] WHERE col = ?");
1463 }
1464
1465 #[test]
1466 fn sqlite_table_build_sql_with_order_by() {
1467 let table = SQLiteQueryBuilder::new(
1468 PathBuf::from(":memory:"),
1469 "test".to_string(),
1470 Signals::empty(),
1471 )
1472 .with_order_by("id DESC".to_string());
1473 assert_eq!(table.build_sql(), "SELECT * FROM [test] ORDER BY id DESC");
1474 }
1475
1476 #[test]
1477 fn sqlite_table_build_sql_with_limit() {
1478 let table = SQLiteQueryBuilder::new(
1479 PathBuf::from(":memory:"),
1480 "test".to_string(),
1481 Signals::empty(),
1482 )
1483 .with_limit(10);
1484 assert_eq!(table.build_sql(), "SELECT * FROM [test] LIMIT 10");
1485 }
1486
1487 #[test]
1488 fn sqlite_table_execute_with_column_adapters() {
1489 use tempfile::NamedTempFile;
1490
1491 let temp_file = NamedTempFile::new().unwrap();
1492 let db_path = temp_file.path().to_path_buf();
1493 let signals = Signals::empty();
1494
1495 {
1496 let conn = Connection::open(&db_path).unwrap();
1497 conn.execute(
1498 "CREATE TABLE history (start_timestamp INTEGER, duration INTEGER)",
1499 [],
1500 )
1501 .unwrap();
1502 conn.execute(
1503 "INSERT INTO history (start_timestamp, duration) VALUES (1736041045123, 30002)",
1504 [],
1505 )
1506 .unwrap();
1507 conn.execute(
1508 "INSERT INTO history (start_timestamp, duration) VALUES (NULL, NULL)",
1509 [],
1510 )
1511 .unwrap();
1512 }
1513
1514 let table = SQLiteQueryBuilder::new(db_path, "history".to_string(), signals)
1515 .with_select("start_timestamp, duration".to_string())
1516 .with_unix_millis_datetime_column("start_timestamp".to_string())
1517 .with_millis_duration_column("duration".to_string());
1518
1519 let result = table.execute(Span::test_data()).unwrap();
1520 let value = result.into_value(Span::test_data()).unwrap();
1521
1522 if let Value::List { vals, .. } = value {
1523 assert_eq!(vals.len(), 2);
1524
1525 if let Value::Record { val: first, .. } = &vals[0] {
1526 assert!(matches!(
1527 first.get("start_timestamp"),
1528 Some(Value::Date { .. })
1529 ));
1530 assert!(matches!(
1531 first.get("duration"),
1532 Some(Value::Duration { .. })
1533 ));
1534 } else {
1535 panic!("Expected first row to be a record");
1536 }
1537
1538 if let Value::Record { val: second, .. } = &vals[1] {
1539 assert!(matches!(
1540 second.get("start_timestamp"),
1541 Some(Value::Nothing { .. })
1542 ));
1543 assert!(matches!(
1544 second.get("duration"),
1545 Some(Value::Nothing { .. })
1546 ));
1547 } else {
1548 panic!("Expected second row to be a record");
1549 }
1550 } else {
1551 panic!("Expected list");
1552 }
1553 }
1554
1555 #[test]
1556 fn sqlite_table_project_output_columns_preserves_aliases() {
1557 let table = SQLiteQueryBuilder::new(
1558 PathBuf::from(":memory:"),
1559 "history".to_string(),
1560 Signals::empty(),
1561 )
1562 .with_select(
1563 "start_timestamp, command_line as command, cwd, duration_ms as duration, exit_status"
1564 .to_string(),
1565 );
1566
1567 let projected = table
1568 .project_output_columns(&["command".to_string(), "duration".to_string()])
1569 .expect("projection should succeed");
1570
1571 assert_eq!(
1572 projected.build_sql(),
1573 "SELECT command_line as command, duration_ms as duration FROM [history]"
1574 );
1575 }
1576
1577 #[test]
1578 fn sqlite_table_project_output_columns_returns_none_for_missing_column() {
1579 let table = SQLiteQueryBuilder::new(
1580 PathBuf::from(":memory:"),
1581 "history".to_string(),
1582 Signals::empty(),
1583 )
1584 .with_select("command_line as command".to_string());
1585
1586 assert!(
1587 table
1588 .project_output_columns(&["missing".to_string()])
1589 .is_none()
1590 );
1591 }
1592}