Skip to main content

xls_rs/columnar/
avro.rs

1//! Avro file handling
2
3use anyhow::{Context, Result};
4use std::fs::File;
5
6use apache_avro::{
7    types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
8};
9
10use crate::csv_handler::CellRange;
11use crate::helpers::{default_column_names, filter_by_range, max_column_count};
12use crate::traits::{DataReader, DataWriteOptions, DataWriter, FileHandler, SchemaProvider};
13
14/// Handler for Avro files
15#[derive(Default)]
16pub struct AvroHandler;
17
18impl AvroHandler {
19    pub fn new() -> Self {
20        Self::default()
21    }
22
23    /// Read Avro file into `Vec<Vec<String>>`
24    pub fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
25        let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
26
27        let reader = AvroReader::new(file)?;
28        let mut all_rows: Vec<Vec<String>> = Vec::new();
29
30        for value in reader {
31            let value = value?;
32            if let AvroValue::Record(fields) = value {
33                let row: Vec<String> = fields
34                    .iter()
35                    .map(|(_, v)| self.avro_value_to_string(v))
36                    .collect();
37                all_rows.push(row);
38            }
39        }
40
41        Ok(all_rows)
42    }
43
44    /// Read Avro file with field names as first row
45    pub fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
46        let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
47
48        let reader = AvroReader::new(file)?;
49        let mut all_rows: Vec<Vec<String>> = Vec::new();
50
51        // Get field names from schema
52        if let AvroSchema::Record(record) = reader.writer_schema() {
53            let headers: Vec<String> = record.fields.iter().map(|f| f.name.clone()).collect();
54            all_rows.push(headers);
55        }
56
57        for value in reader {
58            let value = value?;
59            if let AvroValue::Record(fields) = value {
60                let row: Vec<String> = fields
61                    .iter()
62                    .map(|(_, v)| self.avro_value_to_string(v))
63                    .collect();
64                all_rows.push(row);
65            }
66        }
67
68        Ok(all_rows)
69    }
70
71    /// Write data to Avro file (all fields as strings)
72    pub fn write(
73        &self,
74        path: &str,
75        data: &[Vec<String>],
76        field_names: Option<&[String]>,
77    ) -> Result<()> {
78        let num_cols = match field_names {
79            Some(names) if names.is_empty() => {
80                anyhow::bail!("Field names cannot be empty");
81            }
82            Some(names) => names.len(),
83            None if data.is_empty() => {
84                anyhow::bail!("Cannot write empty data to Avro");
85            }
86            None => max_column_count(data),
87        };
88
89        if num_cols == 0 {
90            anyhow::bail!("Cannot write Avro with zero fields");
91        }
92
93        // Generate field names if not provided
94        let names: Vec<String> = field_names
95            .map(|n| n.to_vec())
96            .unwrap_or_else(|| default_column_names(num_cols, "field"));
97
98        // Build Avro schema
99        let schema_json = format!(
100            r#"{{
101                "type": "record",
102                "name": "Row",
103                "fields": [{}]
104            }}"#,
105            names
106                .iter()
107                .map(|n| format!(r#"{{"name": "{}", "type": ["null", "string"]}}"#, n))
108                .collect::<Vec<_>>()
109                .join(", ")
110        );
111
112        let schema = AvroSchema::parse_str(&schema_json)?;
113
114        let file =
115            File::create(path).with_context(|| format!("Failed to create Avro file: {path}"))?;
116
117        {
118            let mut writer = AvroWriter::new(&schema, file);
119
120            for row in data {
121                let mut record: Vec<(String, AvroValue)> = Vec::new();
122                for (i, name) in names.iter().enumerate() {
123                    let value = row
124                        .get(i)
125                        .map(|s| AvroValue::Union(1, Box::new(AvroValue::String(s.clone()))))
126                        .unwrap_or(AvroValue::Union(0, Box::new(AvroValue::Null)));
127                    record.push((name.clone(), value));
128                }
129                writer.append(AvroValue::Record(record))?;
130            }
131
132            // Flush and finalize the writer - this is critical for Avro format
133            writer.flush()?;
134            // Writer is dropped here, which finalizes the Avro file
135        }
136
137        Ok(())
138    }
139
140    /// Get schema information from Avro file
141    pub fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
142        let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
143
144        let reader = AvroReader::new(file)?;
145
146        let fields = if let AvroSchema::Record(record) = reader.writer_schema() {
147            record
148                .fields
149                .iter()
150                .map(|f| (f.name.clone(), format!("{:?}", f.schema)))
151                .collect()
152        } else {
153            Vec::new()
154        };
155
156        Ok(fields)
157    }
158
159    fn avro_value_to_string(&self, value: &AvroValue) -> String {
160        match value {
161            AvroValue::Null => String::new(),
162            AvroValue::Boolean(b) => b.to_string(),
163            AvroValue::Int(i) => i.to_string(),
164            AvroValue::Long(l) => l.to_string(),
165            AvroValue::Float(f) => f.to_string(),
166            AvroValue::Double(d) => d.to_string(),
167            AvroValue::String(s) => s.clone(),
168            AvroValue::Bytes(b) => String::from_utf8_lossy(b).to_string(),
169            AvroValue::Union(_, inner) => self.avro_value_to_string(inner),
170            AvroValue::Array(arr) => {
171                let items: Vec<String> = arr.iter().map(|v| self.avro_value_to_string(v)).collect();
172                format!("[{}]", items.join(", "))
173            }
174            AvroValue::Map(map) => {
175                let items: Vec<String> = map
176                    .iter()
177                    .map(|(k, v)| format!("{}: {}", k, self.avro_value_to_string(v)))
178                    .collect();
179                format!("{{{}}}", items.join(", "))
180            }
181            _ => format!("{:?}", value),
182        }
183    }
184}
185
186impl DataReader for AvroHandler {
187    fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
188        self.read(path)
189    }
190
191    fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
192        self.read_with_headers(path)
193    }
194
195    fn read_range(&self, path: &str, range: &CellRange) -> Result<Vec<Vec<String>>> {
196        let all_data = self.read(path)?;
197        Ok(filter_by_range(&all_data, range))
198    }
199
200    fn read_as_json(&self, path: &str) -> Result<String> {
201        let data = self.read(path)?;
202        serde_json::to_string_pretty(&data).with_context(|| "Failed to serialize to JSON")
203    }
204
205    fn supports_format(&self, path: &str) -> bool {
206        path.to_lowercase().ends_with(".avro")
207    }
208}
209
210impl DataWriter for AvroHandler {
211    fn write(&self, path: &str, data: &[Vec<String>], options: DataWriteOptions) -> Result<()> {
212        if let Some(ref names) = options.column_names {
213            return self.write(path, data, Some(names.as_slice()));
214        }
215        if options.include_headers && !data.is_empty() {
216            let body = data.get(1..).unwrap_or(&[]);
217            return self.write(path, body, Some(&data[0]));
218        }
219        self.write(path, data, None)
220    }
221
222    fn write_range(
223        &self,
224        path: &str,
225        data: &[Vec<String>],
226        _start_row: usize,
227        _start_col: usize,
228    ) -> Result<()> {
229        // For Avro, we write the entire dataset
230        self.write(path, data, None)
231    }
232
233    fn append(&self, _path: &str, _data: &[Vec<String>]) -> Result<()> {
234        anyhow::bail!("Append operation not supported for Avro files")
235    }
236
237    fn supports_format(&self, path: &str) -> bool {
238        path.to_lowercase().ends_with(".avro")
239    }
240}
241
242impl FileHandler for AvroHandler {
243    fn format_name(&self) -> &'static str {
244        "avro"
245    }
246
247    fn supported_extensions(&self) -> &'static [&'static str] {
248        &["avro"]
249    }
250}
251
252impl SchemaProvider for AvroHandler {
253    fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
254        self.get_schema(path)
255    }
256
257    fn get_column_names(&self, path: &str) -> Result<Vec<String>> {
258        let schema = self.get_schema(path)?;
259        Ok(schema.into_iter().map(|(name, _)| name).collect())
260    }
261
262    fn get_row_count(&self, path: &str) -> Result<usize> {
263        let data = self.read(path)?;
264        Ok(data.len())
265    }
266
267    fn get_column_count(&self, path: &str) -> Result<usize> {
268        let data = self.read(path)?;
269        Ok(data.first().map(|r| r.len()).unwrap_or(0))
270    }
271}