datu 0.3.4

datu - a data file utility
Documentation
use super::DataframeSelect;
use super::DataframeTail;
use crate::FileType;
use crate::pipeline::DataframeParquetReader;
use crate::pipeline::DataframeToRecordBatch;
use crate::pipeline::RecordBatchAvroWriter;
use crate::pipeline::Step;
use crate::pipeline::csv::DataframeCsvWriter;
use crate::pipeline::read::ReadArgs;
use crate::pipeline::write::WriteArgs;

fn temp_path(dir: &tempfile::TempDir, name: &str) -> String {
    dir.path()
        .join(name)
        .to_str()
        .expect("Failed to convert path to string")
        .to_string()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_dataframe_steps_parquet_tail_to_csv() {
    let read_args = ReadArgs::new("fixtures/table.parquet", FileType::Parquet);
    let source = DataframeParquetReader { args: read_args }
        .execute(())
        .await
        .unwrap();
    let source = DataframeSelect { select: None }
        .execute(source)
        .await
        .unwrap();
    let source = DataframeTail {
        input_path: "fixtures/table.parquet".to_string(),
        input_file_type: FileType::Parquet,
        n: 2,
    }
    .execute(source)
    .await
    .unwrap();
    let temp_dir = tempfile::tempdir().unwrap();
    let output = temp_path(&temp_dir, "out.csv");
    let write_args = WriteArgs {
        path: output.clone(),
        file_type: FileType::Csv,
        sparse: None,
        pretty: None,
    };
    DataframeCsvWriter { args: write_args }
        .execute(Box::new(source))
        .await
        .unwrap();
    assert!(std::path::Path::new(&output).exists());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_dataframe_to_record_batch_record_batch_avro_writer() {
    let read_args = ReadArgs::new("fixtures/table.parquet", FileType::Parquet);
    let source = DataframeParquetReader { args: read_args }
        .execute(())
        .await
        .unwrap();
    let reader = DataframeToRecordBatch::try_new(source).await.unwrap();
    let temp_dir = tempfile::tempdir().unwrap();
    let output = temp_path(&temp_dir, "out.avro");
    let write_args = WriteArgs {
        path: output.clone(),
        file_type: FileType::Avro,
        sparse: None,
        pretty: None,
    };
    RecordBatchAvroWriter { args: write_args }
        .execute(Box::new(reader))
        .await
        .unwrap();
    assert!(std::path::Path::new(&output).exists());
}