1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54 INSTALL_DRIVERS.call_once(|| {
55 sqlx::any::install_default_drivers();
56 });
57}
58
59use std::sync::Arc;
60use crate::schema::{SchemaRegistry, DEFAULT_TABLE};
61
62pub struct SqlStore {
63 pool: AnyPool,
64 is_sqlite: bool,
65 registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73 Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74 }
75
76 pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81 install_drivers();
82
83 let is_sqlite = connection_string.starts_with("sqlite:");
84
85 let registry = if is_sqlite && !registry.is_bypass() {
87 Arc::new(SchemaRegistry::bypass())
88 } else {
89 registry
90 };
91
92 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93 AnyPoolOptions::new()
94 .max_connections(20)
95 .acquire_timeout(Duration::from_secs(10))
96 .idle_timeout(Duration::from_secs(300))
97 .connect(connection_string)
98 .await
99 .map_err(|e| StorageError::Backend(e.to_string()))
100 })
101 .await?;
102
103 let store = Self { pool, is_sqlite, registry };
104
105 if is_sqlite {
107 store.enable_wal_mode().await?;
108 }
109
110 store.init_schema().await?;
111
112 if !is_sqlite {
115 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116 .execute(&store.pool)
117 .await
118 .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119 }
120
121 Ok(store)
122 }
123
124 pub fn pool(&self) -> AnyPool {
126 self.pool.clone()
127 }
128
129 #[must_use]
131 pub fn registry(&self) -> &Arc<SchemaRegistry> {
132 &self.registry
133 }
134
135 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142 sqlx::query("PRAGMA journal_mode = WAL")
143 .execute(&self.pool)
144 .await
145 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146
147 sqlx::query("PRAGMA synchronous = NORMAL")
150 .execute(&self.pool)
151 .await
152 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153
154 Ok(())
155 }
156
157 async fn init_schema(&self) -> Result<(), StorageError> {
158 let sql = if self.is_sqlite {
168 r#"
169 CREATE TABLE IF NOT EXISTS sync_items (
170 id TEXT PRIMARY KEY,
171 version INTEGER NOT NULL DEFAULT 1,
172 timestamp INTEGER NOT NULL,
173 payload_hash TEXT,
174 payload TEXT,
175 payload_blob BLOB,
176 audit TEXT,
177 merkle_dirty INTEGER NOT NULL DEFAULT 1,
178 state TEXT NOT NULL DEFAULT 'default',
179 access_count INTEGER NOT NULL DEFAULT 0,
180 last_accessed INTEGER NOT NULL DEFAULT 0
181 )
182 "#
183 } else {
184 r#"
187 CREATE TABLE IF NOT EXISTS sync_items (
188 id VARCHAR(255) PRIMARY KEY,
189 version BIGINT NOT NULL DEFAULT 1,
190 timestamp BIGINT NOT NULL,
191 payload_hash VARCHAR(64),
192 payload LONGTEXT,
193 payload_blob MEDIUMBLOB,
194 audit TEXT,
195 merkle_dirty TINYINT NOT NULL DEFAULT 1,
196 state VARCHAR(32) NOT NULL DEFAULT 'default',
197 access_count BIGINT NOT NULL DEFAULT 0,
198 last_accessed BIGINT NOT NULL DEFAULT 0,
199 INDEX idx_timestamp (timestamp),
200 INDEX idx_merkle_dirty (merkle_dirty),
201 INDEX idx_state (state)
202 )
203 "#
204 };
205
206 retry("sql_init_schema", &RetryConfig::startup(), || async {
207 sqlx::query(sql)
208 .execute(&self.pool)
209 .await
210 .map_err(|e| StorageError::Backend(e.to_string()))
211 })
212 .await?;
213
214 Ok(())
215 }
216
217 pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239 if self.is_sqlite {
241 return Ok(());
242 }
243
244 if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247 return Err(StorageError::Backend(format!(
248 "Invalid table name '{}': only alphanumeric and underscore allowed",
249 table_name
250 )));
251 }
252
253 if table_name == crate::schema::DEFAULT_TABLE {
255 return Ok(());
256 }
257
258 let sql = format!(
260 r#"
261 CREATE TABLE IF NOT EXISTS {} (
262 id VARCHAR(255) PRIMARY KEY,
263 version BIGINT NOT NULL DEFAULT 1,
264 timestamp BIGINT NOT NULL,
265 payload_hash VARCHAR(64),
266 payload LONGTEXT,
267 payload_blob MEDIUMBLOB,
268 audit TEXT,
269 merkle_dirty TINYINT NOT NULL DEFAULT 1,
270 state VARCHAR(32) NOT NULL DEFAULT 'default',
271 access_count BIGINT NOT NULL DEFAULT 0,
272 last_accessed BIGINT NOT NULL DEFAULT 0,
273 INDEX idx_{}_timestamp (timestamp),
274 INDEX idx_{}_merkle_dirty (merkle_dirty),
275 INDEX idx_{}_state (state)
276 )
277 "#,
278 table_name, table_name, table_name, table_name
279 );
280
281 retry("sql_ensure_table", &RetryConfig::startup(), || async {
282 sqlx::query(&sql)
283 .execute(&self.pool)
284 .await
285 .map_err(|e| StorageError::Backend(e.to_string()))
286 })
287 .await?;
288
289 tracing::info!(table = %table_name, "Created schema table");
290
291 Ok(())
292 }
293
294 pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296 if self.is_sqlite {
297 let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298 let result = sqlx::query(sql)
299 .bind(table_name)
300 .fetch_optional(&self.pool)
301 .await
302 .map_err(|e| StorageError::Backend(e.to_string()))?;
303 Ok(result.is_some())
304 } else {
305 let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306 let result = sqlx::query(sql)
307 .bind(table_name)
308 .fetch_optional(&self.pool)
309 .await
310 .map_err(|e| StorageError::Backend(e.to_string()))?;
311 Ok(result.is_some())
312 }
313 }
314
315 #[must_use]
317 pub fn is_sqlite(&self) -> bool {
318 self.is_sqlite
319 }
320
321 fn build_audit_json(item: &SyncItem) -> Option<String> {
323 let mut audit = serde_json::Map::new();
324
325 if let Some(ref batch_id) = item.batch_id {
326 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327 }
328 if let Some(ref trace_parent) = item.trace_parent {
329 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330 }
331 if let Some(ref home) = item.home_instance_id {
332 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333 }
334
335 if audit.is_empty() {
336 None
337 } else {
338 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339 }
340 }
341
342 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344 match audit_str {
345 Some(s) => {
346 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350 (batch_id, trace_parent, home_instance_id)
351 } else {
352 (None, None, None)
353 }
354 }
355 None => (None, None, None),
356 }
357 }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363 let id = id.to_string();
364 let table = self.registry.table_for_key(&id);
365
366 retry("sql_get", &RetryConfig::query(), || async {
367 let sql = format!(
368 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369 table
370 );
371 let result = sqlx::query(&sql)
372 .bind(&id)
373 .fetch_optional(&self.pool)
374 .await
375 .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377 match result {
378 Some(row) => {
379 let version: i64 = row.try_get("version").unwrap_or(1);
380 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382
383 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385 .or_else(|| {
386 row.try_get::<Vec<u8>, _>("payload").ok()
387 .and_then(|bytes| String::from_utf8(bytes).ok())
388 });
389
390 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391
392 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394 .or_else(|| {
395 row.try_get::<Vec<u8>, _>("audit").ok()
396 .and_then(|bytes| String::from_utf8(bytes).ok())
397 });
398
399 let state: String = row.try_get::<String, _>("state").ok()
401 .or_else(|| {
402 row.try_get::<Vec<u8>, _>("state").ok()
403 .and_then(|bytes| String::from_utf8(bytes).ok())
404 })
405 .unwrap_or_else(|| "default".to_string());
406
407 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410
411 let (content, content_type) = if let Some(ref json_str) = payload_json {
413 let content = json_str.as_bytes().to_vec();
415 (content, ContentType::Json)
416 } else if let Some(blob) = payload_blob {
417 (blob, ContentType::Binary)
419 } else {
420 return Err(StorageError::Backend("No payload in row".to_string()));
421 };
422
423 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425
426 let item = SyncItem::reconstruct(
427 id.clone(),
428 version as u64,
429 timestamp,
430 content_type,
431 content,
432 batch_id,
433 trace_parent,
434 payload_hash.unwrap_or_default(),
435 home_instance_id,
436 state,
437 access_count as u64,
438 last_accessed as u64,
439 );
440 Ok(Some(item))
441 }
442 None => Ok(None),
443 }
444 })
445 .await
446 }
447
448 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449 let id = item.object_id.clone();
450 let table = self.registry.table_for_key(&id);
451 let version = item.version as i64;
452 let timestamp = item.updated_at;
453 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454 let audit_json = Self::build_audit_json(item);
455 let state = item.state.clone();
456
457 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459 ContentType::Json => {
460 let json_str = String::from_utf8_lossy(&item.content).to_string();
461 (Some(json_str), None)
462 }
463 ContentType::Binary => {
464 (None, Some(item.content.clone()))
465 }
466 };
467
468 let sql = if self.is_sqlite {
469 format!(
471 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
472 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
473 ON CONFLICT(id) DO UPDATE SET
474 version = excluded.version,
475 timestamp = excluded.timestamp,
476 payload_hash = excluded.payload_hash,
477 payload = excluded.payload,
478 payload_blob = excluded.payload_blob,
479 audit = excluded.audit,
480 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END,
481 state = excluded.state,
482 access_count = excluded.access_count,
483 last_accessed = excluded.last_accessed",
484 table, table, table
485 )
486 } else {
487 format!(
489 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
490 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
491 ON DUPLICATE KEY UPDATE
492 version = VALUES(version),
493 timestamp = VALUES(timestamp),
494 payload_hash = VALUES(payload_hash),
495 payload = VALUES(payload),
496 payload_blob = VALUES(payload_blob),
497 audit = VALUES(audit),
498 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END,
499 state = VALUES(state),
500 access_count = VALUES(access_count),
501 last_accessed = VALUES(last_accessed)",
502 table
503 )
504 };
505
506 retry("sql_put", &RetryConfig::query(), || async {
507 sqlx::query(&sql)
508 .bind(&id)
509 .bind(version)
510 .bind(timestamp)
511 .bind(&payload_hash)
512 .bind(&payload_json)
513 .bind(&payload_blob)
514 .bind(&audit_json)
515 .bind(&state)
516 .bind(item.access_count as i64)
517 .bind(item.last_accessed as i64)
518 .execute(&self.pool)
519 .await
520 .map_err(|e| StorageError::Backend(e.to_string()))?;
521 Ok(())
522 })
523 .await
524 }
525
526 async fn delete(&self, id: &str) -> Result<(), StorageError> {
527 let id = id.to_string();
528 let table = self.registry.table_for_key(&id);
529 retry("sql_delete", &RetryConfig::query(), || async {
530 let sql = format!("DELETE FROM {} WHERE id = ?", table);
531 sqlx::query(&sql)
532 .bind(&id)
533 .execute(&self.pool)
534 .await
535 .map_err(|e| StorageError::Backend(e.to_string()))?;
536 Ok(())
537 })
538 .await
539 }
540
541 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542 let id = id.to_string();
543 let table = self.registry.table_for_key(&id);
544 retry("sql_exists", &RetryConfig::query(), || async {
545 let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546 let result = sqlx::query(&sql)
547 .bind(&id)
548 .fetch_optional(&self.pool)
549 .await
550 .map_err(|e| StorageError::Backend(e.to_string()))?;
551 Ok(result.is_some())
552 })
553 .await
554 }
555
556 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561 if items.is_empty() {
562 return Ok(BatchWriteResult {
563 batch_id: String::new(),
564 written: 0,
565 verified: true,
566 });
567 }
568
569 let batch_id = uuid::Uuid::new_v4().to_string();
571
572 for item in items.iter_mut() {
574 item.batch_id = Some(batch_id.clone());
575 }
576
577 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580 let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582 for item in items.iter() {
583 let table = self.registry.table_for_key(&item.object_id);
584 by_table.entry(table).or_default().push(item);
585 }
586
587 const CHUNK_SIZE: usize = 500;
589 let mut total_written = 0usize;
590
591 for (table, table_items) in by_table {
593 for chunk in table_items.chunks(CHUNK_SIZE) {
594 let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595 total_written += written;
596 }
597 }
598
599 let verified_count = self.verify_batch_ids(&item_ids).await?;
601 let verified = verified_count == items.len();
602
603 if !verified {
604 tracing::warn!(
605 batch_id = %batch_id,
606 expected = items.len(),
607 actual = verified_count,
608 "Batch verification mismatch"
609 );
610 }
611
612 Ok(BatchWriteResult {
613 batch_id,
614 written: total_written,
615 verified,
616 })
617 }
618
619 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621 .bind(limit as i64)
622 .bind(offset as i64)
623 .fetch_all(&self.pool)
624 .await
625 .map_err(|e| StorageError::Backend(e.to_string()))?;
626
627 let mut keys = Vec::with_capacity(rows.len());
628 for row in rows {
629 let id: String = row.try_get("id")
630 .map_err(|e| StorageError::Backend(e.to_string()))?;
631 keys.push(id);
632 }
633
634 Ok(keys)
635 }
636
637 async fn count_all(&self) -> Result<u64, StorageError> {
638 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639 .fetch_one(&self.pool)
640 .await
641 .map_err(|e| StorageError::Backend(e.to_string()))?;
642
643 let count: i64 = result.try_get("cnt")
644 .map_err(|e| StorageError::Backend(e.to_string()))?;
645
646 Ok(count as u64)
647 }
648
649 async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650 use sqlx::Row;
651
652 let sql = format!(
653 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654 FROM sync_items WHERE {} LIMIT {}",
655 where_clause, limit
656 );
657
658 let mut query = sqlx::query(&sql);
659 for param in params {
660 query = match param {
661 SqlParam::Text(s) => query.bind(s.clone()),
662 SqlParam::Numeric(f) => query.bind(*f),
663 SqlParam::Boolean(b) => query.bind(*b),
664 };
665 }
666
667 let rows = query.fetch_all(&self.pool)
668 .await
669 .map_err(|e| StorageError::Backend(e.to_string()))?;
670
671 let mut items = Vec::with_capacity(rows.len());
672 for row in rows {
673 let id: String = row.try_get("id")
674 .map_err(|e| StorageError::Backend(e.to_string()))?;
675
676 if let Some(item) = self.get(&id).await? {
679 items.push(item);
680 }
681 }
682
683 Ok(items)
684 }
685
686 async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687 self.count_where_in_table(crate::schema::DEFAULT_TABLE, where_clause, params).await
688 }
689}
690
691impl SqlStore {
692 pub async fn search_in_table(
694 &self,
695 table: &str,
696 where_clause: &str,
697 params: &[SqlParam],
698 limit: usize,
699 ) -> Result<Vec<SyncItem>, StorageError> {
700 use sqlx::Row;
701
702 if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
704 return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
705 }
706
707 let sql = format!(
708 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
709 FROM {} WHERE {} LIMIT {}",
710 table, where_clause, limit
711 );
712
713 let mut query = sqlx::query(&sql);
714 for param in params {
715 query = match param {
716 SqlParam::Text(s) => query.bind(s.clone()),
717 SqlParam::Numeric(f) => query.bind(*f),
718 SqlParam::Boolean(b) => query.bind(*b),
719 };
720 }
721
722 let rows = query.fetch_all(&self.pool)
723 .await
724 .map_err(|e| StorageError::Backend(e.to_string()))?;
725
726 let mut items = Vec::with_capacity(rows.len());
727 for row in rows {
728 let id: String = row.try_get("id")
729 .map_err(|e| StorageError::Backend(e.to_string()))?;
730
731 if let Some(item) = self.get(&id).await? {
733 items.push(item);
734 }
735 }
736
737 Ok(items)
738 }
739
740 pub async fn count_where_in_table(
742 &self,
743 table: &str,
744 where_clause: &str,
745 params: &[SqlParam],
746 ) -> Result<u64, StorageError> {
747 use sqlx::Row;
748
749 if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
751 return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
752 }
753
754 let sql = format!("SELECT COUNT(*) as cnt FROM {} WHERE {}", table, where_clause);
755
756 let mut query = sqlx::query(&sql);
757 for param in params {
758 query = match param {
759 SqlParam::Text(s) => query.bind(s.clone()),
760 SqlParam::Numeric(f) => query.bind(*f),
761 SqlParam::Boolean(b) => query.bind(*b),
762 };
763 }
764
765 let result = query.fetch_one(&self.pool)
766 .await
767 .map_err(|e| StorageError::Backend(e.to_string()))?;
768
769 let count: i64 = result.try_get("cnt")
770 .map_err(|e| StorageError::Backend(e.to_string()))?;
771
772 Ok(count as u64)
773 }
774
775 async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
778 let placeholders: Vec<String> = (0..chunk.len())
779 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
780 .collect();
781
782 let sql = if self.is_sqlite {
783 format!(
784 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
785 ON CONFLICT(id) DO UPDATE SET \
786 version = excluded.version, \
787 timestamp = excluded.timestamp, \
788 payload_hash = excluded.payload_hash, \
789 payload = excluded.payload, \
790 payload_blob = excluded.payload_blob, \
791 audit = excluded.audit, \
792 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
793 state = excluded.state, \
794 access_count = excluded.access_count, \
795 last_accessed = excluded.last_accessed",
796 table, placeholders.join(", "), table, table
797 )
798 } else {
799 format!(
800 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
801 ON DUPLICATE KEY UPDATE \
802 version = VALUES(version), \
803 timestamp = VALUES(timestamp), \
804 payload_hash = VALUES(payload_hash), \
805 payload = VALUES(payload), \
806 payload_blob = VALUES(payload_blob), \
807 audit = VALUES(audit), \
808 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
809 state = VALUES(state), \
810 access_count = VALUES(access_count), \
811 last_accessed = VALUES(last_accessed)",
812 table, placeholders.join(", ")
813 )
814 };
815
816 #[derive(Clone)]
818 struct PreparedRow {
819 id: String,
820 version: i64,
821 timestamp: i64,
822 payload_hash: Option<String>,
823 payload_json: Option<String>,
824 payload_blob: Option<Vec<u8>>,
825 audit_json: Option<String>,
826 state: String,
827 access_count: i64,
828 last_accessed: i64,
829 }
830
831 let prepared: Vec<PreparedRow> = chunk.iter()
832 .map(|item| {
833 let (payload_json, payload_blob) = match item.content_type {
834 ContentType::Json => {
835 let json_str = String::from_utf8_lossy(&item.content).to_string();
836 (Some(json_str), None)
837 }
838 ContentType::Binary => {
839 (None, Some(item.content.clone()))
840 }
841 };
842
843 PreparedRow {
844 id: item.object_id.clone(),
845 version: item.version as i64,
846 timestamp: item.updated_at,
847 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
848 payload_json,
849 payload_blob,
850 audit_json: Self::build_audit_json(item),
851 state: item.state.clone(),
852 access_count: item.access_count as i64,
853 last_accessed: item.last_accessed as i64,
854 }
855 })
856 .collect();
857
858 retry("sql_put_batch", &RetryConfig::batch_write(), || {
859 let sql = sql.clone();
860 let prepared = prepared.clone();
861 async move {
862 let mut query = sqlx::query(&sql);
863
864 for row in &prepared {
865 query = query
866 .bind(&row.id)
867 .bind(row.version)
868 .bind(row.timestamp)
869 .bind(&row.payload_hash)
870 .bind(&row.payload_json)
871 .bind(&row.payload_blob)
872 .bind(&row.audit_json)
873 .bind(&row.state)
874 .bind(row.access_count)
875 .bind(row.last_accessed);
876 }
877
878 query.execute(&self.pool)
879 .await
880 .map_err(|e| StorageError::Backend(e.to_string()))?;
881
882 Ok(())
883 }
884 })
885 .await?;
886
887 Ok(chunk.len())
888 }
889
890 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
894 if ids.is_empty() {
895 return Ok(0);
896 }
897
898 let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
900 for id in ids {
901 let table = self.registry.table_for_key(id);
902 by_table.entry(table).or_default().push(id);
903 }
904
905 const CHUNK_SIZE: usize = 500;
907 let mut total_found = 0usize;
908
909 for (table, table_ids) in by_table {
910 for chunk in table_ids.chunks(CHUNK_SIZE) {
911 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
912 let sql = format!(
913 "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
914 table, placeholders.join(", ")
915 );
916
917 let mut query = sqlx::query(&sql);
918 for id in chunk {
919 query = query.bind(*id);
920 }
921
922 let result = query
923 .fetch_one(&self.pool)
924 .await
925 .map_err(|e| StorageError::Backend(e.to_string()))?;
926
927 let count: i64 = result
928 .try_get("cnt")
929 .map_err(|e| StorageError::Backend(e.to_string()))?;
930 total_found += count as usize;
931 }
932 }
933
934 Ok(total_found)
935 }
936
937 #[allow(dead_code)]
939 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
940 let batch_id = batch_id.to_string();
941
942 let sql = if self.is_sqlite {
944 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
945 } else {
946 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
947 };
948
949 let bind_value = if self.is_sqlite {
950 format!("%\"batch\":\"{}%", batch_id)
951 } else {
952 batch_id.clone()
953 };
954
955 let result = sqlx::query(sql)
956 .bind(&bind_value)
957 .fetch_one(&self.pool)
958 .await
959 .map_err(|e| StorageError::Backend(e.to_string()))?;
960
961 let count: i64 = result.try_get("cnt")
962 .map_err(|e| StorageError::Backend(e.to_string()))?;
963
964 Ok(count as usize)
965 }
966
967 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
969 let rows = sqlx::query(
970 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
971 )
972 .bind(limit as i64)
973 .fetch_all(&self.pool)
974 .await
975 .map_err(|e| StorageError::Backend(e.to_string()))?;
976
977 let mut items = Vec::with_capacity(rows.len());
978 for row in rows {
979 let id: String = row.try_get("id")
980 .map_err(|e| StorageError::Backend(e.to_string()))?;
981 let version: i64 = row.try_get("version").unwrap_or(1);
982 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
983 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
984
985 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
987 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
988 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
989 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
990 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
991
992 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
993 let state: String = state_bytes
994 .and_then(|bytes| String::from_utf8(bytes).ok())
995 .unwrap_or_else(|| "default".to_string());
996
997 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
999 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1000
1001 let (content, content_type) = if let Some(ref json_str) = payload_json {
1002 (json_str.as_bytes().to_vec(), ContentType::Json)
1003 } else if let Some(blob) = payload_blob {
1004 (blob, ContentType::Binary)
1005 } else {
1006 continue; };
1008
1009 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1010
1011 let item = SyncItem::reconstruct(
1012 id,
1013 version as u64,
1014 timestamp,
1015 content_type,
1016 content,
1017 batch_id,
1018 trace_parent,
1019 payload_hash.unwrap_or_default(),
1020 home_instance_id,
1021 state,
1022 access_count as u64,
1023 last_accessed as u64,
1024 );
1025 items.push(item);
1026 }
1027
1028 Ok(items)
1029 }
1030
1031 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
1033 if ids.is_empty() {
1034 return Ok(0);
1035 }
1036
1037 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1038 let sql = format!(
1039 "DELETE FROM sync_items WHERE id IN ({})",
1040 placeholders.join(", ")
1041 );
1042
1043 retry("sql_delete_batch", &RetryConfig::query(), || {
1044 let sql = sql.clone();
1045 let ids = ids.to_vec();
1046 async move {
1047 let mut query = sqlx::query(&sql);
1048 for id in &ids {
1049 query = query.bind(id);
1050 }
1051
1052 let result = query.execute(&self.pool)
1053 .await
1054 .map_err(|e| StorageError::Backend(e.to_string()))?;
1055
1056 Ok(result.rows_affected() as usize)
1057 }
1058 })
1059 .await
1060 }
1061
1062 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1070 let rows = sqlx::query(
1071 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1072 )
1073 .bind(limit as i64)
1074 .fetch_all(&self.pool)
1075 .await
1076 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1077
1078 let mut ids = Vec::with_capacity(rows.len());
1079 for row in rows {
1080 let id: String = row.try_get("id")
1081 .map_err(|e| StorageError::Backend(e.to_string()))?;
1082 ids.push(id);
1083 }
1084
1085 Ok(ids)
1086 }
1087
1088 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1090 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1091 .fetch_one(&self.pool)
1092 .await
1093 .map_err(|e| StorageError::Backend(e.to_string()))?;
1094
1095 let count: i64 = result.try_get("cnt")
1096 .map_err(|e| StorageError::Backend(e.to_string()))?;
1097
1098 Ok(count as u64)
1099 }
1100
1101 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1105 if ids.is_empty() {
1106 return Ok(0);
1107 }
1108
1109 let mut ids_by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
1111 for id in ids {
1112 let table = self.registry.table_for_key(id);
1113 ids_by_table.entry(table).or_default().push(id);
1114 }
1115
1116 let mut total_updated = 0usize;
1117
1118 for (table, table_ids) in ids_by_table {
1119 let placeholders: Vec<&str> = table_ids.iter().map(|_| "?").collect();
1120 let sql = format!(
1121 "UPDATE {} SET merkle_dirty = 0 WHERE id IN ({})",
1122 table,
1123 placeholders.join(", ")
1124 );
1125
1126 let mut query = sqlx::query(&sql);
1127 for id in &table_ids {
1128 query = query.bind(*id);
1129 }
1130
1131 match query.execute(&self.pool).await {
1132 Ok(result) => {
1133 total_updated += result.rows_affected() as usize;
1134 }
1135 Err(e) => {
1136 tracing::warn!(table = %table, error = %e, "Failed to mark merkle clean in table");
1137 }
1138 }
1139 }
1140
1141 Ok(total_updated)
1142 }
1143
1144 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1146 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1147 .fetch_optional(&self.pool)
1148 .await
1149 .map_err(|e| StorageError::Backend(e.to_string()))?;
1150
1151 Ok(result.is_some())
1152 }
1153
1154 pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1162 let pattern = format!("{}%", prefix);
1163 let result = sqlx::query(
1164 "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1165 )
1166 .bind(&pattern)
1167 .fetch_one(&self.pool)
1168 .await
1169 .map_err(|e| StorageError::Backend(e.to_string()))?;
1170
1171 let count: i64 = result.try_get("cnt")
1172 .map_err(|e| StorageError::Backend(e.to_string()))?;
1173
1174 Ok(count as u64)
1175 }
1176
1177 pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1182 let sql = if self.is_sqlite {
1184 "SELECT DISTINCT CASE
1186 WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1187 ELSE id
1188 END as prefix FROM sync_items WHERE merkle_dirty = 1"
1189 } else {
1190 "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1192 };
1193
1194 let rows = sqlx::query(sql)
1195 .fetch_all(&self.pool)
1196 .await
1197 .map_err(|e| StorageError::Backend(e.to_string()))?;
1198
1199 let mut prefixes = Vec::with_capacity(rows.len());
1200 for row in rows {
1201 let prefix: String = row.try_get("prefix")
1202 .map_err(|e| StorageError::Backend(e.to_string()))?;
1203 prefixes.push(prefix);
1204 }
1205
1206 Ok(prefixes)
1207 }
1208
1209 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1215 let mut tables_to_query: Vec<String> = self.registry.tables();
1217 if !tables_to_query.contains(&DEFAULT_TABLE.to_string()) {
1218 tables_to_query.push(DEFAULT_TABLE.to_string());
1219 }
1220
1221 let mut all_items = Vec::new();
1222 let per_table_limit = (limit / tables_to_query.len().max(1)).max(100);
1223
1224 for table in &tables_to_query {
1225 let sql = format!(
1226 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1227 FROM {} WHERE merkle_dirty = 1 LIMIT ?",
1228 table
1229 );
1230
1231 let rows = match sqlx::query(&sql)
1232 .bind(per_table_limit as i64)
1233 .fetch_all(&self.pool)
1234 .await
1235 {
1236 Ok(r) => r,
1237 Err(e) => {
1238 tracing::debug!(table = %table, error = %e, "Skipping table for dirty merkle query");
1240 continue;
1241 }
1242 };
1243
1244 for row in rows {
1245 if let Some(item) = self.row_to_sync_item(&row) {
1246 all_items.push(item);
1247 }
1248 }
1249
1250 if all_items.len() >= limit {
1252 break;
1253 }
1254 }
1255
1256 all_items.truncate(limit);
1258 Ok(all_items)
1259 }
1260
1261 fn row_to_sync_item(&self, row: &sqlx::any::AnyRow) -> Option<SyncItem> {
1263 let id: String = row.try_get("id").ok()?;
1264 let version: i64 = row.try_get("version").unwrap_or(1);
1265 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1266 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1267
1268 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1270 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1271 String::from_utf8(bytes).ok()
1272 });
1273
1274 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1275 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1276 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1277 String::from_utf8(bytes).ok()
1278 });
1279
1280 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1282 let state: String = state_bytes
1283 .and_then(|bytes| String::from_utf8(bytes).ok())
1284 .unwrap_or_else(|| "default".to_string());
1285
1286 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1288 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1289
1290 let (content, content_type) = if let Some(ref json_str) = payload_json {
1292 (json_str.as_bytes().to_vec(), ContentType::Json)
1293 } else if let Some(blob) = payload_blob {
1294 (blob, ContentType::Binary)
1295 } else {
1296 return None; };
1298
1299 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1301
1302 Some(SyncItem::reconstruct(
1303 id,
1304 version as u64,
1305 timestamp,
1306 content_type,
1307 content,
1308 batch_id,
1309 trace_parent,
1310 payload_hash.unwrap_or_default(),
1311 home_instance_id,
1312 state,
1313 access_count as u64,
1314 last_accessed as u64,
1315 ))
1316 }
1317
1318 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1326 let rows = sqlx::query(
1327 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1328 FROM sync_items WHERE state = ? LIMIT ?"
1329 )
1330 .bind(state)
1331 .bind(limit as i64)
1332 .fetch_all(&self.pool)
1333 .await
1334 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1335
1336 let mut items = Vec::with_capacity(rows.len());
1337 for row in rows {
1338 let id: String = row.try_get("id")
1339 .map_err(|e| StorageError::Backend(e.to_string()))?;
1340 let version: i64 = row.try_get("version").unwrap_or(1);
1341 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1342 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1343
1344 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1346 .or_else(|| {
1347 row.try_get::<Vec<u8>, _>("payload").ok()
1348 .and_then(|bytes| String::from_utf8(bytes).ok())
1349 });
1350
1351 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1352
1353 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1355 .or_else(|| {
1356 row.try_get::<Vec<u8>, _>("audit").ok()
1357 .and_then(|bytes| String::from_utf8(bytes).ok())
1358 });
1359
1360 let state: String = row.try_get::<String, _>("state").ok()
1362 .or_else(|| {
1363 row.try_get::<Vec<u8>, _>("state").ok()
1364 .and_then(|bytes| String::from_utf8(bytes).ok())
1365 })
1366 .unwrap_or_else(|| "default".to_string());
1367
1368 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1370 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1371
1372 let (content, content_type) = if let Some(ref json_str) = payload_json {
1373 (json_str.as_bytes().to_vec(), ContentType::Json)
1374 } else if let Some(blob) = payload_blob {
1375 (blob, ContentType::Binary)
1376 } else {
1377 continue;
1378 };
1379
1380 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1381
1382 let item = SyncItem::reconstruct(
1383 id,
1384 version as u64,
1385 timestamp,
1386 content_type,
1387 content,
1388 batch_id,
1389 trace_parent,
1390 payload_hash.unwrap_or_default(),
1391 home_instance_id,
1392 state,
1393 access_count as u64,
1394 last_accessed as u64,
1395 );
1396 items.push(item);
1397 }
1398
1399 Ok(items)
1400 }
1401
1402 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1404 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1405 .bind(state)
1406 .fetch_one(&self.pool)
1407 .await
1408 .map_err(|e| StorageError::Backend(e.to_string()))?;
1409
1410 let count: i64 = result.try_get("cnt")
1411 .map_err(|e| StorageError::Backend(e.to_string()))?;
1412
1413 Ok(count as u64)
1414 }
1415
1416 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1418 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1419 .bind(state)
1420 .bind(limit as i64)
1421 .fetch_all(&self.pool)
1422 .await
1423 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1424
1425 let mut ids = Vec::with_capacity(rows.len());
1426 for row in rows {
1427 let id: String = row.try_get("id")
1428 .map_err(|e| StorageError::Backend(e.to_string()))?;
1429 ids.push(id);
1430 }
1431
1432 Ok(ids)
1433 }
1434
1435 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1437 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1438 .bind(new_state)
1439 .bind(id)
1440 .execute(&self.pool)
1441 .await
1442 .map_err(|e| StorageError::Backend(e.to_string()))?;
1443
1444 Ok(result.rows_affected() > 0)
1445 }
1446
1447 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1451 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1452 .bind(state)
1453 .execute(&self.pool)
1454 .await
1455 .map_err(|e| StorageError::Backend(e.to_string()))?;
1456
1457 Ok(result.rows_affected())
1458 }
1459
1460 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1471 let pattern = format!("{}%", prefix);
1473
1474 let rows = sqlx::query(
1475 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1476 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1477 )
1478 .bind(&pattern)
1479 .bind(limit as i64)
1480 .fetch_all(&self.pool)
1481 .await
1482 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1483
1484 let mut items = Vec::with_capacity(rows.len());
1485 for row in rows {
1486 let id: String = row.try_get("id")
1487 .map_err(|e| StorageError::Backend(e.to_string()))?;
1488 let version: i64 = row.try_get("version").unwrap_or(1);
1489 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1490 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1491
1492 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1494 .or_else(|| {
1495 row.try_get::<Vec<u8>, _>("payload").ok()
1496 .and_then(|bytes| String::from_utf8(bytes).ok())
1497 });
1498
1499 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1500
1501 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1503 .or_else(|| {
1504 row.try_get::<Vec<u8>, _>("audit").ok()
1505 .and_then(|bytes| String::from_utf8(bytes).ok())
1506 });
1507
1508 let state: String = row.try_get::<String, _>("state").ok()
1510 .or_else(|| {
1511 row.try_get::<Vec<u8>, _>("state").ok()
1512 .and_then(|bytes| String::from_utf8(bytes).ok())
1513 })
1514 .unwrap_or_else(|| "default".to_string());
1515
1516 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1518 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1519
1520 let (content, content_type) = if let Some(ref json_str) = payload_json {
1521 (json_str.as_bytes().to_vec(), ContentType::Json)
1522 } else if let Some(blob) = payload_blob {
1523 (blob, ContentType::Binary)
1524 } else {
1525 continue;
1526 };
1527
1528 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1529
1530 let item = SyncItem::reconstruct(
1531 id,
1532 version as u64,
1533 timestamp,
1534 content_type,
1535 content,
1536 batch_id,
1537 trace_parent,
1538 payload_hash.unwrap_or_default(),
1539 home_instance_id,
1540 state,
1541 access_count as u64,
1542 last_accessed as u64,
1543 );
1544 items.push(item);
1545 }
1546
1547 Ok(items)
1548 }
1549
1550 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1552 let pattern = format!("{}%", prefix);
1553
1554 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1555 .bind(&pattern)
1556 .fetch_one(&self.pool)
1557 .await
1558 .map_err(|e| StorageError::Backend(e.to_string()))?;
1559
1560 let count: i64 = result.try_get("cnt")
1561 .map_err(|e| StorageError::Backend(e.to_string()))?;
1562
1563 Ok(count as u64)
1564 }
1565
1566 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1570 let pattern = format!("{}%", prefix);
1571
1572 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1573 .bind(&pattern)
1574 .execute(&self.pool)
1575 .await
1576 .map_err(|e| StorageError::Backend(e.to_string()))?;
1577
1578 Ok(result.rows_affected())
1579 }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use super::*;
1585 use std::path::PathBuf;
1586 use serde_json::json;
1587
1588 fn temp_db_path(name: &str) -> PathBuf {
1589 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1591 }
1592
1593 fn cleanup_db(path: &PathBuf) {
1595 let _ = std::fs::remove_file(path);
1596 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1597 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1598 }
1599
1600 fn test_item(id: &str, state: &str) -> SyncItem {
1601 SyncItem::from_json(id.to_string(), json!({"id": id}))
1602 .with_state(state)
1603 }
1604
1605 #[tokio::test]
1606 async fn test_state_stored_and_retrieved() {
1607 let db_path = temp_db_path("stored");
1608 cleanup_db(&db_path);
1609
1610 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1611 let store = SqlStore::new(&url).await.unwrap();
1612
1613 let item = test_item("item1", "delta");
1615 store.put(&item).await.unwrap();
1616
1617 let retrieved = store.get("item1").await.unwrap().unwrap();
1619 assert_eq!(retrieved.state, "delta");
1620
1621 cleanup_db(&db_path);
1622 }
1623
1624 #[tokio::test]
1625 async fn test_state_default_value() {
1626 let db_path = temp_db_path("default");
1627 cleanup_db(&db_path);
1628
1629 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1630 let store = SqlStore::new(&url).await.unwrap();
1631
1632 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1634 store.put(&item).await.unwrap();
1635
1636 let retrieved = store.get("item1").await.unwrap().unwrap();
1637 assert_eq!(retrieved.state, "default");
1638
1639 cleanup_db(&db_path);
1640 }
1641
1642 #[tokio::test]
1643 async fn test_get_by_state() {
1644 let db_path = temp_db_path("get_by_state");
1645 cleanup_db(&db_path);
1646
1647 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1648 let store = SqlStore::new(&url).await.unwrap();
1649
1650 store.put(&test_item("delta1", "delta")).await.unwrap();
1652 store.put(&test_item("delta2", "delta")).await.unwrap();
1653 store.put(&test_item("base1", "base")).await.unwrap();
1654 store.put(&test_item("pending1", "pending")).await.unwrap();
1655
1656 let deltas = store.get_by_state("delta", 100).await.unwrap();
1658 assert_eq!(deltas.len(), 2);
1659 assert!(deltas.iter().all(|i| i.state == "delta"));
1660
1661 let bases = store.get_by_state("base", 100).await.unwrap();
1662 assert_eq!(bases.len(), 1);
1663 assert_eq!(bases[0].object_id, "base1");
1664
1665 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1667 assert!(none.is_empty());
1668
1669 cleanup_db(&db_path);
1670 }
1671
1672 #[tokio::test]
1673 async fn test_get_by_state_with_limit() {
1674 let db_path = temp_db_path("get_by_state_limit");
1675 cleanup_db(&db_path);
1676
1677 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1678 let store = SqlStore::new(&url).await.unwrap();
1679
1680 for i in 0..10 {
1682 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1683 }
1684
1685 let limited = store.get_by_state("batch", 5).await.unwrap();
1687 assert_eq!(limited.len(), 5);
1688
1689 cleanup_db(&db_path);
1690 }
1691
1692 #[tokio::test]
1693 async fn test_count_by_state() {
1694 let db_path = temp_db_path("count_by_state");
1695 cleanup_db(&db_path);
1696
1697 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1698 let store = SqlStore::new(&url).await.unwrap();
1699
1700 store.put(&test_item("a1", "alpha")).await.unwrap();
1702 store.put(&test_item("a2", "alpha")).await.unwrap();
1703 store.put(&test_item("a3", "alpha")).await.unwrap();
1704 store.put(&test_item("b1", "beta")).await.unwrap();
1705
1706 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1707 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1708 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1709
1710 cleanup_db(&db_path);
1711 }
1712
1713 #[tokio::test]
1714 async fn test_list_state_ids() {
1715 let db_path = temp_db_path("list_state_ids");
1716 cleanup_db(&db_path);
1717
1718 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1719 let store = SqlStore::new(&url).await.unwrap();
1720
1721 store.put(&test_item("id1", "pending")).await.unwrap();
1722 store.put(&test_item("id2", "pending")).await.unwrap();
1723 store.put(&test_item("id3", "done")).await.unwrap();
1724
1725 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1726 assert_eq!(pending_ids.len(), 2);
1727 assert!(pending_ids.contains(&"id1".to_string()));
1728 assert!(pending_ids.contains(&"id2".to_string()));
1729
1730 cleanup_db(&db_path);
1731 }
1732
1733 #[tokio::test]
1734 async fn test_set_state() {
1735 let db_path = temp_db_path("set_state");
1736 cleanup_db(&db_path);
1737
1738 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1739 let store = SqlStore::new(&url).await.unwrap();
1740
1741 store.put(&test_item("item1", "pending")).await.unwrap();
1742
1743 let before = store.get("item1").await.unwrap().unwrap();
1745 assert_eq!(before.state, "pending");
1746
1747 let updated = store.set_state("item1", "approved").await.unwrap();
1749 assert!(updated);
1750
1751 let after = store.get("item1").await.unwrap().unwrap();
1753 assert_eq!(after.state, "approved");
1754
1755 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1757 assert!(!not_found);
1758
1759 cleanup_db(&db_path);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_delete_by_state() {
1764 let db_path = temp_db_path("delete_by_state");
1765 cleanup_db(&db_path);
1766
1767 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1768 let store = SqlStore::new(&url).await.unwrap();
1769
1770 store.put(&test_item("keep1", "keep")).await.unwrap();
1771 store.put(&test_item("keep2", "keep")).await.unwrap();
1772 store.put(&test_item("del1", "delete_me")).await.unwrap();
1773 store.put(&test_item("del2", "delete_me")).await.unwrap();
1774 store.put(&test_item("del3", "delete_me")).await.unwrap();
1775
1776 let deleted = store.delete_by_state("delete_me").await.unwrap();
1778 assert_eq!(deleted, 3);
1779
1780 assert!(store.get("del1").await.unwrap().is_none());
1782 assert!(store.get("del2").await.unwrap().is_none());
1783
1784 assert!(store.get("keep1").await.unwrap().is_some());
1786 assert!(store.get("keep2").await.unwrap().is_some());
1787
1788 let zero = store.delete_by_state("nonexistent").await.unwrap();
1790 assert_eq!(zero, 0);
1791
1792 cleanup_db(&db_path);
1793 }
1794
1795 #[tokio::test]
1796 async fn test_multiple_puts_preserve_state() {
1797 let db_path = temp_db_path("multi_put_state");
1798 cleanup_db(&db_path);
1799
1800 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1801 let store = SqlStore::new(&url).await.unwrap();
1802
1803 store.put(&test_item("a", "state_a")).await.unwrap();
1805 store.put(&test_item("b", "state_b")).await.unwrap();
1806 store.put(&test_item("c", "state_c")).await.unwrap();
1807
1808 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1809 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1810 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1811
1812 cleanup_db(&db_path);
1813 }
1814
1815 #[tokio::test]
1816 async fn test_scan_prefix() {
1817 let db_path = temp_db_path("scan_prefix");
1818 cleanup_db(&db_path);
1819
1820 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1821 let store = SqlStore::new(&url).await.unwrap();
1822
1823 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1825 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1826 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1827 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1828 store.put(&test_item("base:user.123", "base")).await.unwrap();
1829 store.put(&test_item("base:user.456", "base")).await.unwrap();
1830
1831 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1833 assert_eq!(user123_deltas.len(), 3);
1834 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1835
1836 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1838 assert_eq!(user456_deltas.len(), 1);
1839
1840 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1842 assert_eq!(all_deltas.len(), 4);
1843
1844 let bases = store.scan_prefix("base:", 100).await.unwrap();
1846 assert_eq!(bases.len(), 2);
1847
1848 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1850 assert!(none.is_empty());
1851
1852 cleanup_db(&db_path);
1853 }
1854
1855 #[tokio::test]
1856 async fn test_scan_prefix_with_limit() {
1857 let db_path = temp_db_path("scan_prefix_limit");
1858 cleanup_db(&db_path);
1859
1860 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1861 let store = SqlStore::new(&url).await.unwrap();
1862
1863 for i in 0..20 {
1865 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1866 }
1867
1868 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1870 assert_eq!(limited.len(), 5);
1871
1872 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1874 assert_eq!(all.len(), 20);
1875
1876 cleanup_db(&db_path);
1877 }
1878
1879 #[tokio::test]
1880 async fn test_count_prefix() {
1881 let db_path = temp_db_path("count_prefix");
1882 cleanup_db(&db_path);
1883
1884 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1885 let store = SqlStore::new(&url).await.unwrap();
1886
1887 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1889 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1890 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1891 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1892 store.put(&test_item("base:user.123", "base")).await.unwrap();
1893
1894 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1896 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1897 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1898 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1899 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1900
1901 cleanup_db(&db_path);
1902 }
1903
1904 #[tokio::test]
1905 async fn test_delete_prefix() {
1906 let db_path = temp_db_path("delete_prefix");
1907 cleanup_db(&db_path);
1908
1909 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1910 let store = SqlStore::new(&url).await.unwrap();
1911
1912 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1914 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1915 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1916 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1917 store.put(&test_item("base:user.123", "base")).await.unwrap();
1918
1919 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1921 assert_eq!(deleted, 3);
1922
1923 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1925 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1926
1927 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1929
1930 assert!(store.get("base:user.123").await.unwrap().is_some());
1932
1933 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1935 assert_eq!(zero, 0);
1936
1937 cleanup_db(&db_path);
1938 }
1939}