datu 0.3.4

datu - a data file utility
Documentation
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::path::Path;
use std::process::Stdio;
use std::time::Duration;

use cucumber::World;
use cucumber::given;
use cucumber::then;
use cucumber::when;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::SessionContext;
use expectrl::Expect;
use expectrl::session::OsSession;
use gherkin::Step;
use tempfile::NamedTempFile;

#[path = "common/assert_output.rs"]
mod assert_output;
#[path = "common/parquet.rs"]
mod parquet;
#[path = "common/row_count.rs"]
mod row_count;

use assert_output::assert_output_contains;
use parquet::assert_valid_parquet_file;
use row_count::get_row_count;

const TEMPDIR_PLACEHOLDER: &str = "$TEMPDIR";

#[derive(Debug, Default, World)]
pub struct ReplWorld {
    session: Option<OsSession>,
    temp_dir: Option<tempfile::TempDir>,
    last_file: Option<String>,
    last_output: Option<std::process::Output>,
}

fn replace_tempdir(s: &str, temp_path: &str) -> String {
    s.replace(TEMPDIR_PLACEHOLDER, temp_path)
}

fn resolve_path(world: &ReplWorld, path: &str) -> String {
    if let Some(ref temp_dir) = world.temp_dir {
        let temp_path = temp_dir
            .path()
            .to_str()
            .expect("Temp path is not valid UTF-8");
        replace_tempdir(path, temp_path)
    } else {
        path.to_string()
    }
}

fn ensure_temp_dir(world: &mut ReplWorld) -> String {
    if world.temp_dir.is_none() {
        world.temp_dir = Some(tempfile::tempdir().expect("Failed to create temp dir"));
    }
    world
        .temp_dir
        .as_ref()
        .unwrap()
        .path()
        .to_str()
        .expect("Temp path is not valid UTF-8")
        .to_string()
}

#[given(regex = r#"^a Parquet file with the following data:$"#)]
async fn a_parquet_file_with_the_following_data(world: &mut ReplWorld, step: &Step) {
    let docstring = step.docstring.as_ref().expect("Step requires a docstring");
    // Write the docstring to a temporary file
    let csv_file = NamedTempFile::with_suffix(".csv").expect("Failed to create temporary file");
    let csv_file_path = csv_file
        .path()
        .to_str()
        .expect("Failed to get temporary file path");
    std::fs::write(csv_file_path, docstring).expect("Failed to write to temporary file");

    let temp_path = ensure_temp_dir(world);
    let path = Path::new(&temp_path).join("input.parquet");
    let path_str = path.to_str().expect("Temp path is not valid UTF-8");
    let session = SessionContext::new();
    let df = session
        .read_csv(csv_file_path, CsvReadOptions::new())
        .await
        .expect("Failed to read CSV file");
    df.write_parquet(path_str, DataFrameWriteOptions::default(), None)
        .await
        .expect("Failed to write Parquet file");
    world.last_file = Some(path_str.to_string());
}

#[when(regex = r#"^datu is ran without a command$"#)]
fn run_datu_repl(world: &mut ReplWorld) {
    let datu_path = std::env::var("CARGO_BIN_EXE_datu")
        .expect("Environment variable 'CARGO_BIN_EXE_datu' not defined");
    let mut session = expectrl::spawn(datu_path).expect("Failed to spawn REPL");
    session.set_expect_timeout(Some(Duration::from_secs(5)));
    world.session = Some(session);
}

#[when(regex = r#"^the REPL is ran and the user types:$"#)]
fn repl_is_ran_and_user_types(world: &mut ReplWorld, step: &Step) {
    let input = step.docstring.as_ref().expect("Step requires a docstring");
    let temp_path = ensure_temp_dir(world);

    let datu_path = std::env::var("CARGO_BIN_EXE_datu")
        .expect("Environment variable 'CARGO_BIN_EXE_datu' not defined");

    let mut child = std::process::Command::new(datu_path)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("Failed to spawn datu REPL");

    {
        let stdin = child.stdin.as_mut().expect("Failed to open stdin");
        for line in input.trim().lines() {
            let resolved = replace_tempdir(line.trim(), &temp_path);
            writeln!(stdin, "{resolved}").expect("Failed to write to REPL stdin");
        }
    }
    drop(child.stdin.take());

    let output = child.wait_with_output().expect("Failed to wait for datu");
    world.last_output = Some(output);
    let last = world.last_output.as_ref().unwrap();
    assert!(
        last.status.success(),
        "datu REPL exited with error.\nstdout: {}\nstderr: {}",
        String::from_utf8_lossy(&last.stdout),
        String::from_utf8_lossy(&last.stderr)
    );
}

#[then(regex = r#"^the command should succeed$"#)]
fn command_should_succeed(world: &mut ReplWorld) {
    let output = world
        .last_output
        .as_ref()
        .expect("No command output; use 'the REPL is ran and the user types' first");
    assert!(
        output.status.success(),
        "Command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
        output.status.code(),
        String::from_utf8_lossy(&output.stdout),
        String::from_utf8_lossy(&output.stderr)
    );
}

#[then(regex = r#"^the output should contain "(.+)"$"#)]
fn output_should_contain(world: &mut ReplWorld, expected: String) {
    let output = world
        .last_output
        .as_ref()
        .expect("No command output; use 'the REPL is ran and the user types' first");
    assert_output_contains(output, &expected, world.temp_dir.as_ref());
}

#[then(regex = r#"^the output should be:$"#)]
fn the_output_should_be(world: &mut ReplWorld, step: &Step) {
    let expected = step.docstring.as_ref().expect("Step requires a docstring");
    let session = world.session.as_mut().expect("No session running");
    let expected_trimmed = expected.trim();
    session
        .expect(expected_trimmed)
        .unwrap_or_else(|e| panic!("Expected to find '{expected_trimmed}' in output: {e}"));
}

#[then(regex = r#"^the file "(.+)" should exist$"#)]
fn file_should_exist(world: &mut ReplWorld, path: String) {
    let path_resolved = resolve_path(world, &path);
    assert!(
        Path::new(&path_resolved).exists(),
        "Expected file to exist: {path_resolved}"
    );
    world.last_file = Some(path_resolved);
}

#[then(regex = r#"^that file should be a CSV file$"#)]
fn that_file_should_be_csv(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let mut reader = csv::Reader::from_path(path).expect("Failed to open file as CSV");
    let headers = reader.headers().expect("Failed to read CSV headers");
    assert!(
        !headers.is_empty(),
        "Expected CSV file to have headers, but got none"
    );
}

#[then(regex = r#"^the first line of that file should be: "(.+)"$"#)]
fn first_line_of_file_should_be(world: &mut ReplWorld, expected: String) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let file = std::fs::File::open(path).expect("Failed to open file");
    let first_line = std::io::BufReader::new(file)
        .lines()
        .next()
        .expect("File is empty")
        .expect("Failed to read line");
    assert!(
        first_line == expected,
        "Expected first line to be '{expected}', but got: {first_line}"
    );
}

#[then(regex = r#"^the first line of that file should contain "(.+)"$"#)]
fn first_line_of_file_should_contain(world: &mut ReplWorld, expected: String) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let file = std::fs::File::open(path).expect("Failed to open file");
    let first_line = std::io::BufReader::new(file)
        .lines()
        .next()
        .expect("File is empty")
        .expect("Failed to read line");
    assert!(
        first_line.contains(&expected),
        "Expected first line to contain '{expected}', but got: {first_line}"
    );
}

#[then(regex = r#"^that file should be valid JSON$"#)]
fn that_file_should_be_valid_json(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let content = std::fs::read_to_string(path).expect("Failed to read file");
    serde_json::from_str::<serde_json::Value>(content.trim())
        .expect("Expected file to contain valid JSON, but parsing failed");
}

#[then(regex = r#"^that file should be valid YAML$"#)]
fn that_file_should_be_valid_yaml(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let content = std::fs::read_to_string(path).expect("Failed to read file");
    serde_yaml::from_str::<serde_yaml::Value>(content.trim())
        .expect("Expected file to contain valid YAML, but parsing failed");
}

#[then(regex = r#"^that file should be valid Avro$"#)]
fn that_file_should_be_valid_avro(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let file = std::fs::File::open(path).expect("Failed to open file");
    let reader = arrow_avro::reader::ReaderBuilder::new()
        .build(BufReader::new(file))
        .expect("Expected file to be valid Avro, but reading failed");
    let schema = reader.schema();
    assert!(
        !schema.fields().is_empty(),
        "Expected Avro file to have at least one field"
    );
}

#[then(regex = r#"^that file should be valid ORC$"#)]
fn that_file_should_be_valid_orc(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let file = std::fs::File::open(path).expect("Failed to open file");
    let builder = orc_rust::arrow_reader::ArrowReaderBuilder::try_new(file)
        .expect("Expected file to be valid ORC, but reading failed");
    let schema = builder.schema();
    assert!(
        !schema.fields().is_empty(),
        "Expected ORC file to have at least one column"
    );
}

#[then(regex = r#"^that file should be valid XLSX$"#)]
fn that_file_should_be_valid_xlsx(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let bytes = std::fs::read(path).expect("Failed to read file");
    assert!(
        bytes.len() >= 4 && bytes[..4] == [0x50, 0x4B, 0x03, 0x04],
        "Expected file to be a valid XLSX (ZIP archive), but magic bytes did not match"
    );
}

#[then(regex = r#"^that file should be a valid Parquet file$"#)]
fn that_file_should_be_valid_parquet(world: &mut ReplWorld) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    assert_valid_parquet_file(path);
}

#[then(regex = r#"^that file should contain "(.+)"$"#)]
fn that_file_should_contain(world: &mut ReplWorld, expected: String) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let content = std::fs::read_to_string(path).expect("Failed to read file");
    assert!(
        content.contains(&expected),
        "Expected file {path} to contain '{expected}', but it did not"
    );
}

#[then(regex = r#"^that file should have (\d+) lines$"#)]
fn that_file_should_have_n_lines(world: &mut ReplWorld, n: usize) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let file = std::fs::File::open(path).expect("Failed to open file");
    let line_count = std::io::BufReader::new(file)
        .lines()
        .filter(|r| r.as_ref().is_ok_and(|s| !s.trim().is_empty()))
        .count();
    assert!(
        line_count == n,
        "Expected file {path} to have {n} lines, but got {line_count}"
    );
}

#[then(regex = r#"^that file should have (\d+) records$"#)]
fn that_file_should_have_n_records(world: &mut ReplWorld, n: usize) {
    let path = world
        .last_file
        .as_ref()
        .expect("No file has been set; use 'the file \"...\" should exist' first");
    let row_count = get_row_count(path);
    assert!(
        row_count == n,
        "Expected file {path} to have {n} records, but got {row_count}"
    );
}

#[tokio::main]
async fn main() {
    ReplWorld::cucumber()
        .fail_on_skipped()
        .run_and_exit("features/repl")
        .await;
}