1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53 INSTALL_DRIVERS.call_once(|| {
54 sqlx::any::install_default_drivers();
55 });
56}
57
58pub struct SqlStore {
59 pool: AnyPool,
60 is_sqlite: bool,
61}
62
63impl SqlStore {
64 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66 install_drivers();
67
68 let is_sqlite = connection_string.starts_with("sqlite:");
69
70 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71 AnyPoolOptions::new()
72 .max_connections(20)
73 .acquire_timeout(Duration::from_secs(10))
74 .idle_timeout(Duration::from_secs(300))
75 .connect(connection_string)
76 .await
77 .map_err(|e| StorageError::Backend(e.to_string()))
78 })
79 .await?;
80
81 let store = Self { pool, is_sqlite };
82
83 if is_sqlite {
85 store.enable_wal_mode().await?;
86 }
87
88 store.init_schema().await?;
89 Ok(store)
90 }
91
92 pub fn pool(&self) -> AnyPool {
94 self.pool.clone()
95 }
96
97 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
104 sqlx::query("PRAGMA journal_mode = WAL")
105 .execute(&self.pool)
106 .await
107 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
108
109 sqlx::query("PRAGMA synchronous = NORMAL")
112 .execute(&self.pool)
113 .await
114 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
115
116 Ok(())
117 }
118
119 async fn init_schema(&self) -> Result<(), StorageError> {
120 let sql = if self.is_sqlite {
130 r#"
131 CREATE TABLE IF NOT EXISTS sync_items (
132 id TEXT PRIMARY KEY,
133 version INTEGER NOT NULL DEFAULT 1,
134 timestamp INTEGER NOT NULL,
135 payload_hash TEXT,
136 payload TEXT,
137 payload_blob BLOB,
138 audit TEXT,
139 merkle_dirty INTEGER NOT NULL DEFAULT 1,
140 state TEXT NOT NULL DEFAULT 'default',
141 access_count INTEGER NOT NULL DEFAULT 0,
142 last_accessed INTEGER NOT NULL DEFAULT 0
143 )
144 "#
145 } else {
146 r#"
149 CREATE TABLE IF NOT EXISTS sync_items (
150 id VARCHAR(255) PRIMARY KEY,
151 version BIGINT NOT NULL DEFAULT 1,
152 timestamp BIGINT NOT NULL,
153 payload_hash VARCHAR(64),
154 payload LONGTEXT,
155 payload_blob MEDIUMBLOB,
156 audit TEXT,
157 merkle_dirty TINYINT NOT NULL DEFAULT 1,
158 state VARCHAR(32) NOT NULL DEFAULT 'default',
159 access_count BIGINT NOT NULL DEFAULT 0,
160 last_accessed BIGINT NOT NULL DEFAULT 0,
161 INDEX idx_timestamp (timestamp),
162 INDEX idx_merkle_dirty (merkle_dirty),
163 INDEX idx_state (state)
164 )
165 "#
166 };
167
168 retry("sql_init_schema", &RetryConfig::startup(), || async {
169 sqlx::query(sql)
170 .execute(&self.pool)
171 .await
172 .map_err(|e| StorageError::Backend(e.to_string()))
173 })
174 .await?;
175
176 Ok(())
177 }
178
179 fn build_audit_json(item: &SyncItem) -> Option<String> {
181 let mut audit = serde_json::Map::new();
182
183 if let Some(ref batch_id) = item.batch_id {
184 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
185 }
186 if let Some(ref trace_parent) = item.trace_parent {
187 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
188 }
189 if let Some(ref home) = item.home_instance_id {
190 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
191 }
192
193 if audit.is_empty() {
194 None
195 } else {
196 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
197 }
198 }
199
200 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
202 match audit_str {
203 Some(s) => {
204 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
205 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
206 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
207 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
208 (batch_id, trace_parent, home_instance_id)
209 } else {
210 (None, None, None)
211 }
212 }
213 None => (None, None, None),
214 }
215 }
216}
217
218#[async_trait]
219impl ArchiveStore for SqlStore {
220 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
221 let id = id.to_string();
222
223 retry("sql_get", &RetryConfig::query(), || async {
224 let result = sqlx::query(
225 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
226 )
227 .bind(&id)
228 .fetch_optional(&self.pool)
229 .await
230 .map_err(|e| StorageError::Backend(e.to_string()))?;
231
232 match result {
233 Some(row) => {
234 let version: i64 = row.try_get("version").unwrap_or(1);
235 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
236 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
237
238 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
240 .or_else(|| {
241 row.try_get::<Vec<u8>, _>("payload").ok()
242 .and_then(|bytes| String::from_utf8(bytes).ok())
243 });
244
245 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
246
247 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
249 .or_else(|| {
250 row.try_get::<Vec<u8>, _>("audit").ok()
251 .and_then(|bytes| String::from_utf8(bytes).ok())
252 });
253
254 let state: String = row.try_get::<String, _>("state").ok()
256 .or_else(|| {
257 row.try_get::<Vec<u8>, _>("state").ok()
258 .and_then(|bytes| String::from_utf8(bytes).ok())
259 })
260 .unwrap_or_else(|| "default".to_string());
261
262 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
264 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
265
266 let (content, content_type) = if let Some(ref json_str) = payload_json {
268 let content = json_str.as_bytes().to_vec();
270 (content, ContentType::Json)
271 } else if let Some(blob) = payload_blob {
272 (blob, ContentType::Binary)
274 } else {
275 return Err(StorageError::Backend("No payload in row".to_string()));
276 };
277
278 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
280
281 let item = SyncItem::reconstruct(
282 id.clone(),
283 version as u64,
284 timestamp,
285 content_type,
286 content,
287 batch_id,
288 trace_parent,
289 payload_hash.unwrap_or_default(),
290 home_instance_id,
291 state,
292 access_count as u64,
293 last_accessed as u64,
294 );
295 Ok(Some(item))
296 }
297 None => Ok(None),
298 }
299 })
300 .await
301 }
302
303 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
304 let id = item.object_id.clone();
305 let version = item.version as i64;
306 let timestamp = item.updated_at;
307 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
308 let audit_json = Self::build_audit_json(item);
309 let state = item.state.clone();
310
311 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
313 ContentType::Json => {
314 let json_str = String::from_utf8_lossy(&item.content).to_string();
315 (Some(json_str), None)
316 }
317 ContentType::Binary => {
318 (None, Some(item.content.clone()))
319 }
320 };
321
322 let sql = if self.is_sqlite {
323 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
324 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
325 ON CONFLICT(id) DO UPDATE SET
326 version = excluded.version,
327 timestamp = excluded.timestamp,
328 payload_hash = excluded.payload_hash,
329 payload = excluded.payload,
330 payload_blob = excluded.payload_blob,
331 audit = excluded.audit,
332 merkle_dirty = 1,
333 state = excluded.state,
334 access_count = excluded.access_count,
335 last_accessed = excluded.last_accessed"
336 } else {
337 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
338 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
339 ON DUPLICATE KEY UPDATE
340 version = VALUES(version),
341 timestamp = VALUES(timestamp),
342 payload_hash = VALUES(payload_hash),
343 payload = VALUES(payload),
344 payload_blob = VALUES(payload_blob),
345 audit = VALUES(audit),
346 merkle_dirty = 1,
347 state = VALUES(state),
348 access_count = VALUES(access_count),
349 last_accessed = VALUES(last_accessed)"
350 };
351
352 retry("sql_put", &RetryConfig::query(), || async {
353 sqlx::query(sql)
354 .bind(&id)
355 .bind(version)
356 .bind(timestamp)
357 .bind(&payload_hash)
358 .bind(&payload_json)
359 .bind(&payload_blob)
360 .bind(&audit_json)
361 .bind(&state)
362 .bind(item.access_count as i64)
363 .bind(item.last_accessed as i64)
364 .execute(&self.pool)
365 .await
366 .map_err(|e| StorageError::Backend(e.to_string()))?;
367 Ok(())
368 })
369 .await
370 }
371
372 async fn delete(&self, id: &str) -> Result<(), StorageError> {
373 let id = id.to_string();
374 retry("sql_delete", &RetryConfig::query(), || async {
375 sqlx::query("DELETE FROM sync_items WHERE id = ?")
376 .bind(&id)
377 .execute(&self.pool)
378 .await
379 .map_err(|e| StorageError::Backend(e.to_string()))?;
380 Ok(())
381 })
382 .await
383 }
384
385 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
386 let id = id.to_string();
387 retry("sql_exists", &RetryConfig::query(), || async {
388 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
389 .bind(&id)
390 .fetch_optional(&self.pool)
391 .await
392 .map_err(|e| StorageError::Backend(e.to_string()))?;
393 Ok(result.is_some())
394 })
395 .await
396 }
397
398 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
400 if items.is_empty() {
401 return Ok(BatchWriteResult {
402 batch_id: String::new(),
403 written: 0,
404 verified: true,
405 });
406 }
407
408 let batch_id = uuid::Uuid::new_v4().to_string();
410
411 for item in items.iter_mut() {
413 item.batch_id = Some(batch_id.clone());
414 }
415
416 const CHUNK_SIZE: usize = 500;
418 let mut total_written = 0usize;
419
420 for chunk in items.chunks(CHUNK_SIZE) {
421 let written = self.put_batch_chunk(chunk, &batch_id).await?;
422 total_written += written;
423 }
424
425 let verified_count = self.verify_batch(&batch_id).await?;
427 let verified = verified_count == items.len();
428
429 if !verified {
430 tracing::warn!(
431 batch_id = %batch_id,
432 expected = items.len(),
433 actual = verified_count,
434 "Batch verification mismatch"
435 );
436 }
437
438 Ok(BatchWriteResult {
439 batch_id,
440 written: total_written,
441 verified,
442 })
443 }
444
445 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
446 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
447 .bind(limit as i64)
448 .bind(offset as i64)
449 .fetch_all(&self.pool)
450 .await
451 .map_err(|e| StorageError::Backend(e.to_string()))?;
452
453 let mut keys = Vec::with_capacity(rows.len());
454 for row in rows {
455 let id: String = row.try_get("id")
456 .map_err(|e| StorageError::Backend(e.to_string()))?;
457 keys.push(id);
458 }
459
460 Ok(keys)
461 }
462
463 async fn count_all(&self) -> Result<u64, StorageError> {
464 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
465 .fetch_one(&self.pool)
466 .await
467 .map_err(|e| StorageError::Backend(e.to_string()))?;
468
469 let count: i64 = result.try_get("cnt")
470 .map_err(|e| StorageError::Backend(e.to_string()))?;
471
472 Ok(count as u64)
473 }
474}
475
476impl SqlStore {
477 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
480 let placeholders: Vec<String> = (0..chunk.len())
481 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
482 .collect();
483
484 let sql = if self.is_sqlite {
485 format!(
486 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
487 ON CONFLICT(id) DO UPDATE SET \
488 version = excluded.version, \
489 timestamp = excluded.timestamp, \
490 payload_hash = excluded.payload_hash, \
491 payload = excluded.payload, \
492 payload_blob = excluded.payload_blob, \
493 audit = excluded.audit, \
494 merkle_dirty = 1, \
495 state = excluded.state, \
496 access_count = excluded.access_count, \
497 last_accessed = excluded.last_accessed",
498 placeholders.join(", ")
499 )
500 } else {
501 format!(
502 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
503 ON DUPLICATE KEY UPDATE \
504 version = VALUES(version), \
505 timestamp = VALUES(timestamp), \
506 payload_hash = VALUES(payload_hash), \
507 payload = VALUES(payload), \
508 payload_blob = VALUES(payload_blob), \
509 audit = VALUES(audit), \
510 merkle_dirty = 1, \
511 state = VALUES(state), \
512 access_count = VALUES(access_count), \
513 last_accessed = VALUES(last_accessed)",
514 placeholders.join(", ")
515 )
516 };
517
518 #[derive(Clone)]
520 struct PreparedRow {
521 id: String,
522 version: i64,
523 timestamp: i64,
524 payload_hash: Option<String>,
525 payload_json: Option<String>,
526 payload_blob: Option<Vec<u8>>,
527 audit_json: Option<String>,
528 state: String,
529 access_count: i64,
530 last_accessed: i64,
531 }
532
533 let prepared: Vec<PreparedRow> = chunk.iter()
534 .map(|item| {
535 let (payload_json, payload_blob) = match item.content_type {
536 ContentType::Json => {
537 let json_str = String::from_utf8_lossy(&item.content).to_string();
538 (Some(json_str), None)
539 }
540 ContentType::Binary => {
541 (None, Some(item.content.clone()))
542 }
543 };
544
545 PreparedRow {
546 id: item.object_id.clone(),
547 version: item.version as i64,
548 timestamp: item.updated_at,
549 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
550 payload_json,
551 payload_blob,
552 audit_json: Self::build_audit_json(item),
553 state: item.state.clone(),
554 access_count: item.access_count as i64,
555 last_accessed: item.last_accessed as i64,
556 }
557 })
558 .collect();
559
560 retry("sql_put_batch", &RetryConfig::query(), || {
561 let sql = sql.clone();
562 let prepared = prepared.clone();
563 async move {
564 let mut query = sqlx::query(&sql);
565
566 for row in &prepared {
567 query = query
568 .bind(&row.id)
569 .bind(row.version)
570 .bind(row.timestamp)
571 .bind(&row.payload_hash)
572 .bind(&row.payload_json)
573 .bind(&row.payload_blob)
574 .bind(&row.audit_json)
575 .bind(&row.state)
576 .bind(row.access_count)
577 .bind(row.last_accessed);
578 }
579
580 query.execute(&self.pool)
581 .await
582 .map_err(|e| StorageError::Backend(e.to_string()))?;
583
584 Ok(())
585 }
586 })
587 .await?;
588
589 Ok(chunk.len())
590 }
591
592 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
594 let batch_id = batch_id.to_string();
595
596 let sql = if self.is_sqlite {
598 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
599 } else {
600 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
601 };
602
603 let bind_value = if self.is_sqlite {
604 format!("%\"batch\":\"{}%", batch_id)
605 } else {
606 batch_id.clone()
607 };
608
609 let result = sqlx::query(sql)
610 .bind(&bind_value)
611 .fetch_one(&self.pool)
612 .await
613 .map_err(|e| StorageError::Backend(e.to_string()))?;
614
615 let count: i64 = result.try_get("cnt")
616 .map_err(|e| StorageError::Backend(e.to_string()))?;
617
618 Ok(count as usize)
619 }
620
621 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
623 let rows = sqlx::query(
624 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
625 )
626 .bind(limit as i64)
627 .fetch_all(&self.pool)
628 .await
629 .map_err(|e| StorageError::Backend(e.to_string()))?;
630
631 let mut items = Vec::with_capacity(rows.len());
632 for row in rows {
633 let id: String = row.try_get("id")
634 .map_err(|e| StorageError::Backend(e.to_string()))?;
635 let version: i64 = row.try_get("version").unwrap_or(1);
636 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
637 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
638
639 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
641 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
642 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
643 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
644 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
645
646 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
647 let state: String = state_bytes
648 .and_then(|bytes| String::from_utf8(bytes).ok())
649 .unwrap_or_else(|| "default".to_string());
650
651 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
653 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
654
655 let (content, content_type) = if let Some(ref json_str) = payload_json {
656 (json_str.as_bytes().to_vec(), ContentType::Json)
657 } else if let Some(blob) = payload_blob {
658 (blob, ContentType::Binary)
659 } else {
660 continue; };
662
663 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
664
665 let item = SyncItem::reconstruct(
666 id,
667 version as u64,
668 timestamp,
669 content_type,
670 content,
671 batch_id,
672 trace_parent,
673 payload_hash.unwrap_or_default(),
674 home_instance_id,
675 state,
676 access_count as u64,
677 last_accessed as u64,
678 );
679 items.push(item);
680 }
681
682 Ok(items)
683 }
684
685 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
687 if ids.is_empty() {
688 return Ok(0);
689 }
690
691 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
692 let sql = format!(
693 "DELETE FROM sync_items WHERE id IN ({})",
694 placeholders.join(", ")
695 );
696
697 retry("sql_delete_batch", &RetryConfig::query(), || {
698 let sql = sql.clone();
699 let ids = ids.to_vec();
700 async move {
701 let mut query = sqlx::query(&sql);
702 for id in &ids {
703 query = query.bind(id);
704 }
705
706 let result = query.execute(&self.pool)
707 .await
708 .map_err(|e| StorageError::Backend(e.to_string()))?;
709
710 Ok(result.rows_affected() as usize)
711 }
712 })
713 .await
714 }
715
716 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
724 let rows = sqlx::query(
725 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
726 )
727 .bind(limit as i64)
728 .fetch_all(&self.pool)
729 .await
730 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
731
732 let mut ids = Vec::with_capacity(rows.len());
733 for row in rows {
734 let id: String = row.try_get("id")
735 .map_err(|e| StorageError::Backend(e.to_string()))?;
736 ids.push(id);
737 }
738
739 Ok(ids)
740 }
741
742 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
744 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
745 .fetch_one(&self.pool)
746 .await
747 .map_err(|e| StorageError::Backend(e.to_string()))?;
748
749 let count: i64 = result.try_get("cnt")
750 .map_err(|e| StorageError::Backend(e.to_string()))?;
751
752 Ok(count as u64)
753 }
754
755 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
757 if ids.is_empty() {
758 return Ok(0);
759 }
760
761 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
762 let sql = format!(
763 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
764 placeholders.join(", ")
765 );
766
767 let mut query = sqlx::query(&sql);
768 for id in ids {
769 query = query.bind(id);
770 }
771
772 let result = query.execute(&self.pool)
773 .await
774 .map_err(|e| StorageError::Backend(e.to_string()))?;
775
776 Ok(result.rows_affected() as usize)
777 }
778
779 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
781 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
782 .fetch_optional(&self.pool)
783 .await
784 .map_err(|e| StorageError::Backend(e.to_string()))?;
785
786 Ok(result.is_some())
787 }
788
789 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
794 let rows = sqlx::query(
795 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
796 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
797 )
798 .bind(limit as i64)
799 .fetch_all(&self.pool)
800 .await
801 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
802
803 let mut items = Vec::with_capacity(rows.len());
804 for row in rows {
805 let id: String = row.try_get("id")
806 .map_err(|e| StorageError::Backend(e.to_string()))?;
807 let version: i64 = row.try_get("version").unwrap_or(1);
808 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
809 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
810
811 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
813 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
814 String::from_utf8(bytes).ok()
815 });
816
817 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
818 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
819 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
820 String::from_utf8(bytes).ok()
821 });
822
823 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
825 let state: String = state_bytes
826 .and_then(|bytes| String::from_utf8(bytes).ok())
827 .unwrap_or_else(|| "default".to_string());
828
829 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
831 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
832
833 let (content, content_type) = if let Some(ref json_str) = payload_json {
835 (json_str.as_bytes().to_vec(), ContentType::Json)
836 } else if let Some(blob) = payload_blob {
837 (blob, ContentType::Binary)
838 } else {
839 continue; };
841
842 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
844
845 let item = SyncItem::reconstruct(
846 id,
847 version as u64,
848 timestamp,
849 content_type,
850 content,
851 batch_id,
852 trace_parent,
853 payload_hash.unwrap_or_default(),
854 home_instance_id,
855 state,
856 access_count as u64,
857 last_accessed as u64,
858 );
859 items.push(item);
860 }
861
862 Ok(items)
863 }
864
865 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
873 let rows = sqlx::query(
874 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
875 FROM sync_items WHERE state = ? LIMIT ?"
876 )
877 .bind(state)
878 .bind(limit as i64)
879 .fetch_all(&self.pool)
880 .await
881 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
882
883 let mut items = Vec::with_capacity(rows.len());
884 for row in rows {
885 let id: String = row.try_get("id")
886 .map_err(|e| StorageError::Backend(e.to_string()))?;
887 let version: i64 = row.try_get("version").unwrap_or(1);
888 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
889 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
890
891 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
893 .or_else(|| {
894 row.try_get::<Vec<u8>, _>("payload").ok()
895 .and_then(|bytes| String::from_utf8(bytes).ok())
896 });
897
898 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
899
900 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
902 .or_else(|| {
903 row.try_get::<Vec<u8>, _>("audit").ok()
904 .and_then(|bytes| String::from_utf8(bytes).ok())
905 });
906
907 let state: String = row.try_get::<String, _>("state").ok()
909 .or_else(|| {
910 row.try_get::<Vec<u8>, _>("state").ok()
911 .and_then(|bytes| String::from_utf8(bytes).ok())
912 })
913 .unwrap_or_else(|| "default".to_string());
914
915 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
917 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
918
919 let (content, content_type) = if let Some(ref json_str) = payload_json {
920 (json_str.as_bytes().to_vec(), ContentType::Json)
921 } else if let Some(blob) = payload_blob {
922 (blob, ContentType::Binary)
923 } else {
924 continue;
925 };
926
927 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
928
929 let item = SyncItem::reconstruct(
930 id,
931 version as u64,
932 timestamp,
933 content_type,
934 content,
935 batch_id,
936 trace_parent,
937 payload_hash.unwrap_or_default(),
938 home_instance_id,
939 state,
940 access_count as u64,
941 last_accessed as u64,
942 );
943 items.push(item);
944 }
945
946 Ok(items)
947 }
948
949 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
951 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
952 .bind(state)
953 .fetch_one(&self.pool)
954 .await
955 .map_err(|e| StorageError::Backend(e.to_string()))?;
956
957 let count: i64 = result.try_get("cnt")
958 .map_err(|e| StorageError::Backend(e.to_string()))?;
959
960 Ok(count as u64)
961 }
962
963 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
965 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
966 .bind(state)
967 .bind(limit as i64)
968 .fetch_all(&self.pool)
969 .await
970 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
971
972 let mut ids = Vec::with_capacity(rows.len());
973 for row in rows {
974 let id: String = row.try_get("id")
975 .map_err(|e| StorageError::Backend(e.to_string()))?;
976 ids.push(id);
977 }
978
979 Ok(ids)
980 }
981
982 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
984 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
985 .bind(new_state)
986 .bind(id)
987 .execute(&self.pool)
988 .await
989 .map_err(|e| StorageError::Backend(e.to_string()))?;
990
991 Ok(result.rows_affected() > 0)
992 }
993
994 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
998 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
999 .bind(state)
1000 .execute(&self.pool)
1001 .await
1002 .map_err(|e| StorageError::Backend(e.to_string()))?;
1003
1004 Ok(result.rows_affected())
1005 }
1006
1007 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1018 let pattern = format!("{}%", prefix);
1020
1021 let rows = sqlx::query(
1022 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1023 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1024 )
1025 .bind(&pattern)
1026 .bind(limit as i64)
1027 .fetch_all(&self.pool)
1028 .await
1029 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1030
1031 let mut items = Vec::with_capacity(rows.len());
1032 for row in rows {
1033 let id: String = row.try_get("id")
1034 .map_err(|e| StorageError::Backend(e.to_string()))?;
1035 let version: i64 = row.try_get("version").unwrap_or(1);
1036 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1037 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1038
1039 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1041 .or_else(|| {
1042 row.try_get::<Vec<u8>, _>("payload").ok()
1043 .and_then(|bytes| String::from_utf8(bytes).ok())
1044 });
1045
1046 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1047
1048 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1050 .or_else(|| {
1051 row.try_get::<Vec<u8>, _>("audit").ok()
1052 .and_then(|bytes| String::from_utf8(bytes).ok())
1053 });
1054
1055 let state: String = row.try_get::<String, _>("state").ok()
1057 .or_else(|| {
1058 row.try_get::<Vec<u8>, _>("state").ok()
1059 .and_then(|bytes| String::from_utf8(bytes).ok())
1060 })
1061 .unwrap_or_else(|| "default".to_string());
1062
1063 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1065 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1066
1067 let (content, content_type) = if let Some(ref json_str) = payload_json {
1068 (json_str.as_bytes().to_vec(), ContentType::Json)
1069 } else if let Some(blob) = payload_blob {
1070 (blob, ContentType::Binary)
1071 } else {
1072 continue;
1073 };
1074
1075 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1076
1077 let item = SyncItem::reconstruct(
1078 id,
1079 version as u64,
1080 timestamp,
1081 content_type,
1082 content,
1083 batch_id,
1084 trace_parent,
1085 payload_hash.unwrap_or_default(),
1086 home_instance_id,
1087 state,
1088 access_count as u64,
1089 last_accessed as u64,
1090 );
1091 items.push(item);
1092 }
1093
1094 Ok(items)
1095 }
1096
1097 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1099 let pattern = format!("{}%", prefix);
1100
1101 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1102 .bind(&pattern)
1103 .fetch_one(&self.pool)
1104 .await
1105 .map_err(|e| StorageError::Backend(e.to_string()))?;
1106
1107 let count: i64 = result.try_get("cnt")
1108 .map_err(|e| StorageError::Backend(e.to_string()))?;
1109
1110 Ok(count as u64)
1111 }
1112
1113 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1117 let pattern = format!("{}%", prefix);
1118
1119 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1120 .bind(&pattern)
1121 .execute(&self.pool)
1122 .await
1123 .map_err(|e| StorageError::Backend(e.to_string()))?;
1124
1125 Ok(result.rows_affected())
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use std::path::PathBuf;
1133 use serde_json::json;
1134
1135 fn temp_db_path(name: &str) -> PathBuf {
1136 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1138 }
1139
1140 fn cleanup_db(path: &PathBuf) {
1142 let _ = std::fs::remove_file(path);
1143 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1144 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1145 }
1146
1147 fn test_item(id: &str, state: &str) -> SyncItem {
1148 SyncItem::from_json(id.to_string(), json!({"id": id}))
1149 .with_state(state)
1150 }
1151
1152 #[tokio::test]
1153 async fn test_state_stored_and_retrieved() {
1154 let db_path = temp_db_path("stored");
1155 cleanup_db(&db_path);
1156
1157 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1158 let store = SqlStore::new(&url).await.unwrap();
1159
1160 let item = test_item("item1", "delta");
1162 store.put(&item).await.unwrap();
1163
1164 let retrieved = store.get("item1").await.unwrap().unwrap();
1166 assert_eq!(retrieved.state, "delta");
1167
1168 cleanup_db(&db_path);
1169 }
1170
1171 #[tokio::test]
1172 async fn test_state_default_value() {
1173 let db_path = temp_db_path("default");
1174 cleanup_db(&db_path);
1175
1176 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1177 let store = SqlStore::new(&url).await.unwrap();
1178
1179 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1181 store.put(&item).await.unwrap();
1182
1183 let retrieved = store.get("item1").await.unwrap().unwrap();
1184 assert_eq!(retrieved.state, "default");
1185
1186 cleanup_db(&db_path);
1187 }
1188
1189 #[tokio::test]
1190 async fn test_get_by_state() {
1191 let db_path = temp_db_path("get_by_state");
1192 cleanup_db(&db_path);
1193
1194 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1195 let store = SqlStore::new(&url).await.unwrap();
1196
1197 store.put(&test_item("delta1", "delta")).await.unwrap();
1199 store.put(&test_item("delta2", "delta")).await.unwrap();
1200 store.put(&test_item("base1", "base")).await.unwrap();
1201 store.put(&test_item("pending1", "pending")).await.unwrap();
1202
1203 let deltas = store.get_by_state("delta", 100).await.unwrap();
1205 assert_eq!(deltas.len(), 2);
1206 assert!(deltas.iter().all(|i| i.state == "delta"));
1207
1208 let bases = store.get_by_state("base", 100).await.unwrap();
1209 assert_eq!(bases.len(), 1);
1210 assert_eq!(bases[0].object_id, "base1");
1211
1212 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1214 assert!(none.is_empty());
1215
1216 cleanup_db(&db_path);
1217 }
1218
1219 #[tokio::test]
1220 async fn test_get_by_state_with_limit() {
1221 let db_path = temp_db_path("get_by_state_limit");
1222 cleanup_db(&db_path);
1223
1224 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1225 let store = SqlStore::new(&url).await.unwrap();
1226
1227 for i in 0..10 {
1229 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1230 }
1231
1232 let limited = store.get_by_state("batch", 5).await.unwrap();
1234 assert_eq!(limited.len(), 5);
1235
1236 cleanup_db(&db_path);
1237 }
1238
1239 #[tokio::test]
1240 async fn test_count_by_state() {
1241 let db_path = temp_db_path("count_by_state");
1242 cleanup_db(&db_path);
1243
1244 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1245 let store = SqlStore::new(&url).await.unwrap();
1246
1247 store.put(&test_item("a1", "alpha")).await.unwrap();
1249 store.put(&test_item("a2", "alpha")).await.unwrap();
1250 store.put(&test_item("a3", "alpha")).await.unwrap();
1251 store.put(&test_item("b1", "beta")).await.unwrap();
1252
1253 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1254 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1255 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1256
1257 cleanup_db(&db_path);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_list_state_ids() {
1262 let db_path = temp_db_path("list_state_ids");
1263 cleanup_db(&db_path);
1264
1265 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266 let store = SqlStore::new(&url).await.unwrap();
1267
1268 store.put(&test_item("id1", "pending")).await.unwrap();
1269 store.put(&test_item("id2", "pending")).await.unwrap();
1270 store.put(&test_item("id3", "done")).await.unwrap();
1271
1272 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1273 assert_eq!(pending_ids.len(), 2);
1274 assert!(pending_ids.contains(&"id1".to_string()));
1275 assert!(pending_ids.contains(&"id2".to_string()));
1276
1277 cleanup_db(&db_path);
1278 }
1279
1280 #[tokio::test]
1281 async fn test_set_state() {
1282 let db_path = temp_db_path("set_state");
1283 cleanup_db(&db_path);
1284
1285 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1286 let store = SqlStore::new(&url).await.unwrap();
1287
1288 store.put(&test_item("item1", "pending")).await.unwrap();
1289
1290 let before = store.get("item1").await.unwrap().unwrap();
1292 assert_eq!(before.state, "pending");
1293
1294 let updated = store.set_state("item1", "approved").await.unwrap();
1296 assert!(updated);
1297
1298 let after = store.get("item1").await.unwrap().unwrap();
1300 assert_eq!(after.state, "approved");
1301
1302 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1304 assert!(!not_found);
1305
1306 cleanup_db(&db_path);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_delete_by_state() {
1311 let db_path = temp_db_path("delete_by_state");
1312 cleanup_db(&db_path);
1313
1314 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1315 let store = SqlStore::new(&url).await.unwrap();
1316
1317 store.put(&test_item("keep1", "keep")).await.unwrap();
1318 store.put(&test_item("keep2", "keep")).await.unwrap();
1319 store.put(&test_item("del1", "delete_me")).await.unwrap();
1320 store.put(&test_item("del2", "delete_me")).await.unwrap();
1321 store.put(&test_item("del3", "delete_me")).await.unwrap();
1322
1323 let deleted = store.delete_by_state("delete_me").await.unwrap();
1325 assert_eq!(deleted, 3);
1326
1327 assert!(store.get("del1").await.unwrap().is_none());
1329 assert!(store.get("del2").await.unwrap().is_none());
1330
1331 assert!(store.get("keep1").await.unwrap().is_some());
1333 assert!(store.get("keep2").await.unwrap().is_some());
1334
1335 let zero = store.delete_by_state("nonexistent").await.unwrap();
1337 assert_eq!(zero, 0);
1338
1339 cleanup_db(&db_path);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_multiple_puts_preserve_state() {
1344 let db_path = temp_db_path("multi_put_state");
1345 cleanup_db(&db_path);
1346
1347 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1348 let store = SqlStore::new(&url).await.unwrap();
1349
1350 store.put(&test_item("a", "state_a")).await.unwrap();
1352 store.put(&test_item("b", "state_b")).await.unwrap();
1353 store.put(&test_item("c", "state_c")).await.unwrap();
1354
1355 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1356 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1357 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1358
1359 cleanup_db(&db_path);
1360 }
1361
1362 #[tokio::test]
1363 async fn test_scan_prefix() {
1364 let db_path = temp_db_path("scan_prefix");
1365 cleanup_db(&db_path);
1366
1367 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1368 let store = SqlStore::new(&url).await.unwrap();
1369
1370 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1372 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1373 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1374 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1375 store.put(&test_item("base:user.123", "base")).await.unwrap();
1376 store.put(&test_item("base:user.456", "base")).await.unwrap();
1377
1378 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1380 assert_eq!(user123_deltas.len(), 3);
1381 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1382
1383 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1385 assert_eq!(user456_deltas.len(), 1);
1386
1387 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1389 assert_eq!(all_deltas.len(), 4);
1390
1391 let bases = store.scan_prefix("base:", 100).await.unwrap();
1393 assert_eq!(bases.len(), 2);
1394
1395 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1397 assert!(none.is_empty());
1398
1399 cleanup_db(&db_path);
1400 }
1401
1402 #[tokio::test]
1403 async fn test_scan_prefix_with_limit() {
1404 let db_path = temp_db_path("scan_prefix_limit");
1405 cleanup_db(&db_path);
1406
1407 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1408 let store = SqlStore::new(&url).await.unwrap();
1409
1410 for i in 0..20 {
1412 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1413 }
1414
1415 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1417 assert_eq!(limited.len(), 5);
1418
1419 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1421 assert_eq!(all.len(), 20);
1422
1423 cleanup_db(&db_path);
1424 }
1425
1426 #[tokio::test]
1427 async fn test_count_prefix() {
1428 let db_path = temp_db_path("count_prefix");
1429 cleanup_db(&db_path);
1430
1431 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1432 let store = SqlStore::new(&url).await.unwrap();
1433
1434 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1436 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1437 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1438 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1439 store.put(&test_item("base:user.123", "base")).await.unwrap();
1440
1441 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1443 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1444 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1445 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1446 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1447
1448 cleanup_db(&db_path);
1449 }
1450
1451 #[tokio::test]
1452 async fn test_delete_prefix() {
1453 let db_path = temp_db_path("delete_prefix");
1454 cleanup_db(&db_path);
1455
1456 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1457 let store = SqlStore::new(&url).await.unwrap();
1458
1459 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1461 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1462 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1463 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1464 store.put(&test_item("base:user.123", "base")).await.unwrap();
1465
1466 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1468 assert_eq!(deleted, 3);
1469
1470 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1472 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1473
1474 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1476
1477 assert!(store.get("base:user.123").await.unwrap().is_some());
1479
1480 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1482 assert_eq!(zero, 0);
1483
1484 cleanup_db(&db_path);
1485 }
1486}