1use anyhow::{Context, Result};
4use std::fs::File;
5
6use apache_avro::{
7 types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
8};
9
10use crate::csv_handler::CellRange;
11use crate::helpers::{default_column_names, filter_by_range, max_column_count};
12use crate::traits::{DataReader, DataWriteOptions, DataWriter, FileHandler, SchemaProvider};
13
14#[derive(Default)]
16pub struct AvroHandler;
17
18impl AvroHandler {
19 pub fn new() -> Self {
20 Self::default()
21 }
22
23 pub fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
25 let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
26
27 let reader = AvroReader::new(file)?;
28 let mut all_rows: Vec<Vec<String>> = Vec::new();
29
30 for value in reader {
31 let value = value?;
32 if let AvroValue::Record(fields) = value {
33 let row: Vec<String> = fields
34 .iter()
35 .map(|(_, v)| self.avro_value_to_string(v))
36 .collect();
37 all_rows.push(row);
38 }
39 }
40
41 Ok(all_rows)
42 }
43
44 pub fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
46 let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
47
48 let reader = AvroReader::new(file)?;
49 let mut all_rows: Vec<Vec<String>> = Vec::new();
50
51 if let AvroSchema::Record(record) = reader.writer_schema() {
53 let headers: Vec<String> = record.fields.iter().map(|f| f.name.clone()).collect();
54 all_rows.push(headers);
55 }
56
57 for value in reader {
58 let value = value?;
59 if let AvroValue::Record(fields) = value {
60 let row: Vec<String> = fields
61 .iter()
62 .map(|(_, v)| self.avro_value_to_string(v))
63 .collect();
64 all_rows.push(row);
65 }
66 }
67
68 Ok(all_rows)
69 }
70
71 pub fn write(
73 &self,
74 path: &str,
75 data: &[Vec<String>],
76 field_names: Option<&[String]>,
77 ) -> Result<()> {
78 let num_cols = match field_names {
79 Some(names) if names.is_empty() => {
80 anyhow::bail!("Field names cannot be empty");
81 }
82 Some(names) => names.len(),
83 None if data.is_empty() => {
84 anyhow::bail!("Cannot write empty data to Avro");
85 }
86 None => max_column_count(data),
87 };
88
89 if num_cols == 0 {
90 anyhow::bail!("Cannot write Avro with zero fields");
91 }
92
93 let names: Vec<String> = field_names
95 .map(|n| n.to_vec())
96 .unwrap_or_else(|| default_column_names(num_cols, "field"));
97
98 let schema_json = format!(
100 r#"{{
101 "type": "record",
102 "name": "Row",
103 "fields": [{}]
104 }}"#,
105 names
106 .iter()
107 .map(|n| format!(r#"{{"name": "{}", "type": ["null", "string"]}}"#, n))
108 .collect::<Vec<_>>()
109 .join(", ")
110 );
111
112 let schema = AvroSchema::parse_str(&schema_json)?;
113
114 let file =
115 File::create(path).with_context(|| format!("Failed to create Avro file: {path}"))?;
116
117 {
118 let mut writer = AvroWriter::new(&schema, file);
119
120 for row in data {
121 let mut record: Vec<(String, AvroValue)> = Vec::new();
122 for (i, name) in names.iter().enumerate() {
123 let value = row
124 .get(i)
125 .map(|s| AvroValue::Union(1, Box::new(AvroValue::String(s.clone()))))
126 .unwrap_or(AvroValue::Union(0, Box::new(AvroValue::Null)));
127 record.push((name.clone(), value));
128 }
129 writer.append(AvroValue::Record(record))?;
130 }
131
132 writer.flush()?;
134 }
136
137 Ok(())
138 }
139
140 pub fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
142 let file = File::open(path).with_context(|| format!("Failed to open Avro file: {path}"))?;
143
144 let reader = AvroReader::new(file)?;
145
146 let fields = if let AvroSchema::Record(record) = reader.writer_schema() {
147 record
148 .fields
149 .iter()
150 .map(|f| (f.name.clone(), format!("{:?}", f.schema)))
151 .collect()
152 } else {
153 Vec::new()
154 };
155
156 Ok(fields)
157 }
158
159 fn avro_value_to_string(&self, value: &AvroValue) -> String {
160 match value {
161 AvroValue::Null => String::new(),
162 AvroValue::Boolean(b) => b.to_string(),
163 AvroValue::Int(i) => i.to_string(),
164 AvroValue::Long(l) => l.to_string(),
165 AvroValue::Float(f) => f.to_string(),
166 AvroValue::Double(d) => d.to_string(),
167 AvroValue::String(s) => s.clone(),
168 AvroValue::Bytes(b) => String::from_utf8_lossy(b).to_string(),
169 AvroValue::Union(_, inner) => self.avro_value_to_string(inner),
170 AvroValue::Array(arr) => {
171 let items: Vec<String> = arr.iter().map(|v| self.avro_value_to_string(v)).collect();
172 format!("[{}]", items.join(", "))
173 }
174 AvroValue::Map(map) => {
175 let items: Vec<String> = map
176 .iter()
177 .map(|(k, v)| format!("{}: {}", k, self.avro_value_to_string(v)))
178 .collect();
179 format!("{{{}}}", items.join(", "))
180 }
181 _ => format!("{:?}", value),
182 }
183 }
184}
185
186impl DataReader for AvroHandler {
187 fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
188 self.read(path)
189 }
190
191 fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
192 self.read_with_headers(path)
193 }
194
195 fn read_range(&self, path: &str, range: &CellRange) -> Result<Vec<Vec<String>>> {
196 let all_data = self.read(path)?;
197 Ok(filter_by_range(&all_data, range))
198 }
199
200 fn read_as_json(&self, path: &str) -> Result<String> {
201 let data = self.read(path)?;
202 serde_json::to_string_pretty(&data).with_context(|| "Failed to serialize to JSON")
203 }
204
205 fn supports_format(&self, path: &str) -> bool {
206 path.to_lowercase().ends_with(".avro")
207 }
208}
209
210impl DataWriter for AvroHandler {
211 fn write(&self, path: &str, data: &[Vec<String>], options: DataWriteOptions) -> Result<()> {
212 if let Some(ref names) = options.column_names {
213 return self.write(path, data, Some(names.as_slice()));
214 }
215 if options.include_headers && !data.is_empty() {
216 let body = data.get(1..).unwrap_or(&[]);
217 return self.write(path, body, Some(&data[0]));
218 }
219 self.write(path, data, None)
220 }
221
222 fn write_range(
223 &self,
224 path: &str,
225 data: &[Vec<String>],
226 _start_row: usize,
227 _start_col: usize,
228 ) -> Result<()> {
229 self.write(path, data, None)
231 }
232
233 fn append(&self, _path: &str, _data: &[Vec<String>]) -> Result<()> {
234 anyhow::bail!("Append operation not supported for Avro files")
235 }
236
237 fn supports_format(&self, path: &str) -> bool {
238 path.to_lowercase().ends_with(".avro")
239 }
240}
241
242impl FileHandler for AvroHandler {
243 fn format_name(&self) -> &'static str {
244 "avro"
245 }
246
247 fn supported_extensions(&self) -> &'static [&'static str] {
248 &["avro"]
249 }
250}
251
252impl SchemaProvider for AvroHandler {
253 fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
254 self.get_schema(path)
255 }
256
257 fn get_column_names(&self, path: &str) -> Result<Vec<String>> {
258 let schema = self.get_schema(path)?;
259 Ok(schema.into_iter().map(|(name, _)| name).collect())
260 }
261
262 fn get_row_count(&self, path: &str) -> Result<usize> {
263 let data = self.read(path)?;
264 Ok(data.len())
265 }
266
267 fn get_column_count(&self, path: &str) -> Result<usize> {
268 let data = self.read(path)?;
269 Ok(data.first().map(|r| r.len()).unwrap_or(0))
270 }
271}