Skip to main content

chainindex_core/
export.rs

1//! Data export — write indexed events to JSONL or CSV files for analytics.
2//!
3//! Supports exporting decoded events with filtering by block range, schema,
4//! and address. Designed for feeding data to DuckDB, BigQuery, or Spark.
5//!
6//! # Supported Formats
7//!
8//! - **JSONL** (newline-delimited JSON) — one JSON object per line
9//! - **CSV** — comma-separated with header row
10//!
11//! # Example
12//!
13//! ```rust
14//! use chainindex_core::export::{ExportConfig, ExportFormat, export_events};
15//! use chainindex_core::handler::DecodedEvent;
16//!
17//! let events: Vec<DecodedEvent> = vec![];
18//! let config = ExportConfig {
19//!     format: ExportFormat::Jsonl,
20//!     ..Default::default()
21//! };
22//!
23//! let mut buf = Vec::new();
24//! let stats = export_events(&events, &config, &mut buf).unwrap();
25//! ```
26
27use std::io::Write;
28
29use serde::{Deserialize, Serialize};
30
31use crate::error::IndexerError;
32use crate::handler::DecodedEvent;
33
34// ─── ExportFormat ───────────────────────────────────────────────────────────
35
36/// Supported export formats.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum ExportFormat {
39    /// Newline-delimited JSON (one JSON object per line).
40    Jsonl,
41    /// Comma-separated values with header row.
42    Csv,
43}
44
45// ─── ExportConfig ───────────────────────────────────────────────────────────
46
47/// Configuration for a data export operation.
48#[derive(Debug, Clone)]
49pub struct ExportConfig {
50    /// Output format.
51    pub format: ExportFormat,
52    /// Only export events from blocks >= this number.
53    pub from_block: Option<u64>,
54    /// Only export events from blocks <= this number.
55    pub to_block: Option<u64>,
56    /// Only export events matching these schema names (empty = all).
57    pub schema_filter: Vec<String>,
58    /// Only export events from these addresses (empty = all).
59    pub address_filter: Vec<String>,
60}
61
62impl Default for ExportConfig {
63    fn default() -> Self {
64        Self {
65            format: ExportFormat::Jsonl,
66            from_block: None,
67            to_block: None,
68            schema_filter: Vec::new(),
69            address_filter: Vec::new(),
70        }
71    }
72}
73
74// ─── ExportStats ────────────────────────────────────────────────────────────
75
76/// Statistics from a completed export operation.
77#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct ExportStats {
79    /// Number of events written.
80    pub events_exported: u64,
81    /// Total bytes written.
82    pub bytes_written: u64,
83    /// Number of events skipped by filters.
84    pub events_skipped: u64,
85}
86
87// ─── Exporter Trait ─────────────────────────────────────────────────────────
88
89/// Trait for format-specific exporters.
90pub trait Exporter {
91    /// Write a single event.
92    fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError>;
93    /// Finalize the export and return bytes written.
94    fn finish(&mut self) -> Result<u64, IndexerError>;
95}
96
97// ─── JsonlExporter ──────────────────────────────────────────────────────────
98
99/// Writes events as newline-delimited JSON (JSONL).
100pub struct JsonlExporter<W: Write> {
101    writer: W,
102    bytes_written: u64,
103}
104
105impl<W: Write> JsonlExporter<W> {
106    /// Create a new JSONL exporter writing to the given writer.
107    pub fn new(writer: W) -> Self {
108        Self {
109            writer,
110            bytes_written: 0,
111        }
112    }
113}
114
115impl<W: Write> Exporter for JsonlExporter<W> {
116    fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
117        let json = serde_json::to_string(event)
118            .map_err(|e| IndexerError::Other(format!("JSON serialization error: {e}")))?;
119        let line = format!("{json}\n");
120        self.writer
121            .write_all(line.as_bytes())
122            .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
123        self.bytes_written += line.len() as u64;
124        Ok(())
125    }
126
127    fn finish(&mut self) -> Result<u64, IndexerError> {
128        self.writer
129            .flush()
130            .map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
131        Ok(self.bytes_written)
132    }
133}
134
135// ─── CsvExporter ────────────────────────────────────────────────────────────
136
137/// Writes events as CSV with a header row.
138///
139/// Columns: chain, schema, address, tx_hash, block_number, log_index, fields_json
140pub struct CsvExporter<W: Write> {
141    writer: W,
142    bytes_written: u64,
143    header_written: bool,
144}
145
146impl<W: Write> CsvExporter<W> {
147    /// Create a new CSV exporter writing to the given writer.
148    pub fn new(writer: W) -> Self {
149        Self {
150            writer,
151            bytes_written: 0,
152            header_written: false,
153        }
154    }
155
156    fn write_header(&mut self) -> Result<(), IndexerError> {
157        if !self.header_written {
158            let header = "chain,schema,address,tx_hash,block_number,log_index,fields_json\n";
159            self.writer
160                .write_all(header.as_bytes())
161                .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
162            self.bytes_written += header.len() as u64;
163            self.header_written = true;
164        }
165        Ok(())
166    }
167}
168
169impl<W: Write> Exporter for CsvExporter<W> {
170    fn write_event(&mut self, event: &DecodedEvent) -> Result<(), IndexerError> {
171        self.write_header()?;
172
173        let fields_json = serde_json::to_string(&event.fields_json)
174            .map_err(|e| IndexerError::Other(format!("JSON error: {e}")))?;
175
176        // Escape CSV fields that contain commas or quotes
177        let line = format!(
178            "{},{},{},{},{},{},\"{}\"\n",
179            csv_escape(&event.chain),
180            csv_escape(&event.schema),
181            csv_escape(&event.address),
182            csv_escape(&event.tx_hash),
183            event.block_number,
184            event.log_index,
185            fields_json.replace('"', "\"\""),
186        );
187        self.writer
188            .write_all(line.as_bytes())
189            .map_err(|e| IndexerError::Other(format!("Write error: {e}")))?;
190        self.bytes_written += line.len() as u64;
191        Ok(())
192    }
193
194    fn finish(&mut self) -> Result<u64, IndexerError> {
195        self.writer
196            .flush()
197            .map_err(|e| IndexerError::Other(format!("Flush error: {e}")))?;
198        Ok(self.bytes_written)
199    }
200}
201
202/// Escape a CSV field value (wrap in quotes if it contains comma, quote, or newline).
203fn csv_escape(s: &str) -> String {
204    if s.contains(',') || s.contains('"') || s.contains('\n') {
205        format!("\"{}\"", s.replace('"', "\"\""))
206    } else {
207        s.to_string()
208    }
209}
210
211// ─── Filter + Export ────────────────────────────────────────────────────────
212
213/// Check if an event passes the export filters.
214fn passes_filter(event: &DecodedEvent, config: &ExportConfig) -> bool {
215    // Block range filter
216    if let Some(from) = config.from_block {
217        if event.block_number < from {
218            return false;
219        }
220    }
221    if let Some(to) = config.to_block {
222        if event.block_number > to {
223            return false;
224        }
225    }
226    // Schema filter
227    if !config.schema_filter.is_empty()
228        && !config
229            .schema_filter
230            .iter()
231            .any(|s| s.eq_ignore_ascii_case(&event.schema))
232    {
233        return false;
234    }
235    // Address filter
236    if !config.address_filter.is_empty()
237        && !config
238            .address_filter
239            .iter()
240            .any(|a| a.eq_ignore_ascii_case(&event.address))
241    {
242        return false;
243    }
244    true
245}
246
247/// Export a slice of events to a writer using the given config.
248///
249/// Returns statistics about the export operation.
250pub fn export_events<W: Write>(
251    events: &[DecodedEvent],
252    config: &ExportConfig,
253    writer: W,
254) -> Result<ExportStats, IndexerError> {
255    let mut stats = ExportStats::default();
256
257    match config.format {
258        ExportFormat::Jsonl => {
259            let mut exporter = JsonlExporter::new(writer);
260            for event in events {
261                if passes_filter(event, config) {
262                    exporter.write_event(event)?;
263                    stats.events_exported += 1;
264                } else {
265                    stats.events_skipped += 1;
266                }
267            }
268            stats.bytes_written = exporter.finish()?;
269        }
270        ExportFormat::Csv => {
271            let mut exporter = CsvExporter::new(writer);
272            for event in events {
273                if passes_filter(event, config) {
274                    exporter.write_event(event)?;
275                    stats.events_exported += 1;
276                } else {
277                    stats.events_skipped += 1;
278                }
279            }
280            stats.bytes_written = exporter.finish()?;
281        }
282    }
283
284    Ok(stats)
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    fn make_event(schema: &str, address: &str, block: u64) -> DecodedEvent {
292        DecodedEvent {
293            chain: "ethereum".into(),
294            schema: schema.into(),
295            address: address.into(),
296            tx_hash: format!("0xtx_{block}"),
297            block_number: block,
298            log_index: 0,
299            fields_json: serde_json::json!({"from": "0xA", "to": "0xB", "value": 100}),
300        }
301    }
302
303    fn test_events() -> Vec<DecodedEvent> {
304        vec![
305            make_event("Transfer", "0xToken1", 100),
306            make_event("Approval", "0xToken1", 101),
307            make_event("Transfer", "0xToken2", 102),
308            make_event("Swap", "0xPool1", 103),
309            make_event("Transfer", "0xToken1", 200),
310        ]
311    }
312
313    #[test]
314    fn jsonl_export_single_event() {
315        let events = vec![make_event("Transfer", "0xToken", 100)];
316        let mut buf = Vec::new();
317        let config = ExportConfig::default();
318
319        let stats = export_events(&events, &config, &mut buf).unwrap();
320        assert_eq!(stats.events_exported, 1);
321        assert!(stats.bytes_written > 0);
322
323        let output = String::from_utf8(buf).unwrap();
324        let lines: Vec<&str> = output.trim().lines().collect();
325        assert_eq!(lines.len(), 1);
326
327        // Verify it's valid JSON
328        let _: DecodedEvent = serde_json::from_str(lines[0]).unwrap();
329    }
330
331    #[test]
332    fn jsonl_export_multiple_events() {
333        let events = test_events();
334        let mut buf = Vec::new();
335        let config = ExportConfig::default();
336
337        let stats = export_events(&events, &config, &mut buf).unwrap();
338        assert_eq!(stats.events_exported, 5);
339
340        let output = String::from_utf8(buf).unwrap();
341        let lines: Vec<&str> = output.trim().lines().collect();
342        assert_eq!(lines.len(), 5);
343    }
344
345    #[test]
346    fn csv_export_with_header() {
347        let events = vec![make_event("Transfer", "0xToken", 100)];
348        let mut buf = Vec::new();
349        let config = ExportConfig {
350            format: ExportFormat::Csv,
351            ..Default::default()
352        };
353
354        let stats = export_events(&events, &config, &mut buf).unwrap();
355        assert_eq!(stats.events_exported, 1);
356
357        let output = String::from_utf8(buf).unwrap();
358        let lines: Vec<&str> = output.trim().lines().collect();
359        assert_eq!(lines.len(), 2); // header + 1 data row
360        assert!(lines[0].starts_with("chain,schema,address"));
361        assert!(lines[1].starts_with("ethereum,Transfer"));
362    }
363
364    #[test]
365    fn block_range_filter() {
366        let events = test_events();
367        let mut buf = Vec::new();
368        let config = ExportConfig {
369            from_block: Some(101),
370            to_block: Some(103),
371            ..Default::default()
372        };
373
374        let stats = export_events(&events, &config, &mut buf).unwrap();
375        assert_eq!(stats.events_exported, 3); // blocks 101, 102, 103
376        assert_eq!(stats.events_skipped, 2); // blocks 100, 200
377    }
378
379    #[test]
380    fn schema_filter() {
381        let events = test_events();
382        let mut buf = Vec::new();
383        let config = ExportConfig {
384            schema_filter: vec!["Transfer".into()],
385            ..Default::default()
386        };
387
388        let stats = export_events(&events, &config, &mut buf).unwrap();
389        assert_eq!(stats.events_exported, 3); // 3 Transfer events
390        assert_eq!(stats.events_skipped, 2); // Approval + Swap
391    }
392
393    #[test]
394    fn address_filter() {
395        let events = test_events();
396        let mut buf = Vec::new();
397        let config = ExportConfig {
398            address_filter: vec!["0xToken1".into()],
399            ..Default::default()
400        };
401
402        let stats = export_events(&events, &config, &mut buf).unwrap();
403        assert_eq!(stats.events_exported, 3); // 3 events from 0xToken1
404        assert_eq!(stats.events_skipped, 2);
405    }
406
407    #[test]
408    fn combined_filters() {
409        let events = test_events();
410        let mut buf = Vec::new();
411        let config = ExportConfig {
412            schema_filter: vec!["Transfer".into()],
413            address_filter: vec!["0xToken1".into()],
414            from_block: Some(100),
415            to_block: Some(150),
416            ..Default::default()
417        };
418
419        let stats = export_events(&events, &config, &mut buf).unwrap();
420        assert_eq!(stats.events_exported, 1); // only Transfer at 0xToken1 block 100
421    }
422
423    #[test]
424    fn empty_export() {
425        let events: Vec<DecodedEvent> = vec![];
426        let mut buf = Vec::new();
427        let config = ExportConfig::default();
428
429        let stats = export_events(&events, &config, &mut buf).unwrap();
430        assert_eq!(stats.events_exported, 0);
431        assert_eq!(stats.bytes_written, 0);
432    }
433
434    #[test]
435    fn export_stats_accurate() {
436        let events = test_events();
437        let mut buf = Vec::new();
438        let config = ExportConfig::default();
439
440        let stats = export_events(&events, &config, &mut buf).unwrap();
441        assert_eq!(stats.events_exported, 5);
442        assert_eq!(stats.events_skipped, 0);
443        assert_eq!(stats.bytes_written, buf.len() as u64);
444    }
445}