1use crate::abi::{DecodedLog, DecodedValue};
4use crate::error::{OutputError, Result};
5use crate::fetcher::{FetchLogs, FetchResult};
6use crate::output::OutputWriter;
7use alloy::rpc::types::Log;
8use rusqlite::{params, Connection};
9use std::path::Path;
10
11use std::collections::HashMap as StdHashMap;
12
13pub struct SqliteWriter {
15 conn: Connection,
17 columns: Vec<String>,
19 column_name_map: StdHashMap<String, String>,
21 sanitized_names: std::collections::HashSet<String>,
23 table_created: bool,
25 buffer: Vec<DecodedLog>,
27 batch_size: usize,
29}
30
31impl SqliteWriter {
32 pub fn new(path: &Path) -> Result<Self> {
34 let conn = Connection::open(path).map_err(OutputError::Sqlite)?;
35
36 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
38 .map_err(OutputError::Sqlite)?;
39
40 Ok(Self {
41 conn,
42 columns: Vec::new(),
43 column_name_map: StdHashMap::new(),
44 sanitized_names: std::collections::HashSet::new(),
45 table_created: false,
46 buffer: Vec::new(),
47 batch_size: 1000,
48 })
49 }
50
51 fn create_table(&mut self) -> Result<()> {
53 let mut create_sql = String::from(
55 "CREATE TABLE IF NOT EXISTS events (
56 id INTEGER PRIMARY KEY AUTOINCREMENT,
57 block_number INTEGER NOT NULL,
58 transaction_hash TEXT NOT NULL,
59 log_index INTEGER NOT NULL,
60 address TEXT NOT NULL,
61 event_name TEXT NOT NULL,
62 event_signature TEXT NOT NULL,
63 topics TEXT,
64 data BLOB",
65 );
66
67 for col in &self.columns.clone() {
69 let safe_col = self.get_sanitized_column_name(col);
70 create_sql.push_str(&format!(",\n {} TEXT", safe_col));
71 }
72
73 create_sql.push_str(
74 "
75 );
76 CREATE INDEX IF NOT EXISTS idx_block ON events(block_number);
77 CREATE INDEX IF NOT EXISTS idx_address ON events(address);
78 CREATE INDEX IF NOT EXISTS idx_event ON events(event_name);",
79 );
80
81 self.conn
82 .execute_batch(&create_sql)
83 .map_err(OutputError::Sqlite)?;
84 self.table_created = true;
85
86 Ok(())
87 }
88
89 fn sanitize_column_name_basic(name: &str) -> String {
91 let sanitized: String = name
93 .chars()
94 .map(|c| if c.is_alphanumeric() { c } else { '_' })
95 .collect();
96 format!("param_{}", sanitized)
97 }
98
99 fn get_sanitized_column_name(&mut self, original_name: &str) -> String {
102 if let Some(sanitized) = self.column_name_map.get(original_name) {
104 return sanitized.clone();
105 }
106
107 let base_sanitized = Self::sanitize_column_name_basic(original_name);
109
110 let unique_name = if self.sanitized_names.contains(&base_sanitized) {
112 let mut suffix = 1u32;
114 loop {
115 let candidate = format!("{}_{}", base_sanitized, suffix);
116 if !self.sanitized_names.contains(&candidate) {
117 tracing::warn!(
118 "Column name collision detected: '{}' and another column both sanitize to '{}'. \
119 Using '{}' for '{}'.",
120 original_name,
121 base_sanitized,
122 candidate,
123 original_name
124 );
125 break candidate;
126 }
127 suffix += 1;
128 }
129 } else {
130 base_sanitized
131 };
132
133 self.sanitized_names.insert(unique_name.clone());
135 self.column_name_map
136 .insert(original_name.to_string(), unique_name.clone());
137
138 unique_name
139 }
140
141 fn ensure_column(&mut self, name: &str) -> Result<()> {
143 if self.columns.contains(&name.to_string()) {
144 return Ok(());
145 }
146
147 let safe_col = self.get_sanitized_column_name(name);
148
149 if self.table_created {
150 self.conn
152 .execute(
153 &format!("ALTER TABLE events ADD COLUMN {} TEXT", safe_col),
154 [],
155 )
156 .map_err(OutputError::Sqlite)?;
157 }
158
159 self.columns.push(name.to_string());
160 Ok(())
161 }
162
163 fn collect_columns(&mut self, logs: &[DecodedLog]) {
165 for log in logs {
166 for key in log.params.keys() {
167 if !self.columns.contains(key) {
168 self.columns.push(key.clone());
169 }
170 }
171 }
172 }
173
174 fn insert_batch(&mut self, logs: Vec<DecodedLog>) -> Result<()> {
176 if logs.is_empty() {
177 return Ok(());
178 }
179
180 for log in &logs {
182 for key in log.params.keys() {
183 self.ensure_column(key)?;
184 }
185 }
186
187 let mut cols = vec![
189 "block_number",
190 "transaction_hash",
191 "log_index",
192 "address",
193 "event_name",
194 "event_signature",
195 "topics",
196 "data",
197 ];
198
199 let param_cols: Vec<String> = self
201 .columns
202 .iter()
203 .map(|c| {
204 self.column_name_map
205 .get(c)
206 .cloned()
207 .unwrap_or_else(|| Self::sanitize_column_name_basic(c))
208 })
209 .collect();
210
211 for col in ¶m_cols {
212 cols.push(col);
213 }
214
215 let placeholders: Vec<&str> = (0..cols.len()).map(|_| "?").collect();
216 let sql = format!(
217 "INSERT INTO events ({}) VALUES ({})",
218 cols.join(", "),
219 placeholders.join(", ")
220 );
221
222 let tx = self.conn.transaction().map_err(OutputError::Sqlite)?;
223
224 {
225 let mut stmt = tx.prepare(&sql).map_err(OutputError::Sqlite)?;
226
227 for log in &logs {
228 let topics_json = match serde_json::to_string(&log.topics) {
229 Ok(json) => json,
230 Err(e) => {
231 tracing::warn!(
232 "Failed to serialize topics for log in tx {:#x}: {}",
233 log.transaction_hash,
234 e
235 );
236 "[]".to_string()
237 }
238 };
239
240 let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![
241 Box::new(log.block_number as i64),
242 Box::new(format!("{:#x}", log.transaction_hash)),
243 Box::new(log.log_index as i64),
244 Box::new(format!("{:#x}", log.address)),
245 Box::new(log.event_name.clone()),
246 Box::new(log.event_signature.clone()),
247 Box::new(topics_json),
248 Box::new(log.data.clone()),
249 ];
250
251 for col_name in &self.columns {
253 let value = log
254 .params
255 .get(col_name)
256 .map(Self::value_to_string)
257 .unwrap_or_default();
258 values.push(Box::new(value));
259 }
260
261 let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|v| v.as_ref()).collect();
262 stmt.execute(params.as_slice())
263 .map_err(OutputError::Sqlite)?;
264 }
265 }
266
267 tx.commit().map_err(OutputError::Sqlite)?;
268 Ok(())
269 }
270
271 fn value_to_string(value: &DecodedValue) -> String {
273 match value {
274 DecodedValue::Address(s) => s.clone(),
275 DecodedValue::Uint(s) => s.clone(),
276 DecodedValue::Int(s) => s.clone(),
277 DecodedValue::Bool(b) => b.to_string(),
278 DecodedValue::Bytes(s) => s.clone(),
279 DecodedValue::String(s) => s.clone(),
280 DecodedValue::Array(arr) => serde_json::to_string(arr).unwrap_or_else(|e| {
281 tracing::warn!("Failed to serialize array value: {}", e);
282 "[serialization error]".to_string()
283 }),
284 DecodedValue::Tuple(arr) => serde_json::to_string(arr).unwrap_or_else(|e| {
285 tracing::warn!("Failed to serialize tuple value: {}", e);
286 "[serialization error]".to_string()
287 }),
288 }
289 }
290
291 fn write_raw_logs(&mut self, logs: &[Log]) -> Result<()> {
293 self.conn
295 .execute_batch(
296 "CREATE TABLE IF NOT EXISTS raw_logs (
297 id INTEGER PRIMARY KEY AUTOINCREMENT,
298 block_number INTEGER,
299 transaction_hash TEXT,
300 log_index INTEGER,
301 address TEXT NOT NULL,
302 topic0 TEXT,
303 topic1 TEXT,
304 topic2 TEXT,
305 topic3 TEXT,
306 data BLOB
307 );
308 CREATE INDEX IF NOT EXISTS idx_raw_block ON raw_logs(block_number);
309 CREATE INDEX IF NOT EXISTS idx_raw_address ON raw_logs(address);",
310 )
311 .map_err(OutputError::Sqlite)?;
312
313 let tx = self.conn.transaction().map_err(OutputError::Sqlite)?;
314
315 {
316 let mut stmt = tx.prepare(
317 "INSERT INTO raw_logs (block_number, transaction_hash, log_index, address, topic0, topic1, topic2, topic3, data)
318 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
319 ).map_err(OutputError::Sqlite)?;
320
321 for log in logs {
322 let topics = log.topics();
323 stmt.execute(params![
324 log.block_number.map(|n| n as i64),
325 log.transaction_hash.map(|h| format!("{:#x}", h)),
326 log.log_index.map(|i| i as i64),
327 format!("{:#x}", log.address()),
328 topics.first().map(|t| format!("{:#x}", t)),
329 topics.get(1).map(|t| format!("{:#x}", t)),
330 topics.get(2).map(|t| format!("{:#x}", t)),
331 topics.get(3).map(|t| format!("{:#x}", t)),
332 log.data().data.to_vec(),
333 ])
334 .map_err(OutputError::Sqlite)?;
335 }
336 }
337
338 tx.commit().map_err(OutputError::Sqlite)?;
339 Ok(())
340 }
341}
342
343impl OutputWriter for SqliteWriter {
344 fn write_logs(&mut self, result: &FetchResult) -> Result<()> {
345 match &result.logs {
346 FetchLogs::Decoded(logs) => {
347 if !self.table_created {
349 self.collect_columns(logs);
350 self.create_table()?;
351 }
352
353 self.buffer.extend(logs.iter().cloned());
355
356 if self.buffer.len() >= self.batch_size {
358 let batch = std::mem::take(&mut self.buffer);
359 self.insert_batch(batch)?;
360 }
361 }
362 FetchLogs::Raw(logs) => {
363 self.write_raw_logs(logs)?;
364 }
365 }
366 Ok(())
367 }
368
369 fn finalize(&mut self) -> Result<()> {
370 if !self.buffer.is_empty() {
372 let batch = std::mem::take(&mut self.buffer);
374 if !self.table_created {
375 self.collect_columns(&batch);
376 self.create_table()?;
377 }
378 self.insert_batch(batch)?;
379 }
380
381 self.conn
383 .execute_batch("PRAGMA optimize;")
384 .map_err(OutputError::Sqlite)?;
385
386 Ok(())
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_sanitize_column_name_basic() {
396 assert_eq!(
397 SqliteWriter::sanitize_column_name_basic("from"),
398 "param_from"
399 );
400 assert_eq!(
401 SqliteWriter::sanitize_column_name_basic("token-id"),
402 "param_token_id"
403 );
404 }
405
406 #[test]
407 fn test_column_name_collision_handling() {
408 let name1 = "a-b";
412 let name2 = "a_b";
413 let sanitized1 = SqliteWriter::sanitize_column_name_basic(name1);
414 let sanitized2 = SqliteWriter::sanitize_column_name_basic(name2);
415 assert_eq!(sanitized1, sanitized2); }
417
418 #[test]
419 fn test_value_to_string_address() {
420 let value = DecodedValue::Address("0x1234567890abcdef".to_string());
421 assert_eq!(SqliteWriter::value_to_string(&value), "0x1234567890abcdef");
422 }
423
424 #[test]
425 fn test_value_to_string_uint() {
426 let value = DecodedValue::Uint("12345678901234567890".to_string());
427 assert_eq!(
428 SqliteWriter::value_to_string(&value),
429 "12345678901234567890"
430 );
431 }
432
433 #[test]
434 fn test_value_to_string_bool() {
435 let value_true = DecodedValue::Bool(true);
436 let value_false = DecodedValue::Bool(false);
437 assert_eq!(SqliteWriter::value_to_string(&value_true), "true");
438 assert_eq!(SqliteWriter::value_to_string(&value_false), "false");
439 }
440
441 #[test]
442 fn test_value_to_string_array() {
443 let value = DecodedValue::Array(vec![
444 DecodedValue::Uint("1".to_string()),
445 DecodedValue::Uint("2".to_string()),
446 ]);
447 let result = SqliteWriter::value_to_string(&value);
448 assert!(result.starts_with('['));
450 assert!(result.contains("\"1\""));
451 assert!(result.contains("\"2\""));
452 }
453
454 #[test]
455 fn test_value_to_string_tuple() {
456 let value = DecodedValue::Tuple(vec![
457 DecodedValue::Address("0xabc".to_string()),
458 DecodedValue::Uint("123".to_string()),
459 ]);
460 let result = SqliteWriter::value_to_string(&value);
461 assert!(result.starts_with('['));
463 assert!(result.contains("0xabc"));
464 }
465
466 #[test]
467 fn test_value_to_string_nested() {
468 let value = DecodedValue::Array(vec![DecodedValue::Tuple(vec![
469 DecodedValue::Address("0x123".to_string()),
470 DecodedValue::Bool(true),
471 ])]);
472 let result = SqliteWriter::value_to_string(&value);
473 assert!(result.contains("0x123"));
475 assert!(result.contains("true"));
476 }
477}