1use std::io::Write;
28
29use serde::{Deserialize, Serialize};
30
31use crate::error::IndexerError;
32use crate::handler::DecodedEvent;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum ExportFormat {
39 Jsonl,
41 Csv,
43}
44
45#[derive(Debug, Clone)]
49pub struct ExportConfig {
50 pub format: ExportFormat,
52 pub from_block: Option<u64>,
54 pub to_block: Option<u64>,
56 pub schema_filter: Vec<String>,
58 pub address_filter: Vec<String>,
60}
61
62impl Default for ExportConfig {
63 fn default() -> Self {
64 Self {
65 format: ExportFormat::Jsonl,
66 from_block: None,
67 to_block: None,
68 schema_filter: Vec::new(),
69 address_filter: Vec::new(),
70 }
71 }
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct ExportStats {
79 pub events_exported: u64,
81 pub bytes_written: u64,
83 pub events_skipped: u64,
85}
86
87pub trait Exporter {
91 fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError>;
93 fn finish(&mut self) -> Result<u64, IndexerError>;
95}
96
97pub struct JsonlExporter<W: Write> {
101 writer: W,
102 bytes_written: u64,
103}
104
105impl<W: Write> JsonlExporter<W> {
106 pub fn new(writer: W) -> Self {
108 Self {
109 writer,
110 bytes_written: 0,
111 }
112 }
113}
114
115impl<W: Write> Exporter for JsonlExporter<W> {
116 fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
117 let json = serde_json::to_string(event)
118 .map_err(|e| IndexerError::Other(format!("JSON serialization error: {e}")))?;
119 let line = format!("{json}\n");
120 self.writer
121 .write_all(line.as_bytes())
122 .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
123 self.bytes_written += line.len() as u64;
124 Ok(())
125 }
126
127 fn finish(&mut self) -> Result<u64, IndexerError> {
128 self.writer
129 .flush()
130 .map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
131 Ok(self.bytes_written)
132 }
133}
134
135pub struct CsvExporter<W: Write> {
141 writer: W,
142 bytes_written: u64,
143 header_written: bool,
144}
145
146impl<W: Write> CsvExporter<W> {
147 pub fn new(writer: W) -> Self {
149 Self {
150 writer,
151 bytes_written: 0,
152 header_written: false,
153 }
154 }
155
156 fn write_header(&mut self) -> Result<(), IndexerError> {
157 if !self.header_written {
158 let header = "chain,schema,address,tx_hash,block_number,log_index,fields_json\n";
159 self.writer
160 .write_all(header.as_bytes())
161 .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
162 self.bytes_written += header.len() as u64;
163 self.header_written = true;
164 }
165 Ok(())
166 }
167}
168
169impl<W: Write> Exporter for CsvExporter<W> {
170 fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
171 self.write_header()?;
172
173 let fields_json = serde_json::to_string(&event.fields_json)
174 .map_err(|e| IndexerError::Other(format!("JSON error: {e}")))?;
175
176 let line = format!(
178 "{},{},{},{},{},{},\"{}\"\n",
179 csv_escape(&event.chain),
180 csv_escape(&event.schema),
181 csv_escape(&event.address),
182 csv_escape(&event.tx_hash),
183 event.block_number,
184 event.log_index,
185 fields_json.replace('"', "\"\""),
186 );
187 self.writer
188 .write_all(line.as_bytes())
189 .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
190 self.bytes_written += line.len() as u64;
191 Ok(())
192 }
193
194 fn finish(&mut self) -> Result<u64, IndexerError> {
195 self.writer
196 .flush()
197 .map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
198 Ok(self.bytes_written)
199 }
200}
201
202fn csv_escape(s: &str) -> String {
204 if s.contains(',') || s.contains('"') || s.contains('\n') {
205 format!("\"{}\"", s.replace('"', "\"\""))
206 } else {
207 s.to_string()
208 }
209}
210
211fn passes_filter(event: &DecodedEvent, config: &ExportConfig) -> bool {
215 if let Some(from) = config.from_block {
217 if event.block_number < from {
218 return false;
219 }
220 }
221 if let Some(to) = config.to_block {
222 if event.block_number > to {
223 return false;
224 }
225 }
226 if !config.schema_filter.is_empty()
228 && !config
229 .schema_filter
230 .iter()
231 .any(|s| s.eq_ignore_ascii_case(&event.schema))
232 {
233 return false;
234 }
235 if !config.address_filter.is_empty()
237 && !config
238 .address_filter
239 .iter()
240 .any(|a| a.eq_ignore_ascii_case(&event.address))
241 {
242 return false;
243 }
244 true
245}
246
247pub fn export_events<W: Write>(
251 events: &[DecodedEvent],
252 config: &ExportConfig,
253 writer: W,
254) -> Result<ExportStats, IndexerError> {
255 let mut stats = ExportStats::default();
256
257 match config.format {
258 ExportFormat::Jsonl => {
259 let mut exporter = JsonlExporter::new(writer);
260 for event in events {
261 if passes_filter(event, config) {
262 exporter.write_event(event)?;
263 stats.events_exported += 1;
264 } else {
265 stats.events_skipped += 1;
266 }
267 }
268 stats.bytes_written = exporter.finish()?;
269 }
270 ExportFormat::Csv => {
271 let mut exporter = CsvExporter::new(writer);
272 for event in events {
273 if passes_filter(event, config) {
274 exporter.write_event(event)?;
275 stats.events_exported += 1;
276 } else {
277 stats.events_skipped += 1;
278 }
279 }
280 stats.bytes_written = exporter.finish()?;
281 }
282 }
283
284 Ok(stats)
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 fn make_event(schema: &str, address: &str, block: u64) -> DecodedEvent {
292 DecodedEvent {
293 chain: "ethereum".into(),
294 schema: schema.into(),
295 address: address.into(),
296 tx_hash: format!("0xtx_{block}"),
297 block_number: block,
298 log_index: 0,
299 fields_json: serde_json::json!({"from": "0xA", "to": "0xB", "value": 100}),
300 }
301 }
302
303 fn test_events() -> Vec<DecodedEvent> {
304 vec![
305 make_event("Transfer", "0xToken1", 100),
306 make_event("Approval", "0xToken1", 101),
307 make_event("Transfer", "0xToken2", 102),
308 make_event("Swap", "0xPool1", 103),
309 make_event("Transfer", "0xToken1", 200),
310 ]
311 }
312
313 #[test]
314 fn jsonl_export_single_event() {
315 let events = vec![make_event("Transfer", "0xToken", 100)];
316 let mut buf = Vec::new();
317 let config = ExportConfig::default();
318
319 let stats = export_events(&events, &config, &mut buf).unwrap();
320 assert_eq!(stats.events_exported, 1);
321 assert!(stats.bytes_written > 0);
322
323 let output = String::from_utf8(buf).unwrap();
324 let lines: Vec<&str> = output.trim().lines().collect();
325 assert_eq!(lines.len(), 1);
326
327 let _: DecodedEvent = serde_json::from_str(lines[0]).unwrap();
329 }
330
331 #[test]
332 fn jsonl_export_multiple_events() {
333 let events = test_events();
334 let mut buf = Vec::new();
335 let config = ExportConfig::default();
336
337 let stats = export_events(&events, &config, &mut buf).unwrap();
338 assert_eq!(stats.events_exported, 5);
339
340 let output = String::from_utf8(buf).unwrap();
341 let lines: Vec<&str> = output.trim().lines().collect();
342 assert_eq!(lines.len(), 5);
343 }
344
345 #[test]
346 fn csv_export_with_header() {
347 let events = vec![make_event("Transfer", "0xToken", 100)];
348 let mut buf = Vec::new();
349 let config = ExportConfig {
350 format: ExportFormat::Csv,
351 ..Default::default()
352 };
353
354 let stats = export_events(&events, &config, &mut buf).unwrap();
355 assert_eq!(stats.events_exported, 1);
356
357 let output = String::from_utf8(buf).unwrap();
358 let lines: Vec<&str> = output.trim().lines().collect();
359 assert_eq!(lines.len(), 2); assert!(lines[0].starts_with("chain,schema,address"));
361 assert!(lines[1].starts_with("ethereum,Transfer"));
362 }
363
364 #[test]
365 fn block_range_filter() {
366 let events = test_events();
367 let mut buf = Vec::new();
368 let config = ExportConfig {
369 from_block: Some(101),
370 to_block: Some(103),
371 ..Default::default()
372 };
373
374 let stats = export_events(&events, &config, &mut buf).unwrap();
375 assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2); }
378
379 #[test]
380 fn schema_filter() {
381 let events = test_events();
382 let mut buf = Vec::new();
383 let config = ExportConfig {
384 schema_filter: vec!["Transfer".into()],
385 ..Default::default()
386 };
387
388 let stats = export_events(&events, &config, &mut buf).unwrap();
389 assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2); }
392
393 #[test]
394 fn address_filter() {
395 let events = test_events();
396 let mut buf = Vec::new();
397 let config = ExportConfig {
398 address_filter: vec!["0xToken1".into()],
399 ..Default::default()
400 };
401
402 let stats = export_events(&events, &config, &mut buf).unwrap();
403 assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2);
405 }
406
407 #[test]
408 fn combined_filters() {
409 let events = test_events();
410 let mut buf = Vec::new();
411 let config = ExportConfig {
412 schema_filter: vec!["Transfer".into()],
413 address_filter: vec!["0xToken1".into()],
414 from_block: Some(100),
415 to_block: Some(150),
416 ..Default::default()
417 };
418
419 let stats = export_events(&events, &config, &mut buf).unwrap();
420 assert_eq!(stats.events_exported, 1); }
422
423 #[test]
424 fn empty_export() {
425 let events: Vec<DecodedEvent> = vec![];
426 let mut buf = Vec::new();
427 let config = ExportConfig::default();
428
429 let stats = export_events(&events, &config, &mut buf).unwrap();
430 assert_eq!(stats.events_exported, 0);
431 assert_eq!(stats.bytes_written, 0);
432 }
433
434 #[test]
435 fn export_stats_accurate() {
436 let events = test_events();
437 let mut buf = Vec::new();
438 let config = ExportConfig::default();
439
440 let stats = export_events(&events, &config, &mut buf).unwrap();
441 assert_eq!(stats.events_exported, 5);
442 assert_eq!(stats.events_skipped, 0);
443 assert_eq!(stats.bytes_written, buf.len() as u64);
444 }
445}