1use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53 INSTALL_DRIVERS.call_once(|| {
54 sqlx::any::install_default_drivers();
55 });
56}
57
58pub struct SqlStore {
59 pool: AnyPool,
60 is_sqlite: bool,
61}
62
63impl SqlStore {
64 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66 install_drivers();
67
68 let is_sqlite = connection_string.starts_with("sqlite:");
69
70 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71 AnyPoolOptions::new()
72 .max_connections(20)
73 .acquire_timeout(Duration::from_secs(10))
74 .idle_timeout(Duration::from_secs(300))
75 .connect(connection_string)
76 .await
77 .map_err(|e| StorageError::Backend(e.to_string()))
78 })
79 .await?;
80
81 let store = Self { pool, is_sqlite };
82
83 if is_sqlite {
85 store.enable_wal_mode().await?;
86 }
87
88 store.init_schema().await?;
89
90 if !is_sqlite {
93 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
94 .execute(&store.pool)
95 .await
96 .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
97 }
98
99 Ok(store)
100 }
101
102 pub fn pool(&self) -> AnyPool {
104 self.pool.clone()
105 }
106
107 async fn enable_wal_mode(&self) -> Result<(), StorageError> {
114 sqlx::query("PRAGMA journal_mode = WAL")
115 .execute(&self.pool)
116 .await
117 .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
118
119 sqlx::query("PRAGMA synchronous = NORMAL")
122 .execute(&self.pool)
123 .await
124 .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
125
126 Ok(())
127 }
128
129 async fn init_schema(&self) -> Result<(), StorageError> {
130 let sql = if self.is_sqlite {
140 r#"
141 CREATE TABLE IF NOT EXISTS sync_items (
142 id TEXT PRIMARY KEY,
143 version INTEGER NOT NULL DEFAULT 1,
144 timestamp INTEGER NOT NULL,
145 payload_hash TEXT,
146 payload TEXT,
147 payload_blob BLOB,
148 audit TEXT,
149 merkle_dirty INTEGER NOT NULL DEFAULT 1,
150 state TEXT NOT NULL DEFAULT 'default',
151 access_count INTEGER NOT NULL DEFAULT 0,
152 last_accessed INTEGER NOT NULL DEFAULT 0
153 )
154 "#
155 } else {
156 r#"
159 CREATE TABLE IF NOT EXISTS sync_items (
160 id VARCHAR(255) PRIMARY KEY,
161 version BIGINT NOT NULL DEFAULT 1,
162 timestamp BIGINT NOT NULL,
163 payload_hash VARCHAR(64),
164 payload LONGTEXT,
165 payload_blob MEDIUMBLOB,
166 audit TEXT,
167 merkle_dirty TINYINT NOT NULL DEFAULT 1,
168 state VARCHAR(32) NOT NULL DEFAULT 'default',
169 access_count BIGINT NOT NULL DEFAULT 0,
170 last_accessed BIGINT NOT NULL DEFAULT 0,
171 INDEX idx_timestamp (timestamp),
172 INDEX idx_merkle_dirty (merkle_dirty),
173 INDEX idx_state (state)
174 )
175 "#
176 };
177
178 retry("sql_init_schema", &RetryConfig::startup(), || async {
179 sqlx::query(sql)
180 .execute(&self.pool)
181 .await
182 .map_err(|e| StorageError::Backend(e.to_string()))
183 })
184 .await?;
185
186 Ok(())
187 }
188
189 fn build_audit_json(item: &SyncItem) -> Option<String> {
191 let mut audit = serde_json::Map::new();
192
193 if let Some(ref batch_id) = item.batch_id {
194 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
195 }
196 if let Some(ref trace_parent) = item.trace_parent {
197 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
198 }
199 if let Some(ref home) = item.home_instance_id {
200 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
201 }
202
203 if audit.is_empty() {
204 None
205 } else {
206 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
207 }
208 }
209
210 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
212 match audit_str {
213 Some(s) => {
214 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
215 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
216 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
217 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
218 (batch_id, trace_parent, home_instance_id)
219 } else {
220 (None, None, None)
221 }
222 }
223 None => (None, None, None),
224 }
225 }
226}
227
228#[async_trait]
229impl ArchiveStore for SqlStore {
230 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
231 let id = id.to_string();
232
233 retry("sql_get", &RetryConfig::query(), || async {
234 let result = sqlx::query(
235 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
236 )
237 .bind(&id)
238 .fetch_optional(&self.pool)
239 .await
240 .map_err(|e| StorageError::Backend(e.to_string()))?;
241
242 match result {
243 Some(row) => {
244 let version: i64 = row.try_get("version").unwrap_or(1);
245 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
246 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
247
248 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
250 .or_else(|| {
251 row.try_get::<Vec<u8>, _>("payload").ok()
252 .and_then(|bytes| String::from_utf8(bytes).ok())
253 });
254
255 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
256
257 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
259 .or_else(|| {
260 row.try_get::<Vec<u8>, _>("audit").ok()
261 .and_then(|bytes| String::from_utf8(bytes).ok())
262 });
263
264 let state: String = row.try_get::<String, _>("state").ok()
266 .or_else(|| {
267 row.try_get::<Vec<u8>, _>("state").ok()
268 .and_then(|bytes| String::from_utf8(bytes).ok())
269 })
270 .unwrap_or_else(|| "default".to_string());
271
272 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
274 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
275
276 let (content, content_type) = if let Some(ref json_str) = payload_json {
278 let content = json_str.as_bytes().to_vec();
280 (content, ContentType::Json)
281 } else if let Some(blob) = payload_blob {
282 (blob, ContentType::Binary)
284 } else {
285 return Err(StorageError::Backend("No payload in row".to_string()));
286 };
287
288 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
290
291 let item = SyncItem::reconstruct(
292 id.clone(),
293 version as u64,
294 timestamp,
295 content_type,
296 content,
297 batch_id,
298 trace_parent,
299 payload_hash.unwrap_or_default(),
300 home_instance_id,
301 state,
302 access_count as u64,
303 last_accessed as u64,
304 );
305 Ok(Some(item))
306 }
307 None => Ok(None),
308 }
309 })
310 .await
311 }
312
313 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
314 let id = item.object_id.clone();
315 let version = item.version as i64;
316 let timestamp = item.updated_at;
317 let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
318 let audit_json = Self::build_audit_json(item);
319 let state = item.state.clone();
320
321 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
323 ContentType::Json => {
324 let json_str = String::from_utf8_lossy(&item.content).to_string();
325 (Some(json_str), None)
326 }
327 ContentType::Binary => {
328 (None, Some(item.content.clone()))
329 }
330 };
331
332 let sql = if self.is_sqlite {
333 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
335 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
336 ON CONFLICT(id) DO UPDATE SET
337 version = excluded.version,
338 timestamp = excluded.timestamp,
339 payload_hash = excluded.payload_hash,
340 payload = excluded.payload,
341 payload_blob = excluded.payload_blob,
342 audit = excluded.audit,
343 merkle_dirty = CASE WHEN sync_items.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE sync_items.merkle_dirty END,
344 state = excluded.state,
345 access_count = excluded.access_count,
346 last_accessed = excluded.last_accessed"
347 } else {
348 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed)
350 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)
351 ON DUPLICATE KEY UPDATE
352 version = VALUES(version),
353 timestamp = VALUES(timestamp),
354 payload_hash = VALUES(payload_hash),
355 payload = VALUES(payload),
356 payload_blob = VALUES(payload_blob),
357 audit = VALUES(audit),
358 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END,
359 state = VALUES(state),
360 access_count = VALUES(access_count),
361 last_accessed = VALUES(last_accessed)"
362 };
363
364 retry("sql_put", &RetryConfig::query(), || async {
365 sqlx::query(sql)
366 .bind(&id)
367 .bind(version)
368 .bind(timestamp)
369 .bind(&payload_hash)
370 .bind(&payload_json)
371 .bind(&payload_blob)
372 .bind(&audit_json)
373 .bind(&state)
374 .bind(item.access_count as i64)
375 .bind(item.last_accessed as i64)
376 .execute(&self.pool)
377 .await
378 .map_err(|e| StorageError::Backend(e.to_string()))?;
379 Ok(())
380 })
381 .await
382 }
383
384 async fn delete(&self, id: &str) -> Result<(), StorageError> {
385 let id = id.to_string();
386 retry("sql_delete", &RetryConfig::query(), || async {
387 sqlx::query("DELETE FROM sync_items WHERE id = ?")
388 .bind(&id)
389 .execute(&self.pool)
390 .await
391 .map_err(|e| StorageError::Backend(e.to_string()))?;
392 Ok(())
393 })
394 .await
395 }
396
397 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
398 let id = id.to_string();
399 retry("sql_exists", &RetryConfig::query(), || async {
400 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
401 .bind(&id)
402 .fetch_optional(&self.pool)
403 .await
404 .map_err(|e| StorageError::Backend(e.to_string()))?;
405 Ok(result.is_some())
406 })
407 .await
408 }
409
410 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
412 if items.is_empty() {
413 return Ok(BatchWriteResult {
414 batch_id: String::new(),
415 written: 0,
416 verified: true,
417 });
418 }
419
420 let batch_id = uuid::Uuid::new_v4().to_string();
422
423 for item in items.iter_mut() {
425 item.batch_id = Some(batch_id.clone());
426 }
427
428 let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
430
431 const CHUNK_SIZE: usize = 500;
433 let mut total_written = 0usize;
434
435 for chunk in items.chunks(CHUNK_SIZE) {
436 let written = self.put_batch_chunk(chunk, &batch_id).await?;
437 total_written += written;
438 }
439
440 let verified_count = self.verify_batch_ids(&item_ids).await?;
442 let verified = verified_count == items.len();
443
444 if !verified {
445 tracing::warn!(
446 batch_id = %batch_id,
447 expected = items.len(),
448 actual = verified_count,
449 "Batch verification mismatch"
450 );
451 }
452
453 Ok(BatchWriteResult {
454 batch_id,
455 written: total_written,
456 verified,
457 })
458 }
459
460 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
461 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
462 .bind(limit as i64)
463 .bind(offset as i64)
464 .fetch_all(&self.pool)
465 .await
466 .map_err(|e| StorageError::Backend(e.to_string()))?;
467
468 let mut keys = Vec::with_capacity(rows.len());
469 for row in rows {
470 let id: String = row.try_get("id")
471 .map_err(|e| StorageError::Backend(e.to_string()))?;
472 keys.push(id);
473 }
474
475 Ok(keys)
476 }
477
478 async fn count_all(&self) -> Result<u64, StorageError> {
479 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
480 .fetch_one(&self.pool)
481 .await
482 .map_err(|e| StorageError::Backend(e.to_string()))?;
483
484 let count: i64 = result.try_get("cnt")
485 .map_err(|e| StorageError::Backend(e.to_string()))?;
486
487 Ok(count as u64)
488 }
489}
490
491impl SqlStore {
492 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
495 let placeholders: Vec<String> = (0..chunk.len())
496 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
497 .collect();
498
499 let sql = if self.is_sqlite {
500 format!(
501 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
502 ON CONFLICT(id) DO UPDATE SET \
503 version = excluded.version, \
504 timestamp = excluded.timestamp, \
505 payload_hash = excluded.payload_hash, \
506 payload = excluded.payload, \
507 payload_blob = excluded.payload_blob, \
508 audit = excluded.audit, \
509 merkle_dirty = CASE WHEN sync_items.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE sync_items.merkle_dirty END, \
510 state = excluded.state, \
511 access_count = excluded.access_count, \
512 last_accessed = excluded.last_accessed",
513 placeholders.join(", ")
514 )
515 } else {
516 format!(
517 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
518 ON DUPLICATE KEY UPDATE \
519 version = VALUES(version), \
520 timestamp = VALUES(timestamp), \
521 payload_hash = VALUES(payload_hash), \
522 payload = VALUES(payload), \
523 payload_blob = VALUES(payload_blob), \
524 audit = VALUES(audit), \
525 merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
526 state = VALUES(state), \
527 access_count = VALUES(access_count), \
528 last_accessed = VALUES(last_accessed)",
529 placeholders.join(", ")
530 )
531 };
532
533 #[derive(Clone)]
535 struct PreparedRow {
536 id: String,
537 version: i64,
538 timestamp: i64,
539 payload_hash: Option<String>,
540 payload_json: Option<String>,
541 payload_blob: Option<Vec<u8>>,
542 audit_json: Option<String>,
543 state: String,
544 access_count: i64,
545 last_accessed: i64,
546 }
547
548 let prepared: Vec<PreparedRow> = chunk.iter()
549 .map(|item| {
550 let (payload_json, payload_blob) = match item.content_type {
551 ContentType::Json => {
552 let json_str = String::from_utf8_lossy(&item.content).to_string();
553 (Some(json_str), None)
554 }
555 ContentType::Binary => {
556 (None, Some(item.content.clone()))
557 }
558 };
559
560 PreparedRow {
561 id: item.object_id.clone(),
562 version: item.version as i64,
563 timestamp: item.updated_at,
564 payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
565 payload_json,
566 payload_blob,
567 audit_json: Self::build_audit_json(item),
568 state: item.state.clone(),
569 access_count: item.access_count as i64,
570 last_accessed: item.last_accessed as i64,
571 }
572 })
573 .collect();
574
575 retry("sql_put_batch", &RetryConfig::batch_write(), || {
576 let sql = sql.clone();
577 let prepared = prepared.clone();
578 async move {
579 let mut query = sqlx::query(&sql);
580
581 for row in &prepared {
582 query = query
583 .bind(&row.id)
584 .bind(row.version)
585 .bind(row.timestamp)
586 .bind(&row.payload_hash)
587 .bind(&row.payload_json)
588 .bind(&row.payload_blob)
589 .bind(&row.audit_json)
590 .bind(&row.state)
591 .bind(row.access_count)
592 .bind(row.last_accessed);
593 }
594
595 query.execute(&self.pool)
596 .await
597 .map_err(|e| StorageError::Backend(e.to_string()))?;
598
599 Ok(())
600 }
601 })
602 .await?;
603
604 Ok(chunk.len())
605 }
606
607 async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
610 if ids.is_empty() {
611 return Ok(0);
612 }
613
614 const CHUNK_SIZE: usize = 500;
616 let mut total_found = 0usize;
617
618 for chunk in ids.chunks(CHUNK_SIZE) {
619 let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
620 let sql = format!(
621 "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
622 placeholders.join(", ")
623 );
624
625 let mut query = sqlx::query(&sql);
626 for id in chunk {
627 query = query.bind(id);
628 }
629
630 let result = query
631 .fetch_one(&self.pool)
632 .await
633 .map_err(|e| StorageError::Backend(e.to_string()))?;
634
635 let count: i64 = result
636 .try_get("cnt")
637 .map_err(|e| StorageError::Backend(e.to_string()))?;
638 total_found += count as usize;
639 }
640
641 Ok(total_found)
642 }
643
644 #[allow(dead_code)]
646 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
647 let batch_id = batch_id.to_string();
648
649 let sql = if self.is_sqlite {
651 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
652 } else {
653 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
654 };
655
656 let bind_value = if self.is_sqlite {
657 format!("%\"batch\":\"{}%", batch_id)
658 } else {
659 batch_id.clone()
660 };
661
662 let result = sqlx::query(sql)
663 .bind(&bind_value)
664 .fetch_one(&self.pool)
665 .await
666 .map_err(|e| StorageError::Backend(e.to_string()))?;
667
668 let count: i64 = result.try_get("cnt")
669 .map_err(|e| StorageError::Backend(e.to_string()))?;
670
671 Ok(count as usize)
672 }
673
674 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
676 let rows = sqlx::query(
677 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
678 )
679 .bind(limit as i64)
680 .fetch_all(&self.pool)
681 .await
682 .map_err(|e| StorageError::Backend(e.to_string()))?;
683
684 let mut items = Vec::with_capacity(rows.len());
685 for row in rows {
686 let id: String = row.try_get("id")
687 .map_err(|e| StorageError::Backend(e.to_string()))?;
688 let version: i64 = row.try_get("version").unwrap_or(1);
689 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
690 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
691
692 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
694 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
695 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
696 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
697 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
698
699 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
700 let state: String = state_bytes
701 .and_then(|bytes| String::from_utf8(bytes).ok())
702 .unwrap_or_else(|| "default".to_string());
703
704 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
706 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
707
708 let (content, content_type) = if let Some(ref json_str) = payload_json {
709 (json_str.as_bytes().to_vec(), ContentType::Json)
710 } else if let Some(blob) = payload_blob {
711 (blob, ContentType::Binary)
712 } else {
713 continue; };
715
716 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
717
718 let item = SyncItem::reconstruct(
719 id,
720 version as u64,
721 timestamp,
722 content_type,
723 content,
724 batch_id,
725 trace_parent,
726 payload_hash.unwrap_or_default(),
727 home_instance_id,
728 state,
729 access_count as u64,
730 last_accessed as u64,
731 );
732 items.push(item);
733 }
734
735 Ok(items)
736 }
737
738 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
740 if ids.is_empty() {
741 return Ok(0);
742 }
743
744 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
745 let sql = format!(
746 "DELETE FROM sync_items WHERE id IN ({})",
747 placeholders.join(", ")
748 );
749
750 retry("sql_delete_batch", &RetryConfig::query(), || {
751 let sql = sql.clone();
752 let ids = ids.to_vec();
753 async move {
754 let mut query = sqlx::query(&sql);
755 for id in &ids {
756 query = query.bind(id);
757 }
758
759 let result = query.execute(&self.pool)
760 .await
761 .map_err(|e| StorageError::Backend(e.to_string()))?;
762
763 Ok(result.rows_affected() as usize)
764 }
765 })
766 .await
767 }
768
769 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
777 let rows = sqlx::query(
778 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
779 )
780 .bind(limit as i64)
781 .fetch_all(&self.pool)
782 .await
783 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
784
785 let mut ids = Vec::with_capacity(rows.len());
786 for row in rows {
787 let id: String = row.try_get("id")
788 .map_err(|e| StorageError::Backend(e.to_string()))?;
789 ids.push(id);
790 }
791
792 Ok(ids)
793 }
794
795 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
797 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
798 .fetch_one(&self.pool)
799 .await
800 .map_err(|e| StorageError::Backend(e.to_string()))?;
801
802 let count: i64 = result.try_get("cnt")
803 .map_err(|e| StorageError::Backend(e.to_string()))?;
804
805 Ok(count as u64)
806 }
807
808 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
810 if ids.is_empty() {
811 return Ok(0);
812 }
813
814 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
815 let sql = format!(
816 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
817 placeholders.join(", ")
818 );
819
820 let mut query = sqlx::query(&sql);
821 for id in ids {
822 query = query.bind(id);
823 }
824
825 let result = query.execute(&self.pool)
826 .await
827 .map_err(|e| StorageError::Backend(e.to_string()))?;
828
829 Ok(result.rows_affected() as usize)
830 }
831
832 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
834 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
835 .fetch_optional(&self.pool)
836 .await
837 .map_err(|e| StorageError::Backend(e.to_string()))?;
838
839 Ok(result.is_some())
840 }
841
842 pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
850 let pattern = format!("{}%", prefix);
851 let result = sqlx::query(
852 "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
853 )
854 .bind(&pattern)
855 .fetch_one(&self.pool)
856 .await
857 .map_err(|e| StorageError::Backend(e.to_string()))?;
858
859 let count: i64 = result.try_get("cnt")
860 .map_err(|e| StorageError::Backend(e.to_string()))?;
861
862 Ok(count as u64)
863 }
864
865 pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
870 let sql = if self.is_sqlite {
872 "SELECT DISTINCT CASE
874 WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
875 ELSE id
876 END as prefix FROM sync_items WHERE merkle_dirty = 1"
877 } else {
878 "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
880 };
881
882 let rows = sqlx::query(sql)
883 .fetch_all(&self.pool)
884 .await
885 .map_err(|e| StorageError::Backend(e.to_string()))?;
886
887 let mut prefixes = Vec::with_capacity(rows.len());
888 for row in rows {
889 let prefix: String = row.try_get("prefix")
890 .map_err(|e| StorageError::Backend(e.to_string()))?;
891 prefixes.push(prefix);
892 }
893
894 Ok(prefixes)
895 }
896
897 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
902 let rows = sqlx::query(
903 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
904 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
905 )
906 .bind(limit as i64)
907 .fetch_all(&self.pool)
908 .await
909 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
910
911 let mut items = Vec::with_capacity(rows.len());
912 for row in rows {
913 let id: String = row.try_get("id")
914 .map_err(|e| StorageError::Backend(e.to_string()))?;
915 let version: i64 = row.try_get("version").unwrap_or(1);
916 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
917 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
918
919 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
921 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
922 String::from_utf8(bytes).ok()
923 });
924
925 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
926 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
927 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
928 String::from_utf8(bytes).ok()
929 });
930
931 let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
933 let state: String = state_bytes
934 .and_then(|bytes| String::from_utf8(bytes).ok())
935 .unwrap_or_else(|| "default".to_string());
936
937 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
939 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
940
941 let (content, content_type) = if let Some(ref json_str) = payload_json {
943 (json_str.as_bytes().to_vec(), ContentType::Json)
944 } else if let Some(blob) = payload_blob {
945 (blob, ContentType::Binary)
946 } else {
947 continue; };
949
950 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
952
953 let item = SyncItem::reconstruct(
954 id,
955 version as u64,
956 timestamp,
957 content_type,
958 content,
959 batch_id,
960 trace_parent,
961 payload_hash.unwrap_or_default(),
962 home_instance_id,
963 state,
964 access_count as u64,
965 last_accessed as u64,
966 );
967 items.push(item);
968 }
969
970 Ok(items)
971 }
972
973 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
981 let rows = sqlx::query(
982 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
983 FROM sync_items WHERE state = ? LIMIT ?"
984 )
985 .bind(state)
986 .bind(limit as i64)
987 .fetch_all(&self.pool)
988 .await
989 .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
990
991 let mut items = Vec::with_capacity(rows.len());
992 for row in rows {
993 let id: String = row.try_get("id")
994 .map_err(|e| StorageError::Backend(e.to_string()))?;
995 let version: i64 = row.try_get("version").unwrap_or(1);
996 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
997 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
998
999 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1001 .or_else(|| {
1002 row.try_get::<Vec<u8>, _>("payload").ok()
1003 .and_then(|bytes| String::from_utf8(bytes).ok())
1004 });
1005
1006 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1007
1008 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1010 .or_else(|| {
1011 row.try_get::<Vec<u8>, _>("audit").ok()
1012 .and_then(|bytes| String::from_utf8(bytes).ok())
1013 });
1014
1015 let state: String = row.try_get::<String, _>("state").ok()
1017 .or_else(|| {
1018 row.try_get::<Vec<u8>, _>("state").ok()
1019 .and_then(|bytes| String::from_utf8(bytes).ok())
1020 })
1021 .unwrap_or_else(|| "default".to_string());
1022
1023 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1025 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1026
1027 let (content, content_type) = if let Some(ref json_str) = payload_json {
1028 (json_str.as_bytes().to_vec(), ContentType::Json)
1029 } else if let Some(blob) = payload_blob {
1030 (blob, ContentType::Binary)
1031 } else {
1032 continue;
1033 };
1034
1035 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1036
1037 let item = SyncItem::reconstruct(
1038 id,
1039 version as u64,
1040 timestamp,
1041 content_type,
1042 content,
1043 batch_id,
1044 trace_parent,
1045 payload_hash.unwrap_or_default(),
1046 home_instance_id,
1047 state,
1048 access_count as u64,
1049 last_accessed as u64,
1050 );
1051 items.push(item);
1052 }
1053
1054 Ok(items)
1055 }
1056
1057 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1059 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1060 .bind(state)
1061 .fetch_one(&self.pool)
1062 .await
1063 .map_err(|e| StorageError::Backend(e.to_string()))?;
1064
1065 let count: i64 = result.try_get("cnt")
1066 .map_err(|e| StorageError::Backend(e.to_string()))?;
1067
1068 Ok(count as u64)
1069 }
1070
1071 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1073 let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1074 .bind(state)
1075 .bind(limit as i64)
1076 .fetch_all(&self.pool)
1077 .await
1078 .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1079
1080 let mut ids = Vec::with_capacity(rows.len());
1081 for row in rows {
1082 let id: String = row.try_get("id")
1083 .map_err(|e| StorageError::Backend(e.to_string()))?;
1084 ids.push(id);
1085 }
1086
1087 Ok(ids)
1088 }
1089
1090 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1092 let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1093 .bind(new_state)
1094 .bind(id)
1095 .execute(&self.pool)
1096 .await
1097 .map_err(|e| StorageError::Backend(e.to_string()))?;
1098
1099 Ok(result.rows_affected() > 0)
1100 }
1101
1102 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1106 let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1107 .bind(state)
1108 .execute(&self.pool)
1109 .await
1110 .map_err(|e| StorageError::Backend(e.to_string()))?;
1111
1112 Ok(result.rows_affected())
1113 }
1114
1115 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1126 let pattern = format!("{}%", prefix);
1128
1129 let rows = sqlx::query(
1130 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed
1131 FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1132 )
1133 .bind(&pattern)
1134 .bind(limit as i64)
1135 .fetch_all(&self.pool)
1136 .await
1137 .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1138
1139 let mut items = Vec::with_capacity(rows.len());
1140 for row in rows {
1141 let id: String = row.try_get("id")
1142 .map_err(|e| StorageError::Backend(e.to_string()))?;
1143 let version: i64 = row.try_get("version").unwrap_or(1);
1144 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1145 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1146
1147 let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1149 .or_else(|| {
1150 row.try_get::<Vec<u8>, _>("payload").ok()
1151 .and_then(|bytes| String::from_utf8(bytes).ok())
1152 });
1153
1154 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1155
1156 let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1158 .or_else(|| {
1159 row.try_get::<Vec<u8>, _>("audit").ok()
1160 .and_then(|bytes| String::from_utf8(bytes).ok())
1161 });
1162
1163 let state: String = row.try_get::<String, _>("state").ok()
1165 .or_else(|| {
1166 row.try_get::<Vec<u8>, _>("state").ok()
1167 .and_then(|bytes| String::from_utf8(bytes).ok())
1168 })
1169 .unwrap_or_else(|| "default".to_string());
1170
1171 let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1173 let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1174
1175 let (content, content_type) = if let Some(ref json_str) = payload_json {
1176 (json_str.as_bytes().to_vec(), ContentType::Json)
1177 } else if let Some(blob) = payload_blob {
1178 (blob, ContentType::Binary)
1179 } else {
1180 continue;
1181 };
1182
1183 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1184
1185 let item = SyncItem::reconstruct(
1186 id,
1187 version as u64,
1188 timestamp,
1189 content_type,
1190 content,
1191 batch_id,
1192 trace_parent,
1193 payload_hash.unwrap_or_default(),
1194 home_instance_id,
1195 state,
1196 access_count as u64,
1197 last_accessed as u64,
1198 );
1199 items.push(item);
1200 }
1201
1202 Ok(items)
1203 }
1204
1205 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1207 let pattern = format!("{}%", prefix);
1208
1209 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1210 .bind(&pattern)
1211 .fetch_one(&self.pool)
1212 .await
1213 .map_err(|e| StorageError::Backend(e.to_string()))?;
1214
1215 let count: i64 = result.try_get("cnt")
1216 .map_err(|e| StorageError::Backend(e.to_string()))?;
1217
1218 Ok(count as u64)
1219 }
1220
1221 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1225 let pattern = format!("{}%", prefix);
1226
1227 let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1228 .bind(&pattern)
1229 .execute(&self.pool)
1230 .await
1231 .map_err(|e| StorageError::Backend(e.to_string()))?;
1232
1233 Ok(result.rows_affected())
1234 }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239 use super::*;
1240 use std::path::PathBuf;
1241 use serde_json::json;
1242
1243 fn temp_db_path(name: &str) -> PathBuf {
1244 PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1246 }
1247
1248 fn cleanup_db(path: &PathBuf) {
1250 let _ = std::fs::remove_file(path);
1251 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1252 let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1253 }
1254
1255 fn test_item(id: &str, state: &str) -> SyncItem {
1256 SyncItem::from_json(id.to_string(), json!({"id": id}))
1257 .with_state(state)
1258 }
1259
1260 #[tokio::test]
1261 async fn test_state_stored_and_retrieved() {
1262 let db_path = temp_db_path("stored");
1263 cleanup_db(&db_path);
1264
1265 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266 let store = SqlStore::new(&url).await.unwrap();
1267
1268 let item = test_item("item1", "delta");
1270 store.put(&item).await.unwrap();
1271
1272 let retrieved = store.get("item1").await.unwrap().unwrap();
1274 assert_eq!(retrieved.state, "delta");
1275
1276 cleanup_db(&db_path);
1277 }
1278
1279 #[tokio::test]
1280 async fn test_state_default_value() {
1281 let db_path = temp_db_path("default");
1282 cleanup_db(&db_path);
1283
1284 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1285 let store = SqlStore::new(&url).await.unwrap();
1286
1287 let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1289 store.put(&item).await.unwrap();
1290
1291 let retrieved = store.get("item1").await.unwrap().unwrap();
1292 assert_eq!(retrieved.state, "default");
1293
1294 cleanup_db(&db_path);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_get_by_state() {
1299 let db_path = temp_db_path("get_by_state");
1300 cleanup_db(&db_path);
1301
1302 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1303 let store = SqlStore::new(&url).await.unwrap();
1304
1305 store.put(&test_item("delta1", "delta")).await.unwrap();
1307 store.put(&test_item("delta2", "delta")).await.unwrap();
1308 store.put(&test_item("base1", "base")).await.unwrap();
1309 store.put(&test_item("pending1", "pending")).await.unwrap();
1310
1311 let deltas = store.get_by_state("delta", 100).await.unwrap();
1313 assert_eq!(deltas.len(), 2);
1314 assert!(deltas.iter().all(|i| i.state == "delta"));
1315
1316 let bases = store.get_by_state("base", 100).await.unwrap();
1317 assert_eq!(bases.len(), 1);
1318 assert_eq!(bases[0].object_id, "base1");
1319
1320 let none = store.get_by_state("nonexistent", 100).await.unwrap();
1322 assert!(none.is_empty());
1323
1324 cleanup_db(&db_path);
1325 }
1326
1327 #[tokio::test]
1328 async fn test_get_by_state_with_limit() {
1329 let db_path = temp_db_path("get_by_state_limit");
1330 cleanup_db(&db_path);
1331
1332 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1333 let store = SqlStore::new(&url).await.unwrap();
1334
1335 for i in 0..10 {
1337 store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1338 }
1339
1340 let limited = store.get_by_state("batch", 5).await.unwrap();
1342 assert_eq!(limited.len(), 5);
1343
1344 cleanup_db(&db_path);
1345 }
1346
1347 #[tokio::test]
1348 async fn test_count_by_state() {
1349 let db_path = temp_db_path("count_by_state");
1350 cleanup_db(&db_path);
1351
1352 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1353 let store = SqlStore::new(&url).await.unwrap();
1354
1355 store.put(&test_item("a1", "alpha")).await.unwrap();
1357 store.put(&test_item("a2", "alpha")).await.unwrap();
1358 store.put(&test_item("a3", "alpha")).await.unwrap();
1359 store.put(&test_item("b1", "beta")).await.unwrap();
1360
1361 assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1362 assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1363 assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1364
1365 cleanup_db(&db_path);
1366 }
1367
1368 #[tokio::test]
1369 async fn test_list_state_ids() {
1370 let db_path = temp_db_path("list_state_ids");
1371 cleanup_db(&db_path);
1372
1373 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1374 let store = SqlStore::new(&url).await.unwrap();
1375
1376 store.put(&test_item("id1", "pending")).await.unwrap();
1377 store.put(&test_item("id2", "pending")).await.unwrap();
1378 store.put(&test_item("id3", "done")).await.unwrap();
1379
1380 let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1381 assert_eq!(pending_ids.len(), 2);
1382 assert!(pending_ids.contains(&"id1".to_string()));
1383 assert!(pending_ids.contains(&"id2".to_string()));
1384
1385 cleanup_db(&db_path);
1386 }
1387
1388 #[tokio::test]
1389 async fn test_set_state() {
1390 let db_path = temp_db_path("set_state");
1391 cleanup_db(&db_path);
1392
1393 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1394 let store = SqlStore::new(&url).await.unwrap();
1395
1396 store.put(&test_item("item1", "pending")).await.unwrap();
1397
1398 let before = store.get("item1").await.unwrap().unwrap();
1400 assert_eq!(before.state, "pending");
1401
1402 let updated = store.set_state("item1", "approved").await.unwrap();
1404 assert!(updated);
1405
1406 let after = store.get("item1").await.unwrap().unwrap();
1408 assert_eq!(after.state, "approved");
1409
1410 let not_found = store.set_state("nonexistent", "x").await.unwrap();
1412 assert!(!not_found);
1413
1414 cleanup_db(&db_path);
1415 }
1416
1417 #[tokio::test]
1418 async fn test_delete_by_state() {
1419 let db_path = temp_db_path("delete_by_state");
1420 cleanup_db(&db_path);
1421
1422 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1423 let store = SqlStore::new(&url).await.unwrap();
1424
1425 store.put(&test_item("keep1", "keep")).await.unwrap();
1426 store.put(&test_item("keep2", "keep")).await.unwrap();
1427 store.put(&test_item("del1", "delete_me")).await.unwrap();
1428 store.put(&test_item("del2", "delete_me")).await.unwrap();
1429 store.put(&test_item("del3", "delete_me")).await.unwrap();
1430
1431 let deleted = store.delete_by_state("delete_me").await.unwrap();
1433 assert_eq!(deleted, 3);
1434
1435 assert!(store.get("del1").await.unwrap().is_none());
1437 assert!(store.get("del2").await.unwrap().is_none());
1438
1439 assert!(store.get("keep1").await.unwrap().is_some());
1441 assert!(store.get("keep2").await.unwrap().is_some());
1442
1443 let zero = store.delete_by_state("nonexistent").await.unwrap();
1445 assert_eq!(zero, 0);
1446
1447 cleanup_db(&db_path);
1448 }
1449
1450 #[tokio::test]
1451 async fn test_multiple_puts_preserve_state() {
1452 let db_path = temp_db_path("multi_put_state");
1453 cleanup_db(&db_path);
1454
1455 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1456 let store = SqlStore::new(&url).await.unwrap();
1457
1458 store.put(&test_item("a", "state_a")).await.unwrap();
1460 store.put(&test_item("b", "state_b")).await.unwrap();
1461 store.put(&test_item("c", "state_c")).await.unwrap();
1462
1463 assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1464 assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1465 assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1466
1467 cleanup_db(&db_path);
1468 }
1469
1470 #[tokio::test]
1471 async fn test_scan_prefix() {
1472 let db_path = temp_db_path("scan_prefix");
1473 cleanup_db(&db_path);
1474
1475 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1476 let store = SqlStore::new(&url).await.unwrap();
1477
1478 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1480 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1481 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1482 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1483 store.put(&test_item("base:user.123", "base")).await.unwrap();
1484 store.put(&test_item("base:user.456", "base")).await.unwrap();
1485
1486 let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1488 assert_eq!(user123_deltas.len(), 3);
1489 assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1490
1491 let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1493 assert_eq!(user456_deltas.len(), 1);
1494
1495 let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1497 assert_eq!(all_deltas.len(), 4);
1498
1499 let bases = store.scan_prefix("base:", 100).await.unwrap();
1501 assert_eq!(bases.len(), 2);
1502
1503 let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1505 assert!(none.is_empty());
1506
1507 cleanup_db(&db_path);
1508 }
1509
1510 #[tokio::test]
1511 async fn test_scan_prefix_with_limit() {
1512 let db_path = temp_db_path("scan_prefix_limit");
1513 cleanup_db(&db_path);
1514
1515 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1516 let store = SqlStore::new(&url).await.unwrap();
1517
1518 for i in 0..20 {
1520 store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1521 }
1522
1523 let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1525 assert_eq!(limited.len(), 5);
1526
1527 let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1529 assert_eq!(all.len(), 20);
1530
1531 cleanup_db(&db_path);
1532 }
1533
1534 #[tokio::test]
1535 async fn test_count_prefix() {
1536 let db_path = temp_db_path("count_prefix");
1537 cleanup_db(&db_path);
1538
1539 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1540 let store = SqlStore::new(&url).await.unwrap();
1541
1542 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1544 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1545 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1546 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1547 store.put(&test_item("base:user.123", "base")).await.unwrap();
1548
1549 assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1551 assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1552 assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1553 assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1554 assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1555
1556 cleanup_db(&db_path);
1557 }
1558
1559 #[tokio::test]
1560 async fn test_delete_prefix() {
1561 let db_path = temp_db_path("delete_prefix");
1562 cleanup_db(&db_path);
1563
1564 let url = format!("sqlite://{}?mode=rwc", db_path.display());
1565 let store = SqlStore::new(&url).await.unwrap();
1566
1567 store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1569 store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1570 store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1571 store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1572 store.put(&test_item("base:user.123", "base")).await.unwrap();
1573
1574 let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1576 assert_eq!(deleted, 3);
1577
1578 assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1580 assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1581
1582 assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1584
1585 assert!(store.get("base:user.123").await.unwrap().is_some());
1587
1588 let zero = store.delete_prefix("nonexistent:").await.unwrap();
1590 assert_eq!(zero, 0);
1591
1592 cleanup_db(&db_path);
1593 }
1594}