Skip to main content

csv_adapter_store/
lib.rs

1//! SQLite persistence for CSV Adapter seals and anchors
2
3#![allow(missing_docs)]
4#![allow(dead_code)]
5
6use csv_adapter_core::{AnchorRecord, Hash, SealRecord, SealStore, StoreError};
7use rusqlite::{params, Connection};
8use std::sync::Mutex;
9
10/// SQLite-backed seal and anchor store
11pub struct SqliteSealStore {
12    conn: Mutex<Connection>,
13}
14
15impl SqliteSealStore {
16    /// Create or open a SQLite store at the given path
17    pub fn open(path: &str) -> Result<Self, StoreError> {
18        let conn = Connection::open(path).map_err(|e| StoreError::IoError(e.to_string()))?;
19        Self::init_tables(&conn)?;
20        Ok(Self {
21            conn: Mutex::new(conn),
22        })
23    }
24
25    /// Create an in-memory store (for testing)
26    pub fn in_memory() -> Result<Self, StoreError> {
27        let conn = Connection::open_in_memory().map_err(|e| StoreError::IoError(e.to_string()))?;
28        Self::init_tables(&conn)?;
29        Ok(Self {
30            conn: Mutex::new(conn),
31        })
32    }
33
34    fn init_tables(conn: &Connection) -> Result<(), StoreError> {
35        conn.execute_batch(
36            "CREATE TABLE IF NOT EXISTS seals (
37                id INTEGER PRIMARY KEY AUTOINCREMENT,
38                chain TEXT NOT NULL,
39                seal_id BLOB NOT NULL,
40                consumed_at_height INTEGER NOT NULL,
41                commitment_hash BLOB NOT NULL,
42                recorded_at INTEGER NOT NULL,
43                UNIQUE(chain, seal_id)
44            );
45            CREATE INDEX IF NOT EXISTS idx_seals_chain ON seals(chain);
46            CREATE INDEX IF NOT EXISTS idx_seals_height ON seals(chain, consumed_at_height);
47
48            CREATE TABLE IF NOT EXISTS anchors (
49                id INTEGER PRIMARY KEY AUTOINCREMENT,
50                chain TEXT NOT NULL,
51                anchor_id BLOB NOT NULL,
52                block_height INTEGER NOT NULL,
53                commitment_hash BLOB NOT NULL,
54                is_finalized INTEGER NOT NULL DEFAULT 0,
55                confirmations INTEGER NOT NULL DEFAULT 0,
56                recorded_at INTEGER NOT NULL,
57                UNIQUE(chain, anchor_id)
58            );
59            CREATE INDEX IF NOT EXISTS idx_anchors_chain ON anchors(chain);
60            CREATE INDEX IF NOT EXISTS idx_anchors_height ON anchors(chain, block_height);
61            CREATE INDEX IF NOT EXISTS idx_anchors_pending ON anchors(chain, is_finalized);
62            ",
63        )
64        .map_err(|e| StoreError::IoError(e.to_string()))?;
65        Ok(())
66    }
67}
68
69impl SealStore for SqliteSealStore {
70    fn save_seal(&mut self, record: &SealRecord) -> Result<(), StoreError> {
71        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
72        conn.execute(
73            "INSERT OR IGNORE INTO seals (chain, seal_id, consumed_at_height, commitment_hash, recorded_at)
74             VALUES (?1, ?2, ?3, ?4, ?5)",
75            params![
76                record.chain,
77                record.seal_id,
78                record.consumed_at_height as i64,
79                record.commitment_hash.as_bytes(),
80                record.recorded_at as i64,
81            ],
82        ).map_err(|e| StoreError::IoError(e.to_string()))?;
83        Ok(())
84    }
85
86    fn is_seal_consumed(&self, chain: &str, seal_id: &[u8]) -> Result<bool, StoreError> {
87        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
88        let count: i64 = conn
89            .query_row(
90                "SELECT COUNT(*) FROM seals WHERE chain = ?1 AND seal_id = ?2",
91                params![chain, seal_id],
92                |row| row.get(0),
93            )
94            .map_err(|e| StoreError::IoError(e.to_string()))?;
95        Ok(count > 0)
96    }
97
98    fn get_seals(&self, chain: &str) -> Result<Vec<SealRecord>, StoreError> {
99        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
100        let mut stmt = conn.prepare(
101            "SELECT seal_id, consumed_at_height, commitment_hash, recorded_at FROM seals WHERE chain = ?1"
102        ).map_err(|e| StoreError::IoError(e.to_string()))?;
103
104        let seals = stmt
105            .query_map(params![chain], |row| {
106                let seal_id: Vec<u8> = row.get(0)?;
107                let consumed_at_height: i64 = row.get(1)?;
108                let commitment_hash: Vec<u8> = row.get(2)?;
109                let recorded_at: i64 = row.get(3)?;
110                let mut hash_bytes = [0u8; 32];
111                hash_bytes.copy_from_slice(&commitment_hash);
112                Ok(SealRecord {
113                    chain: chain.to_string(),
114                    seal_id,
115                    consumed_at_height: consumed_at_height as u64,
116                    commitment_hash: Hash::new(hash_bytes),
117                    recorded_at: recorded_at as u64,
118                })
119            })
120            .map_err(|e| StoreError::IoError(e.to_string()))?;
121
122        seals
123            .collect::<Result<Vec<_>, _>>()
124            .map_err(|e| StoreError::IoError(e.to_string()))
125    }
126
127    fn remove_seal(&mut self, chain: &str, seal_id: &[u8]) -> Result<(), StoreError> {
128        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
129        conn.execute(
130            "DELETE FROM seals WHERE chain = ?1 AND seal_id = ?2",
131            params![chain, seal_id],
132        )
133        .map_err(|e| StoreError::IoError(e.to_string()))?;
134        Ok(())
135    }
136
137    fn remove_seals_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
138        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
139        let removed = conn
140            .execute(
141                "DELETE FROM seals WHERE chain = ?1 AND consumed_at_height > ?2",
142                params![chain, height as i64],
143            )
144            .map_err(|e| StoreError::IoError(e.to_string()))?;
145        Ok(removed)
146    }
147
148    fn save_anchor(&mut self, record: &AnchorRecord) -> Result<(), StoreError> {
149        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
150        conn.execute(
151            "INSERT OR IGNORE INTO anchors (chain, anchor_id, block_height, commitment_hash, is_finalized, confirmations, recorded_at)
152             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
153            params![
154                record.chain,
155                record.anchor_id,
156                record.block_height as i64,
157                record.commitment_hash.as_bytes(),
158                record.is_finalized as i64,
159                record.confirmations as i64,
160                record.recorded_at as i64,
161            ],
162        ).map_err(|e| StoreError::IoError(e.to_string()))?;
163        Ok(())
164    }
165
166    fn has_anchor(&self, chain: &str, anchor_id: &[u8]) -> Result<bool, StoreError> {
167        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
168        let count: i64 = conn
169            .query_row(
170                "SELECT COUNT(*) FROM anchors WHERE chain = ?1 AND anchor_id = ?2",
171                params![chain, anchor_id],
172                |row| row.get(0),
173            )
174            .map_err(|e| StoreError::IoError(e.to_string()))?;
175        Ok(count > 0)
176    }
177
178    fn finalize_anchor(
179        &mut self,
180        chain: &str,
181        anchor_id: &[u8],
182        confirmations: u64,
183    ) -> Result<(), StoreError> {
184        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
185        conn.execute(
186            "UPDATE anchors SET is_finalized = 1, confirmations = ?3
187             WHERE chain = ?1 AND anchor_id = ?2",
188            params![chain, anchor_id, confirmations as i64],
189        )
190        .map_err(|e| StoreError::IoError(e.to_string()))?;
191        Ok(())
192    }
193
194    fn pending_anchors(&self, chain: &str) -> Result<Vec<AnchorRecord>, StoreError> {
195        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
196        let mut stmt = conn
197            .prepare(
198                "SELECT anchor_id, block_height, commitment_hash, confirmations, recorded_at
199             FROM anchors WHERE chain = ?1 AND is_finalized = 0",
200            )
201            .map_err(|e| StoreError::IoError(e.to_string()))?;
202
203        let anchors = stmt
204            .query_map(params![chain], |row| {
205                let anchor_id: Vec<u8> = row.get(0)?;
206                let block_height: i64 = row.get(1)?;
207                let commitment_hash: Vec<u8> = row.get(2)?;
208                let confirmations: i64 = row.get(3)?;
209                let recorded_at: i64 = row.get(4)?;
210                let mut hash_bytes = [0u8; 32];
211                hash_bytes.copy_from_slice(&commitment_hash);
212                Ok(AnchorRecord {
213                    chain: chain.to_string(),
214                    anchor_id,
215                    block_height: block_height as u64,
216                    commitment_hash: Hash::new(hash_bytes),
217                    is_finalized: false,
218                    confirmations: confirmations as u64,
219                    recorded_at: recorded_at as u64,
220                })
221            })
222            .map_err(|e| StoreError::IoError(e.to_string()))?;
223
224        anchors
225            .collect::<Result<Vec<_>, _>>()
226            .map_err(|e| StoreError::IoError(e.to_string()))
227    }
228
229    fn remove_anchors_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
230        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
231        let removed = conn
232            .execute(
233                "DELETE FROM anchors WHERE chain = ?1 AND block_height > ?2",
234                params![chain, height as i64],
235            )
236            .map_err(|e| StoreError::IoError(e.to_string()))?;
237        Ok(removed)
238    }
239
240    fn highest_block(&self, chain: &str) -> Result<u64, StoreError> {
241        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
242        let max: Option<i64> = conn
243            .query_row(
244                "SELECT MAX(block_height) FROM anchors WHERE chain = ?1",
245                params![chain],
246                |row| row.get(0),
247            )
248            .map_err(|e| StoreError::IoError(e.to_string()))?;
249        Ok(max.unwrap_or(0) as u64)
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use csv_adapter_core::SealRecord;
257
258    fn test_seal_record(chain: &str, height: u64) -> SealRecord {
259        let mut seal_id = vec![0u8; 16];
260        seal_id[0..8].copy_from_slice(&height.to_le_bytes());
261        seal_id[8..].copy_from_slice(chain.as_bytes().get(..8).unwrap_or(&[0u8; 8]));
262        SealRecord {
263            chain: chain.to_string(),
264            seal_id,
265            consumed_at_height: height,
266            commitment_hash: Hash::new([0xAA; 32]),
267            recorded_at: 1700000000,
268        }
269    }
270
271    fn test_anchor_record(chain: &str, height: u64) -> AnchorRecord {
272        let mut anchor_id = vec![0u8; 16];
273        anchor_id[0..8].copy_from_slice(&height.to_le_bytes());
274        AnchorRecord {
275            chain: chain.to_string(),
276            anchor_id,
277            block_height: height,
278            commitment_hash: Hash::new([0xBB; 32]),
279            is_finalized: false,
280            confirmations: 0,
281            recorded_at: 1700000000,
282        }
283    }
284
285    #[test]
286    fn test_sqlite_open_in_memory() {
287        let store = SqliteSealStore::in_memory().unwrap();
288        assert_eq!(store.highest_block("bitcoin").unwrap(), 0);
289    }
290
291    #[test]
292    fn test_sqlite_seal_lifecycle() {
293        let mut store = SqliteSealStore::in_memory().unwrap();
294        let record = test_seal_record("bitcoin", 100);
295        let seal_id = record.seal_id.clone();
296        store.save_seal(&record).unwrap();
297        assert!(store.is_seal_consumed("bitcoin", &seal_id).unwrap());
298        assert!(!store.is_seal_consumed("ethereum", &seal_id).unwrap());
299    }
300
301    #[test]
302    fn test_sqlite_get_seals() {
303        let mut store = SqliteSealStore::in_memory().unwrap();
304        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
305        store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
306        store.save_seal(&test_seal_record("ethereum", 300)).unwrap();
307
308        let btc_seals = store.get_seals("bitcoin").unwrap();
309        assert_eq!(btc_seals.len(), 2);
310
311        let eth_seals = store.get_seals("ethereum").unwrap();
312        assert_eq!(eth_seals.len(), 1);
313    }
314
315    #[test]
316    fn test_sqlite_remove_seal() {
317        let mut store = SqliteSealStore::in_memory().unwrap();
318        let record = test_seal_record("bitcoin", 100);
319        let seal_id = record.seal_id.clone();
320        store.save_seal(&record).unwrap();
321        store.remove_seal("bitcoin", &seal_id).unwrap();
322        assert!(!store.is_seal_consumed("bitcoin", &seal_id).unwrap());
323    }
324
325    #[test]
326    fn test_sqlite_remove_seals_after_height() {
327        let mut store = SqliteSealStore::in_memory().unwrap();
328        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
329        store.save_seal(&test_seal_record("bitcoin", 150)).unwrap();
330        store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
331        let removed = store.remove_seals_after("bitcoin", 150).unwrap();
332        assert_eq!(removed, 1);
333    }
334
335    #[test]
336    fn test_sqlite_anchor_lifecycle() {
337        let mut store = SqliteSealStore::in_memory().unwrap();
338        let anchor = test_anchor_record("bitcoin", 100);
339        let anchor_id = anchor.anchor_id.clone();
340        store.save_anchor(&anchor).unwrap();
341        assert!(store.has_anchor("bitcoin", &anchor_id).unwrap());
342
343        let pending = store.pending_anchors("bitcoin").unwrap();
344        assert_eq!(pending.len(), 1);
345
346        store.finalize_anchor("bitcoin", &anchor_id, 6).unwrap();
347        let pending = store.pending_anchors("bitcoin").unwrap();
348        assert!(pending.is_empty());
349    }
350
351    #[test]
352    fn test_sqlite_remove_anchors_after_height() {
353        let mut store = SqliteSealStore::in_memory().unwrap();
354        store
355            .save_anchor(&test_anchor_record("bitcoin", 100))
356            .unwrap();
357        store
358            .save_anchor(&test_anchor_record("bitcoin", 200))
359            .unwrap();
360        store
361            .save_anchor(&test_anchor_record("bitcoin", 300))
362            .unwrap();
363        let removed = store.remove_anchors_after("bitcoin", 200).unwrap();
364        assert_eq!(removed, 1);
365    }
366
367    #[test]
368    fn test_sqlite_highest_block() {
369        let mut store = SqliteSealStore::in_memory().unwrap();
370        store
371            .save_anchor(&test_anchor_record("bitcoin", 100))
372            .unwrap();
373        store
374            .save_anchor(&test_anchor_record("bitcoin", 300))
375            .unwrap();
376        store
377            .save_anchor(&test_anchor_record("bitcoin", 200))
378            .unwrap();
379        assert_eq!(store.highest_block("bitcoin").unwrap(), 300);
380        assert_eq!(store.highest_block("ethereum").unwrap(), 0);
381    }
382
383    #[test]
384    fn test_sqlite_duplicate_seal_ignored() {
385        let mut store = SqliteSealStore::in_memory().unwrap();
386        let record = test_seal_record("bitcoin", 100);
387        let _seal_id = record.seal_id.clone();
388        store.save_seal(&record).unwrap();
389        // Try to save another seal with the same seal_id but different height
390        let mut dup = record.clone();
391        dup.consumed_at_height = 200;
392        store.save_seal(&dup).unwrap();
393        let seals = store.get_seals("bitcoin").unwrap();
394        // INSERT OR IGNORE means only the first one is stored
395        assert_eq!(seals.len(), 1);
396        assert_eq!(seals[0].consumed_at_height, 100);
397    }
398
399    #[test]
400    fn test_sqlite_multi_chain_isolation() {
401        let mut store = SqliteSealStore::in_memory().unwrap();
402        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
403        store.save_seal(&test_seal_record("ethereum", 200)).unwrap();
404        store
405            .save_anchor(&test_anchor_record("bitcoin", 100))
406            .unwrap();
407        store
408            .save_anchor(&test_anchor_record("ethereum", 200))
409            .unwrap();
410
411        assert_eq!(store.get_seals("bitcoin").unwrap().len(), 1);
412        assert_eq!(store.pending_anchors("bitcoin").unwrap().len(), 1);
413    }
414}