1#![allow(missing_docs)]
4#![allow(dead_code)]
5
6use csv_adapter_core::{AnchorRecord, Hash, SealRecord, SealStore, StoreError};
7use rusqlite::{params, Connection};
8use std::sync::Mutex;
9
10pub struct SqliteSealStore {
12 conn: Mutex<Connection>,
13}
14
15impl SqliteSealStore {
16 pub fn open(path: &str) -> Result<Self, StoreError> {
18 let conn = Connection::open(path).map_err(|e| StoreError::IoError(e.to_string()))?;
19 Self::init_tables(&conn)?;
20 Ok(Self {
21 conn: Mutex::new(conn),
22 })
23 }
24
25 pub fn in_memory() -> Result<Self, StoreError> {
27 let conn = Connection::open_in_memory().map_err(|e| StoreError::IoError(e.to_string()))?;
28 Self::init_tables(&conn)?;
29 Ok(Self {
30 conn: Mutex::new(conn),
31 })
32 }
33
34 fn init_tables(conn: &Connection) -> Result<(), StoreError> {
35 conn.execute_batch(
36 "CREATE TABLE IF NOT EXISTS seals (
37 id INTEGER PRIMARY KEY AUTOINCREMENT,
38 chain TEXT NOT NULL,
39 seal_id BLOB NOT NULL,
40 consumed_at_height INTEGER NOT NULL,
41 commitment_hash BLOB NOT NULL,
42 recorded_at INTEGER NOT NULL,
43 UNIQUE(chain, seal_id)
44 );
45 CREATE INDEX IF NOT EXISTS idx_seals_chain ON seals(chain);
46 CREATE INDEX IF NOT EXISTS idx_seals_height ON seals(chain, consumed_at_height);
47
48 CREATE TABLE IF NOT EXISTS anchors (
49 id INTEGER PRIMARY KEY AUTOINCREMENT,
50 chain TEXT NOT NULL,
51 anchor_id BLOB NOT NULL,
52 block_height INTEGER NOT NULL,
53 commitment_hash BLOB NOT NULL,
54 is_finalized INTEGER NOT NULL DEFAULT 0,
55 confirmations INTEGER NOT NULL DEFAULT 0,
56 recorded_at INTEGER NOT NULL,
57 UNIQUE(chain, anchor_id)
58 );
59 CREATE INDEX IF NOT EXISTS idx_anchors_chain ON anchors(chain);
60 CREATE INDEX IF NOT EXISTS idx_anchors_height ON anchors(chain, block_height);
61 CREATE INDEX IF NOT EXISTS idx_anchors_pending ON anchors(chain, is_finalized);
62 ",
63 )
64 .map_err(|e| StoreError::IoError(e.to_string()))?;
65 Ok(())
66 }
67}
68
69impl SealStore for SqliteSealStore {
70 fn save_seal(&mut self, record: &SealRecord) -> Result<(), StoreError> {
71 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
72 conn.execute(
73 "INSERT OR IGNORE INTO seals (chain, seal_id, consumed_at_height, commitment_hash, recorded_at)
74 VALUES (?1, ?2, ?3, ?4, ?5)",
75 params![
76 record.chain,
77 record.seal_id,
78 record.consumed_at_height as i64,
79 record.commitment_hash.as_bytes(),
80 record.recorded_at as i64,
81 ],
82 ).map_err(|e| StoreError::IoError(e.to_string()))?;
83 Ok(())
84 }
85
86 fn is_seal_consumed(&self, chain: &str, seal_id: &[u8]) -> Result<bool, StoreError> {
87 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
88 let count: i64 = conn
89 .query_row(
90 "SELECT COUNT(*) FROM seals WHERE chain = ?1 AND seal_id = ?2",
91 params![chain, seal_id],
92 |row| row.get(0),
93 )
94 .map_err(|e| StoreError::IoError(e.to_string()))?;
95 Ok(count > 0)
96 }
97
98 fn get_seals(&self, chain: &str) -> Result<Vec<SealRecord>, StoreError> {
99 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
100 let mut stmt = conn.prepare(
101 "SELECT seal_id, consumed_at_height, commitment_hash, recorded_at FROM seals WHERE chain = ?1"
102 ).map_err(|e| StoreError::IoError(e.to_string()))?;
103
104 let seals = stmt
105 .query_map(params![chain], |row| {
106 let seal_id: Vec<u8> = row.get(0)?;
107 let consumed_at_height: i64 = row.get(1)?;
108 let commitment_hash: Vec<u8> = row.get(2)?;
109 let recorded_at: i64 = row.get(3)?;
110 let mut hash_bytes = [0u8; 32];
111 hash_bytes.copy_from_slice(&commitment_hash);
112 Ok(SealRecord {
113 chain: chain.to_string(),
114 seal_id,
115 consumed_at_height: consumed_at_height as u64,
116 commitment_hash: Hash::new(hash_bytes),
117 recorded_at: recorded_at as u64,
118 })
119 })
120 .map_err(|e| StoreError::IoError(e.to_string()))?;
121
122 seals
123 .collect::<Result<Vec<_>, _>>()
124 .map_err(|e| StoreError::IoError(e.to_string()))
125 }
126
127 fn remove_seal(&mut self, chain: &str, seal_id: &[u8]) -> Result<(), StoreError> {
128 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
129 conn.execute(
130 "DELETE FROM seals WHERE chain = ?1 AND seal_id = ?2",
131 params![chain, seal_id],
132 )
133 .map_err(|e| StoreError::IoError(e.to_string()))?;
134 Ok(())
135 }
136
137 fn remove_seals_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
138 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
139 let removed = conn
140 .execute(
141 "DELETE FROM seals WHERE chain = ?1 AND consumed_at_height > ?2",
142 params![chain, height as i64],
143 )
144 .map_err(|e| StoreError::IoError(e.to_string()))?;
145 Ok(removed)
146 }
147
148 fn save_anchor(&mut self, record: &AnchorRecord) -> Result<(), StoreError> {
149 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
150 conn.execute(
151 "INSERT OR IGNORE INTO anchors (chain, anchor_id, block_height, commitment_hash, is_finalized, confirmations, recorded_at)
152 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
153 params![
154 record.chain,
155 record.anchor_id,
156 record.block_height as i64,
157 record.commitment_hash.as_bytes(),
158 record.is_finalized as i64,
159 record.confirmations as i64,
160 record.recorded_at as i64,
161 ],
162 ).map_err(|e| StoreError::IoError(e.to_string()))?;
163 Ok(())
164 }
165
166 fn has_anchor(&self, chain: &str, anchor_id: &[u8]) -> Result<bool, StoreError> {
167 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
168 let count: i64 = conn
169 .query_row(
170 "SELECT COUNT(*) FROM anchors WHERE chain = ?1 AND anchor_id = ?2",
171 params![chain, anchor_id],
172 |row| row.get(0),
173 )
174 .map_err(|e| StoreError::IoError(e.to_string()))?;
175 Ok(count > 0)
176 }
177
178 fn finalize_anchor(
179 &mut self,
180 chain: &str,
181 anchor_id: &[u8],
182 confirmations: u64,
183 ) -> Result<(), StoreError> {
184 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
185 conn.execute(
186 "UPDATE anchors SET is_finalized = 1, confirmations = ?3
187 WHERE chain = ?1 AND anchor_id = ?2",
188 params![chain, anchor_id, confirmations as i64],
189 )
190 .map_err(|e| StoreError::IoError(e.to_string()))?;
191 Ok(())
192 }
193
194 fn pending_anchors(&self, chain: &str) -> Result<Vec<AnchorRecord>, StoreError> {
195 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
196 let mut stmt = conn
197 .prepare(
198 "SELECT anchor_id, block_height, commitment_hash, confirmations, recorded_at
199 FROM anchors WHERE chain = ?1 AND is_finalized = 0",
200 )
201 .map_err(|e| StoreError::IoError(e.to_string()))?;
202
203 let anchors = stmt
204 .query_map(params![chain], |row| {
205 let anchor_id: Vec<u8> = row.get(0)?;
206 let block_height: i64 = row.get(1)?;
207 let commitment_hash: Vec<u8> = row.get(2)?;
208 let confirmations: i64 = row.get(3)?;
209 let recorded_at: i64 = row.get(4)?;
210 let mut hash_bytes = [0u8; 32];
211 hash_bytes.copy_from_slice(&commitment_hash);
212 Ok(AnchorRecord {
213 chain: chain.to_string(),
214 anchor_id,
215 block_height: block_height as u64,
216 commitment_hash: Hash::new(hash_bytes),
217 is_finalized: false,
218 confirmations: confirmations as u64,
219 recorded_at: recorded_at as u64,
220 })
221 })
222 .map_err(|e| StoreError::IoError(e.to_string()))?;
223
224 anchors
225 .collect::<Result<Vec<_>, _>>()
226 .map_err(|e| StoreError::IoError(e.to_string()))
227 }
228
229 fn remove_anchors_after(&mut self, chain: &str, height: u64) -> Result<usize, StoreError> {
230 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
231 let removed = conn
232 .execute(
233 "DELETE FROM anchors WHERE chain = ?1 AND block_height > ?2",
234 params![chain, height as i64],
235 )
236 .map_err(|e| StoreError::IoError(e.to_string()))?;
237 Ok(removed)
238 }
239
240 fn highest_block(&self, chain: &str) -> Result<u64, StoreError> {
241 let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
242 let max: Option<i64> = conn
243 .query_row(
244 "SELECT MAX(block_height) FROM anchors WHERE chain = ?1",
245 params![chain],
246 |row| row.get(0),
247 )
248 .map_err(|e| StoreError::IoError(e.to_string()))?;
249 Ok(max.unwrap_or(0) as u64)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use csv_adapter_core::SealRecord;
257
258 fn test_seal_record(chain: &str, height: u64) -> SealRecord {
259 let mut seal_id = vec![0u8; 16];
260 seal_id[0..8].copy_from_slice(&height.to_le_bytes());
261 seal_id[8..].copy_from_slice(chain.as_bytes().get(..8).unwrap_or(&[0u8; 8]));
262 SealRecord {
263 chain: chain.to_string(),
264 seal_id,
265 consumed_at_height: height,
266 commitment_hash: Hash::new([0xAA; 32]),
267 recorded_at: 1700000000,
268 }
269 }
270
271 fn test_anchor_record(chain: &str, height: u64) -> AnchorRecord {
272 let mut anchor_id = vec![0u8; 16];
273 anchor_id[0..8].copy_from_slice(&height.to_le_bytes());
274 AnchorRecord {
275 chain: chain.to_string(),
276 anchor_id,
277 block_height: height,
278 commitment_hash: Hash::new([0xBB; 32]),
279 is_finalized: false,
280 confirmations: 0,
281 recorded_at: 1700000000,
282 }
283 }
284
285 #[test]
286 fn test_sqlite_open_in_memory() {
287 let store = SqliteSealStore::in_memory().unwrap();
288 assert_eq!(store.highest_block("bitcoin").unwrap(), 0);
289 }
290
291 #[test]
292 fn test_sqlite_seal_lifecycle() {
293 let mut store = SqliteSealStore::in_memory().unwrap();
294 let record = test_seal_record("bitcoin", 100);
295 let seal_id = record.seal_id.clone();
296 store.save_seal(&record).unwrap();
297 assert!(store.is_seal_consumed("bitcoin", &seal_id).unwrap());
298 assert!(!store.is_seal_consumed("ethereum", &seal_id).unwrap());
299 }
300
301 #[test]
302 fn test_sqlite_get_seals() {
303 let mut store = SqliteSealStore::in_memory().unwrap();
304 store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
305 store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
306 store.save_seal(&test_seal_record("ethereum", 300)).unwrap();
307
308 let btc_seals = store.get_seals("bitcoin").unwrap();
309 assert_eq!(btc_seals.len(), 2);
310
311 let eth_seals = store.get_seals("ethereum").unwrap();
312 assert_eq!(eth_seals.len(), 1);
313 }
314
315 #[test]
316 fn test_sqlite_remove_seal() {
317 let mut store = SqliteSealStore::in_memory().unwrap();
318 let record = test_seal_record("bitcoin", 100);
319 let seal_id = record.seal_id.clone();
320 store.save_seal(&record).unwrap();
321 store.remove_seal("bitcoin", &seal_id).unwrap();
322 assert!(!store.is_seal_consumed("bitcoin", &seal_id).unwrap());
323 }
324
325 #[test]
326 fn test_sqlite_remove_seals_after_height() {
327 let mut store = SqliteSealStore::in_memory().unwrap();
328 store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
329 store.save_seal(&test_seal_record("bitcoin", 150)).unwrap();
330 store.save_seal(&test_seal_record("bitcoin", 200)).unwrap();
331 let removed = store.remove_seals_after("bitcoin", 150).unwrap();
332 assert_eq!(removed, 1);
333 }
334
335 #[test]
336 fn test_sqlite_anchor_lifecycle() {
337 let mut store = SqliteSealStore::in_memory().unwrap();
338 let anchor = test_anchor_record("bitcoin", 100);
339 let anchor_id = anchor.anchor_id.clone();
340 store.save_anchor(&anchor).unwrap();
341 assert!(store.has_anchor("bitcoin", &anchor_id).unwrap());
342
343 let pending = store.pending_anchors("bitcoin").unwrap();
344 assert_eq!(pending.len(), 1);
345
346 store.finalize_anchor("bitcoin", &anchor_id, 6).unwrap();
347 let pending = store.pending_anchors("bitcoin").unwrap();
348 assert!(pending.is_empty());
349 }
350
351 #[test]
352 fn test_sqlite_remove_anchors_after_height() {
353 let mut store = SqliteSealStore::in_memory().unwrap();
354 store
355 .save_anchor(&test_anchor_record("bitcoin", 100))
356 .unwrap();
357 store
358 .save_anchor(&test_anchor_record("bitcoin", 200))
359 .unwrap();
360 store
361 .save_anchor(&test_anchor_record("bitcoin", 300))
362 .unwrap();
363 let removed = store.remove_anchors_after("bitcoin", 200).unwrap();
364 assert_eq!(removed, 1);
365 }
366
367 #[test]
368 fn test_sqlite_highest_block() {
369 let mut store = SqliteSealStore::in_memory().unwrap();
370 store
371 .save_anchor(&test_anchor_record("bitcoin", 100))
372 .unwrap();
373 store
374 .save_anchor(&test_anchor_record("bitcoin", 300))
375 .unwrap();
376 store
377 .save_anchor(&test_anchor_record("bitcoin", 200))
378 .unwrap();
379 assert_eq!(store.highest_block("bitcoin").unwrap(), 300);
380 assert_eq!(store.highest_block("ethereum").unwrap(), 0);
381 }
382
383 #[test]
384 fn test_sqlite_duplicate_seal_ignored() {
385 let mut store = SqliteSealStore::in_memory().unwrap();
386 let record = test_seal_record("bitcoin", 100);
387 let _seal_id = record.seal_id.clone();
388 store.save_seal(&record).unwrap();
389 let mut dup = record.clone();
391 dup.consumed_at_height = 200;
392 store.save_seal(&dup).unwrap();
393 let seals = store.get_seals("bitcoin").unwrap();
394 assert_eq!(seals.len(), 1);
396 assert_eq!(seals[0].consumed_at_height, 100);
397 }
398
399 #[test]
400 fn test_sqlite_multi_chain_isolation() {
401 let mut store = SqliteSealStore::in_memory().unwrap();
402 store.save_seal(&test_seal_record("bitcoin", 100)).unwrap();
403 store.save_seal(&test_seal_record("ethereum", 200)).unwrap();
404 store
405 .save_anchor(&test_anchor_record("bitcoin", 100))
406 .unwrap();
407 store
408 .save_anchor(&test_anchor_record("ethereum", 200))
409 .unwrap();
410
411 assert_eq!(store.get_seals("bitcoin").unwrap().len(), 1);
412 assert_eq!(store.pending_anchors("bitcoin").unwrap().len(), 1);
413 }
414}