1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53 INSTALL_DRIVERS.call_once(|| {
54 sqlx::any::install_default_drivers();
55 });
56}
57
58pub struct SqlStore {
59 pool: AnyPool,
60 is_sqlite: bool,
61}
62
63impl SqlStore {
64 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66 install_drivers();
67
68 let is_sqlite = connection_string.starts_with("sqlite:");
69
70 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71 AnyPoolOptions::new()
72 .max_connections(20)
73 .acquire_timeout(Duration::from_secs(10))
74 .idle_timeout(Duration::from_secs(300))
75 .connect(connection_string)
76 .await
77 .map_err(|e| StorageError::Backend(e.to_string()))
78 })
79 .await?;
80
81 let store = Self { pool, is_sqlite };
82
83 if is_sqlite {
85 store.enable_wal_mode().await?;
86 }
87
88 store.init_schema().await?;
89 Ok(store)
90 }
91
92 pub fn pool(&self) -> AnyPool {
94 self.pool.clone()
95 }
96
97 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
104 sqlx::query("PRAGMA journal_mode = WAL")
105 .execute(&self.pool)
106 .await
107 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
108
109 sqlx::query("PRAGMA synchronous = NORMAL")
112 .execute(&self.pool)
113 .await
114 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
115
116 Ok(())
117 }
118
119 async fn init_schema(&self) -> Result<(), StorageError> {
120 let sql = if self.is_sqlite {
130 r#"
131 CREATE TABLE IF NOT EXISTS sync_items (
132 id TEXT PRIMARY KEY,
133 version INTEGER NOT NULL DEFAULT 1,
134 timestamp INTEGER NOT NULL,
135 payload_hash TEXT,
136 payload TEXT,
137 payload_blob BLOB,
138 audit TEXT,
139 merkle_dirty INTEGER NOT NULL DEFAULT 1,
140 state TEXT NOT NULL DEFAULT 'default',
141 access_count INTEGER NOT NULL DEFAULT 0,
142 last_accessed INTEGER NOT NULL DEFAULT 0
143 )
144 "#
145 } else {
146 r#"
149 CREATE TABLE IF NOT EXISTS sync_items (
150 id VARCHAR(255) PRIMARY KEY,
151 version BIGINT NOT NULL DEFAULT 1,
152 timestamp BIGINT NOT NULL,
153 payload_hash VARCHAR(64),
154 payload LONGTEXT,
155 payload_blob MEDIUMBLOB,
156 audit TEXT,
157 merkle_dirty TINYINT NOT NULL DEFAULT 1,
158 state VARCHAR(32) NOT NULL DEFAULT 'default',
159 access_count BIGINT NOT NULL DEFAULT 0,
160 last_accessed BIGINT NOT NULL DEFAULT 0,
161 INDEX idx_timestamp (timestamp),
162 INDEX idx_merkle_dirty (merkle_dirty),
163 INDEX idx_state (state)
164 )
165 "#
166 };
167
168 retry("sql_init_schema", &RetryConfig::startup(), || async {
169 sqlx::query(sql)
170 .execute(&self.pool)
171 .await
172 .map_err(|e| StorageError::Backend(e.to_string()))
173 })
174 .await?;
175
176 Ok(())
177 }
178
179 fn build_audit_json(item: &SyncItem) -> Option<String> {
181 let mut audit = serde_json::Map::new();
182
183 if let Some(ref batch_id) = item.batch_id {
184 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
185 }
186 if let Some(ref trace_parent) = item.trace_parent {
187 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
188 }
189 if let Some(ref home) = item.home_instance_id {
190 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
191 }
192
193 if audit.is_empty() {
194 None
195 } else {
196 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
197 }
198 }
199
200 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
202 match audit_str {
203 Some(s) => {
204 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
205 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
206 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
207 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
208 (batch_id, trace_parent, home_instance_id)
209 } else {
210 (None, None, None)
211 }
212 }
213 None => (None, None, None),
214 }
215 }
216}
217
218#[async_trait]
219impl ArchiveStore for SqlStore {
220 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
221 let id = id.to_string();
222
223 retry("sql_get", &RetryConfig::query(), || async {
224 let result = sqlx::query(
225 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
226 )
227 .bind(&id)
228 .fetch_optional(&self.pool)
229 .await
230 .map_err(|e| StorageError::Backend(e.to_string()))?;
231
232 match result {
233 Some(row) => {
234 let version: i64 = row.try_get("version").unwrap_or(1);
235 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
236 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
237
238 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
240 .or_else(|| {
241 row.try_get::<Vec<u8>, _>("payload").ok()
242 .and_then(|bytes| String::from_utf8(bytes).ok())
243 });
244
245 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
246
247 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
249 .or_else(|| {
250 row.try_get::<Vec<u8>, _>("audit").ok()
251 .and_then(|bytes| String::from_utf8(bytes).ok())
252 });
253
254 let state: String = row.try_get::<String, _>("state").ok()
256 .or_else(|| {
257 row.try_get::<Vec<u8>, _>("state").ok()
258 .and_then(|bytes| String::from_utf8(bytes).ok())
259 })
260 .unwrap_or_else(|| "default".to_string());
261
262 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
264 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
265
266 let (content, content_type) = if let Some(ref json_str) = payload_json {
268 let content = json_str.as_bytes().to_vec();
270 (content, ContentType::Json)
271 } else if let Some(blob) = payload_blob {
272 (blob, ContentType::Binary)
274 } else {
275 return Err(StorageError::Backend("No payload in row".to_string()));
276 };
277
278 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
280
281 let item = SyncItem::reconstruct(
282 id.clone(),
283 version as u64,
284 timestamp,
285 content_type,
286 content,
287 batch_id,
288 trace_parent,
289 payload_hash.unwrap_or_default(),
290 home_instance_id,
291 state,
292 access_count as u64,
293 last_accessed as u64,
294 );
295 Ok(Some(item))
296 }
297 None => Ok(None),
298 }
299 })
300 .await
301 }
302
303 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
304 let id = item.object_id.clone();
305 let version = item.version as i64;
306 let timestamp = item.updated_at;
307 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
308 let audit_json = Self::build_audit_json(item);
309 let state = item.state.clone();
310
311 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
313 ContentType::Json => {
314 let json_str = String::from_utf8_lossy(&item.content).to_string();
315 (Some(json_str), None)
316 }
317 ContentType::Binary => {
318 (None, Some(item.content.clone()))
319 }
320 };
321
322 let sql = if self.is_sqlite {
323 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
324 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
325 ON CONFLICT(id) DO UPDATE SET
326 version = excluded.version,
327 timestamp = excluded.timestamp,
328 payload_hash = excluded.payload_hash,
329 payload = excluded.payload,
330 payload_blob = excluded.payload_blob,
331 audit = excluded.audit,
332 merkle_dirty = 1,
333 state = excluded.state,
334 access_count = excluded.access_count,
335 last_accessed = excluded.last_accessed"
336 } else {
337 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
338 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
339 ON DUPLICATE KEY UPDATE
340 version = VALUES(version),
341 timestamp = VALUES(timestamp),
342 payload_hash = VALUES(payload_hash),
343 payload = VALUES(payload),
344 payload_blob = VALUES(payload_blob),
345 audit = VALUES(audit),
346 merkle_dirty = 1,
347 state = VALUES(state),
348 access_count = VALUES(access_count),
349 last_accessed = VALUES(last_accessed)"
350 };
351
352 retry("sql_put", &RetryConfig::query(), || async {
353 sqlx::query(sql)
354 .bind(&id)
355 .bind(version)
356 .bind(timestamp)
357 .bind(&payload_hash)
358 .bind(&payload_json)
359 .bind(&payload_blob)
360 .bind(&audit_json)
361 .bind(&state)
362 .bind(item.access_count as i64)
363 .bind(item.last_accessed as i64)
364 .execute(&self.pool)
365 .await
366 .map_err(|e| StorageError::Backend(e.to_string()))?;
367 Ok(())
368 })
369 .await
370 }
371
372 async fn delete(&self, id: &str) -> Result<(), StorageError> {
373 let id = id.to_string();
374 retry("sql_delete", &RetryConfig::query(), || async {
375 sqlx::query("DELETE FROM sync_items WHERE id = ?")
376 .bind(&id)
377 .execute(&self.pool)
378 .await
379 .map_err(|e| StorageError::Backend(e.to_string()))?;
380 Ok(())
381 })
382 .await
383 }
384
385 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
386 let id = id.to_string();
387 retry("sql_exists", &RetryConfig::query(), || async {
388 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
389 .bind(&id)
390 .fetch_optional(&self.pool)
391 .await
392 .map_err(|e| StorageError::Backend(e.to_string()))?;
393 Ok(result.is_some())
394 })
395 .await
396 }
397
398 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
400 if items.is_empty() {
401 return Ok(BatchWriteResult {
402 batch_id: String::new(),
403 written: 0,
404 verified: true,
405 });
406 }
407
408 let batch_id = uuid::Uuid::new_v4().to_string();
410
411 for item in items.iter_mut() {
413 item.batch_id = Some(batch_id.clone());
414 }
415
416 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
418
419 const CHUNK_SIZE: usize = 500;
421 let mut total_written = 0usize;
422
423 for chunk in items.chunks(CHUNK_SIZE) {
424 let written = self.put_batch_chunk(chunk, &batch_id).await?;
425 total_written += written;
426 }
427
428 let verified_count = self.verify_batch_ids(&item_ids).await?;
430 let verified = verified_count == items.len();
431
432 if !verified {
433 tracing::warn!(
434 batch_id = %batch_id,
435 expected = items.len(),
436 actual = verified_count,
437 "Batch verification mismatch"
438 );
439 }
440
441 Ok(BatchWriteResult {
442 batch_id,
443 written: total_written,
444 verified,
445 })
446 }
447
448 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
449 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
450 .bind(limit as i64)
451 .bind(offset as i64)
452 .fetch_all(&self.pool)
453 .await
454 .map_err(|e| StorageError::Backend(e.to_string()))?;
455
456 let mut keys = Vec::with_capacity(rows.len());
457 for row in rows {
458 let id: String = row.try_get("id")
459 .map_err(|e| StorageError::Backend(e.to_string()))?;
460 keys.push(id);
461 }
462
463 Ok(keys)
464 }
465
466 async fn count_all(&self) -> Result<u64, StorageError> {
467 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
468 .fetch_one(&self.pool)
469 .await
470 .map_err(|e| StorageError::Backend(e.to_string()))?;
471
472 let count: i64 = result.try_get("cnt")
473 .map_err(|e| StorageError::Backend(e.to_string()))?;
474
475 Ok(count as u64)
476 }
477}
478
479impl SqlStore {
480 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
483 let placeholders: Vec<String> = (0..chunk.len())
484 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
485 .collect();
486
487 let sql = if self.is_sqlite {
488 format!(
489 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
490 ON CONFLICT(id) DO UPDATE SET \
491 version = excluded.version, \
492 timestamp = excluded.timestamp, \
493 payload_hash = excluded.payload_hash, \
494 payload = excluded.payload, \
495 payload_blob = excluded.payload_blob, \
496 audit = excluded.audit, \
497 merkle_dirty = 1, \
498 state = excluded.state, \
499 access_count = excluded.access_count, \
500 last_accessed = excluded.last_accessed",
501 placeholders.join(", ")
502 )
503 } else {
504 format!(
505 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
506 ON DUPLICATE KEY UPDATE \
507 version = VALUES(version), \
508 timestamp = VALUES(timestamp), \
509 payload_hash = VALUES(payload_hash), \
510 payload = VALUES(payload), \
511 payload_blob = VALUES(payload_blob), \
512 audit = VALUES(audit), \
513 merkle_dirty = 1, \
514 state = VALUES(state), \
515 access_count = VALUES(access_count), \
516 last_accessed = VALUES(last_accessed)",
517 placeholders.join(", ")
518 )
519 };
520
521 #[derive(Clone)]
523 struct PreparedRow {
524 id: String,
525 version: i64,
526 timestamp: i64,
527 payload_hash: Option<String>,
528 payload_json: Option<String>,
529 payload_blob: Option<Vec<u8>>,
530 audit_json: Option<String>,
531 state: String,
532 access_count: i64,
533 last_accessed: i64,
534 }
535
536 let prepared: Vec<PreparedRow> = chunk.iter()
537 .map(|item| {
538 let (payload_json, payload_blob) = match item.content_type {
539 ContentType::Json => {
540 let json_str = String::from_utf8_lossy(&item.content).to_string();
541 (Some(json_str), None)
542 }
543 ContentType::Binary => {
544 (None, Some(item.content.clone()))
545 }
546 };
547
548 PreparedRow {
549 id: item.object_id.clone(),
550 version: item.version as i64,
551 timestamp: item.updated_at,
552 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
553 payload_json,
554 payload_blob,
555 audit_json: Self::build_audit_json(item),
556 state: item.state.clone(),
557 access_count: item.access_count as i64,
558 last_accessed: item.last_accessed as i64,
559 }
560 })
561 .collect();
562
563 retry("sql_put_batch", &RetryConfig::batch_write(), || {
564 let sql = sql.clone();
565 let prepared = prepared.clone();
566 async move {
567 let mut query = sqlx::query(&sql);
568
569 for row in &prepared {
570 query = query
571 .bind(&row.id)
572 .bind(row.version)
573 .bind(row.timestamp)
574 .bind(&row.payload_hash)
575 .bind(&row.payload_json)
576 .bind(&row.payload_blob)
577 .bind(&row.audit_json)
578 .bind(&row.state)
579 .bind(row.access_count)
580 .bind(row.last_accessed);
581 }
582
583 query.execute(&self.pool)
584 .await
585 .map_err(|e| StorageError::Backend(e.to_string()))?;
586
587 Ok(())
588 }
589 })
590 .await?;
591
592 Ok(chunk.len())
593 }
594
595 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
598 if ids.is_empty() {
599 return Ok(0);
600 }
601
602 const CHUNK_SIZE: usize = 500;
604 let mut total_found = 0usize;
605
606 for chunk in ids.chunks(CHUNK_SIZE) {
607 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
608 let sql = format!(
609 "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
610 placeholders.join(", ")
611 );
612
613 let mut query = sqlx::query(&sql);
614 for id in chunk {
615 query = query.bind(id);
616 }
617
618 let result = query
619 .fetch_one(&self.pool)
620 .await
621 .map_err(|e| StorageError::Backend(e.to_string()))?;
622
623 let count: i64 = result
624 .try_get("cnt")
625 .map_err(|e| StorageError::Backend(e.to_string()))?;
626 total_found += count as usize;
627 }
628
629 Ok(total_found)
630 }
631
632 #[allow(dead_code)]
634 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
635 let batch_id = batch_id.to_string();
636
637 let sql = if self.is_sqlite {
639 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
640 } else {
641 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
642 };
643
644 let bind_value = if self.is_sqlite {
645 format!("%\"batch\":\"{}%", batch_id)
646 } else {
647 batch_id.clone()
648 };
649
650 let result = sqlx::query(sql)
651 .bind(&bind_value)
652 .fetch_one(&self.pool)
653 .await
654 .map_err(|e| StorageError::Backend(e.to_string()))?;
655
656 let count: i64 = result.try_get("cnt")
657 .map_err(|e| StorageError::Backend(e.to_string()))?;
658
659 Ok(count as usize)
660 }
661
662 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
664 let rows = sqlx::query(
665 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
666 )
667 .bind(limit as i64)
668 .fetch_all(&self.pool)
669 .await
670 .map_err(|e| StorageError::Backend(e.to_string()))?;
671
672 let mut items = Vec::with_capacity(rows.len());
673 for row in rows {
674 let id: String = row.try_get("id")
675 .map_err(|e| StorageError::Backend(e.to_string()))?;
676 let version: i64 = row.try_get("version").unwrap_or(1);
677 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
678 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
679
680 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
682 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
683 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
684 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
685 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
686
687 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
688 let state: String = state_bytes
689 .and_then(|bytes| String::from_utf8(bytes).ok())
690 .unwrap_or_else(|| "default".to_string());
691
692 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
694 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
695
696 let (content, content_type) = if let Some(ref json_str) = payload_json {
697 (json_str.as_bytes().to_vec(), ContentType::Json)
698 } else if let Some(blob) = payload_blob {
699 (blob, ContentType::Binary)
700 } else {
701 continue; };
703
704 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
705
706 let item = SyncItem::reconstruct(
707 id,
708 version as u64,
709 timestamp,
710 content_type,
711 content,
712 batch_id,
713 trace_parent,
714 payload_hash.unwrap_or_default(),
715 home_instance_id,
716 state,
717 access_count as u64,
718 last_accessed as u64,
719 );
720 items.push(item);
721 }
722
723 Ok(items)
724 }
725
726 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
728 if ids.is_empty() {
729 return Ok(0);
730 }
731
732 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
733 let sql = format!(
734 "DELETE FROM sync_items WHERE id IN ({})",
735 placeholders.join(", ")
736 );
737
738 retry("sql_delete_batch", &RetryConfig::query(), || {
739 let sql = sql.clone();
740 let ids = ids.to_vec();
741 async move {
742 let mut query = sqlx::query(&sql);
743 for id in &ids {
744 query = query.bind(id);
745 }
746
747 let result = query.execute(&self.pool)
748 .await
749 .map_err(|e| StorageError::Backend(e.to_string()))?;
750
751 Ok(result.rows_affected() as usize)
752 }
753 })
754 .await
755 }
756
757 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
765 let rows = sqlx::query(
766 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
767 )
768 .bind(limit as i64)
769 .fetch_all(&self.pool)
770 .await
771 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
772
773 let mut ids = Vec::with_capacity(rows.len());
774 for row in rows {
775 let id: String = row.try_get("id")
776 .map_err(|e| StorageError::Backend(e.to_string()))?;
777 ids.push(id);
778 }
779
780 Ok(ids)
781 }
782
783 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
785 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
786 .fetch_one(&self.pool)
787 .await
788 .map_err(|e| StorageError::Backend(e.to_string()))?;
789
790 let count: i64 = result.try_get("cnt")
791 .map_err(|e| StorageError::Backend(e.to_string()))?;
792
793 Ok(count as u64)
794 }
795
796 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
798 if ids.is_empty() {
799 return Ok(0);
800 }
801
802 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
803 let sql = format!(
804 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
805 placeholders.join(", ")
806 );
807
808 let mut query = sqlx::query(&sql);
809 for id in ids {
810 query = query.bind(id);
811 }
812
813 let result = query.execute(&self.pool)
814 .await
815 .map_err(|e| StorageError::Backend(e.to_string()))?;
816
817 Ok(result.rows_affected() as usize)
818 }
819
820 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
822 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
823 .fetch_optional(&self.pool)
824 .await
825 .map_err(|e| StorageError::Backend(e.to_string()))?;
826
827 Ok(result.is_some())
828 }
829
830 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
835 let rows = sqlx::query(
836 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
837 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
838 )
839 .bind(limit as i64)
840 .fetch_all(&self.pool)
841 .await
842 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
843
844 let mut items = Vec::with_capacity(rows.len());
845 for row in rows {
846 let id: String = row.try_get("id")
847 .map_err(|e| StorageError::Backend(e.to_string()))?;
848 let version: i64 = row.try_get("version").unwrap_or(1);
849 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
850 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
851
852 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
854 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
855 String::from_utf8(bytes).ok()
856 });
857
858 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
859 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
860 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
861 String::from_utf8(bytes).ok()
862 });
863
864 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
866 let state: String = state_bytes
867 .and_then(|bytes| String::from_utf8(bytes).ok())
868 .unwrap_or_else(|| "default".to_string());
869
870 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
872 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
873
874 let (content, content_type) = if let Some(ref json_str) = payload_json {
876 (json_str.as_bytes().to_vec(), ContentType::Json)
877 } else if let Some(blob) = payload_blob {
878 (blob, ContentType::Binary)
879 } else {
880 continue; };
882
883 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
885
886 let item = SyncItem::reconstruct(
887 id,
888 version as u64,
889 timestamp,
890 content_type,
891 content,
892 batch_id,
893 trace_parent,
894 payload_hash.unwrap_or_default(),
895 home_instance_id,
896 state,
897 access_count as u64,
898 last_accessed as u64,
899 );
900 items.push(item);
901 }
902
903 Ok(items)
904 }
905
906 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
914 let rows = sqlx::query(
915 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
916 FROM sync_items WHERE state = ? LIMIT ?"
917 )
918 .bind(state)
919 .bind(limit as i64)
920 .fetch_all(&self.pool)
921 .await
922 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
923
924 let mut items = Vec::with_capacity(rows.len());
925 for row in rows {
926 let id: String = row.try_get("id")
927 .map_err(|e| StorageError::Backend(e.to_string()))?;
928 let version: i64 = row.try_get("version").unwrap_or(1);
929 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
930 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
931
932 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
934 .or_else(|| {
935 row.try_get::<Vec<u8>, _>("payload").ok()
936 .and_then(|bytes| String::from_utf8(bytes).ok())
937 });
938
939 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
940
941 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
943 .or_else(|| {
944 row.try_get::<Vec<u8>, _>("audit").ok()
945 .and_then(|bytes| String::from_utf8(bytes).ok())
946 });
947
948 let state: String = row.try_get::<String, _>("state").ok()
950 .or_else(|| {
951 row.try_get::<Vec<u8>, _>("state").ok()
952 .and_then(|bytes| String::from_utf8(bytes).ok())
953 })
954 .unwrap_or_else(|| "default".to_string());
955
956 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
958 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
959
960 let (content, content_type) = if let Some(ref json_str) = payload_json {
961 (json_str.as_bytes().to_vec(), ContentType::Json)
962 } else if let Some(blob) = payload_blob {
963 (blob, ContentType::Binary)
964 } else {
965 continue;
966 };
967
968 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
969
970 let item = SyncItem::reconstruct(
971 id,
972 version as u64,
973 timestamp,
974 content_type,
975 content,
976 batch_id,
977 trace_parent,
978 payload_hash.unwrap_or_default(),
979 home_instance_id,
980 state,
981 access_count as u64,
982 last_accessed as u64,
983 );
984 items.push(item);
985 }
986
987 Ok(items)
988 }
989
990 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
992 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
993 .bind(state)
994 .fetch_one(&self.pool)
995 .await
996 .map_err(|e| StorageError::Backend(e.to_string()))?;
997
998 let count: i64 = result.try_get("cnt")
999 .map_err(|e| StorageError::Backend(e.to_string()))?;
1000
1001 Ok(count as u64)
1002 }
1003
1004 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1006 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1007 .bind(state)
1008 .bind(limit as i64)
1009 .fetch_all(&self.pool)
1010 .await
1011 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1012
1013 let mut ids = Vec::with_capacity(rows.len());
1014 for row in rows {
1015 let id: String = row.try_get("id")
1016 .map_err(|e| StorageError::Backend(e.to_string()))?;
1017 ids.push(id);
1018 }
1019
1020 Ok(ids)
1021 }
1022
1023 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1025 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1026 .bind(new_state)
1027 .bind(id)
1028 .execute(&self.pool)
1029 .await
1030 .map_err(|e| StorageError::Backend(e.to_string()))?;
1031
1032 Ok(result.rows_affected() > 0)
1033 }
1034
1035 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1039 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1040 .bind(state)
1041 .execute(&self.pool)
1042 .await
1043 .map_err(|e| StorageError::Backend(e.to_string()))?;
1044
1045 Ok(result.rows_affected())
1046 }
1047
1048 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1059 let pattern = format!("{}%", prefix);
1061
1062 let rows = sqlx::query(
1063 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1064 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1065 )
1066 .bind(&pattern)
1067 .bind(limit as i64)
1068 .fetch_all(&self.pool)
1069 .await
1070 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1071
1072 let mut items = Vec::with_capacity(rows.len());
1073 for row in rows {
1074 let id: String = row.try_get("id")
1075 .map_err(|e| StorageError::Backend(e.to_string()))?;
1076 let version: i64 = row.try_get("version").unwrap_or(1);
1077 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1078 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1079
1080 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1082 .or_else(|| {
1083 row.try_get::<Vec<u8>, _>("payload").ok()
1084 .and_then(|bytes| String::from_utf8(bytes).ok())
1085 });
1086
1087 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1088
1089 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1091 .or_else(|| {
1092 row.try_get::<Vec<u8>, _>("audit").ok()
1093 .and_then(|bytes| String::from_utf8(bytes).ok())
1094 });
1095
1096 let state: String = row.try_get::<String, _>("state").ok()
1098 .or_else(|| {
1099 row.try_get::<Vec<u8>, _>("state").ok()
1100 .and_then(|bytes| String::from_utf8(bytes).ok())
1101 })
1102 .unwrap_or_else(|| "default".to_string());
1103
1104 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1106 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1107
1108 let (content, content_type) = if let Some(ref json_str) = payload_json {
1109 (json_str.as_bytes().to_vec(), ContentType::Json)
1110 } else if let Some(blob) = payload_blob {
1111 (blob, ContentType::Binary)
1112 } else {
1113 continue;
1114 };
1115
1116 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1117
1118 let item = SyncItem::reconstruct(
1119 id,
1120 version as u64,
1121 timestamp,
1122 content_type,
1123 content,
1124 batch_id,
1125 trace_parent,
1126 payload_hash.unwrap_or_default(),
1127 home_instance_id,
1128 state,
1129 access_count as u64,
1130 last_accessed as u64,
1131 );
1132 items.push(item);
1133 }
1134
1135 Ok(items)
1136 }
1137
1138 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1140 let pattern = format!("{}%", prefix);
1141
1142 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1143 .bind(&pattern)
1144 .fetch_one(&self.pool)
1145 .await
1146 .map_err(|e| StorageError::Backend(e.to_string()))?;
1147
1148 let count: i64 = result.try_get("cnt")
1149 .map_err(|e| StorageError::Backend(e.to_string()))?;
1150
1151 Ok(count as u64)
1152 }
1153
1154 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1158 let pattern = format!("{}%", prefix);
1159
1160 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1161 .bind(&pattern)
1162 .execute(&self.pool)
1163 .await
1164 .map_err(|e| StorageError::Backend(e.to_string()))?;
1165
1166 Ok(result.rows_affected())
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173 use std::path::PathBuf;
1174 use serde_json::json;
1175
1176 fn temp_db_path(name: &str) -> PathBuf {
1177 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1179 }
1180
1181 fn cleanup_db(path: &PathBuf) {
1183 let _ = std::fs::remove_file(path);
1184 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1185 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1186 }
1187
1188 fn test_item(id: &str, state: &str) -> SyncItem {
1189 SyncItem::from_json(id.to_string(), json!({"id": id}))
1190 .with_state(state)
1191 }
1192
1193 #[tokio::test]
1194 async fn test_state_stored_and_retrieved() {
1195 let db_path = temp_db_path("stored");
1196 cleanup_db(&db_path);
1197
1198 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1199 let store = SqlStore::new(&url).await.unwrap();
1200
1201 let item = test_item("item1", "delta");
1203 store.put(&item).await.unwrap();
1204
1205 let retrieved = store.get("item1").await.unwrap().unwrap();
1207 assert_eq!(retrieved.state, "delta");
1208
1209 cleanup_db(&db_path);
1210 }
1211
1212 #[tokio::test]
1213 async fn test_state_default_value() {
1214 let db_path = temp_db_path("default");
1215 cleanup_db(&db_path);
1216
1217 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1218 let store = SqlStore::new(&url).await.unwrap();
1219
1220 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1222 store.put(&item).await.unwrap();
1223
1224 let retrieved = store.get("item1").await.unwrap().unwrap();
1225 assert_eq!(retrieved.state, "default");
1226
1227 cleanup_db(&db_path);
1228 }
1229
1230 #[tokio::test]
1231 async fn test_get_by_state() {
1232 let db_path = temp_db_path("get_by_state");
1233 cleanup_db(&db_path);
1234
1235 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1236 let store = SqlStore::new(&url).await.unwrap();
1237
1238 store.put(&test_item("delta1", "delta")).await.unwrap();
1240 store.put(&test_item("delta2", "delta")).await.unwrap();
1241 store.put(&test_item("base1", "base")).await.unwrap();
1242 store.put(&test_item("pending1", "pending")).await.unwrap();
1243
1244 let deltas = store.get_by_state("delta", 100).await.unwrap();
1246 assert_eq!(deltas.len(), 2);
1247 assert!(deltas.iter().all(|i| i.state == "delta"));
1248
1249 let bases = store.get_by_state("base", 100).await.unwrap();
1250 assert_eq!(bases.len(), 1);
1251 assert_eq!(bases[0].object_id, "base1");
1252
1253 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1255 assert!(none.is_empty());
1256
1257 cleanup_db(&db_path);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_get_by_state_with_limit() {
1262 let db_path = temp_db_path("get_by_state_limit");
1263 cleanup_db(&db_path);
1264
1265 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266 let store = SqlStore::new(&url).await.unwrap();
1267
1268 for i in 0..10 {
1270 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1271 }
1272
1273 let limited = store.get_by_state("batch", 5).await.unwrap();
1275 assert_eq!(limited.len(), 5);
1276
1277 cleanup_db(&db_path);
1278 }
1279
1280 #[tokio::test]
1281 async fn test_count_by_state() {
1282 let db_path = temp_db_path("count_by_state");
1283 cleanup_db(&db_path);
1284
1285 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1286 let store = SqlStore::new(&url).await.unwrap();
1287
1288 store.put(&test_item("a1", "alpha")).await.unwrap();
1290 store.put(&test_item("a2", "alpha")).await.unwrap();
1291 store.put(&test_item("a3", "alpha")).await.unwrap();
1292 store.put(&test_item("b1", "beta")).await.unwrap();
1293
1294 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1295 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1296 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1297
1298 cleanup_db(&db_path);
1299 }
1300
1301 #[tokio::test]
1302 async fn test_list_state_ids() {
1303 let db_path = temp_db_path("list_state_ids");
1304 cleanup_db(&db_path);
1305
1306 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1307 let store = SqlStore::new(&url).await.unwrap();
1308
1309 store.put(&test_item("id1", "pending")).await.unwrap();
1310 store.put(&test_item("id2", "pending")).await.unwrap();
1311 store.put(&test_item("id3", "done")).await.unwrap();
1312
1313 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1314 assert_eq!(pending_ids.len(), 2);
1315 assert!(pending_ids.contains(&"id1".to_string()));
1316 assert!(pending_ids.contains(&"id2".to_string()));
1317
1318 cleanup_db(&db_path);
1319 }
1320
1321 #[tokio::test]
1322 async fn test_set_state() {
1323 let db_path = temp_db_path("set_state");
1324 cleanup_db(&db_path);
1325
1326 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1327 let store = SqlStore::new(&url).await.unwrap();
1328
1329 store.put(&test_item("item1", "pending")).await.unwrap();
1330
1331 let before = store.get("item1").await.unwrap().unwrap();
1333 assert_eq!(before.state, "pending");
1334
1335 let updated = store.set_state("item1", "approved").await.unwrap();
1337 assert!(updated);
1338
1339 let after = store.get("item1").await.unwrap().unwrap();
1341 assert_eq!(after.state, "approved");
1342
1343 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1345 assert!(!not_found);
1346
1347 cleanup_db(&db_path);
1348 }
1349
1350 #[tokio::test]
1351 async fn test_delete_by_state() {
1352 let db_path = temp_db_path("delete_by_state");
1353 cleanup_db(&db_path);
1354
1355 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1356 let store = SqlStore::new(&url).await.unwrap();
1357
1358 store.put(&test_item("keep1", "keep")).await.unwrap();
1359 store.put(&test_item("keep2", "keep")).await.unwrap();
1360 store.put(&test_item("del1", "delete_me")).await.unwrap();
1361 store.put(&test_item("del2", "delete_me")).await.unwrap();
1362 store.put(&test_item("del3", "delete_me")).await.unwrap();
1363
1364 let deleted = store.delete_by_state("delete_me").await.unwrap();
1366 assert_eq!(deleted, 3);
1367
1368 assert!(store.get("del1").await.unwrap().is_none());
1370 assert!(store.get("del2").await.unwrap().is_none());
1371
1372 assert!(store.get("keep1").await.unwrap().is_some());
1374 assert!(store.get("keep2").await.unwrap().is_some());
1375
1376 let zero = store.delete_by_state("nonexistent").await.unwrap();
1378 assert_eq!(zero, 0);
1379
1380 cleanup_db(&db_path);
1381 }
1382
1383 #[tokio::test]
1384 async fn test_multiple_puts_preserve_state() {
1385 let db_path = temp_db_path("multi_put_state");
1386 cleanup_db(&db_path);
1387
1388 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1389 let store = SqlStore::new(&url).await.unwrap();
1390
1391 store.put(&test_item("a", "state_a")).await.unwrap();
1393 store.put(&test_item("b", "state_b")).await.unwrap();
1394 store.put(&test_item("c", "state_c")).await.unwrap();
1395
1396 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1397 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1398 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1399
1400 cleanup_db(&db_path);
1401 }
1402
1403 #[tokio::test]
1404 async fn test_scan_prefix() {
1405 let db_path = temp_db_path("scan_prefix");
1406 cleanup_db(&db_path);
1407
1408 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1409 let store = SqlStore::new(&url).await.unwrap();
1410
1411 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1413 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1414 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1415 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1416 store.put(&test_item("base:user.123", "base")).await.unwrap();
1417 store.put(&test_item("base:user.456", "base")).await.unwrap();
1418
1419 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1421 assert_eq!(user123_deltas.len(), 3);
1422 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1423
1424 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1426 assert_eq!(user456_deltas.len(), 1);
1427
1428 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1430 assert_eq!(all_deltas.len(), 4);
1431
1432 let bases = store.scan_prefix("base:", 100).await.unwrap();
1434 assert_eq!(bases.len(), 2);
1435
1436 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1438 assert!(none.is_empty());
1439
1440 cleanup_db(&db_path);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_scan_prefix_with_limit() {
1445 let db_path = temp_db_path("scan_prefix_limit");
1446 cleanup_db(&db_path);
1447
1448 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1449 let store = SqlStore::new(&url).await.unwrap();
1450
1451 for i in 0..20 {
1453 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1454 }
1455
1456 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1458 assert_eq!(limited.len(), 5);
1459
1460 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1462 assert_eq!(all.len(), 20);
1463
1464 cleanup_db(&db_path);
1465 }
1466
1467 #[tokio::test]
1468 async fn test_count_prefix() {
1469 let db_path = temp_db_path("count_prefix");
1470 cleanup_db(&db_path);
1471
1472 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1473 let store = SqlStore::new(&url).await.unwrap();
1474
1475 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1477 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1478 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1479 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1480 store.put(&test_item("base:user.123", "base")).await.unwrap();
1481
1482 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1484 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1485 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1486 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1487 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1488
1489 cleanup_db(&db_path);
1490 }
1491
1492 #[tokio::test]
1493 async fn test_delete_prefix() {
1494 let db_path = temp_db_path("delete_prefix");
1495 cleanup_db(&db_path);
1496
1497 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1498 let store = SqlStore::new(&url).await.unwrap();
1499
1500 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1502 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1503 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1504 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1505 store.put(&test_item("base:user.123", "base")).await.unwrap();
1506
1507 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1509 assert_eq!(deleted, 3);
1510
1511 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1513 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1514
1515 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1517
1518 assert!(store.get("base:user.123").await.unwrap().is_some());
1520
1521 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1523 assert_eq!(zero, 0);
1524
1525 cleanup_db(&db_path);
1526 }
1527}