orc-rust 0.8.0

Implementation of Apache ORC file format using Apache Arrow in-memory format
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Export ORC data to CSV or JSON format.
//!
//! This subcommand unifies the functionality of the former `orc-export`
//! and `orc-read` commands.

use std::fs::File;
use std::io::{self, Read};
use std::num::NonZeroUsize;
use std::path::PathBuf;

use anyhow::{Context, Result};
use arrow::datatypes::DataType;
use bytes::Bytes;
use clap::Parser;
use orc_rust::projection::ProjectionMask;
use orc_rust::reader::metadata::read_metadata;
use orc_rust::reader::ChunkReader;
use orc_rust::ArrowReaderBuilder;

use crate::common::{create_csv_writer, create_json_writer, OutputFormat, OutputWriter};

#[derive(Debug, Parser)]
#[command(about = "Export ORC data to CSV or JSON format")]
pub struct Args {
    /// Path to an ORC file or "-" to read from stdin
    file: String,

    /// Output file. If not provided, output goes to stdout
    #[arg(short, long)]
    output: Option<PathBuf>,

    /// Output format
    #[arg(value_enum, short, long, default_value_t = OutputFormat::Csv)]
    format: OutputFormat,

    /// Export only the first N records (omit to export all)
    #[arg(short, long, value_name = "N")]
    num_rows: Option<NonZeroUsize>,

    /// Export only the specified columns (comma-separated list)
    #[arg(short, long, value_delimiter = ',')]
    columns: Option<Vec<String>>,

    /// Batch size to use when reading
    #[arg(long, default_value_t = 8192)]
    batch_size: usize,
}

pub fn run(args: Args) -> Result<()> {
    // Prepare output writer
    let output: Box<dyn io::Write> = if let Some(ref path) = args.output {
        Box::new(File::create(path).with_context(|| format!("creating {}", path.display()))?)
    } else {
        Box::new(io::stdout().lock())
    };

    if args.file == "-" {
        // Read from stdin
        let mut buf = Vec::new();
        io::stdin().read_to_end(&mut buf).context("reading stdin")?;
        let bytes = Bytes::from(buf);
        run_export(bytes, &args, output)
    } else {
        // Read from file
        let file = File::open(&args.file).with_context(|| format!("opening {}", args.file))?;
        run_export(file, &args, output)
    }
}

fn run_export<R: ChunkReader>(
    mut source: R,
    args: &Args,
    output: Box<dyn io::Write>,
) -> Result<()> {
    // Build projection mask if columns are specified
    let projection = if let Some(selected) = &args.columns {
        // Need to read metadata to build projection
        let metadata = read_metadata(&mut source)?;

        let root_children = metadata.root_data_type().children();
        let mut missing: Vec<&str> = selected
            .iter()
            .filter(|name| !root_children.iter().any(|c| c.name() == name.as_str()))
            .map(|name| name.as_str())
            .collect();
        if !missing.is_empty() {
            missing.sort();
            anyhow::bail!("unknown column(s): {}", missing.join(", "));
        }

        let cols: Vec<usize> = root_children
            .iter()
            .filter(|nc| {
                // Check if column is selected
                let is_selected = selected.iter().any(|c| nc.name().eq(c));
                if !is_selected {
                    return false;
                }

                // Filter out unsupported types for CSV/JSON export.
                match nc.data_type().to_arrow_data_type() {
                    DataType::Binary => false,
                    DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
                        matches!(args.format, OutputFormat::Csv)
                    }
                    _ => true,
                }
            })
            .map(|nc| nc.data_type().column_index())
            .collect();

        Some(ProjectionMask::roots(metadata.root_data_type(), cols))
    } else {
        None
    };

    // Build reader
    let mut builder = ArrowReaderBuilder::try_new(source)?.with_batch_size(args.batch_size);

    if let Some(proj) = projection {
        builder = builder.with_projection(proj);
    }

    let reader = builder.build();

    // Create appropriate writer based on format
    let mut writer: OutputWriter<Box<dyn io::Write>, _> = match args.format {
        OutputFormat::Json => OutputWriter::Json(create_json_writer(output)),
        OutputFormat::Csv => OutputWriter::Csv(create_csv_writer(output)),
    };

    // Read and write data
    let mut remaining = args.num_rows.map(NonZeroUsize::get).unwrap_or(usize::MAX);

    for batch in reader {
        if remaining == 0 {
            break;
        }
        let mut batch = batch?;
        if remaining < batch.num_rows() {
            batch = batch.slice(0, remaining);
        }
        writer.write(&batch)?;
        remaining = remaining.saturating_sub(batch.num_rows());
    }

    writer.finish().context("closing writer")?;
    Ok(())
}