csv-adapter-store 0.1.1

SQLite persistence layer for CSV Adapter seal and anchor storage
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
//! SQLite persistence for CSV Adapter seals and anchors

#![allow(missing_docs)]
#![allow(dead_code)]

use csv_adapter_core::{AnchorRecord, Hash, SealRecord, SealStore, StoreError};
use rusqlite::{params, Connection};
use std::sync::Mutex;

/// SQLite-backed seal and anchor store
pub struct SqliteSealStore {
    conn: Mutex<Connection>,
}

impl SqliteSealStore {
    /// Create or open a SQLite store at the given path
    pub fn open(path: &str) -> Result<Self, StoreError> {
        let conn = Connection::open(path).map_err(|e| StoreError::IoError(e.to_string()))?;
        Self::init_tables(&conn)?;
        Ok(Self {
            conn: Mutex::new(conn),
        })
    }

    /// Create an in-memory store (for testing)
    pub fn in_memory() -> Result<Self, StoreError> {
        let conn = Connection::open_in_memory().map_err(|e| StoreError::IoError(e.to_string()))?;
        Self::init_tables(&conn)?;
        Ok(Self {
            conn: Mutex::new(conn),
        })
    }

    fn init_tables(conn: &Connection) -> Result<(), StoreError> {
        conn.execute_batch(
            "CREATE TABLE IF NOT EXISTS seals (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                chain TEXT NOT NULL,
                seal_id BLOB NOT NULL,
                consumed_at_height INTEGER NOT NULL,
                commitment_hash BLOB NOT NULL,
                recorded_at INTEGER NOT NULL,
                UNIQUE(chain, seal_id)
            );
            CREATE INDEX IF NOT EXISTS idx_seals_chain ON seals(chain);
            CREATE INDEX IF NOT EXISTS idx_seals_height ON seals(chain, consumed_at_height);

            CREATE TABLE IF NOT EXISTS anchors (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                chain TEXT NOT NULL,
                anchor_id BLOB NOT NULL,
                block_height INTEGER NOT NULL,
                commitment_hash BLOB NOT NULL,
                is_finalized INTEGER NOT NULL DEFAULT 0,
                confirmations INTEGER NOT NULL DEFAULT 0,
                recorded_at INTEGER NOT NULL,
                UNIQUE(chain, anchor_id)
            );
            CREATE INDEX IF NOT EXISTS idx_anchors_chain ON anchors(chain);
            CREATE INDEX IF NOT EXISTS idx_anchors_height ON anchors(chain, block_height);
            CREATE INDEX IF NOT EXISTS idx_anchors_pending ON anchors(chain, is_finalized);
            ",
        )
        .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(())
    }
}

impl SealStore for SqliteSealStore {
    fn save_seal(&mut self, record: &SealRecord) -> Result<(), StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        conn.execute(
            "INSERT OR IGNORE INTO seals (chain, seal_id, consumed_at_height, commitment_hash, recorded_at)
             VALUES (?1, ?2, ?3, ?4, ?5)",
            params![
                record.chain,
                record.seal_id,
                record.consumed_at_height as i64,
                record.commitment_hash.as_bytes(),
                record.recorded_at as i64,
            ],
        ).map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(())
    }

    fn is_seal_consumed(&self, chain: &str, seal_id: &[u8]) -> Result<bool, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let count: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM seals WHERE chain = ?1 AND seal_id = ?2",
                params![chain, seal_id],
                |row| row.get(0),
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(count > 0)
    }

    fn get_seals(&self, chain: &str) -> Result<Vec<SealRecord>, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let mut stmt = conn.prepare(
            "SELECT seal_id, consumed_at_height, commitment_hash, recorded_at FROM seals WHERE chain = ?1"
        ).map_err(|e| StoreError::IoError(e.to_string()))?;

        let seals = stmt
            .query_map(params![chain], |row| {
                let seal_id: Vec<u8> = row.get(0)?;
                let consumed_at_height: i64 = row.get(1)?;
                let commitment_hash: Vec<u8> = row.get(2)?;
                let recorded_at: i64 = row.get(3)?;
                let mut hash_bytes = [0u8; 32];
                hash_bytes.copy_from_slice(&commitment_hash);
                Ok(SealRecord {
                    chain: chain.to_string(),
                    seal_id,
                    consumed_at_height: consumed_at_height as u64,
                    commitment_hash: Hash::new(hash_bytes),
                    recorded_at: recorded_at as u64,
                })
            })
            .map_err(|e| StoreError::IoError(e.to_string()))?;

        seals
            .collect::<Result<Vec<_>, _>>()
            .map_err(|e| StoreError::IoError(e.to_string()))
    }

    fn remove_seal(&mut self, chain: &str, seal_id: &[u8]) -> Result<(), StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        conn.execute(
            "DELETE FROM seals WHERE chain = ?1 AND seal_id = ?2",
            params![chain, seal_id],
        )
        .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(())
    }

    fn remove_seals_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let removed = conn
            .execute(
                "DELETE FROM seals WHERE chain = ?1 AND consumed_at_height > ?2",
                params![chain, height as i64],
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(removed)
    }

    fn save_anchor(&mut self, record: &AnchorRecord) -> Result<(), StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        conn.execute(
            "INSERT OR IGNORE INTO anchors (chain, anchor_id, block_height, commitment_hash, is_finalized, confirmations, recorded_at)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
            params![
                record.chain,
                record.anchor_id,
                record.block_height as i64,
                record.commitment_hash.as_bytes(),
                record.is_finalized as i64,
                record.confirmations as i64,
                record.recorded_at as i64,
            ],
        ).map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(())
    }

    fn has_anchor(&self, chain: &str, anchor_id: &[u8]) -> Result<bool, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let count: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM anchors WHERE chain = ?1 AND anchor_id = ?2",
                params![chain, anchor_id],
                |row| row.get(0),
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(count > 0)
    }

    fn finalize_anchor(
        &mut self,
        chain: &str,
        anchor_id: &[u8],
        confirmations: u64,
    ) -> Result<(), StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        conn.execute(
            "UPDATE anchors SET is_finalized = 1, confirmations = ?3
             WHERE chain = ?1 AND anchor_id = ?2",
            params![chain, anchor_id, confirmations as i64],
        )
        .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(())
    }

    fn pending_anchors(&self, chain: &str) -> Result<Vec<AnchorRecord>, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let mut stmt = conn
            .prepare(
                "SELECT anchor_id, block_height, commitment_hash, confirmations, recorded_at
             FROM anchors WHERE chain = ?1 AND is_finalized = 0",
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;

        let anchors = stmt
            .query_map(params![chain], |row| {
                let anchor_id: Vec<u8> = row.get(0)?;
                let block_height: i64 = row.get(1)?;
                let commitment_hash: Vec<u8> = row.get(2)?;
                let confirmations: i64 = row.get(3)?;
                let recorded_at: i64 = row.get(4)?;
                let mut hash_bytes = [0u8; 32];
                hash_bytes.copy_from_slice(&commitment_hash);
                Ok(AnchorRecord {
                    chain: chain.to_string(),
                    anchor_id,
                    block_height: block_height as u64,
                    commitment_hash: Hash::new(hash_bytes),
                    is_finalized: false,
                    confirmations: confirmations as u64,
                    recorded_at: recorded_at as u64,
                })
            })
            .map_err(|e| StoreError::IoError(e.to_string()))?;

        anchors
            .collect::<Result<Vec<_>, _>>()
            .map_err(|e| StoreError::IoError(e.to_string()))
    }

    fn remove_anchors_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let removed = conn
            .execute(
                "DELETE FROM anchors WHERE chain = ?1 AND block_height > ?2",
                params![chain, height as i64],
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(removed)
    }

    fn highest_block(&self, chain: &str) -> Result<u64, StoreError> {
        let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
        let max: Option<i64> = conn
            .query_row(
                "SELECT MAX(block_height) FROM anchors WHERE chain = ?1",
                params![chain],
                |row| row.get(0),
            )
            .map_err(|e| StoreError::IoError(e.to_string()))?;
        Ok(max.unwrap_or(0) as u64)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use csv_adapter_core::SealRecord;

    fn test_seal_record(chain: &str, height: u64) -> SealRecord {
        let mut seal_id = vec![0u8; 16];
        seal_id[0..8].copy_from_slice(&height.to_le_bytes());
        seal_id[8..].copy_from_slice(chain.as_bytes().get(..8).unwrap_or(&[0u8; 8]));
        SealRecord {
            chain: chain.to_string(),
            seal_id,
            consumed_at_height: height,
            commitment_hash: Hash::new([0xAA; 32]),
            recorded_at: 1700000000,
        }
    }

    fn test_anchor_record(chain: &str, height: u64) -> AnchorRecord {
        let mut anchor_id = vec![0u8; 16];
        anchor_id[0..8].copy_from_slice(&height.to_le_bytes());
        AnchorRecord {
            chain: chain.to_string(),
            anchor_id,
            block_height: height,
            commitment_hash: Hash::new([0xBB; 32]),
            is_finalized: false,
            confirmations: 0,
            recorded_at: 1700000000,
        }
    }

    #[test]
    fn test_sqlite_open_in_memory() {
        let store = SqliteSealStore::in_memory().unwrap();
        assert_eq!(store.highest_block("bitcoin").unwrap(), 0);
    }

    #[test]
    fn test_sqlite_seal_lifecycle() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        let record = test_seal_record("bitcoin", 100);
        let seal_id = record.seal_id.clone();
        store.save_seal(&record).unwrap();
        assert!(store.is_seal_consumed("bitcoin", &seal_id).unwrap());
        assert!(!store.is_seal_consumed("ethereum", &seal_id).unwrap());
    }

    #[test]
    fn test_sqlite_get_seals() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
        store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
        store.save_seal(&test_seal_record("ethereum", 300)).unwrap();

        let btc_seals = store.get_seals("bitcoin").unwrap();
        assert_eq!(btc_seals.len(), 2);

        let eth_seals = store.get_seals("ethereum").unwrap();
        assert_eq!(eth_seals.len(), 1);
    }

    #[test]
    fn test_sqlite_remove_seal() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        let record = test_seal_record("bitcoin", 100);
        let seal_id = record.seal_id.clone();
        store.save_seal(&record).unwrap();
        store.remove_seal("bitcoin", &seal_id).unwrap();
        assert!(!store.is_seal_consumed("bitcoin", &seal_id).unwrap());
    }

    #[test]
    fn test_sqlite_remove_seals_after_height() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
        store.save_seal(&test_seal_record("bitcoin", 150)).unwrap();
        store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
        let removed = store.remove_seals_after("bitcoin", 150).unwrap();
        assert_eq!(removed, 1);
    }

    #[test]
    fn test_sqlite_anchor_lifecycle() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        let anchor = test_anchor_record("bitcoin", 100);
        let anchor_id = anchor.anchor_id.clone();
        store.save_anchor(&anchor).unwrap();
        assert!(store.has_anchor("bitcoin", &anchor_id).unwrap());

        let pending = store.pending_anchors("bitcoin").unwrap();
        assert_eq!(pending.len(), 1);

        store.finalize_anchor("bitcoin", &anchor_id, 6).unwrap();
        let pending = store.pending_anchors("bitcoin").unwrap();
        assert!(pending.is_empty());
    }

    #[test]
    fn test_sqlite_remove_anchors_after_height() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 100))
            .unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 200))
            .unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 300))
            .unwrap();
        let removed = store.remove_anchors_after("bitcoin", 200).unwrap();
        assert_eq!(removed, 1);
    }

    #[test]
    fn test_sqlite_highest_block() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 100))
            .unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 300))
            .unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 200))
            .unwrap();
        assert_eq!(store.highest_block("bitcoin").unwrap(), 300);
        assert_eq!(store.highest_block("ethereum").unwrap(), 0);
    }

    #[test]
    fn test_sqlite_duplicate_seal_ignored() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        let record = test_seal_record("bitcoin", 100);
        let _seal_id = record.seal_id.clone();
        store.save_seal(&record).unwrap();
        // Try to save another seal with the same seal_id but different height
        let mut dup = record.clone();
        dup.consumed_at_height = 200;
        store.save_seal(&dup).unwrap();
        let seals = store.get_seals("bitcoin").unwrap();
        // INSERT OR IGNORE means only the first one is stored
        assert_eq!(seals.len(), 1);
        assert_eq!(seals[0].consumed_at_height, 100);
    }

    #[test]
    fn test_sqlite_multi_chain_isolation() {
        let mut store = SqliteSealStore::in_memory().unwrap();
        store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
        store.save_seal(&test_seal_record("ethereum", 200)).unwrap();
        store
            .save_anchor(&test_anchor_record("bitcoin", 100))
            .unwrap();
        store
            .save_anchor(&test_anchor_record("ethereum", 200))
            .unwrap();

        assert_eq!(store.get_seals("bitcoin").unwrap().len(), 1);
        assert_eq!(store.pending_anchors("bitcoin").unwrap().len(), 1);
    }
}