1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53 INSTALL_DRIVERS.call_once(|| {
54 sqlx::any::install_default_drivers();
55 });
56}
57
58pub struct SqlStore {
59 pool: AnyPool,
60 is_sqlite: bool,
61}
62
63impl SqlStore {
64 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66 install_drivers();
67
68 let is_sqlite = connection_string.starts_with("sqlite:");
69
70 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71 AnyPoolOptions::new()
72 .max_connections(20)
73 .acquire_timeout(Duration::from_secs(10))
74 .idle_timeout(Duration::from_secs(300))
75 .connect(connection_string)
76 .await
77 .map_err(|e| StorageError::Backend(e.to_string()))
78 })
79 .await?;
80
81 let store = Self { pool, is_sqlite };
82
83 if is_sqlite {
85 store.enable_wal_mode().await?;
86 }
87
88 store.init_schema().await?;
89
90 if !is_sqlite {
93 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
94 .execute(&store.pool)
95 .await
96 .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
97 }
98
99 Ok(store)
100 }
101
102 pub fn pool(&self) -> AnyPool {
104 self.pool.clone()
105 }
106
107 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
114 sqlx::query("PRAGMA journal_mode = WAL")
115 .execute(&self.pool)
116 .await
117 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
118
119 sqlx::query("PRAGMA synchronous = NORMAL")
122 .execute(&self.pool)
123 .await
124 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
125
126 Ok(())
127 }
128
129 async fn init_schema(&self) -> Result<(), StorageError> {
130 let sql = if self.is_sqlite {
140 r#"
141 CREATE TABLE IF NOT EXISTS sync_items (
142 id TEXT PRIMARY KEY,
143 version INTEGER NOT NULL DEFAULT 1,
144 timestamp INTEGER NOT NULL,
145 payload_hash TEXT,
146 payload TEXT,
147 payload_blob BLOB,
148 audit TEXT,
149 merkle_dirty INTEGER NOT NULL DEFAULT 1,
150 state TEXT NOT NULL DEFAULT 'default',
151 access_count INTEGER NOT NULL DEFAULT 0,
152 last_accessed INTEGER NOT NULL DEFAULT 0
153 )
154 "#
155 } else {
156 r#"
159 CREATE TABLE IF NOT EXISTS sync_items (
160 id VARCHAR(255) PRIMARY KEY,
161 version BIGINT NOT NULL DEFAULT 1,
162 timestamp BIGINT NOT NULL,
163 payload_hash VARCHAR(64),
164 payload LONGTEXT,
165 payload_blob MEDIUMBLOB,
166 audit TEXT,
167 merkle_dirty TINYINT NOT NULL DEFAULT 1,
168 state VARCHAR(32) NOT NULL DEFAULT 'default',
169 access_count BIGINT NOT NULL DEFAULT 0,
170 last_accessed BIGINT NOT NULL DEFAULT 0,
171 INDEX idx_timestamp (timestamp),
172 INDEX idx_merkle_dirty (merkle_dirty),
173 INDEX idx_state (state)
174 )
175 "#
176 };
177
178 retry("sql_init_schema", &RetryConfig::startup(), || async {
179 sqlx::query(sql)
180 .execute(&self.pool)
181 .await
182 .map_err(|e| StorageError::Backend(e.to_string()))
183 })
184 .await?;
185
186 Ok(())
187 }
188
189 fn build_audit_json(item: &SyncItem) -> Option<String> {
191 let mut audit = serde_json::Map::new();
192
193 if let Some(ref batch_id) = item.batch_id {
194 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
195 }
196 if let Some(ref trace_parent) = item.trace_parent {
197 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
198 }
199 if let Some(ref home) = item.home_instance_id {
200 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
201 }
202
203 if audit.is_empty() {
204 None
205 } else {
206 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
207 }
208 }
209
210 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
212 match audit_str {
213 Some(s) => {
214 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
215 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
216 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
217 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
218 (batch_id, trace_parent, home_instance_id)
219 } else {
220 (None, None, None)
221 }
222 }
223 None => (None, None, None),
224 }
225 }
226}
227
228#[async_trait]
229impl ArchiveStore for SqlStore {
230 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
231 let id = id.to_string();
232
233 retry("sql_get", &RetryConfig::query(), || async {
234 let result = sqlx::query(
235 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
236 )
237 .bind(&id)
238 .fetch_optional(&self.pool)
239 .await
240 .map_err(|e| StorageError::Backend(e.to_string()))?;
241
242 match result {
243 Some(row) => {
244 let version: i64 = row.try_get("version").unwrap_or(1);
245 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
246 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
247
248 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
250 .or_else(|| {
251 row.try_get::<Vec<u8>, _>("payload").ok()
252 .and_then(|bytes| String::from_utf8(bytes).ok())
253 });
254
255 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
256
257 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
259 .or_else(|| {
260 row.try_get::<Vec<u8>, _>("audit").ok()
261 .and_then(|bytes| String::from_utf8(bytes).ok())
262 });
263
264 let state: String = row.try_get::<String, _>("state").ok()
266 .or_else(|| {
267 row.try_get::<Vec<u8>, _>("state").ok()
268 .and_then(|bytes| String::from_utf8(bytes).ok())
269 })
270 .unwrap_or_else(|| "default".to_string());
271
272 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
274 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
275
276 let (content, content_type) = if let Some(ref json_str) = payload_json {
278 let content = json_str.as_bytes().to_vec();
280 (content, ContentType::Json)
281 } else if let Some(blob) = payload_blob {
282 (blob, ContentType::Binary)
284 } else {
285 return Err(StorageError::Backend("No payload in row".to_string()));
286 };
287
288 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
290
291 let item = SyncItem::reconstruct(
292 id.clone(),
293 version as u64,
294 timestamp,
295 content_type,
296 content,
297 batch_id,
298 trace_parent,
299 payload_hash.unwrap_or_default(),
300 home_instance_id,
301 state,
302 access_count as u64,
303 last_accessed as u64,
304 );
305 Ok(Some(item))
306 }
307 None => Ok(None),
308 }
309 })
310 .await
311 }
312
313 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
314 let id = item.object_id.clone();
315 let version = item.version as i64;
316 let timestamp = item.updated_at;
317 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
318 let audit_json = Self::build_audit_json(item);
319 let state = item.state.clone();
320
321 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
323 ContentType::Json => {
324 let json_str = String::from_utf8_lossy(&item.content).to_string();
325 (Some(json_str), None)
326 }
327 ContentType::Binary => {
328 (None, Some(item.content.clone()))
329 }
330 };
331
332 let sql = if self.is_sqlite {
333 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
334 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
335 ON CONFLICT(id) DO UPDATE SET
336 version = excluded.version,
337 timestamp = excluded.timestamp,
338 payload_hash = excluded.payload_hash,
339 payload = excluded.payload,
340 payload_blob = excluded.payload_blob,
341 audit = excluded.audit,
342 merkle_dirty = 1,
343 state = excluded.state,
344 access_count = excluded.access_count,
345 last_accessed = excluded.last_accessed"
346 } else {
347 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
348 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
349 ON DUPLICATE KEY UPDATE
350 version = VALUES(version),
351 timestamp = VALUES(timestamp),
352 payload_hash = VALUES(payload_hash),
353 payload = VALUES(payload),
354 payload_blob = VALUES(payload_blob),
355 audit = VALUES(audit),
356 merkle_dirty = 1,
357 state = VALUES(state),
358 access_count = VALUES(access_count),
359 last_accessed = VALUES(last_accessed)"
360 };
361
362 retry("sql_put", &RetryConfig::query(), || async {
363 sqlx::query(sql)
364 .bind(&id)
365 .bind(version)
366 .bind(timestamp)
367 .bind(&payload_hash)
368 .bind(&payload_json)
369 .bind(&payload_blob)
370 .bind(&audit_json)
371 .bind(&state)
372 .bind(item.access_count as i64)
373 .bind(item.last_accessed as i64)
374 .execute(&self.pool)
375 .await
376 .map_err(|e| StorageError::Backend(e.to_string()))?;
377 Ok(())
378 })
379 .await
380 }
381
382 async fn delete(&self, id: &str) -> Result<(), StorageError> {
383 let id = id.to_string();
384 retry("sql_delete", &RetryConfig::query(), || async {
385 sqlx::query("DELETE FROM sync_items WHERE id = ?")
386 .bind(&id)
387 .execute(&self.pool)
388 .await
389 .map_err(|e| StorageError::Backend(e.to_string()))?;
390 Ok(())
391 })
392 .await
393 }
394
395 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
396 let id = id.to_string();
397 retry("sql_exists", &RetryConfig::query(), || async {
398 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
399 .bind(&id)
400 .fetch_optional(&self.pool)
401 .await
402 .map_err(|e| StorageError::Backend(e.to_string()))?;
403 Ok(result.is_some())
404 })
405 .await
406 }
407
408 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
410 if items.is_empty() {
411 return Ok(BatchWriteResult {
412 batch_id: String::new(),
413 written: 0,
414 verified: true,
415 });
416 }
417
418 let batch_id = uuid::Uuid::new_v4().to_string();
420
421 for item in items.iter_mut() {
423 item.batch_id = Some(batch_id.clone());
424 }
425
426 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
428
429 const CHUNK_SIZE: usize = 500;
431 let mut total_written = 0usize;
432
433 for chunk in items.chunks(CHUNK_SIZE) {
434 let written = self.put_batch_chunk(chunk, &batch_id).await?;
435 total_written += written;
436 }
437
438 let verified_count = self.verify_batch_ids(&item_ids).await?;
440 let verified = verified_count == items.len();
441
442 if !verified {
443 tracing::warn!(
444 batch_id = %batch_id,
445 expected = items.len(),
446 actual = verified_count,
447 "Batch verification mismatch"
448 );
449 }
450
451 Ok(BatchWriteResult {
452 batch_id,
453 written: total_written,
454 verified,
455 })
456 }
457
458 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
459 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
460 .bind(limit as i64)
461 .bind(offset as i64)
462 .fetch_all(&self.pool)
463 .await
464 .map_err(|e| StorageError::Backend(e.to_string()))?;
465
466 let mut keys = Vec::with_capacity(rows.len());
467 for row in rows {
468 let id: String = row.try_get("id")
469 .map_err(|e| StorageError::Backend(e.to_string()))?;
470 keys.push(id);
471 }
472
473 Ok(keys)
474 }
475
476 async fn count_all(&self) -> Result<u64, StorageError> {
477 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
478 .fetch_one(&self.pool)
479 .await
480 .map_err(|e| StorageError::Backend(e.to_string()))?;
481
482 let count: i64 = result.try_get("cnt")
483 .map_err(|e| StorageError::Backend(e.to_string()))?;
484
485 Ok(count as u64)
486 }
487}
488
489impl SqlStore {
490 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
493 let placeholders: Vec<String> = (0..chunk.len())
494 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
495 .collect();
496
497 let sql = if self.is_sqlite {
498 format!(
499 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
500 ON CONFLICT(id) DO UPDATE SET \
501 version = excluded.version, \
502 timestamp = excluded.timestamp, \
503 payload_hash = excluded.payload_hash, \
504 payload = excluded.payload, \
505 payload_blob = excluded.payload_blob, \
506 audit = excluded.audit, \
507 merkle_dirty = 1, \
508 state = excluded.state, \
509 access_count = excluded.access_count, \
510 last_accessed = excluded.last_accessed",
511 placeholders.join(", ")
512 )
513 } else {
514 format!(
515 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
516 ON DUPLICATE KEY UPDATE \
517 version = VALUES(version), \
518 timestamp = VALUES(timestamp), \
519 payload_hash = VALUES(payload_hash), \
520 payload = VALUES(payload), \
521 payload_blob = VALUES(payload_blob), \
522 audit = VALUES(audit), \
523 merkle_dirty = 1, \
524 state = VALUES(state), \
525 access_count = VALUES(access_count), \
526 last_accessed = VALUES(last_accessed)",
527 placeholders.join(", ")
528 )
529 };
530
531 #[derive(Clone)]
533 struct PreparedRow {
534 id: String,
535 version: i64,
536 timestamp: i64,
537 payload_hash: Option<String>,
538 payload_json: Option<String>,
539 payload_blob: Option<Vec<u8>>,
540 audit_json: Option<String>,
541 state: String,
542 access_count: i64,
543 last_accessed: i64,
544 }
545
546 let prepared: Vec<PreparedRow> = chunk.iter()
547 .map(|item| {
548 let (payload_json, payload_blob) = match item.content_type {
549 ContentType::Json => {
550 let json_str = String::from_utf8_lossy(&item.content).to_string();
551 (Some(json_str), None)
552 }
553 ContentType::Binary => {
554 (None, Some(item.content.clone()))
555 }
556 };
557
558 PreparedRow {
559 id: item.object_id.clone(),
560 version: item.version as i64,
561 timestamp: item.updated_at,
562 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
563 payload_json,
564 payload_blob,
565 audit_json: Self::build_audit_json(item),
566 state: item.state.clone(),
567 access_count: item.access_count as i64,
568 last_accessed: item.last_accessed as i64,
569 }
570 })
571 .collect();
572
573 retry("sql_put_batch", &RetryConfig::batch_write(), || {
574 let sql = sql.clone();
575 let prepared = prepared.clone();
576 async move {
577 let mut query = sqlx::query(&sql);
578
579 for row in &prepared {
580 query = query
581 .bind(&row.id)
582 .bind(row.version)
583 .bind(row.timestamp)
584 .bind(&row.payload_hash)
585 .bind(&row.payload_json)
586 .bind(&row.payload_blob)
587 .bind(&row.audit_json)
588 .bind(&row.state)
589 .bind(row.access_count)
590 .bind(row.last_accessed);
591 }
592
593 query.execute(&self.pool)
594 .await
595 .map_err(|e| StorageError::Backend(e.to_string()))?;
596
597 Ok(())
598 }
599 })
600 .await?;
601
602 Ok(chunk.len())
603 }
604
605 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
608 if ids.is_empty() {
609 return Ok(0);
610 }
611
612 const CHUNK_SIZE: usize = 500;
614 let mut total_found = 0usize;
615
616 for chunk in ids.chunks(CHUNK_SIZE) {
617 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
618 let sql = format!(
619 "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
620 placeholders.join(", ")
621 );
622
623 let mut query = sqlx::query(&sql);
624 for id in chunk {
625 query = query.bind(id);
626 }
627
628 let result = query
629 .fetch_one(&self.pool)
630 .await
631 .map_err(|e| StorageError::Backend(e.to_string()))?;
632
633 let count: i64 = result
634 .try_get("cnt")
635 .map_err(|e| StorageError::Backend(e.to_string()))?;
636 total_found += count as usize;
637 }
638
639 Ok(total_found)
640 }
641
642 #[allow(dead_code)]
644 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
645 let batch_id = batch_id.to_string();
646
647 let sql = if self.is_sqlite {
649 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
650 } else {
651 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
652 };
653
654 let bind_value = if self.is_sqlite {
655 format!("%\"batch\":\"{}%", batch_id)
656 } else {
657 batch_id.clone()
658 };
659
660 let result = sqlx::query(sql)
661 .bind(&bind_value)
662 .fetch_one(&self.pool)
663 .await
664 .map_err(|e| StorageError::Backend(e.to_string()))?;
665
666 let count: i64 = result.try_get("cnt")
667 .map_err(|e| StorageError::Backend(e.to_string()))?;
668
669 Ok(count as usize)
670 }
671
672 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
674 let rows = sqlx::query(
675 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
676 )
677 .bind(limit as i64)
678 .fetch_all(&self.pool)
679 .await
680 .map_err(|e| StorageError::Backend(e.to_string()))?;
681
682 let mut items = Vec::with_capacity(rows.len());
683 for row in rows {
684 let id: String = row.try_get("id")
685 .map_err(|e| StorageError::Backend(e.to_string()))?;
686 let version: i64 = row.try_get("version").unwrap_or(1);
687 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
688 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
689
690 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
692 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
693 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
694 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
695 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
696
697 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
698 let state: String = state_bytes
699 .and_then(|bytes| String::from_utf8(bytes).ok())
700 .unwrap_or_else(|| "default".to_string());
701
702 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
704 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
705
706 let (content, content_type) = if let Some(ref json_str) = payload_json {
707 (json_str.as_bytes().to_vec(), ContentType::Json)
708 } else if let Some(blob) = payload_blob {
709 (blob, ContentType::Binary)
710 } else {
711 continue; };
713
714 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
715
716 let item = SyncItem::reconstruct(
717 id,
718 version as u64,
719 timestamp,
720 content_type,
721 content,
722 batch_id,
723 trace_parent,
724 payload_hash.unwrap_or_default(),
725 home_instance_id,
726 state,
727 access_count as u64,
728 last_accessed as u64,
729 );
730 items.push(item);
731 }
732
733 Ok(items)
734 }
735
736 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
738 if ids.is_empty() {
739 return Ok(0);
740 }
741
742 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
743 let sql = format!(
744 "DELETE FROM sync_items WHERE id IN ({})",
745 placeholders.join(", ")
746 );
747
748 retry("sql_delete_batch", &RetryConfig::query(), || {
749 let sql = sql.clone();
750 let ids = ids.to_vec();
751 async move {
752 let mut query = sqlx::query(&sql);
753 for id in &ids {
754 query = query.bind(id);
755 }
756
757 let result = query.execute(&self.pool)
758 .await
759 .map_err(|e| StorageError::Backend(e.to_string()))?;
760
761 Ok(result.rows_affected() as usize)
762 }
763 })
764 .await
765 }
766
767 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
775 let rows = sqlx::query(
776 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
777 )
778 .bind(limit as i64)
779 .fetch_all(&self.pool)
780 .await
781 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
782
783 let mut ids = Vec::with_capacity(rows.len());
784 for row in rows {
785 let id: String = row.try_get("id")
786 .map_err(|e| StorageError::Backend(e.to_string()))?;
787 ids.push(id);
788 }
789
790 Ok(ids)
791 }
792
793 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
795 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
796 .fetch_one(&self.pool)
797 .await
798 .map_err(|e| StorageError::Backend(e.to_string()))?;
799
800 let count: i64 = result.try_get("cnt")
801 .map_err(|e| StorageError::Backend(e.to_string()))?;
802
803 Ok(count as u64)
804 }
805
806 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
808 if ids.is_empty() {
809 return Ok(0);
810 }
811
812 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
813 let sql = format!(
814 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
815 placeholders.join(", ")
816 );
817
818 let mut query = sqlx::query(&sql);
819 for id in ids {
820 query = query.bind(id);
821 }
822
823 let result = query.execute(&self.pool)
824 .await
825 .map_err(|e| StorageError::Backend(e.to_string()))?;
826
827 Ok(result.rows_affected() as usize)
828 }
829
830 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
832 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
833 .fetch_optional(&self.pool)
834 .await
835 .map_err(|e| StorageError::Backend(e.to_string()))?;
836
837 Ok(result.is_some())
838 }
839
840 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
845 let rows = sqlx::query(
846 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
847 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
848 )
849 .bind(limit as i64)
850 .fetch_all(&self.pool)
851 .await
852 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
853
854 let mut items = Vec::with_capacity(rows.len());
855 for row in rows {
856 let id: String = row.try_get("id")
857 .map_err(|e| StorageError::Backend(e.to_string()))?;
858 let version: i64 = row.try_get("version").unwrap_or(1);
859 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
860 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
861
862 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
864 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
865 String::from_utf8(bytes).ok()
866 });
867
868 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
869 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
870 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
871 String::from_utf8(bytes).ok()
872 });
873
874 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
876 let state: String = state_bytes
877 .and_then(|bytes| String::from_utf8(bytes).ok())
878 .unwrap_or_else(|| "default".to_string());
879
880 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
882 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
883
884 let (content, content_type) = if let Some(ref json_str) = payload_json {
886 (json_str.as_bytes().to_vec(), ContentType::Json)
887 } else if let Some(blob) = payload_blob {
888 (blob, ContentType::Binary)
889 } else {
890 continue; };
892
893 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
895
896 let item = SyncItem::reconstruct(
897 id,
898 version as u64,
899 timestamp,
900 content_type,
901 content,
902 batch_id,
903 trace_parent,
904 payload_hash.unwrap_or_default(),
905 home_instance_id,
906 state,
907 access_count as u64,
908 last_accessed as u64,
909 );
910 items.push(item);
911 }
912
913 Ok(items)
914 }
915
916 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
924 let rows = sqlx::query(
925 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
926 FROM sync_items WHERE state = ? LIMIT ?"
927 )
928 .bind(state)
929 .bind(limit as i64)
930 .fetch_all(&self.pool)
931 .await
932 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
933
934 let mut items = Vec::with_capacity(rows.len());
935 for row in rows {
936 let id: String = row.try_get("id")
937 .map_err(|e| StorageError::Backend(e.to_string()))?;
938 let version: i64 = row.try_get("version").unwrap_or(1);
939 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
940 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
941
942 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
944 .or_else(|| {
945 row.try_get::<Vec<u8>, _>("payload").ok()
946 .and_then(|bytes| String::from_utf8(bytes).ok())
947 });
948
949 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
950
951 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
953 .or_else(|| {
954 row.try_get::<Vec<u8>, _>("audit").ok()
955 .and_then(|bytes| String::from_utf8(bytes).ok())
956 });
957
958 let state: String = row.try_get::<String, _>("state").ok()
960 .or_else(|| {
961 row.try_get::<Vec<u8>, _>("state").ok()
962 .and_then(|bytes| String::from_utf8(bytes).ok())
963 })
964 .unwrap_or_else(|| "default".to_string());
965
966 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
968 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
969
970 let (content, content_type) = if let Some(ref json_str) = payload_json {
971 (json_str.as_bytes().to_vec(), ContentType::Json)
972 } else if let Some(blob) = payload_blob {
973 (blob, ContentType::Binary)
974 } else {
975 continue;
976 };
977
978 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
979
980 let item = SyncItem::reconstruct(
981 id,
982 version as u64,
983 timestamp,
984 content_type,
985 content,
986 batch_id,
987 trace_parent,
988 payload_hash.unwrap_or_default(),
989 home_instance_id,
990 state,
991 access_count as u64,
992 last_accessed as u64,
993 );
994 items.push(item);
995 }
996
997 Ok(items)
998 }
999
1000 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1002 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1003 .bind(state)
1004 .fetch_one(&self.pool)
1005 .await
1006 .map_err(|e| StorageError::Backend(e.to_string()))?;
1007
1008 let count: i64 = result.try_get("cnt")
1009 .map_err(|e| StorageError::Backend(e.to_string()))?;
1010
1011 Ok(count as u64)
1012 }
1013
1014 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1016 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1017 .bind(state)
1018 .bind(limit as i64)
1019 .fetch_all(&self.pool)
1020 .await
1021 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1022
1023 let mut ids = Vec::with_capacity(rows.len());
1024 for row in rows {
1025 let id: String = row.try_get("id")
1026 .map_err(|e| StorageError::Backend(e.to_string()))?;
1027 ids.push(id);
1028 }
1029
1030 Ok(ids)
1031 }
1032
1033 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1035 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1036 .bind(new_state)
1037 .bind(id)
1038 .execute(&self.pool)
1039 .await
1040 .map_err(|e| StorageError::Backend(e.to_string()))?;
1041
1042 Ok(result.rows_affected() > 0)
1043 }
1044
1045 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1049 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1050 .bind(state)
1051 .execute(&self.pool)
1052 .await
1053 .map_err(|e| StorageError::Backend(e.to_string()))?;
1054
1055 Ok(result.rows_affected())
1056 }
1057
1058 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1069 let pattern = format!("{}%", prefix);
1071
1072 let rows = sqlx::query(
1073 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1074 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1075 )
1076 .bind(&pattern)
1077 .bind(limit as i64)
1078 .fetch_all(&self.pool)
1079 .await
1080 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1081
1082 let mut items = Vec::with_capacity(rows.len());
1083 for row in rows {
1084 let id: String = row.try_get("id")
1085 .map_err(|e| StorageError::Backend(e.to_string()))?;
1086 let version: i64 = row.try_get("version").unwrap_or(1);
1087 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1088 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1089
1090 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1092 .or_else(|| {
1093 row.try_get::<Vec<u8>, _>("payload").ok()
1094 .and_then(|bytes| String::from_utf8(bytes).ok())
1095 });
1096
1097 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1098
1099 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1101 .or_else(|| {
1102 row.try_get::<Vec<u8>, _>("audit").ok()
1103 .and_then(|bytes| String::from_utf8(bytes).ok())
1104 });
1105
1106 let state: String = row.try_get::<String, _>("state").ok()
1108 .or_else(|| {
1109 row.try_get::<Vec<u8>, _>("state").ok()
1110 .and_then(|bytes| String::from_utf8(bytes).ok())
1111 })
1112 .unwrap_or_else(|| "default".to_string());
1113
1114 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1116 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1117
1118 let (content, content_type) = if let Some(ref json_str) = payload_json {
1119 (json_str.as_bytes().to_vec(), ContentType::Json)
1120 } else if let Some(blob) = payload_blob {
1121 (blob, ContentType::Binary)
1122 } else {
1123 continue;
1124 };
1125
1126 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1127
1128 let item = SyncItem::reconstruct(
1129 id,
1130 version as u64,
1131 timestamp,
1132 content_type,
1133 content,
1134 batch_id,
1135 trace_parent,
1136 payload_hash.unwrap_or_default(),
1137 home_instance_id,
1138 state,
1139 access_count as u64,
1140 last_accessed as u64,
1141 );
1142 items.push(item);
1143 }
1144
1145 Ok(items)
1146 }
1147
1148 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1150 let pattern = format!("{}%", prefix);
1151
1152 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1153 .bind(&pattern)
1154 .fetch_one(&self.pool)
1155 .await
1156 .map_err(|e| StorageError::Backend(e.to_string()))?;
1157
1158 let count: i64 = result.try_get("cnt")
1159 .map_err(|e| StorageError::Backend(e.to_string()))?;
1160
1161 Ok(count as u64)
1162 }
1163
1164 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1168 let pattern = format!("{}%", prefix);
1169
1170 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1171 .bind(&pattern)
1172 .execute(&self.pool)
1173 .await
1174 .map_err(|e| StorageError::Backend(e.to_string()))?;
1175
1176 Ok(result.rows_affected())
1177 }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::*;
1183 use std::path::PathBuf;
1184 use serde_json::json;
1185
1186 fn temp_db_path(name: &str) -> PathBuf {
1187 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1189 }
1190
1191 fn cleanup_db(path: &PathBuf) {
1193 let _ = std::fs::remove_file(path);
1194 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1195 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1196 }
1197
1198 fn test_item(id: &str, state: &str) -> SyncItem {
1199 SyncItem::from_json(id.to_string(), json!({"id": id}))
1200 .with_state(state)
1201 }
1202
1203 #[tokio::test]
1204 async fn test_state_stored_and_retrieved() {
1205 let db_path = temp_db_path("stored");
1206 cleanup_db(&db_path);
1207
1208 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1209 let store = SqlStore::new(&url).await.unwrap();
1210
1211 let item = test_item("item1", "delta");
1213 store.put(&item).await.unwrap();
1214
1215 let retrieved = store.get("item1").await.unwrap().unwrap();
1217 assert_eq!(retrieved.state, "delta");
1218
1219 cleanup_db(&db_path);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_state_default_value() {
1224 let db_path = temp_db_path("default");
1225 cleanup_db(&db_path);
1226
1227 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1228 let store = SqlStore::new(&url).await.unwrap();
1229
1230 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1232 store.put(&item).await.unwrap();
1233
1234 let retrieved = store.get("item1").await.unwrap().unwrap();
1235 assert_eq!(retrieved.state, "default");
1236
1237 cleanup_db(&db_path);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_get_by_state() {
1242 let db_path = temp_db_path("get_by_state");
1243 cleanup_db(&db_path);
1244
1245 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1246 let store = SqlStore::new(&url).await.unwrap();
1247
1248 store.put(&test_item("delta1", "delta")).await.unwrap();
1250 store.put(&test_item("delta2", "delta")).await.unwrap();
1251 store.put(&test_item("base1", "base")).await.unwrap();
1252 store.put(&test_item("pending1", "pending")).await.unwrap();
1253
1254 let deltas = store.get_by_state("delta", 100).await.unwrap();
1256 assert_eq!(deltas.len(), 2);
1257 assert!(deltas.iter().all(|i| i.state == "delta"));
1258
1259 let bases = store.get_by_state("base", 100).await.unwrap();
1260 assert_eq!(bases.len(), 1);
1261 assert_eq!(bases[0].object_id, "base1");
1262
1263 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1265 assert!(none.is_empty());
1266
1267 cleanup_db(&db_path);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_get_by_state_with_limit() {
1272 let db_path = temp_db_path("get_by_state_limit");
1273 cleanup_db(&db_path);
1274
1275 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1276 let store = SqlStore::new(&url).await.unwrap();
1277
1278 for i in 0..10 {
1280 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1281 }
1282
1283 let limited = store.get_by_state("batch", 5).await.unwrap();
1285 assert_eq!(limited.len(), 5);
1286
1287 cleanup_db(&db_path);
1288 }
1289
1290 #[tokio::test]
1291 async fn test_count_by_state() {
1292 let db_path = temp_db_path("count_by_state");
1293 cleanup_db(&db_path);
1294
1295 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1296 let store = SqlStore::new(&url).await.unwrap();
1297
1298 store.put(&test_item("a1", "alpha")).await.unwrap();
1300 store.put(&test_item("a2", "alpha")).await.unwrap();
1301 store.put(&test_item("a3", "alpha")).await.unwrap();
1302 store.put(&test_item("b1", "beta")).await.unwrap();
1303
1304 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1305 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1306 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1307
1308 cleanup_db(&db_path);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_list_state_ids() {
1313 let db_path = temp_db_path("list_state_ids");
1314 cleanup_db(&db_path);
1315
1316 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1317 let store = SqlStore::new(&url).await.unwrap();
1318
1319 store.put(&test_item("id1", "pending")).await.unwrap();
1320 store.put(&test_item("id2", "pending")).await.unwrap();
1321 store.put(&test_item("id3", "done")).await.unwrap();
1322
1323 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1324 assert_eq!(pending_ids.len(), 2);
1325 assert!(pending_ids.contains(&"id1".to_string()));
1326 assert!(pending_ids.contains(&"id2".to_string()));
1327
1328 cleanup_db(&db_path);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_set_state() {
1333 let db_path = temp_db_path("set_state");
1334 cleanup_db(&db_path);
1335
1336 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1337 let store = SqlStore::new(&url).await.unwrap();
1338
1339 store.put(&test_item("item1", "pending")).await.unwrap();
1340
1341 let before = store.get("item1").await.unwrap().unwrap();
1343 assert_eq!(before.state, "pending");
1344
1345 let updated = store.set_state("item1", "approved").await.unwrap();
1347 assert!(updated);
1348
1349 let after = store.get("item1").await.unwrap().unwrap();
1351 assert_eq!(after.state, "approved");
1352
1353 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1355 assert!(!not_found);
1356
1357 cleanup_db(&db_path);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_delete_by_state() {
1362 let db_path = temp_db_path("delete_by_state");
1363 cleanup_db(&db_path);
1364
1365 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1366 let store = SqlStore::new(&url).await.unwrap();
1367
1368 store.put(&test_item("keep1", "keep")).await.unwrap();
1369 store.put(&test_item("keep2", "keep")).await.unwrap();
1370 store.put(&test_item("del1", "delete_me")).await.unwrap();
1371 store.put(&test_item("del2", "delete_me")).await.unwrap();
1372 store.put(&test_item("del3", "delete_me")).await.unwrap();
1373
1374 let deleted = store.delete_by_state("delete_me").await.unwrap();
1376 assert_eq!(deleted, 3);
1377
1378 assert!(store.get("del1").await.unwrap().is_none());
1380 assert!(store.get("del2").await.unwrap().is_none());
1381
1382 assert!(store.get("keep1").await.unwrap().is_some());
1384 assert!(store.get("keep2").await.unwrap().is_some());
1385
1386 let zero = store.delete_by_state("nonexistent").await.unwrap();
1388 assert_eq!(zero, 0);
1389
1390 cleanup_db(&db_path);
1391 }
1392
1393 #[tokio::test]
1394 async fn test_multiple_puts_preserve_state() {
1395 let db_path = temp_db_path("multi_put_state");
1396 cleanup_db(&db_path);
1397
1398 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1399 let store = SqlStore::new(&url).await.unwrap();
1400
1401 store.put(&test_item("a", "state_a")).await.unwrap();
1403 store.put(&test_item("b", "state_b")).await.unwrap();
1404 store.put(&test_item("c", "state_c")).await.unwrap();
1405
1406 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1407 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1408 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1409
1410 cleanup_db(&db_path);
1411 }
1412
1413 #[tokio::test]
1414 async fn test_scan_prefix() {
1415 let db_path = temp_db_path("scan_prefix");
1416 cleanup_db(&db_path);
1417
1418 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1419 let store = SqlStore::new(&url).await.unwrap();
1420
1421 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1423 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1424 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1425 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1426 store.put(&test_item("base:user.123", "base")).await.unwrap();
1427 store.put(&test_item("base:user.456", "base")).await.unwrap();
1428
1429 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1431 assert_eq!(user123_deltas.len(), 3);
1432 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1433
1434 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1436 assert_eq!(user456_deltas.len(), 1);
1437
1438 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1440 assert_eq!(all_deltas.len(), 4);
1441
1442 let bases = store.scan_prefix("base:", 100).await.unwrap();
1444 assert_eq!(bases.len(), 2);
1445
1446 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1448 assert!(none.is_empty());
1449
1450 cleanup_db(&db_path);
1451 }
1452
1453 #[tokio::test]
1454 async fn test_scan_prefix_with_limit() {
1455 let db_path = temp_db_path("scan_prefix_limit");
1456 cleanup_db(&db_path);
1457
1458 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1459 let store = SqlStore::new(&url).await.unwrap();
1460
1461 for i in 0..20 {
1463 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1464 }
1465
1466 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1468 assert_eq!(limited.len(), 5);
1469
1470 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1472 assert_eq!(all.len(), 20);
1473
1474 cleanup_db(&db_path);
1475 }
1476
1477 #[tokio::test]
1478 async fn test_count_prefix() {
1479 let db_path = temp_db_path("count_prefix");
1480 cleanup_db(&db_path);
1481
1482 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1483 let store = SqlStore::new(&url).await.unwrap();
1484
1485 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1487 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1488 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1489 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1490 store.put(&test_item("base:user.123", "base")).await.unwrap();
1491
1492 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1494 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1495 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1496 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1497 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1498
1499 cleanup_db(&db_path);
1500 }
1501
1502 #[tokio::test]
1503 async fn test_delete_prefix() {
1504 let db_path = temp_db_path("delete_prefix");
1505 cleanup_db(&db_path);
1506
1507 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1508 let store = SqlStore::new(&url).await.unwrap();
1509
1510 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1512 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1513 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1514 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1515 store.put(&test_item("base:user.123", "base")).await.unwrap();
1516
1517 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1519 assert_eq!(deleted, 3);
1520
1521 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1523 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1524
1525 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1527
1528 assert!(store.get("base:user.123").await.unwrap().is_some());
1530
1531 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1533 assert_eq!(zero, 0);
1534
1535 cleanup_db(&db_path);
1536 }
1537}