1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54 INSTALL_DRIVERS.call_once(|| {
55 sqlx::any::install_default_drivers();
56 });
57}
58
59use std::sync::Arc;
60use crate::schema::SchemaRegistry;
61
62pub struct SqlStore {
63 pool: AnyPool,
64 is_sqlite: bool,
65 registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73 Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74 }
75
76 pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81 install_drivers();
82
83 let is_sqlite = connection_string.starts_with("sqlite:");
84
85 let registry = if is_sqlite && !registry.is_bypass() {
87 Arc::new(SchemaRegistry::bypass())
88 } else {
89 registry
90 };
91
92 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93 AnyPoolOptions::new()
94 .max_connections(20)
95 .acquire_timeout(Duration::from_secs(10))
96 .idle_timeout(Duration::from_secs(300))
97 .connect(connection_string)
98 .await
99 .map_err(|e| StorageError::Backend(e.to_string()))
100 })
101 .await?;
102
103 let store = Self { pool, is_sqlite, registry };
104
105 if is_sqlite {
107 store.enable_wal_mode().await?;
108 }
109
110 store.init_schema().await?;
111
112 if !is_sqlite {
115 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116 .execute(&store.pool)
117 .await
118 .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119 }
120
121 Ok(store)
122 }
123
124 pub fn pool(&self) -> AnyPool {
126 self.pool.clone()
127 }
128
129 #[must_use]
131 pub fn registry(&self) -> &Arc<SchemaRegistry> {
132 &self.registry
133 }
134
135 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142 sqlx::query("PRAGMA journal_mode = WAL")
143 .execute(&self.pool)
144 .await
145 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146
147 sqlx::query("PRAGMA synchronous = NORMAL")
150 .execute(&self.pool)
151 .await
152 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153
154 Ok(())
155 }
156
157 async fn init_schema(&self) -> Result<(), StorageError> {
158 let sql = if self.is_sqlite {
168 r#"
169 CREATE TABLE IF NOT EXISTS sync_items (
170 id TEXT PRIMARY KEY,
171 version INTEGER NOT NULL DEFAULT 1,
172 timestamp INTEGER NOT NULL,
173 payload_hash TEXT,
174 payload TEXT,
175 payload_blob BLOB,
176 audit TEXT,
177 merkle_dirty INTEGER NOT NULL DEFAULT 1,
178 state TEXT NOT NULL DEFAULT 'default',
179 access_count INTEGER NOT NULL DEFAULT 0,
180 last_accessed INTEGER NOT NULL DEFAULT 0
181 )
182 "#
183 } else {
184 r#"
187 CREATE TABLE IF NOT EXISTS sync_items (
188 id VARCHAR(255) PRIMARY KEY,
189 version BIGINT NOT NULL DEFAULT 1,
190 timestamp BIGINT NOT NULL,
191 payload_hash VARCHAR(64),
192 payload LONGTEXT,
193 payload_blob MEDIUMBLOB,
194 audit TEXT,
195 merkle_dirty TINYINT NOT NULL DEFAULT 1,
196 state VARCHAR(32) NOT NULL DEFAULT 'default',
197 access_count BIGINT NOT NULL DEFAULT 0,
198 last_accessed BIGINT NOT NULL DEFAULT 0,
199 INDEX idx_timestamp (timestamp),
200 INDEX idx_merkle_dirty (merkle_dirty),
201 INDEX idx_state (state)
202 )
203 "#
204 };
205
206 retry("sql_init_schema", &RetryConfig::startup(), || async {
207 sqlx::query(sql)
208 .execute(&self.pool)
209 .await
210 .map_err(|e| StorageError::Backend(e.to_string()))
211 })
212 .await?;
213
214 Ok(())
215 }
216
217 pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239 if self.is_sqlite {
241 return Ok(());
242 }
243
244 if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247 return Err(StorageError::Backend(format!(
248 "Invalid table name '{}': only alphanumeric and underscore allowed",
249 table_name
250 )));
251 }
252
253 if table_name == crate::schema::DEFAULT_TABLE {
255 return Ok(());
256 }
257
258 let sql = format!(
260 r#"
261 CREATE TABLE IF NOT EXISTS {} (
262 id VARCHAR(255) PRIMARY KEY,
263 version BIGINT NOT NULL DEFAULT 1,
264 timestamp BIGINT NOT NULL,
265 payload_hash VARCHAR(64),
266 payload LONGTEXT,
267 payload_blob MEDIUMBLOB,
268 audit TEXT,
269 merkle_dirty TINYINT NOT NULL DEFAULT 1,
270 state VARCHAR(32) NOT NULL DEFAULT 'default',
271 access_count BIGINT NOT NULL DEFAULT 0,
272 last_accessed BIGINT NOT NULL DEFAULT 0,
273 INDEX idx_{}_timestamp (timestamp),
274 INDEX idx_{}_merkle_dirty (merkle_dirty),
275 INDEX idx_{}_state (state)
276 )
277 "#,
278 table_name, table_name, table_name, table_name
279 );
280
281 retry("sql_ensure_table", &RetryConfig::startup(), || async {
282 sqlx::query(&sql)
283 .execute(&self.pool)
284 .await
285 .map_err(|e| StorageError::Backend(e.to_string()))
286 })
287 .await?;
288
289 tracing::info!(table = %table_name, "Created schema table");
290
291 Ok(())
292 }
293
294 pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296 if self.is_sqlite {
297 let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298 let result = sqlx::query(sql)
299 .bind(table_name)
300 .fetch_optional(&self.pool)
301 .await
302 .map_err(|e| StorageError::Backend(e.to_string()))?;
303 Ok(result.is_some())
304 } else {
305 let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306 let result = sqlx::query(sql)
307 .bind(table_name)
308 .fetch_optional(&self.pool)
309 .await
310 .map_err(|e| StorageError::Backend(e.to_string()))?;
311 Ok(result.is_some())
312 }
313 }
314
315 #[must_use]
317 pub fn is_sqlite(&self) -> bool {
318 self.is_sqlite
319 }
320
321 fn build_audit_json(item: &SyncItem) -> Option<String> {
323 let mut audit = serde_json::Map::new();
324
325 if let Some(ref batch_id) = item.batch_id {
326 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327 }
328 if let Some(ref trace_parent) = item.trace_parent {
329 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330 }
331 if let Some(ref home) = item.home_instance_id {
332 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333 }
334
335 if audit.is_empty() {
336 None
337 } else {
338 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339 }
340 }
341
342 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344 match audit_str {
345 Some(s) => {
346 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350 (batch_id, trace_parent, home_instance_id)
351 } else {
352 (None, None, None)
353 }
354 }
355 None => (None, None, None),
356 }
357 }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363 let id = id.to_string();
364 let table = self.registry.table_for_key(&id);
365
366 retry("sql_get", &RetryConfig::query(), || async {
367 let sql = format!(
368 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369 table
370 );
371 let result = sqlx::query(&sql)
372 .bind(&id)
373 .fetch_optional(&self.pool)
374 .await
375 .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377 match result {
378 Some(row) => {
379 let version: i64 = row.try_get("version").unwrap_or(1);
380 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382
383 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385 .or_else(|| {
386 row.try_get::<Vec<u8>, _>("payload").ok()
387 .and_then(|bytes| String::from_utf8(bytes).ok())
388 });
389
390 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391
392 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394 .or_else(|| {
395 row.try_get::<Vec<u8>, _>("audit").ok()
396 .and_then(|bytes| String::from_utf8(bytes).ok())
397 });
398
399 let state: String = row.try_get::<String, _>("state").ok()
401 .or_else(|| {
402 row.try_get::<Vec<u8>, _>("state").ok()
403 .and_then(|bytes| String::from_utf8(bytes).ok())
404 })
405 .unwrap_or_else(|| "default".to_string());
406
407 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410
411 let (content, content_type) = if let Some(ref json_str) = payload_json {
413 let content = json_str.as_bytes().to_vec();
415 (content, ContentType::Json)
416 } else if let Some(blob) = payload_blob {
417 (blob, ContentType::Binary)
419 } else {
420 return Err(StorageError::Backend("No payload in row".to_string()));
421 };
422
423 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425
426 let item = SyncItem::reconstruct(
427 id.clone(),
428 version as u64,
429 timestamp,
430 content_type,
431 content,
432 batch_id,
433 trace_parent,
434 payload_hash.unwrap_or_default(),
435 home_instance_id,
436 state,
437 access_count as u64,
438 last_accessed as u64,
439 );
440 Ok(Some(item))
441 }
442 None => Ok(None),
443 }
444 })
445 .await
446 }
447
448 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449 let id = item.object_id.clone();
450 let table = self.registry.table_for_key(&id);
451 let version = item.version as i64;
452 let timestamp = item.updated_at;
453 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454 let audit_json = Self::build_audit_json(item);
455 let state = item.state.clone();
456
457 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459 ContentType::Json => {
460 let json_str = String::from_utf8_lossy(&item.content).to_string();
461 (Some(json_str), None)
462 }
463 ContentType::Binary => {
464 (None, Some(item.content.clone()))
465 }
466 };
467
468 let sql = if self.is_sqlite {
469 format!(
471 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
472 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
473 ON CONFLICT(id) DO UPDATE SET
474 version = excluded.version,
475 timestamp = excluded.timestamp,
476 payload_hash = excluded.payload_hash,
477 payload = excluded.payload,
478 payload_blob = excluded.payload_blob,
479 audit = excluded.audit,
480 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END,
481 state = excluded.state,
482 access_count = excluded.access_count,
483 last_accessed = excluded.last_accessed",
484 table, table, table
485 )
486 } else {
487 format!(
489 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
490 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
491 ON DUPLICATE KEY UPDATE
492 version = VALUES(version),
493 timestamp = VALUES(timestamp),
494 payload_hash = VALUES(payload_hash),
495 payload = VALUES(payload),
496 payload_blob = VALUES(payload_blob),
497 audit = VALUES(audit),
498 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END,
499 state = VALUES(state),
500 access_count = VALUES(access_count),
501 last_accessed = VALUES(last_accessed)",
502 table
503 )
504 };
505
506 retry("sql_put", &RetryConfig::query(), || async {
507 sqlx::query(&sql)
508 .bind(&id)
509 .bind(version)
510 .bind(timestamp)
511 .bind(&payload_hash)
512 .bind(&payload_json)
513 .bind(&payload_blob)
514 .bind(&audit_json)
515 .bind(&state)
516 .bind(item.access_count as i64)
517 .bind(item.last_accessed as i64)
518 .execute(&self.pool)
519 .await
520 .map_err(|e| StorageError::Backend(e.to_string()))?;
521 Ok(())
522 })
523 .await
524 }
525
526 async fn delete(&self, id: &str) -> Result<(), StorageError> {
527 let id = id.to_string();
528 let table = self.registry.table_for_key(&id);
529 retry("sql_delete", &RetryConfig::query(), || async {
530 let sql = format!("DELETE FROM {} WHERE id = ?", table);
531 sqlx::query(&sql)
532 .bind(&id)
533 .execute(&self.pool)
534 .await
535 .map_err(|e| StorageError::Backend(e.to_string()))?;
536 Ok(())
537 })
538 .await
539 }
540
541 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542 let id = id.to_string();
543 let table = self.registry.table_for_key(&id);
544 retry("sql_exists", &RetryConfig::query(), || async {
545 let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546 let result = sqlx::query(&sql)
547 .bind(&id)
548 .fetch_optional(&self.pool)
549 .await
550 .map_err(|e| StorageError::Backend(e.to_string()))?;
551 Ok(result.is_some())
552 })
553 .await
554 }
555
556 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561 if items.is_empty() {
562 return Ok(BatchWriteResult {
563 batch_id: String::new(),
564 written: 0,
565 verified: true,
566 });
567 }
568
569 let batch_id = uuid::Uuid::new_v4().to_string();
571
572 for item in items.iter_mut() {
574 item.batch_id = Some(batch_id.clone());
575 }
576
577 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580 let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582 for item in items.iter() {
583 let table = self.registry.table_for_key(&item.object_id);
584 by_table.entry(table).or_default().push(item);
585 }
586
587 const CHUNK_SIZE: usize = 500;
589 let mut total_written = 0usize;
590
591 for (table, table_items) in by_table {
593 for chunk in table_items.chunks(CHUNK_SIZE) {
594 let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595 total_written += written;
596 }
597 }
598
599 let verified_count = self.verify_batch_ids(&item_ids).await?;
601 let verified = verified_count == items.len();
602
603 if !verified {
604 tracing::warn!(
605 batch_id = %batch_id,
606 expected = items.len(),
607 actual = verified_count,
608 "Batch verification mismatch"
609 );
610 }
611
612 Ok(BatchWriteResult {
613 batch_id,
614 written: total_written,
615 verified,
616 })
617 }
618
619 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621 .bind(limit as i64)
622 .bind(offset as i64)
623 .fetch_all(&self.pool)
624 .await
625 .map_err(|e| StorageError::Backend(e.to_string()))?;
626
627 let mut keys = Vec::with_capacity(rows.len());
628 for row in rows {
629 let id: String = row.try_get("id")
630 .map_err(|e| StorageError::Backend(e.to_string()))?;
631 keys.push(id);
632 }
633
634 Ok(keys)
635 }
636
637 async fn count_all(&self) -> Result<u64, StorageError> {
638 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639 .fetch_one(&self.pool)
640 .await
641 .map_err(|e| StorageError::Backend(e.to_string()))?;
642
643 let count: i64 = result.try_get("cnt")
644 .map_err(|e| StorageError::Backend(e.to_string()))?;
645
646 Ok(count as u64)
647 }
648
649 async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650 use sqlx::Row;
651
652 let sql = format!(
653 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654 FROM sync_items WHERE {} LIMIT {}",
655 where_clause, limit
656 );
657
658 let mut query = sqlx::query(&sql);
659 for param in params {
660 query = match param {
661 SqlParam::Text(s) => query.bind(s.clone()),
662 SqlParam::Numeric(f) => query.bind(*f),
663 SqlParam::Boolean(b) => query.bind(*b),
664 };
665 }
666
667 let rows = query.fetch_all(&self.pool)
668 .await
669 .map_err(|e| StorageError::Backend(e.to_string()))?;
670
671 let mut items = Vec::with_capacity(rows.len());
672 for row in rows {
673 let id: String = row.try_get("id")
674 .map_err(|e| StorageError::Backend(e.to_string()))?;
675
676 if let Some(item) = self.get(&id).await? {
679 items.push(item);
680 }
681 }
682
683 Ok(items)
684 }
685
686 async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687 self.count_where_in_table(crate::schema::DEFAULT_TABLE, where_clause, params).await
688 }
689}
690
691impl SqlStore {
692 pub async fn search_in_table(
694 &self,
695 table: &str,
696 where_clause: &str,
697 params: &[SqlParam],
698 limit: usize,
699 ) -> Result<Vec<SyncItem>, StorageError> {
700 use sqlx::Row;
701
702 if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
704 return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
705 }
706
707 let sql = format!(
708 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
709 FROM {} WHERE {} LIMIT {}",
710 table, where_clause, limit
711 );
712
713 let mut query = sqlx::query(&sql);
714 for param in params {
715 query = match param {
716 SqlParam::Text(s) => query.bind(s.clone()),
717 SqlParam::Numeric(f) => query.bind(*f),
718 SqlParam::Boolean(b) => query.bind(*b),
719 };
720 }
721
722 let rows = query.fetch_all(&self.pool)
723 .await
724 .map_err(|e| StorageError::Backend(e.to_string()))?;
725
726 let mut items = Vec::with_capacity(rows.len());
727 for row in rows {
728 let id: String = row.try_get("id")
729 .map_err(|e| StorageError::Backend(e.to_string()))?;
730
731 if let Some(item) = self.get(&id).await? {
733 items.push(item);
734 }
735 }
736
737 Ok(items)
738 }
739
740 pub async fn count_where_in_table(
742 &self,
743 table: &str,
744 where_clause: &str,
745 params: &[SqlParam],
746 ) -> Result<u64, StorageError> {
747 use sqlx::Row;
748
749 if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
751 return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
752 }
753
754 let sql = format!("SELECT COUNT(*) as cnt FROM {} WHERE {}", table, where_clause);
755
756 let mut query = sqlx::query(&sql);
757 for param in params {
758 query = match param {
759 SqlParam::Text(s) => query.bind(s.clone()),
760 SqlParam::Numeric(f) => query.bind(*f),
761 SqlParam::Boolean(b) => query.bind(*b),
762 };
763 }
764
765 let result = query.fetch_one(&self.pool)
766 .await
767 .map_err(|e| StorageError::Backend(e.to_string()))?;
768
769 let count: i64 = result.try_get("cnt")
770 .map_err(|e| StorageError::Backend(e.to_string()))?;
771
772 Ok(count as u64)
773 }
774
775 async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
778 let placeholders: Vec<String> = (0..chunk.len())
779 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
780 .collect();
781
782 let sql = if self.is_sqlite {
783 format!(
784 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
785 ON CONFLICT(id) DO UPDATE SET \
786 version = excluded.version, \
787 timestamp = excluded.timestamp, \
788 payload_hash = excluded.payload_hash, \
789 payload = excluded.payload, \
790 payload_blob = excluded.payload_blob, \
791 audit = excluded.audit, \
792 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
793 state = excluded.state, \
794 access_count = excluded.access_count, \
795 last_accessed = excluded.last_accessed",
796 table, placeholders.join(", "), table, table
797 )
798 } else {
799 format!(
800 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
801 ON DUPLICATE KEY UPDATE \
802 version = VALUES(version), \
803 timestamp = VALUES(timestamp), \
804 payload_hash = VALUES(payload_hash), \
805 payload = VALUES(payload), \
806 payload_blob = VALUES(payload_blob), \
807 audit = VALUES(audit), \
808 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
809 state = VALUES(state), \
810 access_count = VALUES(access_count), \
811 last_accessed = VALUES(last_accessed)",
812 table, placeholders.join(", ")
813 )
814 };
815
816 #[derive(Clone)]
818 struct PreparedRow {
819 id: String,
820 version: i64,
821 timestamp: i64,
822 payload_hash: Option<String>,
823 payload_json: Option<String>,
824 payload_blob: Option<Vec<u8>>,
825 audit_json: Option<String>,
826 state: String,
827 access_count: i64,
828 last_accessed: i64,
829 }
830
831 let prepared: Vec<PreparedRow> = chunk.iter()
832 .map(|item| {
833 let (payload_json, payload_blob) = match item.content_type {
834 ContentType::Json => {
835 let json_str = String::from_utf8_lossy(&item.content).to_string();
836 (Some(json_str), None)
837 }
838 ContentType::Binary => {
839 (None, Some(item.content.clone()))
840 }
841 };
842
843 PreparedRow {
844 id: item.object_id.clone(),
845 version: item.version as i64,
846 timestamp: item.updated_at,
847 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
848 payload_json,
849 payload_blob,
850 audit_json: Self::build_audit_json(item),
851 state: item.state.clone(),
852 access_count: item.access_count as i64,
853 last_accessed: item.last_accessed as i64,
854 }
855 })
856 .collect();
857
858 retry("sql_put_batch", &RetryConfig::batch_write(), || {
859 let sql = sql.clone();
860 let prepared = prepared.clone();
861 async move {
862 let mut query = sqlx::query(&sql);
863
864 for row in &prepared {
865 query = query
866 .bind(&row.id)
867 .bind(row.version)
868 .bind(row.timestamp)
869 .bind(&row.payload_hash)
870 .bind(&row.payload_json)
871 .bind(&row.payload_blob)
872 .bind(&row.audit_json)
873 .bind(&row.state)
874 .bind(row.access_count)
875 .bind(row.last_accessed);
876 }
877
878 query.execute(&self.pool)
879 .await
880 .map_err(|e| StorageError::Backend(e.to_string()))?;
881
882 Ok(())
883 }
884 })
885 .await?;
886
887 Ok(chunk.len())
888 }
889
890 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
894 if ids.is_empty() {
895 return Ok(0);
896 }
897
898 let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
900 for id in ids {
901 let table = self.registry.table_for_key(id);
902 by_table.entry(table).or_default().push(id);
903 }
904
905 const CHUNK_SIZE: usize = 500;
907 let mut total_found = 0usize;
908
909 for (table, table_ids) in by_table {
910 for chunk in table_ids.chunks(CHUNK_SIZE) {
911 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
912 let sql = format!(
913 "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
914 table, placeholders.join(", ")
915 );
916
917 let mut query = sqlx::query(&sql);
918 for id in chunk {
919 query = query.bind(*id);
920 }
921
922 let result = query
923 .fetch_one(&self.pool)
924 .await
925 .map_err(|e| StorageError::Backend(e.to_string()))?;
926
927 let count: i64 = result
928 .try_get("cnt")
929 .map_err(|e| StorageError::Backend(e.to_string()))?;
930 total_found += count as usize;
931 }
932 }
933
934 Ok(total_found)
935 }
936
937 #[allow(dead_code)]
939 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
940 let batch_id = batch_id.to_string();
941
942 let sql = if self.is_sqlite {
944 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
945 } else {
946 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
947 };
948
949 let bind_value = if self.is_sqlite {
950 format!("%\"batch\":\"{}%", batch_id)
951 } else {
952 batch_id.clone()
953 };
954
955 let result = sqlx::query(sql)
956 .bind(&bind_value)
957 .fetch_one(&self.pool)
958 .await
959 .map_err(|e| StorageError::Backend(e.to_string()))?;
960
961 let count: i64 = result.try_get("cnt")
962 .map_err(|e| StorageError::Backend(e.to_string()))?;
963
964 Ok(count as usize)
965 }
966
967 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
969 let rows = sqlx::query(
970 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
971 )
972 .bind(limit as i64)
973 .fetch_all(&self.pool)
974 .await
975 .map_err(|e| StorageError::Backend(e.to_string()))?;
976
977 let mut items = Vec::with_capacity(rows.len());
978 for row in rows {
979 let id: String = row.try_get("id")
980 .map_err(|e| StorageError::Backend(e.to_string()))?;
981 let version: i64 = row.try_get("version").unwrap_or(1);
982 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
983 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
984
985 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
987 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
988 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
989 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
990 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
991
992 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
993 let state: String = state_bytes
994 .and_then(|bytes| String::from_utf8(bytes).ok())
995 .unwrap_or_else(|| "default".to_string());
996
997 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
999 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1000
1001 let (content, content_type) = if let Some(ref json_str) = payload_json {
1002 (json_str.as_bytes().to_vec(), ContentType::Json)
1003 } else if let Some(blob) = payload_blob {
1004 (blob, ContentType::Binary)
1005 } else {
1006 continue; };
1008
1009 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1010
1011 let item = SyncItem::reconstruct(
1012 id,
1013 version as u64,
1014 timestamp,
1015 content_type,
1016 content,
1017 batch_id,
1018 trace_parent,
1019 payload_hash.unwrap_or_default(),
1020 home_instance_id,
1021 state,
1022 access_count as u64,
1023 last_accessed as u64,
1024 );
1025 items.push(item);
1026 }
1027
1028 Ok(items)
1029 }
1030
1031 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
1033 if ids.is_empty() {
1034 return Ok(0);
1035 }
1036
1037 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1038 let sql = format!(
1039 "DELETE FROM sync_items WHERE id IN ({})",
1040 placeholders.join(", ")
1041 );
1042
1043 retry("sql_delete_batch", &RetryConfig::query(), || {
1044 let sql = sql.clone();
1045 let ids = ids.to_vec();
1046 async move {
1047 let mut query = sqlx::query(&sql);
1048 for id in &ids {
1049 query = query.bind(id);
1050 }
1051
1052 let result = query.execute(&self.pool)
1053 .await
1054 .map_err(|e| StorageError::Backend(e.to_string()))?;
1055
1056 Ok(result.rows_affected() as usize)
1057 }
1058 })
1059 .await
1060 }
1061
1062 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1070 let rows = sqlx::query(
1071 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1072 )
1073 .bind(limit as i64)
1074 .fetch_all(&self.pool)
1075 .await
1076 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1077
1078 let mut ids = Vec::with_capacity(rows.len());
1079 for row in rows {
1080 let id: String = row.try_get("id")
1081 .map_err(|e| StorageError::Backend(e.to_string()))?;
1082 ids.push(id);
1083 }
1084
1085 Ok(ids)
1086 }
1087
1088 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1090 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1091 .fetch_one(&self.pool)
1092 .await
1093 .map_err(|e| StorageError::Backend(e.to_string()))?;
1094
1095 let count: i64 = result.try_get("cnt")
1096 .map_err(|e| StorageError::Backend(e.to_string()))?;
1097
1098 Ok(count as u64)
1099 }
1100
1101 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1103 if ids.is_empty() {
1104 return Ok(0);
1105 }
1106
1107 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1108 let sql = format!(
1109 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
1110 placeholders.join(", ")
1111 );
1112
1113 let mut query = sqlx::query(&sql);
1114 for id in ids {
1115 query = query.bind(id);
1116 }
1117
1118 let result = query.execute(&self.pool)
1119 .await
1120 .map_err(|e| StorageError::Backend(e.to_string()))?;
1121
1122 Ok(result.rows_affected() as usize)
1123 }
1124
1125 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1127 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1128 .fetch_optional(&self.pool)
1129 .await
1130 .map_err(|e| StorageError::Backend(e.to_string()))?;
1131
1132 Ok(result.is_some())
1133 }
1134
1135 pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1143 let pattern = format!("{}%", prefix);
1144 let result = sqlx::query(
1145 "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1146 )
1147 .bind(&pattern)
1148 .fetch_one(&self.pool)
1149 .await
1150 .map_err(|e| StorageError::Backend(e.to_string()))?;
1151
1152 let count: i64 = result.try_get("cnt")
1153 .map_err(|e| StorageError::Backend(e.to_string()))?;
1154
1155 Ok(count as u64)
1156 }
1157
1158 pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1163 let sql = if self.is_sqlite {
1165 "SELECT DISTINCT CASE
1167 WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1168 ELSE id
1169 END as prefix FROM sync_items WHERE merkle_dirty = 1"
1170 } else {
1171 "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1173 };
1174
1175 let rows = sqlx::query(sql)
1176 .fetch_all(&self.pool)
1177 .await
1178 .map_err(|e| StorageError::Backend(e.to_string()))?;
1179
1180 let mut prefixes = Vec::with_capacity(rows.len());
1181 for row in rows {
1182 let prefix: String = row.try_get("prefix")
1183 .map_err(|e| StorageError::Backend(e.to_string()))?;
1184 prefixes.push(prefix);
1185 }
1186
1187 Ok(prefixes)
1188 }
1189
1190 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1195 let rows = sqlx::query(
1196 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1197 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1198 )
1199 .bind(limit as i64)
1200 .fetch_all(&self.pool)
1201 .await
1202 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
1203
1204 let mut items = Vec::with_capacity(rows.len());
1205 for row in rows {
1206 let id: String = row.try_get("id")
1207 .map_err(|e| StorageError::Backend(e.to_string()))?;
1208 let version: i64 = row.try_get("version").unwrap_or(1);
1209 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1210 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1211
1212 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1214 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1215 String::from_utf8(bytes).ok()
1216 });
1217
1218 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1219 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1220 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1221 String::from_utf8(bytes).ok()
1222 });
1223
1224 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1226 let state: String = state_bytes
1227 .and_then(|bytes| String::from_utf8(bytes).ok())
1228 .unwrap_or_else(|| "default".to_string());
1229
1230 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1232 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1233
1234 let (content, content_type) = if let Some(ref json_str) = payload_json {
1236 (json_str.as_bytes().to_vec(), ContentType::Json)
1237 } else if let Some(blob) = payload_blob {
1238 (blob, ContentType::Binary)
1239 } else {
1240 continue; };
1242
1243 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1245
1246 let item = SyncItem::reconstruct(
1247 id,
1248 version as u64,
1249 timestamp,
1250 content_type,
1251 content,
1252 batch_id,
1253 trace_parent,
1254 payload_hash.unwrap_or_default(),
1255 home_instance_id,
1256 state,
1257 access_count as u64,
1258 last_accessed as u64,
1259 );
1260 items.push(item);
1261 }
1262
1263 Ok(items)
1264 }
1265
1266 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1274 let rows = sqlx::query(
1275 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1276 FROM sync_items WHERE state = ? LIMIT ?"
1277 )
1278 .bind(state)
1279 .bind(limit as i64)
1280 .fetch_all(&self.pool)
1281 .await
1282 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1283
1284 let mut items = Vec::with_capacity(rows.len());
1285 for row in rows {
1286 let id: String = row.try_get("id")
1287 .map_err(|e| StorageError::Backend(e.to_string()))?;
1288 let version: i64 = row.try_get("version").unwrap_or(1);
1289 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1290 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1291
1292 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1294 .or_else(|| {
1295 row.try_get::<Vec<u8>, _>("payload").ok()
1296 .and_then(|bytes| String::from_utf8(bytes).ok())
1297 });
1298
1299 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1300
1301 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1303 .or_else(|| {
1304 row.try_get::<Vec<u8>, _>("audit").ok()
1305 .and_then(|bytes| String::from_utf8(bytes).ok())
1306 });
1307
1308 let state: String = row.try_get::<String, _>("state").ok()
1310 .or_else(|| {
1311 row.try_get::<Vec<u8>, _>("state").ok()
1312 .and_then(|bytes| String::from_utf8(bytes).ok())
1313 })
1314 .unwrap_or_else(|| "default".to_string());
1315
1316 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1318 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1319
1320 let (content, content_type) = if let Some(ref json_str) = payload_json {
1321 (json_str.as_bytes().to_vec(), ContentType::Json)
1322 } else if let Some(blob) = payload_blob {
1323 (blob, ContentType::Binary)
1324 } else {
1325 continue;
1326 };
1327
1328 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1329
1330 let item = SyncItem::reconstruct(
1331 id,
1332 version as u64,
1333 timestamp,
1334 content_type,
1335 content,
1336 batch_id,
1337 trace_parent,
1338 payload_hash.unwrap_or_default(),
1339 home_instance_id,
1340 state,
1341 access_count as u64,
1342 last_accessed as u64,
1343 );
1344 items.push(item);
1345 }
1346
1347 Ok(items)
1348 }
1349
1350 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1352 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1353 .bind(state)
1354 .fetch_one(&self.pool)
1355 .await
1356 .map_err(|e| StorageError::Backend(e.to_string()))?;
1357
1358 let count: i64 = result.try_get("cnt")
1359 .map_err(|e| StorageError::Backend(e.to_string()))?;
1360
1361 Ok(count as u64)
1362 }
1363
1364 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1366 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1367 .bind(state)
1368 .bind(limit as i64)
1369 .fetch_all(&self.pool)
1370 .await
1371 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1372
1373 let mut ids = Vec::with_capacity(rows.len());
1374 for row in rows {
1375 let id: String = row.try_get("id")
1376 .map_err(|e| StorageError::Backend(e.to_string()))?;
1377 ids.push(id);
1378 }
1379
1380 Ok(ids)
1381 }
1382
1383 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1385 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1386 .bind(new_state)
1387 .bind(id)
1388 .execute(&self.pool)
1389 .await
1390 .map_err(|e| StorageError::Backend(e.to_string()))?;
1391
1392 Ok(result.rows_affected() > 0)
1393 }
1394
1395 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1399 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1400 .bind(state)
1401 .execute(&self.pool)
1402 .await
1403 .map_err(|e| StorageError::Backend(e.to_string()))?;
1404
1405 Ok(result.rows_affected())
1406 }
1407
1408 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1419 let pattern = format!("{}%", prefix);
1421
1422 let rows = sqlx::query(
1423 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1424 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1425 )
1426 .bind(&pattern)
1427 .bind(limit as i64)
1428 .fetch_all(&self.pool)
1429 .await
1430 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1431
1432 let mut items = Vec::with_capacity(rows.len());
1433 for row in rows {
1434 let id: String = row.try_get("id")
1435 .map_err(|e| StorageError::Backend(e.to_string()))?;
1436 let version: i64 = row.try_get("version").unwrap_or(1);
1437 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1438 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1439
1440 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1442 .or_else(|| {
1443 row.try_get::<Vec<u8>, _>("payload").ok()
1444 .and_then(|bytes| String::from_utf8(bytes).ok())
1445 });
1446
1447 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1448
1449 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1451 .or_else(|| {
1452 row.try_get::<Vec<u8>, _>("audit").ok()
1453 .and_then(|bytes| String::from_utf8(bytes).ok())
1454 });
1455
1456 let state: String = row.try_get::<String, _>("state").ok()
1458 .or_else(|| {
1459 row.try_get::<Vec<u8>, _>("state").ok()
1460 .and_then(|bytes| String::from_utf8(bytes).ok())
1461 })
1462 .unwrap_or_else(|| "default".to_string());
1463
1464 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1466 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1467
1468 let (content, content_type) = if let Some(ref json_str) = payload_json {
1469 (json_str.as_bytes().to_vec(), ContentType::Json)
1470 } else if let Some(blob) = payload_blob {
1471 (blob, ContentType::Binary)
1472 } else {
1473 continue;
1474 };
1475
1476 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1477
1478 let item = SyncItem::reconstruct(
1479 id,
1480 version as u64,
1481 timestamp,
1482 content_type,
1483 content,
1484 batch_id,
1485 trace_parent,
1486 payload_hash.unwrap_or_default(),
1487 home_instance_id,
1488 state,
1489 access_count as u64,
1490 last_accessed as u64,
1491 );
1492 items.push(item);
1493 }
1494
1495 Ok(items)
1496 }
1497
1498 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1500 let pattern = format!("{}%", prefix);
1501
1502 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1503 .bind(&pattern)
1504 .fetch_one(&self.pool)
1505 .await
1506 .map_err(|e| StorageError::Backend(e.to_string()))?;
1507
1508 let count: i64 = result.try_get("cnt")
1509 .map_err(|e| StorageError::Backend(e.to_string()))?;
1510
1511 Ok(count as u64)
1512 }
1513
1514 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1518 let pattern = format!("{}%", prefix);
1519
1520 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1521 .bind(&pattern)
1522 .execute(&self.pool)
1523 .await
1524 .map_err(|e| StorageError::Backend(e.to_string()))?;
1525
1526 Ok(result.rows_affected())
1527 }
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532 use super::*;
1533 use std::path::PathBuf;
1534 use serde_json::json;
1535
1536 fn temp_db_path(name: &str) -> PathBuf {
1537 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1539 }
1540
1541 fn cleanup_db(path: &PathBuf) {
1543 let _ = std::fs::remove_file(path);
1544 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1545 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1546 }
1547
1548 fn test_item(id: &str, state: &str) -> SyncItem {
1549 SyncItem::from_json(id.to_string(), json!({"id": id}))
1550 .with_state(state)
1551 }
1552
1553 #[tokio::test]
1554 async fn test_state_stored_and_retrieved() {
1555 let db_path = temp_db_path("stored");
1556 cleanup_db(&db_path);
1557
1558 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1559 let store = SqlStore::new(&url).await.unwrap();
1560
1561 let item = test_item("item1", "delta");
1563 store.put(&item).await.unwrap();
1564
1565 let retrieved = store.get("item1").await.unwrap().unwrap();
1567 assert_eq!(retrieved.state, "delta");
1568
1569 cleanup_db(&db_path);
1570 }
1571
1572 #[tokio::test]
1573 async fn test_state_default_value() {
1574 let db_path = temp_db_path("default");
1575 cleanup_db(&db_path);
1576
1577 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1578 let store = SqlStore::new(&url).await.unwrap();
1579
1580 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1582 store.put(&item).await.unwrap();
1583
1584 let retrieved = store.get("item1").await.unwrap().unwrap();
1585 assert_eq!(retrieved.state, "default");
1586
1587 cleanup_db(&db_path);
1588 }
1589
1590 #[tokio::test]
1591 async fn test_get_by_state() {
1592 let db_path = temp_db_path("get_by_state");
1593 cleanup_db(&db_path);
1594
1595 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1596 let store = SqlStore::new(&url).await.unwrap();
1597
1598 store.put(&test_item("delta1", "delta")).await.unwrap();
1600 store.put(&test_item("delta2", "delta")).await.unwrap();
1601 store.put(&test_item("base1", "base")).await.unwrap();
1602 store.put(&test_item("pending1", "pending")).await.unwrap();
1603
1604 let deltas = store.get_by_state("delta", 100).await.unwrap();
1606 assert_eq!(deltas.len(), 2);
1607 assert!(deltas.iter().all(|i| i.state == "delta"));
1608
1609 let bases = store.get_by_state("base", 100).await.unwrap();
1610 assert_eq!(bases.len(), 1);
1611 assert_eq!(bases[0].object_id, "base1");
1612
1613 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1615 assert!(none.is_empty());
1616
1617 cleanup_db(&db_path);
1618 }
1619
1620 #[tokio::test]
1621 async fn test_get_by_state_with_limit() {
1622 let db_path = temp_db_path("get_by_state_limit");
1623 cleanup_db(&db_path);
1624
1625 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1626 let store = SqlStore::new(&url).await.unwrap();
1627
1628 for i in 0..10 {
1630 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1631 }
1632
1633 let limited = store.get_by_state("batch", 5).await.unwrap();
1635 assert_eq!(limited.len(), 5);
1636
1637 cleanup_db(&db_path);
1638 }
1639
1640 #[tokio::test]
1641 async fn test_count_by_state() {
1642 let db_path = temp_db_path("count_by_state");
1643 cleanup_db(&db_path);
1644
1645 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1646 let store = SqlStore::new(&url).await.unwrap();
1647
1648 store.put(&test_item("a1", "alpha")).await.unwrap();
1650 store.put(&test_item("a2", "alpha")).await.unwrap();
1651 store.put(&test_item("a3", "alpha")).await.unwrap();
1652 store.put(&test_item("b1", "beta")).await.unwrap();
1653
1654 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1655 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1656 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1657
1658 cleanup_db(&db_path);
1659 }
1660
1661 #[tokio::test]
1662 async fn test_list_state_ids() {
1663 let db_path = temp_db_path("list_state_ids");
1664 cleanup_db(&db_path);
1665
1666 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1667 let store = SqlStore::new(&url).await.unwrap();
1668
1669 store.put(&test_item("id1", "pending")).await.unwrap();
1670 store.put(&test_item("id2", "pending")).await.unwrap();
1671 store.put(&test_item("id3", "done")).await.unwrap();
1672
1673 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1674 assert_eq!(pending_ids.len(), 2);
1675 assert!(pending_ids.contains(&"id1".to_string()));
1676 assert!(pending_ids.contains(&"id2".to_string()));
1677
1678 cleanup_db(&db_path);
1679 }
1680
1681 #[tokio::test]
1682 async fn test_set_state() {
1683 let db_path = temp_db_path("set_state");
1684 cleanup_db(&db_path);
1685
1686 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1687 let store = SqlStore::new(&url).await.unwrap();
1688
1689 store.put(&test_item("item1", "pending")).await.unwrap();
1690
1691 let before = store.get("item1").await.unwrap().unwrap();
1693 assert_eq!(before.state, "pending");
1694
1695 let updated = store.set_state("item1", "approved").await.unwrap();
1697 assert!(updated);
1698
1699 let after = store.get("item1").await.unwrap().unwrap();
1701 assert_eq!(after.state, "approved");
1702
1703 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1705 assert!(!not_found);
1706
1707 cleanup_db(&db_path);
1708 }
1709
1710 #[tokio::test]
1711 async fn test_delete_by_state() {
1712 let db_path = temp_db_path("delete_by_state");
1713 cleanup_db(&db_path);
1714
1715 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1716 let store = SqlStore::new(&url).await.unwrap();
1717
1718 store.put(&test_item("keep1", "keep")).await.unwrap();
1719 store.put(&test_item("keep2", "keep")).await.unwrap();
1720 store.put(&test_item("del1", "delete_me")).await.unwrap();
1721 store.put(&test_item("del2", "delete_me")).await.unwrap();
1722 store.put(&test_item("del3", "delete_me")).await.unwrap();
1723
1724 let deleted = store.delete_by_state("delete_me").await.unwrap();
1726 assert_eq!(deleted, 3);
1727
1728 assert!(store.get("del1").await.unwrap().is_none());
1730 assert!(store.get("del2").await.unwrap().is_none());
1731
1732 assert!(store.get("keep1").await.unwrap().is_some());
1734 assert!(store.get("keep2").await.unwrap().is_some());
1735
1736 let zero = store.delete_by_state("nonexistent").await.unwrap();
1738 assert_eq!(zero, 0);
1739
1740 cleanup_db(&db_path);
1741 }
1742
1743 #[tokio::test]
1744 async fn test_multiple_puts_preserve_state() {
1745 let db_path = temp_db_path("multi_put_state");
1746 cleanup_db(&db_path);
1747
1748 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1749 let store = SqlStore::new(&url).await.unwrap();
1750
1751 store.put(&test_item("a", "state_a")).await.unwrap();
1753 store.put(&test_item("b", "state_b")).await.unwrap();
1754 store.put(&test_item("c", "state_c")).await.unwrap();
1755
1756 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1757 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1758 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1759
1760 cleanup_db(&db_path);
1761 }
1762
1763 #[tokio::test]
1764 async fn test_scan_prefix() {
1765 let db_path = temp_db_path("scan_prefix");
1766 cleanup_db(&db_path);
1767
1768 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1769 let store = SqlStore::new(&url).await.unwrap();
1770
1771 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1773 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1774 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1775 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1776 store.put(&test_item("base:user.123", "base")).await.unwrap();
1777 store.put(&test_item("base:user.456", "base")).await.unwrap();
1778
1779 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1781 assert_eq!(user123_deltas.len(), 3);
1782 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1783
1784 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1786 assert_eq!(user456_deltas.len(), 1);
1787
1788 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1790 assert_eq!(all_deltas.len(), 4);
1791
1792 let bases = store.scan_prefix("base:", 100).await.unwrap();
1794 assert_eq!(bases.len(), 2);
1795
1796 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1798 assert!(none.is_empty());
1799
1800 cleanup_db(&db_path);
1801 }
1802
1803 #[tokio::test]
1804 async fn test_scan_prefix_with_limit() {
1805 let db_path = temp_db_path("scan_prefix_limit");
1806 cleanup_db(&db_path);
1807
1808 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1809 let store = SqlStore::new(&url).await.unwrap();
1810
1811 for i in 0..20 {
1813 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1814 }
1815
1816 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1818 assert_eq!(limited.len(), 5);
1819
1820 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1822 assert_eq!(all.len(), 20);
1823
1824 cleanup_db(&db_path);
1825 }
1826
1827 #[tokio::test]
1828 async fn test_count_prefix() {
1829 let db_path = temp_db_path("count_prefix");
1830 cleanup_db(&db_path);
1831
1832 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1833 let store = SqlStore::new(&url).await.unwrap();
1834
1835 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1837 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1838 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1839 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1840 store.put(&test_item("base:user.123", "base")).await.unwrap();
1841
1842 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1844 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1845 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1846 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1847 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1848
1849 cleanup_db(&db_path);
1850 }
1851
1852 #[tokio::test]
1853 async fn test_delete_prefix() {
1854 let db_path = temp_db_path("delete_prefix");
1855 cleanup_db(&db_path);
1856
1857 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1858 let store = SqlStore::new(&url).await.unwrap();
1859
1860 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1862 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1863 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1864 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1865 store.put(&test_item("base:user.123", "base")).await.unwrap();
1866
1867 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1869 assert_eq!(deleted, 3);
1870
1871 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1873 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1874
1875 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1877
1878 assert!(store.get("base:user.123").await.unwrap().is_some());
1880
1881 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1883 assert_eq!(zero, 0);
1884
1885 cleanup_db(&db_path);
1886 }
1887}