1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54 INSTALL_DRIVERS.call_once(|| {
55 sqlx::any::install_default_drivers();
56 });
57}
58
59use std::sync::Arc;
60use crate::schema::SchemaRegistry;
61
62pub struct SqlStore {
63 pool: AnyPool,
64 is_sqlite: bool,
65 registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73 Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74 }
75
76 pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81 install_drivers();
82
83 let is_sqlite = connection_string.starts_with("sqlite:");
84
85 let registry = if is_sqlite && !registry.is_bypass() {
87 Arc::new(SchemaRegistry::bypass())
88 } else {
89 registry
90 };
91
92 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93 AnyPoolOptions::new()
94 .max_connections(20)
95 .acquire_timeout(Duration::from_secs(10))
96 .idle_timeout(Duration::from_secs(300))
97 .connect(connection_string)
98 .await
99 .map_err(|e| StorageError::Backend(e.to_string()))
100 })
101 .await?;
102
103 let store = Self { pool, is_sqlite, registry };
104
105 if is_sqlite {
107 store.enable_wal_mode().await?;
108 }
109
110 store.init_schema().await?;
111
112 if !is_sqlite {
115 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116 .execute(&store.pool)
117 .await
118 .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119 }
120
121 Ok(store)
122 }
123
124 pub fn pool(&self) -> AnyPool {
126 self.pool.clone()
127 }
128
129 #[must_use]
131 pub fn registry(&self) -> &Arc<SchemaRegistry> {
132 &self.registry
133 }
134
135 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142 sqlx::query("PRAGMA journal_mode = WAL")
143 .execute(&self.pool)
144 .await
145 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146
147 sqlx::query("PRAGMA synchronous = NORMAL")
150 .execute(&self.pool)
151 .await
152 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153
154 Ok(())
155 }
156
157 async fn init_schema(&self) -> Result<(), StorageError> {
158 let sql = if self.is_sqlite {
168 r#"
169 CREATE TABLE IF NOT EXISTS sync_items (
170 id TEXT PRIMARY KEY,
171 version INTEGER NOT NULL DEFAULT 1,
172 timestamp INTEGER NOT NULL,
173 payload_hash TEXT,
174 payload TEXT,
175 payload_blob BLOB,
176 audit TEXT,
177 merkle_dirty INTEGER NOT NULL DEFAULT 1,
178 state TEXT NOT NULL DEFAULT 'default',
179 access_count INTEGER NOT NULL DEFAULT 0,
180 last_accessed INTEGER NOT NULL DEFAULT 0
181 )
182 "#
183 } else {
184 r#"
187 CREATE TABLE IF NOT EXISTS sync_items (
188 id VARCHAR(255) PRIMARY KEY,
189 version BIGINT NOT NULL DEFAULT 1,
190 timestamp BIGINT NOT NULL,
191 payload_hash VARCHAR(64),
192 payload LONGTEXT,
193 payload_blob MEDIUMBLOB,
194 audit TEXT,
195 merkle_dirty TINYINT NOT NULL DEFAULT 1,
196 state VARCHAR(32) NOT NULL DEFAULT 'default',
197 access_count BIGINT NOT NULL DEFAULT 0,
198 last_accessed BIGINT NOT NULL DEFAULT 0,
199 INDEX idx_timestamp (timestamp),
200 INDEX idx_merkle_dirty (merkle_dirty),
201 INDEX idx_state (state)
202 )
203 "#
204 };
205
206 retry("sql_init_schema", &RetryConfig::startup(), || async {
207 sqlx::query(sql)
208 .execute(&self.pool)
209 .await
210 .map_err(|e| StorageError::Backend(e.to_string()))
211 })
212 .await?;
213
214 Ok(())
215 }
216
217 pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239 if self.is_sqlite {
241 return Ok(());
242 }
243
244 if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247 return Err(StorageError::Backend(format!(
248 "Invalid table name '{}': only alphanumeric and underscore allowed",
249 table_name
250 )));
251 }
252
253 if table_name == crate::schema::DEFAULT_TABLE {
255 return Ok(());
256 }
257
258 let sql = format!(
260 r#"
261 CREATE TABLE IF NOT EXISTS {} (
262 id VARCHAR(255) PRIMARY KEY,
263 version BIGINT NOT NULL DEFAULT 1,
264 timestamp BIGINT NOT NULL,
265 payload_hash VARCHAR(64),
266 payload LONGTEXT,
267 payload_blob MEDIUMBLOB,
268 audit TEXT,
269 merkle_dirty TINYINT NOT NULL DEFAULT 1,
270 state VARCHAR(32) NOT NULL DEFAULT 'default',
271 access_count BIGINT NOT NULL DEFAULT 0,
272 last_accessed BIGINT NOT NULL DEFAULT 0,
273 INDEX idx_{}_timestamp (timestamp),
274 INDEX idx_{}_merkle_dirty (merkle_dirty),
275 INDEX idx_{}_state (state)
276 )
277 "#,
278 table_name, table_name, table_name, table_name
279 );
280
281 retry("sql_ensure_table", &RetryConfig::startup(), || async {
282 sqlx::query(&sql)
283 .execute(&self.pool)
284 .await
285 .map_err(|e| StorageError::Backend(e.to_string()))
286 })
287 .await?;
288
289 tracing::info!(table = %table_name, "Created schema table");
290
291 Ok(())
292 }
293
294 pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296 if self.is_sqlite {
297 let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298 let result = sqlx::query(sql)
299 .bind(table_name)
300 .fetch_optional(&self.pool)
301 .await
302 .map_err(|e| StorageError::Backend(e.to_string()))?;
303 Ok(result.is_some())
304 } else {
305 let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306 let result = sqlx::query(sql)
307 .bind(table_name)
308 .fetch_optional(&self.pool)
309 .await
310 .map_err(|e| StorageError::Backend(e.to_string()))?;
311 Ok(result.is_some())
312 }
313 }
314
315 #[must_use]
317 pub fn is_sqlite(&self) -> bool {
318 self.is_sqlite
319 }
320
321 fn build_audit_json(item: &SyncItem) -> Option<String> {
323 let mut audit = serde_json::Map::new();
324
325 if let Some(ref batch_id) = item.batch_id {
326 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327 }
328 if let Some(ref trace_parent) = item.trace_parent {
329 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330 }
331 if let Some(ref home) = item.home_instance_id {
332 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333 }
334
335 if audit.is_empty() {
336 None
337 } else {
338 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339 }
340 }
341
342 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344 match audit_str {
345 Some(s) => {
346 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350 (batch_id, trace_parent, home_instance_id)
351 } else {
352 (None, None, None)
353 }
354 }
355 None => (None, None, None),
356 }
357 }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363 let id = id.to_string();
364 let table = self.registry.table_for_key(&id);
365
366 retry("sql_get", &RetryConfig::query(), || async {
367 let sql = format!(
368 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369 table
370 );
371 let result = sqlx::query(&sql)
372 .bind(&id)
373 .fetch_optional(&self.pool)
374 .await
375 .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377 match result {
378 Some(row) => {
379 let version: i64 = row.try_get("version").unwrap_or(1);
380 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382
383 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385 .or_else(|| {
386 row.try_get::<Vec<u8>, _>("payload").ok()
387 .and_then(|bytes| String::from_utf8(bytes).ok())
388 });
389
390 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391
392 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394 .or_else(|| {
395 row.try_get::<Vec<u8>, _>("audit").ok()
396 .and_then(|bytes| String::from_utf8(bytes).ok())
397 });
398
399 let state: String = row.try_get::<String, _>("state").ok()
401 .or_else(|| {
402 row.try_get::<Vec<u8>, _>("state").ok()
403 .and_then(|bytes| String::from_utf8(bytes).ok())
404 })
405 .unwrap_or_else(|| "default".to_string());
406
407 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410
411 let (content, content_type) = if let Some(ref json_str) = payload_json {
413 let content = json_str.as_bytes().to_vec();
415 (content, ContentType::Json)
416 } else if let Some(blob) = payload_blob {
417 (blob, ContentType::Binary)
419 } else {
420 return Err(StorageError::Backend("No payload in row".to_string()));
421 };
422
423 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425
426 let item = SyncItem::reconstruct(
427 id.clone(),
428 version as u64,
429 timestamp,
430 content_type,
431 content,
432 batch_id,
433 trace_parent,
434 payload_hash.unwrap_or_default(),
435 home_instance_id,
436 state,
437 access_count as u64,
438 last_accessed as u64,
439 );
440 Ok(Some(item))
441 }
442 None => Ok(None),
443 }
444 })
445 .await
446 }
447
448 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449 let id = item.object_id.clone();
450 let table = self.registry.table_for_key(&id);
451 let version = item.version as i64;
452 let timestamp = item.updated_at;
453 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454 let audit_json = Self::build_audit_json(item);
455 let state = item.state.clone();
456
457 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459 ContentType::Json => {
460 let json_str = String::from_utf8_lossy(&item.content).to_string();
461 (Some(json_str), None)
462 }
463 ContentType::Binary => {
464 (None, Some(item.content.clone()))
465 }
466 };
467
468 let sql = if self.is_sqlite {
469 format!(
471 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
472 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
473 ON CONFLICT(id) DO UPDATE SET
474 version = excluded.version,
475 timestamp = excluded.timestamp,
476 payload_hash = excluded.payload_hash,
477 payload = excluded.payload,
478 payload_blob = excluded.payload_blob,
479 audit = excluded.audit,
480 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END,
481 state = excluded.state,
482 access_count = excluded.access_count,
483 last_accessed = excluded.last_accessed",
484 table, table, table
485 )
486 } else {
487 format!(
489 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
490 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
491 ON DUPLICATE KEY UPDATE
492 version = VALUES(version),
493 timestamp = VALUES(timestamp),
494 payload_hash = VALUES(payload_hash),
495 payload = VALUES(payload),
496 payload_blob = VALUES(payload_blob),
497 audit = VALUES(audit),
498 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END,
499 state = VALUES(state),
500 access_count = VALUES(access_count),
501 last_accessed = VALUES(last_accessed)",
502 table
503 )
504 };
505
506 retry("sql_put", &RetryConfig::query(), || async {
507 sqlx::query(&sql)
508 .bind(&id)
509 .bind(version)
510 .bind(timestamp)
511 .bind(&payload_hash)
512 .bind(&payload_json)
513 .bind(&payload_blob)
514 .bind(&audit_json)
515 .bind(&state)
516 .bind(item.access_count as i64)
517 .bind(item.last_accessed as i64)
518 .execute(&self.pool)
519 .await
520 .map_err(|e| StorageError::Backend(e.to_string()))?;
521 Ok(())
522 })
523 .await
524 }
525
526 async fn delete(&self, id: &str) -> Result<(), StorageError> {
527 let id = id.to_string();
528 let table = self.registry.table_for_key(&id);
529 retry("sql_delete", &RetryConfig::query(), || async {
530 let sql = format!("DELETE FROM {} WHERE id = ?", table);
531 sqlx::query(&sql)
532 .bind(&id)
533 .execute(&self.pool)
534 .await
535 .map_err(|e| StorageError::Backend(e.to_string()))?;
536 Ok(())
537 })
538 .await
539 }
540
541 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542 let id = id.to_string();
543 let table = self.registry.table_for_key(&id);
544 retry("sql_exists", &RetryConfig::query(), || async {
545 let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546 let result = sqlx::query(&sql)
547 .bind(&id)
548 .fetch_optional(&self.pool)
549 .await
550 .map_err(|e| StorageError::Backend(e.to_string()))?;
551 Ok(result.is_some())
552 })
553 .await
554 }
555
556 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561 if items.is_empty() {
562 return Ok(BatchWriteResult {
563 batch_id: String::new(),
564 written: 0,
565 verified: true,
566 });
567 }
568
569 let batch_id = uuid::Uuid::new_v4().to_string();
571
572 for item in items.iter_mut() {
574 item.batch_id = Some(batch_id.clone());
575 }
576
577 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580 let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582 for item in items.iter() {
583 let table = self.registry.table_for_key(&item.object_id);
584 by_table.entry(table).or_default().push(item);
585 }
586
587 const CHUNK_SIZE: usize = 500;
589 let mut total_written = 0usize;
590
591 for (table, table_items) in by_table {
593 for chunk in table_items.chunks(CHUNK_SIZE) {
594 let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595 total_written += written;
596 }
597 }
598
599 let verified_count = self.verify_batch_ids(&item_ids).await?;
601 let verified = verified_count == items.len();
602
603 if !verified {
604 tracing::warn!(
605 batch_id = %batch_id,
606 expected = items.len(),
607 actual = verified_count,
608 "Batch verification mismatch"
609 );
610 }
611
612 Ok(BatchWriteResult {
613 batch_id,
614 written: total_written,
615 verified,
616 })
617 }
618
619 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621 .bind(limit as i64)
622 .bind(offset as i64)
623 .fetch_all(&self.pool)
624 .await
625 .map_err(|e| StorageError::Backend(e.to_string()))?;
626
627 let mut keys = Vec::with_capacity(rows.len());
628 for row in rows {
629 let id: String = row.try_get("id")
630 .map_err(|e| StorageError::Backend(e.to_string()))?;
631 keys.push(id);
632 }
633
634 Ok(keys)
635 }
636
637 async fn count_all(&self) -> Result<u64, StorageError> {
638 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639 .fetch_one(&self.pool)
640 .await
641 .map_err(|e| StorageError::Backend(e.to_string()))?;
642
643 let count: i64 = result.try_get("cnt")
644 .map_err(|e| StorageError::Backend(e.to_string()))?;
645
646 Ok(count as u64)
647 }
648
649 async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650 use sqlx::Row;
651
652 let sql = format!(
653 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654 FROM sync_items WHERE {} LIMIT {}",
655 where_clause, limit
656 );
657
658 let mut query = sqlx::query(&sql);
659 for param in params {
660 query = match param {
661 SqlParam::Text(s) => query.bind(s.clone()),
662 SqlParam::Numeric(f) => query.bind(*f),
663 SqlParam::Boolean(b) => query.bind(*b),
664 };
665 }
666
667 let rows = query.fetch_all(&self.pool)
668 .await
669 .map_err(|e| StorageError::Backend(e.to_string()))?;
670
671 let mut items = Vec::with_capacity(rows.len());
672 for row in rows {
673 let id: String = row.try_get("id")
674 .map_err(|e| StorageError::Backend(e.to_string()))?;
675
676 if let Some(item) = self.get(&id).await? {
679 items.push(item);
680 }
681 }
682
683 Ok(items)
684 }
685
686 async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687 let sql = format!("SELECT COUNT(*) as cnt FROM sync_items WHERE {}", where_clause);
688
689 let mut query = sqlx::query(&sql);
690 for param in params {
691 query = match param {
692 SqlParam::Text(s) => query.bind(s.clone()),
693 SqlParam::Numeric(f) => query.bind(*f),
694 SqlParam::Boolean(b) => query.bind(*b),
695 };
696 }
697
698 let result = query.fetch_one(&self.pool)
699 .await
700 .map_err(|e| StorageError::Backend(e.to_string()))?;
701
702 let count: i64 = result.try_get("cnt")
703 .map_err(|e| StorageError::Backend(e.to_string()))?;
704
705 Ok(count as u64)
706 }
707}
708
709impl SqlStore {
710 async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
713 let placeholders: Vec<String> = (0..chunk.len())
714 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
715 .collect();
716
717 let sql = if self.is_sqlite {
718 format!(
719 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
720 ON CONFLICT(id) DO UPDATE SET \
721 version = excluded.version, \
722 timestamp = excluded.timestamp, \
723 payload_hash = excluded.payload_hash, \
724 payload = excluded.payload, \
725 payload_blob = excluded.payload_blob, \
726 audit = excluded.audit, \
727 merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
728 state = excluded.state, \
729 access_count = excluded.access_count, \
730 last_accessed = excluded.last_accessed",
731 table, placeholders.join(", "), table, table
732 )
733 } else {
734 format!(
735 "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
736 ON DUPLICATE KEY UPDATE \
737 version = VALUES(version), \
738 timestamp = VALUES(timestamp), \
739 payload_hash = VALUES(payload_hash), \
740 payload = VALUES(payload), \
741 payload_blob = VALUES(payload_blob), \
742 audit = VALUES(audit), \
743 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
744 state = VALUES(state), \
745 access_count = VALUES(access_count), \
746 last_accessed = VALUES(last_accessed)",
747 table, placeholders.join(", ")
748 )
749 };
750
751 #[derive(Clone)]
753 struct PreparedRow {
754 id: String,
755 version: i64,
756 timestamp: i64,
757 payload_hash: Option<String>,
758 payload_json: Option<String>,
759 payload_blob: Option<Vec<u8>>,
760 audit_json: Option<String>,
761 state: String,
762 access_count: i64,
763 last_accessed: i64,
764 }
765
766 let prepared: Vec<PreparedRow> = chunk.iter()
767 .map(|item| {
768 let (payload_json, payload_blob) = match item.content_type {
769 ContentType::Json => {
770 let json_str = String::from_utf8_lossy(&item.content).to_string();
771 (Some(json_str), None)
772 }
773 ContentType::Binary => {
774 (None, Some(item.content.clone()))
775 }
776 };
777
778 PreparedRow {
779 id: item.object_id.clone(),
780 version: item.version as i64,
781 timestamp: item.updated_at,
782 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
783 payload_json,
784 payload_blob,
785 audit_json: Self::build_audit_json(item),
786 state: item.state.clone(),
787 access_count: item.access_count as i64,
788 last_accessed: item.last_accessed as i64,
789 }
790 })
791 .collect();
792
793 retry("sql_put_batch", &RetryConfig::batch_write(), || {
794 let sql = sql.clone();
795 let prepared = prepared.clone();
796 async move {
797 let mut query = sqlx::query(&sql);
798
799 for row in &prepared {
800 query = query
801 .bind(&row.id)
802 .bind(row.version)
803 .bind(row.timestamp)
804 .bind(&row.payload_hash)
805 .bind(&row.payload_json)
806 .bind(&row.payload_blob)
807 .bind(&row.audit_json)
808 .bind(&row.state)
809 .bind(row.access_count)
810 .bind(row.last_accessed);
811 }
812
813 query.execute(&self.pool)
814 .await
815 .map_err(|e| StorageError::Backend(e.to_string()))?;
816
817 Ok(())
818 }
819 })
820 .await?;
821
822 Ok(chunk.len())
823 }
824
825 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
829 if ids.is_empty() {
830 return Ok(0);
831 }
832
833 let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
835 for id in ids {
836 let table = self.registry.table_for_key(id);
837 by_table.entry(table).or_default().push(id);
838 }
839
840 const CHUNK_SIZE: usize = 500;
842 let mut total_found = 0usize;
843
844 for (table, table_ids) in by_table {
845 for chunk in table_ids.chunks(CHUNK_SIZE) {
846 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
847 let sql = format!(
848 "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
849 table, placeholders.join(", ")
850 );
851
852 let mut query = sqlx::query(&sql);
853 for id in chunk {
854 query = query.bind(*id);
855 }
856
857 let result = query
858 .fetch_one(&self.pool)
859 .await
860 .map_err(|e| StorageError::Backend(e.to_string()))?;
861
862 let count: i64 = result
863 .try_get("cnt")
864 .map_err(|e| StorageError::Backend(e.to_string()))?;
865 total_found += count as usize;
866 }
867 }
868
869 Ok(total_found)
870 }
871
872 #[allow(dead_code)]
874 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
875 let batch_id = batch_id.to_string();
876
877 let sql = if self.is_sqlite {
879 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
880 } else {
881 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
882 };
883
884 let bind_value = if self.is_sqlite {
885 format!("%\"batch\":\"{}%", batch_id)
886 } else {
887 batch_id.clone()
888 };
889
890 let result = sqlx::query(sql)
891 .bind(&bind_value)
892 .fetch_one(&self.pool)
893 .await
894 .map_err(|e| StorageError::Backend(e.to_string()))?;
895
896 let count: i64 = result.try_get("cnt")
897 .map_err(|e| StorageError::Backend(e.to_string()))?;
898
899 Ok(count as usize)
900 }
901
902 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
904 let rows = sqlx::query(
905 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
906 )
907 .bind(limit as i64)
908 .fetch_all(&self.pool)
909 .await
910 .map_err(|e| StorageError::Backend(e.to_string()))?;
911
912 let mut items = Vec::with_capacity(rows.len());
913 for row in rows {
914 let id: String = row.try_get("id")
915 .map_err(|e| StorageError::Backend(e.to_string()))?;
916 let version: i64 = row.try_get("version").unwrap_or(1);
917 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
918 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
919
920 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
922 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
923 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
924 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
925 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
926
927 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
928 let state: String = state_bytes
929 .and_then(|bytes| String::from_utf8(bytes).ok())
930 .unwrap_or_else(|| "default".to_string());
931
932 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
934 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
935
936 let (content, content_type) = if let Some(ref json_str) = payload_json {
937 (json_str.as_bytes().to_vec(), ContentType::Json)
938 } else if let Some(blob) = payload_blob {
939 (blob, ContentType::Binary)
940 } else {
941 continue; };
943
944 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
945
946 let item = SyncItem::reconstruct(
947 id,
948 version as u64,
949 timestamp,
950 content_type,
951 content,
952 batch_id,
953 trace_parent,
954 payload_hash.unwrap_or_default(),
955 home_instance_id,
956 state,
957 access_count as u64,
958 last_accessed as u64,
959 );
960 items.push(item);
961 }
962
963 Ok(items)
964 }
965
966 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
968 if ids.is_empty() {
969 return Ok(0);
970 }
971
972 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
973 let sql = format!(
974 "DELETE FROM sync_items WHERE id IN ({})",
975 placeholders.join(", ")
976 );
977
978 retry("sql_delete_batch", &RetryConfig::query(), || {
979 let sql = sql.clone();
980 let ids = ids.to_vec();
981 async move {
982 let mut query = sqlx::query(&sql);
983 for id in &ids {
984 query = query.bind(id);
985 }
986
987 let result = query.execute(&self.pool)
988 .await
989 .map_err(|e| StorageError::Backend(e.to_string()))?;
990
991 Ok(result.rows_affected() as usize)
992 }
993 })
994 .await
995 }
996
997 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1005 let rows = sqlx::query(
1006 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1007 )
1008 .bind(limit as i64)
1009 .fetch_all(&self.pool)
1010 .await
1011 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1012
1013 let mut ids = Vec::with_capacity(rows.len());
1014 for row in rows {
1015 let id: String = row.try_get("id")
1016 .map_err(|e| StorageError::Backend(e.to_string()))?;
1017 ids.push(id);
1018 }
1019
1020 Ok(ids)
1021 }
1022
1023 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1025 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1026 .fetch_one(&self.pool)
1027 .await
1028 .map_err(|e| StorageError::Backend(e.to_string()))?;
1029
1030 let count: i64 = result.try_get("cnt")
1031 .map_err(|e| StorageError::Backend(e.to_string()))?;
1032
1033 Ok(count as u64)
1034 }
1035
1036 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1038 if ids.is_empty() {
1039 return Ok(0);
1040 }
1041
1042 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1043 let sql = format!(
1044 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
1045 placeholders.join(", ")
1046 );
1047
1048 let mut query = sqlx::query(&sql);
1049 for id in ids {
1050 query = query.bind(id);
1051 }
1052
1053 let result = query.execute(&self.pool)
1054 .await
1055 .map_err(|e| StorageError::Backend(e.to_string()))?;
1056
1057 Ok(result.rows_affected() as usize)
1058 }
1059
1060 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1062 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1063 .fetch_optional(&self.pool)
1064 .await
1065 .map_err(|e| StorageError::Backend(e.to_string()))?;
1066
1067 Ok(result.is_some())
1068 }
1069
1070 pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1078 let pattern = format!("{}%", prefix);
1079 let result = sqlx::query(
1080 "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1081 )
1082 .bind(&pattern)
1083 .fetch_one(&self.pool)
1084 .await
1085 .map_err(|e| StorageError::Backend(e.to_string()))?;
1086
1087 let count: i64 = result.try_get("cnt")
1088 .map_err(|e| StorageError::Backend(e.to_string()))?;
1089
1090 Ok(count as u64)
1091 }
1092
1093 pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1098 let sql = if self.is_sqlite {
1100 "SELECT DISTINCT CASE
1102 WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1103 ELSE id
1104 END as prefix FROM sync_items WHERE merkle_dirty = 1"
1105 } else {
1106 "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1108 };
1109
1110 let rows = sqlx::query(sql)
1111 .fetch_all(&self.pool)
1112 .await
1113 .map_err(|e| StorageError::Backend(e.to_string()))?;
1114
1115 let mut prefixes = Vec::with_capacity(rows.len());
1116 for row in rows {
1117 let prefix: String = row.try_get("prefix")
1118 .map_err(|e| StorageError::Backend(e.to_string()))?;
1119 prefixes.push(prefix);
1120 }
1121
1122 Ok(prefixes)
1123 }
1124
1125 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1130 let rows = sqlx::query(
1131 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1132 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1133 )
1134 .bind(limit as i64)
1135 .fetch_all(&self.pool)
1136 .await
1137 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
1138
1139 let mut items = Vec::with_capacity(rows.len());
1140 for row in rows {
1141 let id: String = row.try_get("id")
1142 .map_err(|e| StorageError::Backend(e.to_string()))?;
1143 let version: i64 = row.try_get("version").unwrap_or(1);
1144 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1145 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1146
1147 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1149 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1150 String::from_utf8(bytes).ok()
1151 });
1152
1153 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1154 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1155 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1156 String::from_utf8(bytes).ok()
1157 });
1158
1159 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1161 let state: String = state_bytes
1162 .and_then(|bytes| String::from_utf8(bytes).ok())
1163 .unwrap_or_else(|| "default".to_string());
1164
1165 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1167 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1168
1169 let (content, content_type) = if let Some(ref json_str) = payload_json {
1171 (json_str.as_bytes().to_vec(), ContentType::Json)
1172 } else if let Some(blob) = payload_blob {
1173 (blob, ContentType::Binary)
1174 } else {
1175 continue; };
1177
1178 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1180
1181 let item = SyncItem::reconstruct(
1182 id,
1183 version as u64,
1184 timestamp,
1185 content_type,
1186 content,
1187 batch_id,
1188 trace_parent,
1189 payload_hash.unwrap_or_default(),
1190 home_instance_id,
1191 state,
1192 access_count as u64,
1193 last_accessed as u64,
1194 );
1195 items.push(item);
1196 }
1197
1198 Ok(items)
1199 }
1200
1201 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1209 let rows = sqlx::query(
1210 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1211 FROM sync_items WHERE state = ? LIMIT ?"
1212 )
1213 .bind(state)
1214 .bind(limit as i64)
1215 .fetch_all(&self.pool)
1216 .await
1217 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1218
1219 let mut items = Vec::with_capacity(rows.len());
1220 for row in rows {
1221 let id: String = row.try_get("id")
1222 .map_err(|e| StorageError::Backend(e.to_string()))?;
1223 let version: i64 = row.try_get("version").unwrap_or(1);
1224 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1225 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1226
1227 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1229 .or_else(|| {
1230 row.try_get::<Vec<u8>, _>("payload").ok()
1231 .and_then(|bytes| String::from_utf8(bytes).ok())
1232 });
1233
1234 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1235
1236 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1238 .or_else(|| {
1239 row.try_get::<Vec<u8>, _>("audit").ok()
1240 .and_then(|bytes| String::from_utf8(bytes).ok())
1241 });
1242
1243 let state: String = row.try_get::<String, _>("state").ok()
1245 .or_else(|| {
1246 row.try_get::<Vec<u8>, _>("state").ok()
1247 .and_then(|bytes| String::from_utf8(bytes).ok())
1248 })
1249 .unwrap_or_else(|| "default".to_string());
1250
1251 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1253 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1254
1255 let (content, content_type) = if let Some(ref json_str) = payload_json {
1256 (json_str.as_bytes().to_vec(), ContentType::Json)
1257 } else if let Some(blob) = payload_blob {
1258 (blob, ContentType::Binary)
1259 } else {
1260 continue;
1261 };
1262
1263 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1264
1265 let item = SyncItem::reconstruct(
1266 id,
1267 version as u64,
1268 timestamp,
1269 content_type,
1270 content,
1271 batch_id,
1272 trace_parent,
1273 payload_hash.unwrap_or_default(),
1274 home_instance_id,
1275 state,
1276 access_count as u64,
1277 last_accessed as u64,
1278 );
1279 items.push(item);
1280 }
1281
1282 Ok(items)
1283 }
1284
1285 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1287 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1288 .bind(state)
1289 .fetch_one(&self.pool)
1290 .await
1291 .map_err(|e| StorageError::Backend(e.to_string()))?;
1292
1293 let count: i64 = result.try_get("cnt")
1294 .map_err(|e| StorageError::Backend(e.to_string()))?;
1295
1296 Ok(count as u64)
1297 }
1298
1299 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1301 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1302 .bind(state)
1303 .bind(limit as i64)
1304 .fetch_all(&self.pool)
1305 .await
1306 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1307
1308 let mut ids = Vec::with_capacity(rows.len());
1309 for row in rows {
1310 let id: String = row.try_get("id")
1311 .map_err(|e| StorageError::Backend(e.to_string()))?;
1312 ids.push(id);
1313 }
1314
1315 Ok(ids)
1316 }
1317
1318 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1320 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1321 .bind(new_state)
1322 .bind(id)
1323 .execute(&self.pool)
1324 .await
1325 .map_err(|e| StorageError::Backend(e.to_string()))?;
1326
1327 Ok(result.rows_affected() > 0)
1328 }
1329
1330 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1334 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1335 .bind(state)
1336 .execute(&self.pool)
1337 .await
1338 .map_err(|e| StorageError::Backend(e.to_string()))?;
1339
1340 Ok(result.rows_affected())
1341 }
1342
1343 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1354 let pattern = format!("{}%", prefix);
1356
1357 let rows = sqlx::query(
1358 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1359 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1360 )
1361 .bind(&pattern)
1362 .bind(limit as i64)
1363 .fetch_all(&self.pool)
1364 .await
1365 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1366
1367 let mut items = Vec::with_capacity(rows.len());
1368 for row in rows {
1369 let id: String = row.try_get("id")
1370 .map_err(|e| StorageError::Backend(e.to_string()))?;
1371 let version: i64 = row.try_get("version").unwrap_or(1);
1372 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1373 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1374
1375 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1377 .or_else(|| {
1378 row.try_get::<Vec<u8>, _>("payload").ok()
1379 .and_then(|bytes| String::from_utf8(bytes).ok())
1380 });
1381
1382 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1383
1384 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1386 .or_else(|| {
1387 row.try_get::<Vec<u8>, _>("audit").ok()
1388 .and_then(|bytes| String::from_utf8(bytes).ok())
1389 });
1390
1391 let state: String = row.try_get::<String, _>("state").ok()
1393 .or_else(|| {
1394 row.try_get::<Vec<u8>, _>("state").ok()
1395 .and_then(|bytes| String::from_utf8(bytes).ok())
1396 })
1397 .unwrap_or_else(|| "default".to_string());
1398
1399 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1401 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1402
1403 let (content, content_type) = if let Some(ref json_str) = payload_json {
1404 (json_str.as_bytes().to_vec(), ContentType::Json)
1405 } else if let Some(blob) = payload_blob {
1406 (blob, ContentType::Binary)
1407 } else {
1408 continue;
1409 };
1410
1411 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1412
1413 let item = SyncItem::reconstruct(
1414 id,
1415 version as u64,
1416 timestamp,
1417 content_type,
1418 content,
1419 batch_id,
1420 trace_parent,
1421 payload_hash.unwrap_or_default(),
1422 home_instance_id,
1423 state,
1424 access_count as u64,
1425 last_accessed as u64,
1426 );
1427 items.push(item);
1428 }
1429
1430 Ok(items)
1431 }
1432
1433 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1435 let pattern = format!("{}%", prefix);
1436
1437 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1438 .bind(&pattern)
1439 .fetch_one(&self.pool)
1440 .await
1441 .map_err(|e| StorageError::Backend(e.to_string()))?;
1442
1443 let count: i64 = result.try_get("cnt")
1444 .map_err(|e| StorageError::Backend(e.to_string()))?;
1445
1446 Ok(count as u64)
1447 }
1448
1449 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1453 let pattern = format!("{}%", prefix);
1454
1455 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1456 .bind(&pattern)
1457 .execute(&self.pool)
1458 .await
1459 .map_err(|e| StorageError::Backend(e.to_string()))?;
1460
1461 Ok(result.rows_affected())
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468 use std::path::PathBuf;
1469 use serde_json::json;
1470
1471 fn temp_db_path(name: &str) -> PathBuf {
1472 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1474 }
1475
1476 fn cleanup_db(path: &PathBuf) {
1478 let _ = std::fs::remove_file(path);
1479 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1480 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1481 }
1482
1483 fn test_item(id: &str, state: &str) -> SyncItem {
1484 SyncItem::from_json(id.to_string(), json!({"id": id}))
1485 .with_state(state)
1486 }
1487
1488 #[tokio::test]
1489 async fn test_state_stored_and_retrieved() {
1490 let db_path = temp_db_path("stored");
1491 cleanup_db(&db_path);
1492
1493 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1494 let store = SqlStore::new(&url).await.unwrap();
1495
1496 let item = test_item("item1", "delta");
1498 store.put(&item).await.unwrap();
1499
1500 let retrieved = store.get("item1").await.unwrap().unwrap();
1502 assert_eq!(retrieved.state, "delta");
1503
1504 cleanup_db(&db_path);
1505 }
1506
1507 #[tokio::test]
1508 async fn test_state_default_value() {
1509 let db_path = temp_db_path("default");
1510 cleanup_db(&db_path);
1511
1512 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1513 let store = SqlStore::new(&url).await.unwrap();
1514
1515 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1517 store.put(&item).await.unwrap();
1518
1519 let retrieved = store.get("item1").await.unwrap().unwrap();
1520 assert_eq!(retrieved.state, "default");
1521
1522 cleanup_db(&db_path);
1523 }
1524
1525 #[tokio::test]
1526 async fn test_get_by_state() {
1527 let db_path = temp_db_path("get_by_state");
1528 cleanup_db(&db_path);
1529
1530 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1531 let store = SqlStore::new(&url).await.unwrap();
1532
1533 store.put(&test_item("delta1", "delta")).await.unwrap();
1535 store.put(&test_item("delta2", "delta")).await.unwrap();
1536 store.put(&test_item("base1", "base")).await.unwrap();
1537 store.put(&test_item("pending1", "pending")).await.unwrap();
1538
1539 let deltas = store.get_by_state("delta", 100).await.unwrap();
1541 assert_eq!(deltas.len(), 2);
1542 assert!(deltas.iter().all(|i| i.state == "delta"));
1543
1544 let bases = store.get_by_state("base", 100).await.unwrap();
1545 assert_eq!(bases.len(), 1);
1546 assert_eq!(bases[0].object_id, "base1");
1547
1548 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1550 assert!(none.is_empty());
1551
1552 cleanup_db(&db_path);
1553 }
1554
1555 #[tokio::test]
1556 async fn test_get_by_state_with_limit() {
1557 let db_path = temp_db_path("get_by_state_limit");
1558 cleanup_db(&db_path);
1559
1560 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1561 let store = SqlStore::new(&url).await.unwrap();
1562
1563 for i in 0..10 {
1565 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1566 }
1567
1568 let limited = store.get_by_state("batch", 5).await.unwrap();
1570 assert_eq!(limited.len(), 5);
1571
1572 cleanup_db(&db_path);
1573 }
1574
1575 #[tokio::test]
1576 async fn test_count_by_state() {
1577 let db_path = temp_db_path("count_by_state");
1578 cleanup_db(&db_path);
1579
1580 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1581 let store = SqlStore::new(&url).await.unwrap();
1582
1583 store.put(&test_item("a1", "alpha")).await.unwrap();
1585 store.put(&test_item("a2", "alpha")).await.unwrap();
1586 store.put(&test_item("a3", "alpha")).await.unwrap();
1587 store.put(&test_item("b1", "beta")).await.unwrap();
1588
1589 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1590 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1591 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1592
1593 cleanup_db(&db_path);
1594 }
1595
1596 #[tokio::test]
1597 async fn test_list_state_ids() {
1598 let db_path = temp_db_path("list_state_ids");
1599 cleanup_db(&db_path);
1600
1601 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1602 let store = SqlStore::new(&url).await.unwrap();
1603
1604 store.put(&test_item("id1", "pending")).await.unwrap();
1605 store.put(&test_item("id2", "pending")).await.unwrap();
1606 store.put(&test_item("id3", "done")).await.unwrap();
1607
1608 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1609 assert_eq!(pending_ids.len(), 2);
1610 assert!(pending_ids.contains(&"id1".to_string()));
1611 assert!(pending_ids.contains(&"id2".to_string()));
1612
1613 cleanup_db(&db_path);
1614 }
1615
1616 #[tokio::test]
1617 async fn test_set_state() {
1618 let db_path = temp_db_path("set_state");
1619 cleanup_db(&db_path);
1620
1621 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1622 let store = SqlStore::new(&url).await.unwrap();
1623
1624 store.put(&test_item("item1", "pending")).await.unwrap();
1625
1626 let before = store.get("item1").await.unwrap().unwrap();
1628 assert_eq!(before.state, "pending");
1629
1630 let updated = store.set_state("item1", "approved").await.unwrap();
1632 assert!(updated);
1633
1634 let after = store.get("item1").await.unwrap().unwrap();
1636 assert_eq!(after.state, "approved");
1637
1638 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1640 assert!(!not_found);
1641
1642 cleanup_db(&db_path);
1643 }
1644
1645 #[tokio::test]
1646 async fn test_delete_by_state() {
1647 let db_path = temp_db_path("delete_by_state");
1648 cleanup_db(&db_path);
1649
1650 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1651 let store = SqlStore::new(&url).await.unwrap();
1652
1653 store.put(&test_item("keep1", "keep")).await.unwrap();
1654 store.put(&test_item("keep2", "keep")).await.unwrap();
1655 store.put(&test_item("del1", "delete_me")).await.unwrap();
1656 store.put(&test_item("del2", "delete_me")).await.unwrap();
1657 store.put(&test_item("del3", "delete_me")).await.unwrap();
1658
1659 let deleted = store.delete_by_state("delete_me").await.unwrap();
1661 assert_eq!(deleted, 3);
1662
1663 assert!(store.get("del1").await.unwrap().is_none());
1665 assert!(store.get("del2").await.unwrap().is_none());
1666
1667 assert!(store.get("keep1").await.unwrap().is_some());
1669 assert!(store.get("keep2").await.unwrap().is_some());
1670
1671 let zero = store.delete_by_state("nonexistent").await.unwrap();
1673 assert_eq!(zero, 0);
1674
1675 cleanup_db(&db_path);
1676 }
1677
1678 #[tokio::test]
1679 async fn test_multiple_puts_preserve_state() {
1680 let db_path = temp_db_path("multi_put_state");
1681 cleanup_db(&db_path);
1682
1683 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1684 let store = SqlStore::new(&url).await.unwrap();
1685
1686 store.put(&test_item("a", "state_a")).await.unwrap();
1688 store.put(&test_item("b", "state_b")).await.unwrap();
1689 store.put(&test_item("c", "state_c")).await.unwrap();
1690
1691 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1692 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1693 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1694
1695 cleanup_db(&db_path);
1696 }
1697
1698 #[tokio::test]
1699 async fn test_scan_prefix() {
1700 let db_path = temp_db_path("scan_prefix");
1701 cleanup_db(&db_path);
1702
1703 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1704 let store = SqlStore::new(&url).await.unwrap();
1705
1706 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1708 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1709 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1710 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1711 store.put(&test_item("base:user.123", "base")).await.unwrap();
1712 store.put(&test_item("base:user.456", "base")).await.unwrap();
1713
1714 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1716 assert_eq!(user123_deltas.len(), 3);
1717 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1718
1719 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1721 assert_eq!(user456_deltas.len(), 1);
1722
1723 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1725 assert_eq!(all_deltas.len(), 4);
1726
1727 let bases = store.scan_prefix("base:", 100).await.unwrap();
1729 assert_eq!(bases.len(), 2);
1730
1731 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1733 assert!(none.is_empty());
1734
1735 cleanup_db(&db_path);
1736 }
1737
1738 #[tokio::test]
1739 async fn test_scan_prefix_with_limit() {
1740 let db_path = temp_db_path("scan_prefix_limit");
1741 cleanup_db(&db_path);
1742
1743 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1744 let store = SqlStore::new(&url).await.unwrap();
1745
1746 for i in 0..20 {
1748 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1749 }
1750
1751 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1753 assert_eq!(limited.len(), 5);
1754
1755 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1757 assert_eq!(all.len(), 20);
1758
1759 cleanup_db(&db_path);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_count_prefix() {
1764 let db_path = temp_db_path("count_prefix");
1765 cleanup_db(&db_path);
1766
1767 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1768 let store = SqlStore::new(&url).await.unwrap();
1769
1770 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1772 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1773 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1774 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1775 store.put(&test_item("base:user.123", "base")).await.unwrap();
1776
1777 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1779 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1780 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1781 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1782 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1783
1784 cleanup_db(&db_path);
1785 }
1786
1787 #[tokio::test]
1788 async fn test_delete_prefix() {
1789 let db_path = temp_db_path("delete_prefix");
1790 cleanup_db(&db_path);
1791
1792 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1793 let store = SqlStore::new(&url).await.unwrap();
1794
1795 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1797 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1798 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1799 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1800 store.put(&test_item("base:user.123", "base")).await.unwrap();
1801
1802 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1804 assert_eq!(deleted, 3);
1805
1806 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1808 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1809
1810 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1812
1813 assert!(store.get("base:user.123").await.unwrap().is_some());
1815
1816 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1818 assert_eq!(zero, 0);
1819
1820 cleanup_db(&db_path);
1821 }
1822}