use std::io::Write;
use arrow::array::RecordBatchReader;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use saphyr::Yaml;
use saphyr::YamlEmitter;
use crate::Error;
use crate::Result;
use crate::cli::DisplayOutputFormat;
use crate::pipeline::record_batch::RecordBatchSelect;
fn normalize_yaml_string_value(s: String) -> String {
s.replace(['\u{2028}', '\u{2029}'], "\n")
}
use crate::pipeline::RecordBatchReaderSource;
use crate::pipeline::Step;
use crate::pipeline::json::RecordBatchJsonWriter;
fn record_batch_to_yaml_rows(batch: &RecordBatch, sparse: bool) -> Vec<Yaml<'static>> {
let schema = batch.schema();
let num_rows = batch.num_rows();
(0..num_rows)
.map(|row_idx| {
let mut map = hashlink::LinkedHashMap::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let array = batch.column(col_idx);
if sparse && array.is_null(row_idx) {
continue;
}
let col_name = field.name().clone();
let value_str =
arrow::util::display::array_value_to_string(array.as_ref(), row_idx)
.unwrap_or_else(|_| "-".to_string());
let value_str = normalize_yaml_string_value(value_str);
map.insert(
Yaml::scalar_from_string(col_name),
Yaml::scalar_from_string(value_str),
);
}
Yaml::Mapping(map)
})
.collect()
}
pub fn write_record_batches_as_csv<W>(
reader: &mut dyn RecordBatchReader,
w: W,
headers: bool,
) -> Result<()>
where
W: Write,
{
let builder = arrow::csv::WriterBuilder::new().with_header(headers);
let mut writer = builder.build(w);
for batch in reader {
let batch = batch.map_err(Error::ArrowError)?;
writer.write(&batch).map_err(Error::ArrowError)?;
}
Ok(())
}
pub fn write_record_batches_as_yaml<W>(
reader: &mut dyn RecordBatchReader,
mut w: W,
sparse: bool,
) -> Result<()>
where
W: Write,
{
let batches: Vec<RecordBatch> =
reader.collect::<std::result::Result<Vec<_>, arrow::error::ArrowError>>()?;
let yaml_rows: Vec<Yaml<'static>> = batches
.iter()
.flat_map(|batch| record_batch_to_yaml_rows(batch, sparse))
.collect();
let doc = Yaml::Sequence(yaml_rows);
let mut out = String::new();
let mut emitter = YamlEmitter::new(&mut out);
emitter
.dump(&doc)
.map_err(|e| Error::GenericError(format!("Failed to emit YAML: {e}")))?;
let to_write = out.strip_prefix("---\n").unwrap_or(&out);
write!(w, "{to_write}")?;
Ok(())
}
pub struct DisplayWriterStep {
pub output_format: DisplayOutputFormat,
pub sparse: bool,
pub headers: bool,
}
#[async_trait(?Send)]
impl Step for DisplayWriterStep {
type Input = RecordBatchReaderSource;
type Output = ();
async fn execute(self, mut input: Self::Input) -> Result<Self::Output> {
let mut reader = input.get().await?;
match self.output_format {
DisplayOutputFormat::Csv => {
write_record_batches_as_csv(&mut *reader, std::io::stdout(), self.headers)?;
}
DisplayOutputFormat::Json => {
RecordBatchJsonWriter::new(self.sparse, false)
.write(&mut *reader, std::io::stdout())?;
}
DisplayOutputFormat::JsonPretty => {
RecordBatchJsonWriter::new(self.sparse, true)
.write(&mut *reader, std::io::stdout())?;
}
DisplayOutputFormat::Yaml => {
write_record_batches_as_yaml(&mut *reader, std::io::stdout(), self.sparse)?;
}
}
Ok(())
}
}
pub async fn apply_select_and_display(
mut reader: RecordBatchReaderSource,
select_step: Option<RecordBatchSelect>,
output_format: DisplayOutputFormat,
sparse: bool,
headers: bool,
) -> Result<()> {
if let Some(step) = select_step {
reader = step.execute(reader).await?;
}
let display_step = DisplayWriterStep {
output_format,
sparse,
headers,
};
display_step.execute(reader).await
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::Int32Array;
use arrow::array::StringArray;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use super::write_record_batches_as_csv;
use super::write_record_batches_as_yaml;
use crate::pipeline::Producer as _;
use crate::pipeline::VecRecordBatchReaderSource;
fn make_test_batch() -> RecordBatch {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["alice", "bob"])),
],
)
.unwrap()
}
#[tokio::test]
async fn test_write_record_batches_as_csv() {
let batch = make_test_batch();
let mut source = VecRecordBatchReaderSource::new(vec![batch]);
let mut reader = source.get().await.unwrap();
let mut out = Vec::new();
write_record_batches_as_csv(&mut *reader, &mut out, true).unwrap();
let s = String::from_utf8(out).unwrap();
assert!(s.contains("id,name"));
assert!(s.contains("1,alice"));
assert!(s.contains("2,bob"));
}
#[tokio::test]
async fn test_write_record_batches_as_csv_no_headers() {
let batch = make_test_batch();
let mut source = VecRecordBatchReaderSource::new(vec![batch]);
let mut reader = source.get().await.unwrap();
let mut out = Vec::new();
write_record_batches_as_csv(&mut *reader, &mut out, false).unwrap();
let s = String::from_utf8(out).unwrap();
assert!(!s.contains("id,name"));
assert!(s.contains("1,alice"));
assert!(s.contains("2,bob"));
}
#[tokio::test]
async fn test_write_record_batches_as_yaml() {
let batch = make_test_batch();
let mut source = VecRecordBatchReaderSource::new(vec![batch]);
let mut reader = source.get().await.unwrap();
let mut out = Vec::new();
write_record_batches_as_yaml(&mut *reader, &mut out, true).unwrap();
let s = String::from_utf8(out).unwrap();
assert!(
!s.starts_with("---\n"),
"YAML output should not include document start marker"
);
assert!(s.contains("id:"));
assert!(s.contains("name:"));
assert!(s.contains("1"));
assert!(s.contains("alice"));
assert!(s.contains("bob"));
}
}