datu 0.3.4

datu - a data file utility
Documentation
use std::io::Write;

use arrow::array::RecordBatchReader;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use saphyr::Yaml;
use saphyr::YamlEmitter;

use crate::Error;
use crate::Result;
use crate::cli::DisplayOutputFormat;
use crate::pipeline::record_batch::RecordBatchSelect;

/// Normalizes string values for YAML emission. Unicode line/paragraph separators (U+2028, U+2029)
/// are replaced with newlines so that saphyr's emitter will quote and escape them, producing
/// valid YAML that parses correctly.
fn normalize_yaml_string_value(s: String) -> String {
    s.replace(['\u{2028}', '\u{2029}'], "\n")
}
use crate::pipeline::RecordBatchReaderSource;
use crate::pipeline::Step;
use crate::pipeline::json::RecordBatchJsonWriter;

/// Converts a record batch into YAML row objects; when `sparse` is true, omits null values.
fn record_batch_to_yaml_rows(batch: &RecordBatch, sparse: bool) -> Vec<Yaml<'static>> {
    let schema = batch.schema();
    let num_rows = batch.num_rows();
    (0..num_rows)
        .map(|row_idx| {
            let mut map = hashlink::LinkedHashMap::new();
            for (col_idx, field) in schema.fields().iter().enumerate() {
                let array = batch.column(col_idx);
                if sparse && array.is_null(row_idx) {
                    continue;
                }
                let col_name = field.name().clone();
                let value_str =
                    arrow::util::display::array_value_to_string(array.as_ref(), row_idx)
                        .unwrap_or_else(|_| "-".to_string());
                let value_str = normalize_yaml_string_value(value_str);
                map.insert(
                    Yaml::scalar_from_string(col_name),
                    Yaml::scalar_from_string(value_str),
                );
            }
            Yaml::Mapping(map)
        })
        .collect()
}

/// Write record batches from a reader to the given writer as CSV.
pub fn write_record_batches_as_csv<W>(
    reader: &mut dyn RecordBatchReader,
    w: W,
    headers: bool,
) -> Result<()>
where
    W: Write,
{
    let builder = arrow::csv::WriterBuilder::new().with_header(headers);
    let mut writer = builder.build(w);
    for batch in reader {
        let batch = batch.map_err(Error::ArrowError)?;
        writer.write(&batch).map_err(Error::ArrowError)?;
    }
    Ok(())
}

/// Write record batches from a reader to the given writer as YAML.
pub fn write_record_batches_as_yaml<W>(
    reader: &mut dyn RecordBatchReader,
    mut w: W,
    sparse: bool,
) -> Result<()>
where
    W: Write,
{
    let batches: Vec<RecordBatch> =
        reader.collect::<std::result::Result<Vec<_>, arrow::error::ArrowError>>()?;
    let yaml_rows: Vec<Yaml<'static>> = batches
        .iter()
        .flat_map(|batch| record_batch_to_yaml_rows(batch, sparse))
        .collect();
    let doc = Yaml::Sequence(yaml_rows);
    let mut out = String::new();
    let mut emitter = YamlEmitter::new(&mut out);
    emitter
        .dump(&doc)
        .map_err(|e| Error::GenericError(format!("Failed to emit YAML: {e}")))?;
    let to_write = out.strip_prefix("---\n").unwrap_or(&out);
    write!(w, "{to_write}")?;
    Ok(())
}

/// Pipeline step that writes record batches to stdout as CSV or JSON.
pub struct DisplayWriterStep {
    pub output_format: DisplayOutputFormat,
    pub sparse: bool,
    pub headers: bool,
}

#[async_trait(?Send)]
impl Step for DisplayWriterStep {
    type Input = RecordBatchReaderSource;
    type Output = ();

    async fn execute(self, mut input: Self::Input) -> Result<Self::Output> {
        let mut reader = input.get().await?;
        match self.output_format {
            DisplayOutputFormat::Csv => {
                write_record_batches_as_csv(&mut *reader, std::io::stdout(), self.headers)?;
            }
            DisplayOutputFormat::Json => {
                RecordBatchJsonWriter::new(self.sparse, false)
                    .write(&mut *reader, std::io::stdout())?;
            }
            DisplayOutputFormat::JsonPretty => {
                RecordBatchJsonWriter::new(self.sparse, true)
                    .write(&mut *reader, std::io::stdout())?;
            }
            DisplayOutputFormat::Yaml => {
                write_record_batches_as_yaml(&mut *reader, std::io::stdout(), self.sparse)?;
            }
        }
        Ok(())
    }
}

/// Applies optional column selection and writes record batches to stdout.
/// If `select_step` is `Some`, filters to the specified columns before display.
pub async fn apply_select_and_display(
    mut reader: RecordBatchReaderSource,
    select_step: Option<RecordBatchSelect>,
    output_format: DisplayOutputFormat,
    sparse: bool,
    headers: bool,
) -> Result<()> {
    if let Some(step) = select_step {
        reader = step.execute(reader).await?;
    }
    let display_step = DisplayWriterStep {
        output_format,
        sparse,
        headers,
    };
    display_step.execute(reader).await
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use arrow::array::Int32Array;
    use arrow::array::StringArray;
    use arrow::datatypes::DataType;
    use arrow::datatypes::Field;
    use arrow::datatypes::Schema;
    use arrow::record_batch::RecordBatch;

    use super::write_record_batches_as_csv;
    use super::write_record_batches_as_yaml;
    use crate::pipeline::Producer as _;
    use crate::pipeline::VecRecordBatchReaderSource;

    fn make_test_batch() -> RecordBatch {
        let schema = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("name", DataType::Utf8, false),
        ]);
        RecordBatch::try_new(
            Arc::new(schema),
            vec![
                Arc::new(Int32Array::from(vec![1, 2])),
                Arc::new(StringArray::from(vec!["alice", "bob"])),
            ],
        )
        .unwrap()
    }

    #[tokio::test]
    async fn test_write_record_batches_as_csv() {
        let batch = make_test_batch();
        let mut source = VecRecordBatchReaderSource::new(vec![batch]);
        let mut reader = source.get().await.unwrap();
        let mut out = Vec::new();
        write_record_batches_as_csv(&mut *reader, &mut out, true).unwrap();
        let s = String::from_utf8(out).unwrap();
        assert!(s.contains("id,name"));
        assert!(s.contains("1,alice"));
        assert!(s.contains("2,bob"));
    }

    #[tokio::test]
    async fn test_write_record_batches_as_csv_no_headers() {
        let batch = make_test_batch();
        let mut source = VecRecordBatchReaderSource::new(vec![batch]);
        let mut reader = source.get().await.unwrap();
        let mut out = Vec::new();
        write_record_batches_as_csv(&mut *reader, &mut out, false).unwrap();
        let s = String::from_utf8(out).unwrap();
        assert!(!s.contains("id,name"));
        assert!(s.contains("1,alice"));
        assert!(s.contains("2,bob"));
    }

    #[tokio::test]
    async fn test_write_record_batches_as_yaml() {
        let batch = make_test_batch();
        let mut source = VecRecordBatchReaderSource::new(vec![batch]);
        let mut reader = source.get().await.unwrap();
        let mut out = Vec::new();
        write_record_batches_as_yaml(&mut *reader, &mut out, true).unwrap();
        let s = String::from_utf8(out).unwrap();
        assert!(
            !s.starts_with("---\n"),
            "YAML output should not include document start marker"
        );
        assert!(s.contains("id:"));
        assert!(s.contains("name:"));
        assert!(s.contains("1"));
        assert!(s.contains("alice"));
        assert!(s.contains("bob"));
    }
}