sqlitepipe 0.1.2

A simple tool for piping the output of a command into sqlite databases.
Documentation
use std::{collections::HashSet, error::Error, path::PathBuf, process::ExitCode, str::FromStr};

use clap::{self, Parser};
use rusqlite::Connection;
use snafu::{ResultExt, Snafu, Whatever, whatever};
use sqlitepipe::{
    column::Column, insert_blob, insert_lines, insert_row, prepare_db,
};

type Result<T> = std::result::Result<T, Whatever>;

mod defaults {
    pub const BLOB_COLUMN: &'static str = "blob";
    pub const LINE_COLUMN: &'static str = "line";
    pub const TABLE_NAME: &'static str = "data";
    pub const DATABASE_PATH: &'static str = "stdin.data.sqlite3";
}

#[derive(Debug, Snafu)]
#[snafu(display("{message}"))]
struct KeyValueParseError {
    message: String,
}

#[derive(Debug, Clone)]
struct KeyValueArg {
    pub key: String,
    pub value: String,
}

impl From<(&str, &str)> for KeyValueArg {
    fn from(value: (&str, &str)) -> Self {
        KeyValueArg {
            key: value.0.to_string(),
            value: value.1.to_string(),
        }
    }
}

impl FromStr for KeyValueArg {
    type Err = KeyValueParseError;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        let Some(kv) = s.split_once('=') else {
            return Err(KeyValueParseError {
                message: "invalid key value pair; missing '='".to_string(),
            });
        };
        Ok(kv.into())
    }
}

#[derive(Debug, clap::Args)]
#[group(multiple = false)]
struct StdinArgs {
    /// Don't ingest stdin. [cannot be used in combination with --stdin-blob or --stdin-line]
    #[arg(short = 'n', long)]
    stdin_none: bool,

    /// Ingest stdin as a single blob and store in a single row. [cannot be used in combination with --stdin-none or --stdin-line] [default: blob]
    #[arg(
        short = 'b',
        long,
        default_missing_value = defaults::BLOB_COLUMN,
        require_equals = true,
        num_args = 0..=1,
    )]
    stdin_blob: Option<String>,

    /// Ingest stdin as a document with lines. Store each line in a separate column. [cannot be used in combination with --stdin-none or --stdin-blob] [default: line]
    #[arg(
        short = 'l',
        long,
        default_missing_value = defaults::LINE_COLUMN,
        require_equals = true,
        num_args = 0..=1,
    )]
    stdin_lines: Option<String>,
}

impl StdinArgs {
    fn mode(&self) -> StdinMode<'_> {
        if let Some(blob_col) = &self.stdin_blob {
            return StdinMode::Blob(blob_col);
        }
        if let Some(lines_col) = &self.stdin_lines {
            return StdinMode::Lines(lines_col);
        }
        return StdinMode::None;
    }
}

#[derive(Debug, PartialEq, Eq)]
enum StdinMode<'a> {
    None,
    Blob(&'a str),
    Lines(&'a str),
}

/// Ingest stdin and write to a database.
#[derive(Debug, clap::Parser)]
struct Args {
    /// Path to the database. [default: stdin.data.sqlite3]
    #[arg(short, long)]
    output_db: Option<PathBuf>,

    /// Name of table where to put the data. [default: data]
    #[arg(short, long)]
    table_name: Option<String>,

    /// If set, delete existing data from the given table. Note: Other tables are left unaffected.
    #[arg(short, long)]
    reset: bool,

    /// Raw values to insert, given as a `<column_name>=<value>` tuple`. Can be given multiple times.
    /// Any column that doesn't exist yet will be added automatically.
    #[arg(short, long)]
    value: Vec<KeyValueArg>,

    #[clap(flatten)]
    stdin_args: StdinArgs,

    /// Verbose output
    #[arg(short = 'V', long)]
    verbose: bool,
}

fn get_all_columns(columns: &Vec<KeyValueArg>, stdin_args: &StdinArgs) -> Result<Vec<Column>> {
    let mut result: Vec<_> = columns
        .iter()
        .map(|v| Column::raw_column(&v.key, &v.value))
        .collect();
    let unique_names: HashSet<_> = result.iter().map(|v| v.name()).collect();

    let mut new_col = None;

    if let Some(stdin_blob_column) = &stdin_args.stdin_blob {
        if unique_names.contains(&stdin_blob_column.as_str()) {
            whatever!("name clash with column name and blob column: {stdin_blob_column}");
        }

        new_col = Some(Column::blob_column(stdin_blob_column));
    }
    if let Some(stdin_lines_column) = &stdin_args.stdin_lines {
        if unique_names.contains(&stdin_lines_column.as_str()) {
            whatever!("name clash with column name and line column: {stdin_lines_column}");
        }

        new_col = Some(Column::line_column(stdin_lines_column));
    }

    if let Some(col) = new_col {
        result.push(col);
    }

    Ok(result)
}

fn get_args() -> Args {
    let mut args = Args::parse();
    if !args.stdin_args.stdin_none
        && args.stdin_args.stdin_blob.is_none()
        && args.stdin_args.stdin_lines.is_none()
    {
        args.stdin_args = StdinArgs {
            stdin_none: false,
            stdin_blob: Some(defaults::BLOB_COLUMN.to_string()),
            stdin_lines: None,
        };
    }
    args
}

fn main_with_error(args: Args) -> Result<()> {
    if args.verbose {
        eprintln!("{args:?}")
    }

    let raw_table_name = args
        .table_name
        .as_ref()
        .map(|v| v.as_str())
        .unwrap_or_else(|| defaults::TABLE_NAME);

    let output_path = args
        .output_db
        .as_ref()
        .map(|v| v.to_owned())
        .unwrap_or_else(|| defaults::DATABASE_PATH.into());

    let columns = get_all_columns(&args.value, &args.stdin_args)?;

    if columns.is_empty() {
        whatever!("No data to insert.");
    }

    let mut conn = Connection::open(output_path).whatever_context("sqlite")?;

    let mut tx = conn.transaction().whatever_context("sqlite")?;

    prepare_db(&mut tx, &raw_table_name, args.reset, &columns).whatever_context("prepare_db")?;

    match args.stdin_args.mode() {
        StdinMode::None => {
            insert_row(&mut tx, &raw_table_name, &columns).whatever_context("insert_row")?;
        }
        StdinMode::Blob(_) => {
            let mut stdin = std::io::stdin().lock();
            insert_blob(&mut tx, &raw_table_name, &columns, &mut stdin)
                .whatever_context("insert_blob")?;
        }
        StdinMode::Lines(_) => {
            let stdin = std::io::stdin().lock();
            insert_lines(&mut tx, &raw_table_name, &columns, stdin)
                .whatever_context("insert_lines")?;
        }
    }

    tx.commit().whatever_context("sqlite")?;

    Ok(())
}

fn main() -> ExitCode {
    let args = get_args();
    let verbose = args.verbose;

    if let Err(e) = main_with_error(args) {
        if verbose {
            eprintln!("Error: {e:?}");
        } else {
            if let Some(source) = e.source() {
                eprintln!("{source}");
            } else {
                eprintln!("Error: {e}");
            }
        }
        ExitCode::FAILURE
    } else {
        ExitCode::SUCCESS
    }
}