Skip to main content

ethcli/output/
sqlite.rs

1//! SQLite output writer
2
3use crate::abi::{DecodedLog, DecodedValue};
4use crate::error::{OutputError, Result};
5use crate::fetcher::{FetchLogs, FetchResult};
6use crate::output::OutputWriter;
7use alloy::rpc::types::Log;
8use rusqlite::{params, Connection};
9use std::path::Path;
10
11use std::collections::HashMap as StdHashMap;
12
13/// SQLite output writer
14pub struct SqliteWriter {
15    /// Database connection
16    conn: Connection,
17    /// Known columns (original names)
18    columns: Vec<String>,
19    /// Mapping from original column name to sanitized column name (handles collisions)
20    column_name_map: StdHashMap<String, String>,
21    /// Set of sanitized names in use (to detect collisions)
22    sanitized_names: std::collections::HashSet<String>,
23    /// Table created
24    table_created: bool,
25    /// Batch buffer
26    buffer: Vec<DecodedLog>,
27    /// Batch size for inserts
28    batch_size: usize,
29}
30
31impl SqliteWriter {
32    /// Create a new SQLite writer
33    pub fn new(path: &Path) -> Result<Self> {
34        let conn = Connection::open(path).map_err(OutputError::Sqlite)?;
35
36        // Enable WAL mode for better performance
37        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
38            .map_err(OutputError::Sqlite)?;
39
40        Ok(Self {
41            conn,
42            columns: Vec::new(),
43            column_name_map: StdHashMap::new(),
44            sanitized_names: std::collections::HashSet::new(),
45            table_created: false,
46            buffer: Vec::new(),
47            batch_size: 1000,
48        })
49    }
50
51    /// Create the events table
52    fn create_table(&mut self) -> Result<()> {
53        // Base columns
54        let mut create_sql = String::from(
55            "CREATE TABLE IF NOT EXISTS events (
56                id INTEGER PRIMARY KEY AUTOINCREMENT,
57                block_number INTEGER NOT NULL,
58                transaction_hash TEXT NOT NULL,
59                log_index INTEGER NOT NULL,
60                address TEXT NOT NULL,
61                event_name TEXT NOT NULL,
62                event_signature TEXT NOT NULL,
63                topics TEXT,
64                data BLOB",
65        );
66
67        // Add dynamic columns with collision-safe names
68        for col in &self.columns.clone() {
69            let safe_col = self.get_sanitized_column_name(col);
70            create_sql.push_str(&format!(",\n                {} TEXT", safe_col));
71        }
72
73        create_sql.push_str(
74            "
75            );
76            CREATE INDEX IF NOT EXISTS idx_block ON events(block_number);
77            CREATE INDEX IF NOT EXISTS idx_address ON events(address);
78            CREATE INDEX IF NOT EXISTS idx_event ON events(event_name);",
79        );
80
81        self.conn
82            .execute_batch(&create_sql)
83            .map_err(OutputError::Sqlite)?;
84        self.table_created = true;
85
86        Ok(())
87    }
88
89    /// Sanitize column name for SQL (static version for basic sanitization)
90    fn sanitize_column_name_basic(name: &str) -> String {
91        // Replace non-alphanumeric with underscore, prefix with param_ to avoid reserved words
92        let sanitized: String = name
93            .chars()
94            .map(|c| if c.is_alphanumeric() { c } else { '_' })
95            .collect();
96        format!("param_{}", sanitized)
97    }
98
99    /// Get or create a unique sanitized column name for an original name
100    /// Handles collisions by appending a numeric suffix
101    fn get_sanitized_column_name(&mut self, original_name: &str) -> String {
102        // Return existing mapping if we have one
103        if let Some(sanitized) = self.column_name_map.get(original_name) {
104            return sanitized.clone();
105        }
106
107        // Generate base sanitized name
108        let base_sanitized = Self::sanitize_column_name_basic(original_name);
109
110        // Check for collision and find unique name
111        let unique_name = if self.sanitized_names.contains(&base_sanitized) {
112            // Collision detected - find unique suffix
113            let mut suffix = 1u32;
114            loop {
115                let candidate = format!("{}_{}", base_sanitized, suffix);
116                if !self.sanitized_names.contains(&candidate) {
117                    tracing::warn!(
118                        "Column name collision detected: '{}' and another column both sanitize to '{}'. \
119                         Using '{}' for '{}'.",
120                        original_name,
121                        base_sanitized,
122                        candidate,
123                        original_name
124                    );
125                    break candidate;
126                }
127                suffix += 1;
128            }
129        } else {
130            base_sanitized
131        };
132
133        // Register the mapping
134        self.sanitized_names.insert(unique_name.clone());
135        self.column_name_map
136            .insert(original_name.to_string(), unique_name.clone());
137
138        unique_name
139    }
140
141    /// Add a column if it doesn't exist
142    fn ensure_column(&mut self, name: &str) -> Result<()> {
143        if self.columns.contains(&name.to_string()) {
144            return Ok(());
145        }
146
147        let safe_col = self.get_sanitized_column_name(name);
148
149        if self.table_created {
150            // ALTER TABLE to add column
151            self.conn
152                .execute(
153                    &format!("ALTER TABLE events ADD COLUMN {} TEXT", safe_col),
154                    [],
155                )
156                .map_err(OutputError::Sqlite)?;
157        }
158
159        self.columns.push(name.to_string());
160        Ok(())
161    }
162
163    /// Collect columns from logs
164    fn collect_columns(&mut self, logs: &[DecodedLog]) {
165        for log in logs {
166            for key in log.params.keys() {
167                if !self.columns.contains(key) {
168                    self.columns.push(key.clone());
169                }
170            }
171        }
172    }
173
174    /// Insert a batch of logs
175    fn insert_batch(&mut self, logs: Vec<DecodedLog>) -> Result<()> {
176        if logs.is_empty() {
177            return Ok(());
178        }
179
180        // Ensure all columns exist
181        for log in &logs {
182            for key in log.params.keys() {
183                self.ensure_column(key)?;
184            }
185        }
186
187        // Build INSERT statement
188        let mut cols = vec![
189            "block_number",
190            "transaction_hash",
191            "log_index",
192            "address",
193            "event_name",
194            "event_signature",
195            "topics",
196            "data",
197        ];
198
199        // Use the collision-safe column name mapping
200        let param_cols: Vec<String> = self
201            .columns
202            .iter()
203            .map(|c| {
204                self.column_name_map
205                    .get(c)
206                    .cloned()
207                    .unwrap_or_else(|| Self::sanitize_column_name_basic(c))
208            })
209            .collect();
210
211        for col in &param_cols {
212            cols.push(col);
213        }
214
215        let placeholders: Vec<&str> = (0..cols.len()).map(|_| "?").collect();
216        let sql = format!(
217            "INSERT INTO events ({}) VALUES ({})",
218            cols.join(", "),
219            placeholders.join(", ")
220        );
221
222        let tx = self.conn.transaction().map_err(OutputError::Sqlite)?;
223
224        {
225            let mut stmt = tx.prepare(&sql).map_err(OutputError::Sqlite)?;
226
227            for log in &logs {
228                let topics_json = match serde_json::to_string(&log.topics) {
229                    Ok(json) => json,
230                    Err(e) => {
231                        tracing::warn!(
232                            "Failed to serialize topics for log in tx {:#x}: {}",
233                            log.transaction_hash,
234                            e
235                        );
236                        "[]".to_string()
237                    }
238                };
239
240                let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![
241                    Box::new(log.block_number as i64),
242                    Box::new(format!("{:#x}", log.transaction_hash)),
243                    Box::new(log.log_index as i64),
244                    Box::new(format!("{:#x}", log.address)),
245                    Box::new(log.event_name.clone()),
246                    Box::new(log.event_signature.clone()),
247                    Box::new(topics_json),
248                    Box::new(log.data.clone()),
249                ];
250
251                // Add parameter values
252                for col_name in &self.columns {
253                    let value = log
254                        .params
255                        .get(col_name)
256                        .map(Self::value_to_string)
257                        .unwrap_or_default();
258                    values.push(Box::new(value));
259                }
260
261                let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|v| v.as_ref()).collect();
262                stmt.execute(params.as_slice())
263                    .map_err(OutputError::Sqlite)?;
264            }
265        }
266
267        tx.commit().map_err(OutputError::Sqlite)?;
268        Ok(())
269    }
270
271    /// Convert value to string for storage
272    fn value_to_string(value: &DecodedValue) -> String {
273        match value {
274            DecodedValue::Address(s) => s.clone(),
275            DecodedValue::Uint(s) => s.clone(),
276            DecodedValue::Int(s) => s.clone(),
277            DecodedValue::Bool(b) => b.to_string(),
278            DecodedValue::Bytes(s) => s.clone(),
279            DecodedValue::String(s) => s.clone(),
280            DecodedValue::Array(arr) => serde_json::to_string(arr).unwrap_or_else(|e| {
281                tracing::warn!("Failed to serialize array value: {}", e);
282                "[serialization error]".to_string()
283            }),
284            DecodedValue::Tuple(arr) => serde_json::to_string(arr).unwrap_or_else(|e| {
285                tracing::warn!("Failed to serialize tuple value: {}", e);
286                "[serialization error]".to_string()
287            }),
288        }
289    }
290
291    /// Write raw logs to a simpler table
292    fn write_raw_logs(&mut self, logs: &[Log]) -> Result<()> {
293        // Create raw logs table
294        self.conn
295            .execute_batch(
296                "CREATE TABLE IF NOT EXISTS raw_logs (
297                id INTEGER PRIMARY KEY AUTOINCREMENT,
298                block_number INTEGER,
299                transaction_hash TEXT,
300                log_index INTEGER,
301                address TEXT NOT NULL,
302                topic0 TEXT,
303                topic1 TEXT,
304                topic2 TEXT,
305                topic3 TEXT,
306                data BLOB
307            );
308            CREATE INDEX IF NOT EXISTS idx_raw_block ON raw_logs(block_number);
309            CREATE INDEX IF NOT EXISTS idx_raw_address ON raw_logs(address);",
310            )
311            .map_err(OutputError::Sqlite)?;
312
313        let tx = self.conn.transaction().map_err(OutputError::Sqlite)?;
314
315        {
316            let mut stmt = tx.prepare(
317                "INSERT INTO raw_logs (block_number, transaction_hash, log_index, address, topic0, topic1, topic2, topic3, data)
318                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
319            ).map_err(OutputError::Sqlite)?;
320
321            for log in logs {
322                let topics = log.topics();
323                stmt.execute(params![
324                    log.block_number.map(|n| n as i64),
325                    log.transaction_hash.map(|h| format!("{:#x}", h)),
326                    log.log_index.map(|i| i as i64),
327                    format!("{:#x}", log.address()),
328                    topics.first().map(|t| format!("{:#x}", t)),
329                    topics.get(1).map(|t| format!("{:#x}", t)),
330                    topics.get(2).map(|t| format!("{:#x}", t)),
331                    topics.get(3).map(|t| format!("{:#x}", t)),
332                    log.data().data.to_vec(),
333                ])
334                .map_err(OutputError::Sqlite)?;
335            }
336        }
337
338        tx.commit().map_err(OutputError::Sqlite)?;
339        Ok(())
340    }
341}
342
343impl OutputWriter for SqliteWriter {
344    fn write_logs(&mut self, result: &FetchResult) -> Result<()> {
345        match &result.logs {
346            FetchLogs::Decoded(logs) => {
347                // Collect columns from first batch
348                if !self.table_created {
349                    self.collect_columns(logs);
350                    self.create_table()?;
351                }
352
353                // Buffer logs
354                self.buffer.extend(logs.iter().cloned());
355
356                // Flush if batch size reached
357                if self.buffer.len() >= self.batch_size {
358                    let batch = std::mem::take(&mut self.buffer);
359                    self.insert_batch(batch)?;
360                }
361            }
362            FetchLogs::Raw(logs) => {
363                self.write_raw_logs(logs)?;
364            }
365        }
366        Ok(())
367    }
368
369    fn finalize(&mut self) -> Result<()> {
370        // Flush remaining buffer
371        if !self.buffer.is_empty() {
372            // Take buffer first to avoid borrow conflict
373            let batch = std::mem::take(&mut self.buffer);
374            if !self.table_created {
375                self.collect_columns(&batch);
376                self.create_table()?;
377            }
378            self.insert_batch(batch)?;
379        }
380
381        // Optimize database
382        self.conn
383            .execute_batch("PRAGMA optimize;")
384            .map_err(OutputError::Sqlite)?;
385
386        Ok(())
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_sanitize_column_name_basic() {
396        assert_eq!(
397            SqliteWriter::sanitize_column_name_basic("from"),
398            "param_from"
399        );
400        assert_eq!(
401            SqliteWriter::sanitize_column_name_basic("token-id"),
402            "param_token_id"
403        );
404    }
405
406    #[test]
407    fn test_column_name_collision_handling() {
408        // Note: We can't easily test the collision handling without a full writer,
409        // but we can verify that the basic sanitization produces identical results
410        // for names that should collide
411        let name1 = "a-b";
412        let name2 = "a_b";
413        let sanitized1 = SqliteWriter::sanitize_column_name_basic(name1);
414        let sanitized2 = SqliteWriter::sanitize_column_name_basic(name2);
415        assert_eq!(sanitized1, sanitized2); // These collide
416    }
417
418    #[test]
419    fn test_value_to_string_address() {
420        let value = DecodedValue::Address("0x1234567890abcdef".to_string());
421        assert_eq!(SqliteWriter::value_to_string(&value), "0x1234567890abcdef");
422    }
423
424    #[test]
425    fn test_value_to_string_uint() {
426        let value = DecodedValue::Uint("12345678901234567890".to_string());
427        assert_eq!(
428            SqliteWriter::value_to_string(&value),
429            "12345678901234567890"
430        );
431    }
432
433    #[test]
434    fn test_value_to_string_bool() {
435        let value_true = DecodedValue::Bool(true);
436        let value_false = DecodedValue::Bool(false);
437        assert_eq!(SqliteWriter::value_to_string(&value_true), "true");
438        assert_eq!(SqliteWriter::value_to_string(&value_false), "false");
439    }
440
441    #[test]
442    fn test_value_to_string_array() {
443        let value = DecodedValue::Array(vec![
444            DecodedValue::Uint("1".to_string()),
445            DecodedValue::Uint("2".to_string()),
446        ]);
447        let result = SqliteWriter::value_to_string(&value);
448        // Should be valid JSON
449        assert!(result.starts_with('['));
450        assert!(result.contains("\"1\""));
451        assert!(result.contains("\"2\""));
452    }
453
454    #[test]
455    fn test_value_to_string_tuple() {
456        let value = DecodedValue::Tuple(vec![
457            DecodedValue::Address("0xabc".to_string()),
458            DecodedValue::Uint("123".to_string()),
459        ]);
460        let result = SqliteWriter::value_to_string(&value);
461        // Should be valid JSON
462        assert!(result.starts_with('['));
463        assert!(result.contains("0xabc"));
464    }
465
466    #[test]
467    fn test_value_to_string_nested() {
468        let value = DecodedValue::Array(vec![DecodedValue::Tuple(vec![
469            DecodedValue::Address("0x123".to_string()),
470            DecodedValue::Bool(true),
471        ])]);
472        let result = SqliteWriter::value_to_string(&value);
473        // Should be valid JSON with nested structure
474        assert!(result.contains("0x123"));
475        assert!(result.contains("true"));
476    }
477}