use crate::{
MEMORY_DB,
database::values::sqlite::{open_sqlite_db, values_to_sql},
};
use nu_engine::command_prelude::*;
use itertools::Itertools;
use nu_protocol::{Signals, shell_error::generic::GenericError};
use std::{borrow::Cow, path::Path};
pub const DEFAULT_TABLE_NAME: &str = "main";
#[derive(Clone)]
pub struct IntoSqliteDb;
impl Command for IntoSqliteDb {
fn name(&self) -> &str {
"into sqlite"
}
fn signature(&self) -> Signature {
Signature::build("into sqlite")
.category(Category::Conversions)
.input_output_types(vec![
(Type::table(), Type::Nothing),
(Type::record(), Type::Nothing),
])
.allow_variants_without_examples(true)
.required(
"file-name",
SyntaxShape::String,
"Specify the filename to save the database to.",
)
.named(
"table-name",
SyntaxShape::String,
"Specify table name to store the data in.",
Some('t'),
)
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
operate(engine_state, stack, call, input)
}
fn description(&self) -> &str {
"Convert table into a SQLite database."
}
fn search_terms(&self) -> Vec<&str> {
vec!["convert", "database"]
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
description: "Convert ls entries into a SQLite database with 'main' as the table name.",
example: "ls | into sqlite my_ls.db",
result: None,
},
Example {
description: "Convert ls entries into a SQLite database with 'my_table' as the table name.",
example: "ls | into sqlite my_ls.db -t my_table",
result: None,
},
Example {
description: "Convert table literal into a SQLite database with 'main' as the table name.",
example: "[[name]; [-----] [someone] [=====] [somename] ['(((((']] | into sqlite filename.db",
result: None,
},
Example {
description: "Insert a single record into a SQLite database.",
example: "{ foo: bar, baz: quux } | into sqlite filename.db",
result: None,
},
Example {
description: "Insert data that contains records, lists or tables, that will be stored as JSONB columns
These columns will be automatically turned back into nu objects when read directly via cell-path",
example: "{a_record: {foo: bar, baz: quux}, a_list: [1 2 3], a_table: [[a b]; [0 1] [2 3]]} | into sqlite filename.db -t my_table
(open filename.db).my_table.0.a_list",
result: Some(Value::test_list(vec![
Value::test_int(1),
Value::test_int(2),
Value::test_int(3)
]))
}
]
}
}
struct Table {
conn: rusqlite::Connection,
table_name: String,
span: Span,
}
impl Table {
pub fn new(
db_path: &Spanned<String>,
table_name: Option<Spanned<String>>,
engine_state: &EngineState,
stack: &Stack,
) -> Result<Self, nu_protocol::ShellError> {
let table_name = table_name
.map(|table_name| table_name.item)
.unwrap_or_else(|| DEFAULT_TABLE_NAME.to_string());
let span = db_path.span;
let db_path: Cow<'_, Path> = match db_path.item.as_str() {
MEMORY_DB => Cow::Borrowed(Path::new(&db_path.item)),
item => engine_state
.cwd(Some(stack))?
.join(item)
.to_std_path_buf()
.into(),
};
let conn = open_sqlite_db(&db_path, span)?;
Ok(Self {
conn,
table_name,
span,
})
}
pub fn name(&self) -> &String {
&self.table_name
}
fn try_init(
&mut self,
record: &Record,
) -> Result<rusqlite::Transaction<'_>, nu_protocol::ShellError> {
let first_row_null = record.values().any(Value::is_nothing);
let columns = get_columns_with_sqlite_types(record, self.span)?;
let table_exists_query = format!(
"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='{}';",
self.name(),
);
let table_count: u64 = self
.conn
.query_row(&table_exists_query, [], |row| row.get(0))
.map_err(|err| {
ShellError::Generic(GenericError::new(
format!("{err:#?}"),
format!("{err:#?}"),
self.span,
))
})?;
if table_count == 0 {
if first_row_null {
eprintln!(
"Warning: The first row contains a null value, which has an \
unknown SQL type. Null values will be assumed to be TEXT columns. \
If this is undesirable, you can create the table first with your desired schema."
);
}
let create_statement = format!(
"CREATE TABLE [{}] ({})",
self.table_name,
columns
.into_iter()
.map(|(col_name, sql_type)| format!("{col_name} {sql_type}"))
.collect::<Vec<_>>()
.join(", ")
);
self.conn.execute(&create_statement, []).map_err(|err| {
ShellError::Generic(GenericError::new(
"Failed to create table",
err.to_string(),
self.span,
))
})?;
}
self.conn.transaction().map_err(|err| {
ShellError::Generic(GenericError::new(
"Failed to open transaction",
err.to_string(),
self.span,
))
})
}
}
fn operate(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let file_name: Spanned<String> = call.req(engine_state, stack, 0)?;
let table_name: Option<Spanned<String>> = call.get_flag(engine_state, stack, "table-name")?;
let table = Table::new(&file_name, table_name, engine_state, stack)?;
Ok(action(engine_state, input, table, span, engine_state.signals())?.into_pipeline_data())
}
fn action(
engine_state: &EngineState,
input: PipelineData,
table: Table,
span: Span,
signals: &Signals,
) -> Result<Value, ShellError> {
match input {
PipelineData::ListStream(stream, _) => {
insert_in_transaction(engine_state, stream.into_iter(), span, table, signals)
}
PipelineData::Value(value @ Value::List { .. }, _) => {
let span = value.span();
let vals = value
.into_list()
.expect("Value matched as list above, but is not a list");
insert_in_transaction(engine_state, vals.into_iter(), span, table, signals)
}
PipelineData::Value(val, _) => {
insert_in_transaction(engine_state, std::iter::once(val), span, table, signals)
}
_ => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list".into(),
wrong_type: "".into(),
dst_span: span,
src_span: span,
}),
}
}
fn insert_in_transaction(
engine_state: &EngineState,
stream: impl Iterator<Item = Value>,
span: Span,
mut table: Table,
signals: &Signals,
) -> Result<Value, ShellError> {
let mut stream = stream.peekable();
let first_val = match stream.peek() {
None => return Ok(Value::nothing(span)),
Some(val) => val.as_record()?.clone(),
};
if first_val.is_empty() {
Err(ShellError::Generic(GenericError::new(
"Failed to create table",
"Cannot create table without columns",
span,
)))?;
}
let table_name = table.name().clone();
let tx = table.try_init(&first_val)?;
for stream_value in stream {
if let Err(err) = signals.check(&span) {
tx.rollback().map_err(|e| {
ShellError::Generic(GenericError::new_internal(
"Failed to rollback SQLite transaction",
e.to_string(),
))
})?;
return Err(err);
}
let val = stream_value.as_record()?;
let insert_statement = format!(
"INSERT INTO [{}] ({}) VALUES ({})",
table_name,
Itertools::intersperse(val.columns().map(|c| format!("`{c}`")), ", ".to_string())
.collect::<String>(),
Itertools::intersperse(itertools::repeat_n("?", val.len()), ", ").collect::<String>(),
);
let mut insert_statement = tx.prepare(&insert_statement).map_err(|e| {
ShellError::Generic(GenericError::new_internal(
"Failed to prepare SQLite statement",
e.to_string(),
))
})?;
let result = insert_value(engine_state, stream_value, span, &mut insert_statement);
insert_statement.finalize().map_err(|e| {
ShellError::Generic(GenericError::new_internal(
"Failed to finalize SQLite prepared statement",
e.to_string(),
))
})?;
result?
}
tx.commit().map_err(|e| {
ShellError::Generic(GenericError::new_internal(
"Failed to commit SQLite transaction",
e.to_string(),
))
})?;
Ok(Value::nothing(span))
}
fn insert_value(
engine_state: &EngineState,
stream_value: Value,
call_span: Span,
insert_statement: &mut rusqlite::Statement<'_>,
) -> Result<(), ShellError> {
match stream_value {
Value::Record { val, .. } => {
let sql_vals = values_to_sql(engine_state, val.values().cloned(), call_span)?;
insert_statement
.execute(rusqlite::params_from_iter(sql_vals))
.map_err(|e| {
ShellError::Generic(GenericError::new(
"Failed to execute SQLite statement",
e.to_string(),
call_span,
))
})?;
Ok(())
}
val => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: val.get_type().to_string(),
dst_span: call_span,
src_span: val.span(),
}),
}
}
fn nu_value_to_sqlite_type(val: &Value, dst_span: Span) -> Result<&'static str, ShellError> {
match val.get_type() {
Type::String => Ok("TEXT"),
Type::Int => Ok("INTEGER"),
Type::Float => Ok("REAL"),
Type::Number => Ok("REAL"),
Type::Binary => Ok("BLOB"),
Type::Bool => Ok("BOOLEAN"),
Type::Date => Ok("DATETIME"),
Type::Duration => Ok("BIGINT"),
Type::Filesize => Ok("INTEGER"),
Type::List(_) | Type::Record(_) | Type::Table(_) => Ok("JSONB"),
Type::Nothing => Ok("TEXT"),
Type::Any
| Type::Block
| Type::CellPath
| Type::Closure
| Type::OneOf(_)
| Type::Custom(_)
| Type::Error
| Type::Range
| Type::Glob => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "sql".into(),
wrong_type: val.get_type().to_string(),
dst_span,
src_span: val.span(),
}),
}
}
fn get_columns_with_sqlite_types(
record: &Record,
dst_span: Span,
) -> Result<Vec<(String, &'static str)>, ShellError> {
let mut columns: Vec<(String, &'static str)> = vec![];
for (c, v) in record {
if !columns
.iter()
.map(|name| (format!("`{}`", name.0), name.1))
.any(|(name, _)| name == *c)
{
columns.push((format!("`{c}`"), nu_value_to_sqlite_type(v, dst_span)?));
}
}
Ok(columns)
}