use std::io::Write;
use serde::{Deserialize, Serialize};
use crate::error::IndexerError;
use crate::handler::DecodedEvent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExportFormat {
Jsonl,
Csv,
}
#[derive(Debug, Clone)]
pub struct ExportConfig {
pub format: ExportFormat,
pub from_block: Option<u64>,
pub to_block: Option<u64>,
pub schema_filter: Vec<String>,
pub address_filter: Vec<String>,
}
impl Default for ExportConfig {
fn default() -> Self {
Self {
format: ExportFormat::Jsonl,
from_block: None,
to_block: None,
schema_filter: Vec::new(),
address_filter: Vec::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExportStats {
pub events_exported: u64,
pub bytes_written: u64,
pub events_skipped: u64,
}
pub trait Exporter {
fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError>;
fn finish(&mut self) -> Result<u64, IndexerError>;
}
pub struct JsonlExporter<W: Write> {
writer: W,
bytes_written: u64,
}
impl<W: Write> JsonlExporter<W> {
pub fn new(writer: W) -> Self {
Self {
writer,
bytes_written: 0,
}
}
}
impl<W: Write> Exporter for JsonlExporter<W> {
fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
let json = serde_json::to_string(event)
.map_err(|e| IndexerError::Other(format!("JSON serialization error: {e}")))?;
let line = format!("{json}\n");
self.writer
.write_all(line.as_bytes())
.map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
self.bytes_written += line.len() as u64;
Ok(())
}
fn finish(&mut self) -> Result<u64, IndexerError> {
self.writer
.flush()
.map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
Ok(self.bytes_written)
}
}
pub struct CsvExporter<W: Write> {
writer: W,
bytes_written: u64,
header_written: bool,
}
impl<W: Write> CsvExporter<W> {
pub fn new(writer: W) -> Self {
Self {
writer,
bytes_written: 0,
header_written: false,
}
}
fn write_header(&mut self) -> Result<(), IndexerError> {
if !self.header_written {
let header = "chain,schema,address,tx_hash,block_number,log_index,fields_json\n";
self.writer
.write_all(header.as_bytes())
.map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
self.bytes_written += header.len() as u64;
self.header_written = true;
}
Ok(())
}
}
impl<W: Write> Exporter for CsvExporter<W> {
fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
self.write_header()?;
let fields_json = serde_json::to_string(&event.fields_json)
.map_err(|e| IndexerError::Other(format!("JSON error: {e}")))?;
let line = format!(
"{},{},{},{},{},{},\"{}\"\n",
csv_escape(&event.chain),
csv_escape(&event.schema),
csv_escape(&event.address),
csv_escape(&event.tx_hash),
event.block_number,
event.log_index,
fields_json.replace('"', "\"\""),
);
self.writer
.write_all(line.as_bytes())
.map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
self.bytes_written += line.len() as u64;
Ok(())
}
fn finish(&mut self) -> Result<u64, IndexerError> {
self.writer
.flush()
.map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
Ok(self.bytes_written)
}
}
fn csv_escape(s: &str) -> String {
if s.contains(',') || s.contains('"') || s.contains('\n') {
format!("\"{}\"", s.replace('"', "\"\""))
} else {
s.to_string()
}
}
fn passes_filter(event: &DecodedEvent, config: &ExportConfig) -> bool {
if let Some(from) = config.from_block {
if event.block_number < from {
return false;
}
}
if let Some(to) = config.to_block {
if event.block_number > to {
return false;
}
}
if !config.schema_filter.is_empty()
&& !config
.schema_filter
.iter()
.any(|s| s.eq_ignore_ascii_case(&event.schema))
{
return false;
}
if !config.address_filter.is_empty()
&& !config
.address_filter
.iter()
.any(|a| a.eq_ignore_ascii_case(&event.address))
{
return false;
}
true
}
pub fn export_events<W: Write>(
events: &[DecodedEvent],
config: &ExportConfig,
writer: W,
) -> Result<ExportStats, IndexerError> {
let mut stats = ExportStats::default();
match config.format {
ExportFormat::Jsonl => {
let mut exporter = JsonlExporter::new(writer);
for event in events {
if passes_filter(event, config) {
exporter.write_event(event)?;
stats.events_exported += 1;
} else {
stats.events_skipped += 1;
}
}
stats.bytes_written = exporter.finish()?;
}
ExportFormat::Csv => {
let mut exporter = CsvExporter::new(writer);
for event in events {
if passes_filter(event, config) {
exporter.write_event(event)?;
stats.events_exported += 1;
} else {
stats.events_skipped += 1;
}
}
stats.bytes_written = exporter.finish()?;
}
}
Ok(stats)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_event(schema: &str, address: &str, block: u64) -> DecodedEvent {
DecodedEvent {
chain: "ethereum".into(),
schema: schema.into(),
address: address.into(),
tx_hash: format!("0xtx_{block}"),
block_number: block,
log_index: 0,
fields_json: serde_json::json!({"from": "0xA", "to": "0xB", "value": 100}),
}
}
fn test_events() -> Vec<DecodedEvent> {
vec![
make_event("Transfer", "0xToken1", 100),
make_event("Approval", "0xToken1", 101),
make_event("Transfer", "0xToken2", 102),
make_event("Swap", "0xPool1", 103),
make_event("Transfer", "0xToken1", 200),
]
}
#[test]
fn jsonl_export_single_event() {
let events = vec![make_event("Transfer", "0xToken", 100)];
let mut buf = Vec::new();
let config = ExportConfig::default();
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 1);
assert!(stats.bytes_written > 0);
let output = String::from_utf8(buf).unwrap();
let lines: Vec<&str> = output.trim().lines().collect();
assert_eq!(lines.len(), 1);
let _: DecodedEvent = serde_json::from_str(lines[0]).unwrap();
}
#[test]
fn jsonl_export_multiple_events() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig::default();
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 5);
let output = String::from_utf8(buf).unwrap();
let lines: Vec<&str> = output.trim().lines().collect();
assert_eq!(lines.len(), 5);
}
#[test]
fn csv_export_with_header() {
let events = vec![make_event("Transfer", "0xToken", 100)];
let mut buf = Vec::new();
let config = ExportConfig {
format: ExportFormat::Csv,
..Default::default()
};
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 1);
let output = String::from_utf8(buf).unwrap();
let lines: Vec<&str> = output.trim().lines().collect();
assert_eq!(lines.len(), 2); assert!(lines[0].starts_with("chain,schema,address"));
assert!(lines[1].starts_with("ethereum,Transfer"));
}
#[test]
fn block_range_filter() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig {
from_block: Some(101),
to_block: Some(103),
..Default::default()
};
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2); }
#[test]
fn schema_filter() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig {
schema_filter: vec!["Transfer".into()],
..Default::default()
};
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2); }
#[test]
fn address_filter() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig {
address_filter: vec!["0xToken1".into()],
..Default::default()
};
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 3); assert_eq!(stats.events_skipped, 2);
}
#[test]
fn combined_filters() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig {
schema_filter: vec!["Transfer".into()],
address_filter: vec!["0xToken1".into()],
from_block: Some(100),
to_block: Some(150),
..Default::default()
};
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 1); }
#[test]
fn empty_export() {
let events: Vec<DecodedEvent> = vec![];
let mut buf = Vec::new();
let config = ExportConfig::default();
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 0);
assert_eq!(stats.bytes_written, 0);
}
#[test]
fn export_stats_accurate() {
let events = test_events();
let mut buf = Vec::new();
let config = ExportConfig::default();
let stats = export_events(&events, &config, &mut buf).unwrap();
assert_eq!(stats.events_exported, 5);
assert_eq!(stats.events_skipped, 0);
assert_eq!(stats.bytes_written, buf.len() as u64);
}
}