avz 0.1.0

Blistering-fast Avro CLI tool — a modern replacement for avro-tools and fastavro
Documentation
use apache_avro::{Reader, Schema, schema_compatibility::SchemaCompatibility};
use aws_sdk_s3::Client as S3Client;
use std::fs;

use crate::error::{AvzError, Result};
use crate::io::{self, AvroInput};

pub async fn execute(
    files: &[String],
    s3_client: &Option<S3Client>,
    reader_schema_path: Option<&str>,
) -> Result<()> {
    if let Some(rs_path) = reader_schema_path {
        return check_compatibility(files, s3_client, rs_path).await;
    }

    let paths = io::resolve_files(files, s3_client).await;
    for path in &paths {
        let input = io::open_input(path, s3_client).await?;
        match input {
            AvroInput::Local(f) => validate_file(Reader::new(f), path)?,
            AvroInput::Memory(c) => validate_file(Reader::new(c), path)?,
        }
        println!("{}: OK", path);
    }
    Ok(())
}

fn validate_file(
    reader_result: std::result::Result<Reader<impl std::io::Read>, apache_avro::Error>,
    path: &str,
) -> Result<()> {
    let reader = reader_result.map_err(|e| {
        AvzError::User(format!("Not a valid Avro file {}: {}", path, e))
    })?;

    let mut count = 0;
    for record in reader {
        record.map_err(|e| {
            AvzError::User(format!("Invalid record #{} in {}: {}", count + 1, path, e))
        })?;
        count += 1;
    }
    eprintln!("Validated {} records in {}", count, path);
    Ok(())
}

async fn check_compatibility(
    files: &[String],
    s3_client: &Option<S3Client>,
    reader_schema_path: &str,
) -> Result<()> {
    let schema_str = fs::read_to_string(reader_schema_path)
        .map_err(|e| AvzError::User(format!("Cannot read schema file {}: {}", reader_schema_path, e)))?;
    let reader_schema = Schema::parse_str(&schema_str)?;

    let paths = io::resolve_files(files, s3_client).await;
    for path in &paths {
        let input = io::open_input(path, s3_client).await?;
        let writer_schema = match input {
            AvroInput::Local(f) => get_schema(Reader::new(f), path)?,
            AvroInput::Memory(c) => get_schema(Reader::new(c), path)?,
        };

        match SchemaCompatibility::can_read(&writer_schema, &reader_schema) {
            Ok(()) => println!("{}: COMPATIBLE", path),
            Err(e) => {
                println!("{}: INCOMPATIBLE", path);
                return Err(AvzError::User(format!(
                    "Reader schema is not compatible with writer schema in {}: {}",
                    path, e
                )));
            }
        }
    }
    Ok(())
}

fn get_schema(
    reader_result: std::result::Result<Reader<impl std::io::Read>, apache_avro::Error>,
    path: &str,
) -> Result<Schema> {
    let reader = reader_result.map_err(|e| {
        AvzError::User(format!("Not a valid Avro file {}: {}", path, e))
    })?;
    Ok(reader.writer_schema().clone())
}