use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
use crate::errors::AvroError;
use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
use crate::writer::encoder::write_long;
use arrow_schema::Schema;
use rand::RngCore;
use std::fmt::Debug;
use std::io::Write;
pub trait AvroFormat: Debug + Default {
const NEEDS_PREFIX: bool;
fn start_stream<W: Write>(
&mut self,
writer: &mut W,
schema: &Schema,
compression: Option<CompressionCodec>,
) -> Result<(), AvroError>;
fn sync_marker(&self) -> Option<&[u8; 16]>;
}
#[derive(Debug, Default)]
pub struct AvroOcfFormat {
sync_marker: [u8; 16],
}
impl AvroFormat for AvroOcfFormat {
const NEEDS_PREFIX: bool = false;
fn start_stream<W: Write>(
&mut self,
writer: &mut W,
schema: &Schema,
compression: Option<CompressionCodec>,
) -> Result<(), AvroError> {
let mut rng = rand::rng();
rng.fill_bytes(&mut self.sync_marker);
let avro_schema = AvroSchema::from_arrow_with_options(
schema,
Some(AvroSchemaOptions {
null_order: None,
strip_metadata: true,
}),
)
.map_err(|e| AvroError::SchemaError(format!("{:?}", e)))?;
writer.write_all(b"Obj\x01")?;
let codec_str = match compression {
Some(CompressionCodec::Deflate) => "deflate",
Some(CompressionCodec::Snappy) => "snappy",
Some(CompressionCodec::ZStandard) => "zstandard",
Some(CompressionCodec::Bzip2) => "bzip2",
Some(CompressionCodec::Xz) => "xz",
None => "null",
};
write_long(writer, 2)?;
write_string(writer, SCHEMA_METADATA_KEY)?;
write_bytes(writer, avro_schema.json_string.as_bytes())?;
write_string(writer, CODEC_METADATA_KEY)?;
write_bytes(writer, codec_str.as_bytes())?;
write_long(writer, 0)?;
writer.write_all(&self.sync_marker)?;
Ok(())
}
fn sync_marker(&self) -> Option<&[u8; 16]> {
Some(&self.sync_marker)
}
}
#[derive(Debug, Default)]
pub struct AvroSoeFormat {}
impl AvroFormat for AvroSoeFormat {
const NEEDS_PREFIX: bool = true;
fn start_stream<W: Write>(
&mut self,
_writer: &mut W,
_schema: &Schema,
compression: Option<CompressionCodec>,
) -> Result<(), AvroError> {
if compression.is_some() {
return Err(AvroError::InvalidArgument(
"Compression not supported for Avro SOE streaming".to_string(),
));
}
Ok(())
}
fn sync_marker(&self) -> Option<&[u8; 16]> {
None
}
}
#[derive(Debug, Default)]
pub struct AvroBinaryFormat;
impl AvroFormat for AvroBinaryFormat {
const NEEDS_PREFIX: bool = false;
fn start_stream<W: Write>(
&mut self,
_writer: &mut W,
_schema: &Schema,
compression: Option<CompressionCodec>,
) -> Result<(), AvroError> {
if compression.is_some() {
return Err(AvroError::InvalidArgument(
"Compression not supported for Avro binary streaming".to_string(),
));
}
Ok(())
}
fn sync_marker(&self) -> Option<&[u8; 16]> {
None
}
}
#[inline]
fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
write_bytes(writer, s.as_bytes())
}
#[inline]
fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
write_long(writer, bytes.len() as i64)?;
writer.write_all(bytes)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field, Schema};
fn test_schema() -> Schema {
Schema::new(vec![Field::new("x", DataType::Int32, false)])
}
#[test]
fn avro_binary_format_rejects_compression() {
let mut format = AvroBinaryFormat;
let schema = test_schema();
let err = format
.start_stream(
&mut Vec::<u8>::new(),
&schema,
Some(CompressionCodec::Snappy),
)
.unwrap_err();
assert!(
err.to_string()
.contains("Compression not supported for Avro binary streaming")
);
}
#[test]
fn avro_soe_format_rejects_compression() {
let mut format = AvroSoeFormat::default();
let schema = test_schema();
let err = format
.start_stream(
&mut Vec::<u8>::new(),
&schema,
Some(CompressionCodec::Snappy),
)
.unwrap_err();
assert!(
err.to_string()
.contains("Compression not supported for Avro SOE streaming")
);
}
}