sync_engine/storage/
sql.rs

1//! SQL storage backend for L3 archive.
2//!
3//! Content-type aware storage with proper columns for queryability:
4//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
5//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
6//!
7//! Schema mirrors Redis structure:
8//! ```sql
9//! CREATE TABLE sync_items (
10//!   id VARCHAR(255) PRIMARY KEY,
11//!   version BIGINT NOT NULL,
12//!   timestamp BIGINT NOT NULL,
13//!   payload_hash VARCHAR(64),
14//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
15//!   payload_blob MEDIUMBLOB, -- For binary content
16//!   audit TEXT               -- Operational metadata: {batch, trace, home}
17//! )
18//! ```
19//!
20//! ## sqlx Any Driver Quirks
21//! 
22//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
23//! 1. Doesn't support MySQL's JSON type mapping
24//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
25//!
26//! JSON functions still work on TEXT columns:
27//!
28//! ```sql
29//! -- Find users named Alice
30//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
31//! 
32//! -- Find all items from a batch
33//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
34//! ```
35
36use async_trait::async_trait;
37use sqlx::{AnyPool, Row, any::AnyPoolOptions};
38use crate::sync_item::{SyncItem, ContentType};
39use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
40use crate::resilience::retry::{retry, RetryConfig};
41use std::sync::Once;
42use std::time::Duration;
43
44// SQLx `Any` driver requires runtime installation
45static INSTALL_DRIVERS: Once = Once::new();
46
47fn install_drivers() {
48    INSTALL_DRIVERS.call_once(|| {
49        sqlx::any::install_default_drivers();
50    });
51}
52
53pub struct SqlStore {
54    pool: AnyPool,
55    is_sqlite: bool,
56}
57
58impl SqlStore {
59    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
60    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
61        install_drivers();
62        
63        let is_sqlite = connection_string.starts_with("sqlite:");
64        
65        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
66            AnyPoolOptions::new()
67                .max_connections(20)
68                .acquire_timeout(Duration::from_secs(10))
69                .idle_timeout(Duration::from_secs(300))
70                .connect(connection_string)
71                .await
72                .map_err(|e| StorageError::Backend(e.to_string()))
73        })
74        .await?;
75
76        let store = Self { pool, is_sqlite };
77        
78        // Enable WAL mode for SQLite (better concurrency, faster writes)
79        if is_sqlite {
80            store.enable_wal_mode().await?;
81        }
82        
83        store.init_schema().await?;
84        Ok(store)
85    }
86    
87    /// Get a clone of the connection pool for sharing with other stores.
88    pub fn pool(&self) -> AnyPool {
89        self.pool.clone()
90    }
91    
92    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
93    /// 
94    /// Benefits:
95    /// - Concurrent reads during writes (readers don't block writers)
96    /// - Better write performance (single fsync instead of two)
97    /// - More predictable performance under load
98    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
99        sqlx::query("PRAGMA journal_mode = WAL")
100            .execute(&self.pool)
101            .await
102            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
103        
104        // Also set synchronous to NORMAL for better performance while still safe
105        // (FULL is default but WAL mode is safe with NORMAL)
106        sqlx::query("PRAGMA synchronous = NORMAL")
107            .execute(&self.pool)
108            .await
109            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
110        
111        Ok(())
112    }
113
114    async fn init_schema(&self) -> Result<(), StorageError> {
115        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
116        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
117        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
118        //
119        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
120        // This enables efficient multi-instance coordination without locking.
121        //
122        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
123        // Indexed for fast state-based queries.
124        let sql = if self.is_sqlite {
125            r#"
126            CREATE TABLE IF NOT EXISTS sync_items (
127                id TEXT PRIMARY KEY,
128                version INTEGER NOT NULL DEFAULT 1,
129                timestamp INTEGER NOT NULL,
130                payload_hash TEXT,
131                payload TEXT,
132                payload_blob BLOB,
133                audit TEXT,
134                merkle_dirty INTEGER NOT NULL DEFAULT 1,
135                state TEXT NOT NULL DEFAULT 'default'
136            )
137            "#
138        } else {
139            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
140            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
141            r#"
142            CREATE TABLE IF NOT EXISTS sync_items (
143                id VARCHAR(255) PRIMARY KEY,
144                version BIGINT NOT NULL DEFAULT 1,
145                timestamp BIGINT NOT NULL,
146                payload_hash VARCHAR(64),
147                payload LONGTEXT,
148                payload_blob MEDIUMBLOB,
149                audit TEXT,
150                merkle_dirty TINYINT NOT NULL DEFAULT 1,
151                state VARCHAR(32) NOT NULL DEFAULT 'default',
152                INDEX idx_timestamp (timestamp),
153                INDEX idx_merkle_dirty (merkle_dirty),
154                INDEX idx_state (state)
155            )
156            "#
157        };
158
159        retry("sql_init_schema", &RetryConfig::startup(), || async {
160            sqlx::query(sql)
161                .execute(&self.pool)
162                .await
163                .map_err(|e| StorageError::Backend(e.to_string()))
164        })
165        .await?;
166
167        Ok(())
168    }
169    
170    /// Build the audit JSON object for operational metadata.
171    fn build_audit_json(item: &SyncItem) -> Option<String> {
172        let mut audit = serde_json::Map::new();
173        
174        if let Some(ref batch_id) = item.batch_id {
175            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
176        }
177        if let Some(ref trace_parent) = item.trace_parent {
178            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
179        }
180        if let Some(ref home) = item.home_instance_id {
181            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
182        }
183        
184        if audit.is_empty() {
185            None
186        } else {
187            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
188        }
189    }
190    
191    /// Parse audit JSON back into SyncItem fields.
192    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
193        match audit_str {
194            Some(s) => {
195                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
196                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
197                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
198                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
199                    (batch_id, trace_parent, home_instance_id)
200                } else {
201                    (None, None, None)
202                }
203            }
204            None => (None, None, None),
205        }
206    }
207}
208
209#[async_trait]
210impl ArchiveStore for SqlStore {
211    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
212        let id = id.to_string();
213        
214        retry("sql_get", &RetryConfig::query(), || async {
215            let result = sqlx::query(
216                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state FROM sync_items WHERE id = ?"
217            )
218                .bind(&id)
219                .fetch_optional(&self.pool)
220                .await
221                .map_err(|e| StorageError::Backend(e.to_string()))?;
222
223            match result {
224                Some(row) => {
225                    let version: i64 = row.try_get("version").unwrap_or(1);
226                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
227                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
228                    
229                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
230                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
231                        .or_else(|| {
232                            row.try_get::<Vec<u8>, _>("payload").ok()
233                                .and_then(|bytes| String::from_utf8(bytes).ok())
234                        });
235                    
236                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
237                    
238                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
239                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
240                        .or_else(|| {
241                            row.try_get::<Vec<u8>, _>("audit").ok()
242                                .and_then(|bytes| String::from_utf8(bytes).ok())
243                        });
244                    
245                    // State field - try String first (SQLite), then bytes (MySQL)
246                    let state: String = row.try_get::<String, _>("state").ok()
247                        .or_else(|| {
248                            row.try_get::<Vec<u8>, _>("state").ok()
249                                .and_then(|bytes| String::from_utf8(bytes).ok())
250                        })
251                        .unwrap_or_else(|| "default".to_string());
252                    
253                    // Determine content and content_type
254                    let (content, content_type) = if let Some(ref json_str) = payload_json {
255                        // JSON content - parse and re-serialize to bytes
256                        let content = json_str.as_bytes().to_vec();
257                        (content, ContentType::Json)
258                    } else if let Some(blob) = payload_blob {
259                        // Binary content
260                        (blob, ContentType::Binary)
261                    } else {
262                        return Err(StorageError::Backend("No payload in row".to_string()));
263                    };
264                    
265                    // Parse audit fields
266                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
267                    
268                    let item = SyncItem::reconstruct(
269                        id.clone(),
270                        version as u64,
271                        timestamp,
272                        content_type,
273                        content,
274                        batch_id,
275                        trace_parent,
276                        payload_hash.unwrap_or_default(),
277                        home_instance_id,
278                        state,
279                    );
280                    Ok(Some(item))
281                }
282                None => Ok(None),
283            }
284        })
285        .await
286    }
287
288    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
289        let id = item.object_id.clone();
290        let version = item.version as i64;
291        let timestamp = item.updated_at;
292        let payload_hash = if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) };
293        let audit_json = Self::build_audit_json(item);
294        let state = item.state.clone();
295        
296        // Determine payload storage based on content type
297        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
298            ContentType::Json => {
299                let json_str = String::from_utf8_lossy(&item.content).to_string();
300                (Some(json_str), None)
301            }
302            ContentType::Binary => {
303                (None, Some(item.content.clone()))
304            }
305        };
306
307        let sql = if self.is_sqlite {
308            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state) 
309             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?) 
310             ON CONFLICT(id) DO UPDATE SET 
311                version = excluded.version, 
312                timestamp = excluded.timestamp, 
313                payload_hash = excluded.payload_hash, 
314                payload = excluded.payload, 
315                payload_blob = excluded.payload_blob, 
316                audit = excluded.audit, 
317                merkle_dirty = 1, 
318                state = excluded.state"
319        } else {
320            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state) 
321             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?) 
322             ON DUPLICATE KEY UPDATE 
323                version = VALUES(version), 
324                timestamp = VALUES(timestamp), 
325                payload_hash = VALUES(payload_hash), 
326                payload = VALUES(payload), 
327                payload_blob = VALUES(payload_blob), 
328                audit = VALUES(audit), 
329                merkle_dirty = 1, 
330                state = VALUES(state)"
331        };
332
333        retry("sql_put", &RetryConfig::query(), || async {
334            sqlx::query(sql)
335                .bind(&id)
336                .bind(version)
337                .bind(timestamp)
338                .bind(&payload_hash)
339                .bind(&payload_json)
340                .bind(&payload_blob)
341                .bind(&audit_json)
342                .bind(&state)
343                .execute(&self.pool)
344                .await
345                .map_err(|e| StorageError::Backend(e.to_string()))?;
346            Ok(())
347        })
348        .await
349    }
350
351    async fn delete(&self, id: &str) -> Result<(), StorageError> {
352        let id = id.to_string();
353        retry("sql_delete", &RetryConfig::query(), || async {
354            sqlx::query("DELETE FROM sync_items WHERE id = ?")
355                .bind(&id)
356                .execute(&self.pool)
357                .await
358                .map_err(|e| StorageError::Backend(e.to_string()))?;
359            Ok(())
360        })
361        .await
362    }
363
364    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
365        let id = id.to_string();
366        retry("sql_exists", &RetryConfig::query(), || async {
367            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
368                .bind(&id)
369                .fetch_optional(&self.pool)
370                .await
371                .map_err(|e| StorageError::Backend(e.to_string()))?;
372            Ok(result.is_some())
373        })
374        .await
375    }
376    
377    /// Write a batch of items in a single multi-row INSERT with verification.
378    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
379        if items.is_empty() {
380            return Ok(BatchWriteResult {
381                batch_id: String::new(),
382                written: 0,
383                verified: true,
384            });
385        }
386
387        // Generate a unique batch ID
388        let batch_id = uuid::Uuid::new_v4().to_string();
389        
390        // Stamp all items with the batch_id
391        for item in items.iter_mut() {
392            item.batch_id = Some(batch_id.clone());
393        }
394
395        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
396        const CHUNK_SIZE: usize = 500;
397        let mut total_written = 0usize;
398
399        for chunk in items.chunks(CHUNK_SIZE) {
400            let written = self.put_batch_chunk(chunk, &batch_id).await?;
401            total_written += written;
402        }
403
404        // Verify the batch was written
405        let verified_count = self.verify_batch(&batch_id).await?;
406        let verified = verified_count == items.len();
407
408        if !verified {
409            tracing::warn!(
410                batch_id = %batch_id,
411                expected = items.len(),
412                actual = verified_count,
413                "Batch verification mismatch"
414            );
415        }
416
417        Ok(BatchWriteResult {
418            batch_id,
419            written: total_written,
420            verified,
421        })
422    }
423
424    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
425        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
426            .bind(limit as i64)
427            .bind(offset as i64)
428            .fetch_all(&self.pool)
429            .await
430            .map_err(|e| StorageError::Backend(e.to_string()))?;
431        
432        let mut keys = Vec::with_capacity(rows.len());
433        for row in rows {
434            let id: String = row.try_get("id")
435                .map_err(|e| StorageError::Backend(e.to_string()))?;
436            keys.push(id);
437        }
438        
439        Ok(keys)
440    }
441
442    async fn count_all(&self) -> Result<u64, StorageError> {
443        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
444            .fetch_one(&self.pool)
445            .await
446            .map_err(|e| StorageError::Backend(e.to_string()))?;
447        
448        let count: i64 = result.try_get("cnt")
449            .map_err(|e| StorageError::Backend(e.to_string()))?;
450        
451        Ok(count as u64)
452    }
453}
454
455impl SqlStore {
456    /// Write a single chunk of items with content-type aware storage.
457    /// The batch_id is already embedded in each item's audit JSON.
458    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
459        let placeholders: Vec<String> = (0..chunk.len())
460            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?)".to_string())
461            .collect();
462        
463        let sql = if self.is_sqlite {
464            format!(
465                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state) VALUES {} \
466                 ON CONFLICT(id) DO UPDATE SET \
467                    version = excluded.version, \
468                    timestamp = excluded.timestamp, \
469                    payload_hash = excluded.payload_hash, \
470                    payload = excluded.payload, \
471                    payload_blob = excluded.payload_blob, \
472                    audit = excluded.audit, \
473                    merkle_dirty = 1, \
474                    state = excluded.state",
475                placeholders.join(", ")
476            )
477        } else {
478            format!(
479                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state) VALUES {} \
480                 ON DUPLICATE KEY UPDATE \
481                    version = VALUES(version), \
482                    timestamp = VALUES(timestamp), \
483                    payload_hash = VALUES(payload_hash), \
484                    payload = VALUES(payload), \
485                    payload_blob = VALUES(payload_blob), \
486                    audit = VALUES(audit), \
487                    merkle_dirty = 1, \
488                    state = VALUES(state)",
489                placeholders.join(", ")
490            )
491        };
492
493        // Prepare all items with their fields
494        #[derive(Clone)]
495        struct PreparedRow {
496            id: String,
497            version: i64,
498            timestamp: i64,
499            payload_hash: Option<String>,
500            payload_json: Option<String>,
501            payload_blob: Option<Vec<u8>>,
502            audit_json: Option<String>,
503            state: String,
504        }
505        
506        let prepared: Vec<PreparedRow> = chunk.iter()
507            .map(|item| {
508                let (payload_json, payload_blob) = match item.content_type {
509                    ContentType::Json => {
510                        let json_str = String::from_utf8_lossy(&item.content).to_string();
511                        (Some(json_str), None)
512                    }
513                    ContentType::Binary => {
514                        (None, Some(item.content.clone()))
515                    }
516                };
517                
518                PreparedRow {
519                    id: item.object_id.clone(),
520                    version: item.version as i64,
521                    timestamp: item.updated_at,
522                    payload_hash: if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) },
523                    payload_json,
524                    payload_blob,
525                    audit_json: Self::build_audit_json(item),
526                    state: item.state.clone(),
527                }
528            })
529            .collect();
530
531        retry("sql_put_batch", &RetryConfig::query(), || {
532            let sql = sql.clone();
533            let prepared = prepared.clone();
534            async move {
535                let mut query = sqlx::query(&sql);
536                
537                for row in &prepared {
538                    query = query
539                        .bind(&row.id)
540                        .bind(row.version)
541                        .bind(row.timestamp)
542                        .bind(&row.payload_hash)
543                        .bind(&row.payload_json)
544                        .bind(&row.payload_blob)
545                        .bind(&row.audit_json)
546                        .bind(&row.state);
547                }
548                
549                query.execute(&self.pool)
550                    .await
551                    .map_err(|e| StorageError::Backend(e.to_string()))?;
552                
553                Ok(())
554            }
555        })
556        .await?;
557
558        Ok(chunk.len())
559    }
560
561    /// Verify a batch was written by counting items with the given batch_id (in audit JSON).
562    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
563        let batch_id = batch_id.to_string();
564        
565        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
566        let sql = if self.is_sqlite {
567            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
568        } else {
569            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
570        };
571        
572        let bind_value = if self.is_sqlite {
573            format!("%\"batch\":\"{}%", batch_id)
574        } else {
575            batch_id.clone()
576        };
577        
578        let result = sqlx::query(sql)
579            .bind(&bind_value)
580            .fetch_one(&self.pool)
581            .await
582            .map_err(|e| StorageError::Backend(e.to_string()))?;
583        
584        let count: i64 = result.try_get("cnt")
585            .map_err(|e| StorageError::Backend(e.to_string()))?;
586        
587        Ok(count as usize)
588    }
589
590    /// Scan a batch of items (for WAL drain).
591    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
592        let rows = sqlx::query(
593            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state FROM sync_items ORDER BY timestamp ASC LIMIT ?"
594        )
595            .bind(limit as i64)
596            .fetch_all(&self.pool)
597            .await
598            .map_err(|e| StorageError::Backend(e.to_string()))?;
599        
600        let mut items = Vec::with_capacity(rows.len());
601        for row in rows {
602            let id: String = row.try_get("id")
603                .map_err(|e| StorageError::Backend(e.to_string()))?;
604            let version: i64 = row.try_get("version").unwrap_or(1);
605            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
606            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
607            
608            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
609            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
610            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
611            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
612            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
613            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
614            
615            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
616            let state: String = state_bytes
617                .and_then(|bytes| String::from_utf8(bytes).ok())
618                .unwrap_or_else(|| "default".to_string());
619            
620            let (content, content_type) = if let Some(ref json_str) = payload_json {
621                (json_str.as_bytes().to_vec(), ContentType::Json)
622            } else if let Some(blob) = payload_blob {
623                (blob, ContentType::Binary)
624            } else {
625                continue; // Skip rows with no payload
626            };
627            
628            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
629            
630            let item = SyncItem::reconstruct(
631                id,
632                version as u64,
633                timestamp,
634                content_type,
635                content,
636                batch_id,
637                trace_parent,
638                payload_hash.unwrap_or_default(),
639                home_instance_id,
640                state,
641            );
642            items.push(item);
643        }
644        
645        Ok(items)
646    }
647
648    /// Delete multiple items by ID in a single query.
649    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
650        if ids.is_empty() {
651            return Ok(0);
652        }
653
654        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
655        let sql = format!(
656            "DELETE FROM sync_items WHERE id IN ({})",
657            placeholders.join(", ")
658        );
659
660        retry("sql_delete_batch", &RetryConfig::query(), || {
661            let sql = sql.clone();
662            let ids = ids.to_vec();
663            async move {
664                let mut query = sqlx::query(&sql);
665                for id in &ids {
666                    query = query.bind(id);
667                }
668                
669                let result = query.execute(&self.pool)
670                    .await
671                    .map_err(|e| StorageError::Backend(e.to_string()))?;
672                
673                Ok(result.rows_affected() as usize)
674            }
675        })
676        .await
677    }
678    
679    // ═══════════════════════════════════════════════════════════════════════════
680    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
681    // ═══════════════════════════════════════════════════════════════════════════
682    
683    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
684    ///
685    /// Used by background merkle processor to batch recalculate affected trees.
686    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
687        let rows = sqlx::query(
688            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
689        )
690            .bind(limit as i64)
691            .fetch_all(&self.pool)
692            .await
693            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
694        
695        let mut ids = Vec::with_capacity(rows.len());
696        for row in rows {
697            let id: String = row.try_get("id")
698                .map_err(|e| StorageError::Backend(e.to_string()))?;
699            ids.push(id);
700        }
701        
702        Ok(ids)
703    }
704    
705    /// Count items with merkle_dirty = 1.
706    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
707        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
708            .fetch_one(&self.pool)
709            .await
710            .map_err(|e| StorageError::Backend(e.to_string()))?;
711        
712        let count: i64 = result.try_get("cnt")
713            .map_err(|e| StorageError::Backend(e.to_string()))?;
714        
715        Ok(count as u64)
716    }
717    
718    /// Mark items as merkle-clean after recalculation.
719    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
720        if ids.is_empty() {
721            return Ok(0);
722        }
723        
724        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
725        let sql = format!(
726            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
727            placeholders.join(", ")
728        );
729        
730        let mut query = sqlx::query(&sql);
731        for id in ids {
732            query = query.bind(id);
733        }
734        
735        let result = query.execute(&self.pool)
736            .await
737            .map_err(|e| StorageError::Backend(e.to_string()))?;
738        
739        Ok(result.rows_affected() as usize)
740    }
741    
742    /// Check if there are any dirty merkle items.
743    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
744        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
745            .fetch_optional(&self.pool)
746            .await
747            .map_err(|e| StorageError::Backend(e.to_string()))?;
748        
749        Ok(result.is_some())
750    }
751    
752    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
753    ///
754    /// Returns the items themselves so merkle can be calculated.
755    /// Use `mark_merkle_clean()` after processing to clear the flag.
756    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
757        let rows = sqlx::query(
758            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state 
759             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
760        )
761            .bind(limit as i64)
762            .fetch_all(&self.pool)
763            .await
764            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
765        
766        let mut items = Vec::with_capacity(rows.len());
767        for row in rows {
768            let id: String = row.try_get("id")
769                .map_err(|e| StorageError::Backend(e.to_string()))?;
770            let version: i64 = row.try_get("version").unwrap_or(1);
771            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
772            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
773            
774            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
775            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
776            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
777                String::from_utf8(bytes).ok()
778            });
779            
780            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
781            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
782            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
783                String::from_utf8(bytes).ok()
784            });
785            
786            // State field
787            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
788            let state: String = state_bytes
789                .and_then(|bytes| String::from_utf8(bytes).ok())
790                .unwrap_or_else(|| "default".to_string());
791            
792            // Determine content and content_type
793            let (content, content_type) = if let Some(ref json_str) = payload_json {
794                (json_str.as_bytes().to_vec(), ContentType::Json)
795            } else if let Some(blob) = payload_blob {
796                (blob, ContentType::Binary)
797            } else {
798                continue; // Skip items with no payload
799            };
800            
801            // Parse audit fields
802            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
803            
804            let item = SyncItem::reconstruct(
805                id,
806                version as u64,
807                timestamp,
808                content_type,
809                content,
810                batch_id,
811                trace_parent,
812                payload_hash.unwrap_or_default(),
813                home_instance_id,
814                state,
815            );
816            items.push(item);
817        }
818        
819        Ok(items)
820    }
821    
822    // ═══════════════════════════════════════════════════════════════════════════
823    // State-based queries: Fast indexed access by caller-defined state tag
824    // ═══════════════════════════════════════════════════════════════════════════
825    
826    /// Get items by state (e.g., "delta", "base", "pending").
827    ///
828    /// Uses indexed query for fast retrieval.
829    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
830        let rows = sqlx::query(
831            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state 
832             FROM sync_items WHERE state = ? LIMIT ?"
833        )
834            .bind(state)
835            .bind(limit as i64)
836            .fetch_all(&self.pool)
837            .await
838            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
839        
840        let mut items = Vec::with_capacity(rows.len());
841        for row in rows {
842            let id: String = row.try_get("id")
843                .map_err(|e| StorageError::Backend(e.to_string()))?;
844            let version: i64 = row.try_get("version").unwrap_or(1);
845            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
846            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
847            
848            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
849            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
850                .or_else(|| {
851                    row.try_get::<Vec<u8>, _>("payload").ok()
852                        .and_then(|bytes| String::from_utf8(bytes).ok())
853                });
854            
855            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
856            
857            // Try reading audit as String first (SQLite), then bytes (MySQL)
858            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
859                .or_else(|| {
860                    row.try_get::<Vec<u8>, _>("audit").ok()
861                        .and_then(|bytes| String::from_utf8(bytes).ok())
862                });
863            
864            // State field - try String first (SQLite), then bytes (MySQL)
865            let state: String = row.try_get::<String, _>("state").ok()
866                .or_else(|| {
867                    row.try_get::<Vec<u8>, _>("state").ok()
868                        .and_then(|bytes| String::from_utf8(bytes).ok())
869                })
870                .unwrap_or_else(|| "default".to_string());
871            
872            let (content, content_type) = if let Some(ref json_str) = payload_json {
873                (json_str.as_bytes().to_vec(), ContentType::Json)
874            } else if let Some(blob) = payload_blob {
875                (blob, ContentType::Binary)
876            } else {
877                continue;
878            };
879            
880            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
881            
882            let item = SyncItem::reconstruct(
883                id,
884                version as u64,
885                timestamp,
886                content_type,
887                content,
888                batch_id,
889                trace_parent,
890                payload_hash.unwrap_or_default(),
891                home_instance_id,
892                state,
893            );
894            items.push(item);
895        }
896        
897        Ok(items)
898    }
899    
900    /// Count items in a given state.
901    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
902        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
903            .bind(state)
904            .fetch_one(&self.pool)
905            .await
906            .map_err(|e| StorageError::Backend(e.to_string()))?;
907        
908        let count: i64 = result.try_get("cnt")
909            .map_err(|e| StorageError::Backend(e.to_string()))?;
910        
911        Ok(count as u64)
912    }
913    
914    /// Get just the IDs of items in a given state (lightweight query).
915    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
916        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
917            .bind(state)
918            .bind(limit as i64)
919            .fetch_all(&self.pool)
920            .await
921            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
922        
923        let mut ids = Vec::with_capacity(rows.len());
924        for row in rows {
925            let id: String = row.try_get("id")
926                .map_err(|e| StorageError::Backend(e.to_string()))?;
927            ids.push(id);
928        }
929        
930        Ok(ids)
931    }
932    
933    /// Update the state of an item by ID.
934    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
935        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
936            .bind(new_state)
937            .bind(id)
938            .execute(&self.pool)
939            .await
940            .map_err(|e| StorageError::Backend(e.to_string()))?;
941        
942        Ok(result.rows_affected() > 0)
943    }
944    
945    /// Delete all items in a given state.
946    ///
947    /// Returns the number of deleted items.
948    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
949        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
950            .bind(state)
951            .execute(&self.pool)
952            .await
953            .map_err(|e| StorageError::Backend(e.to_string()))?;
954        
955        Ok(result.rows_affected())
956    }
957    
958    /// Scan items by ID prefix.
959    ///
960    /// Efficiently retrieves all items whose ID starts with the given prefix.
961    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
962    ///
963    /// # Example
964    /// ```rust,ignore
965    /// // Get all deltas for object user.123
966    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
967    /// ```
968    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
969        // Build LIKE pattern: "prefix%"
970        let pattern = format!("{}%", prefix);
971        
972        let rows = sqlx::query(
973            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state 
974             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
975        )
976            .bind(&pattern)
977            .bind(limit as i64)
978            .fetch_all(&self.pool)
979            .await
980            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
981        
982        let mut items = Vec::with_capacity(rows.len());
983        for row in rows {
984            let id: String = row.try_get("id")
985                .map_err(|e| StorageError::Backend(e.to_string()))?;
986            let version: i64 = row.try_get("version").unwrap_or(1);
987            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
988            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
989            
990            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
991            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
992                .or_else(|| {
993                    row.try_get::<Vec<u8>, _>("payload").ok()
994                        .and_then(|bytes| String::from_utf8(bytes).ok())
995                });
996            
997            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
998            
999            // Try reading audit as String first (SQLite), then bytes (MySQL)
1000            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1001                .or_else(|| {
1002                    row.try_get::<Vec<u8>, _>("audit").ok()
1003                        .and_then(|bytes| String::from_utf8(bytes).ok())
1004                });
1005            
1006            // State field - try String first (SQLite), then bytes (MySQL)
1007            let state: String = row.try_get::<String, _>("state").ok()
1008                .or_else(|| {
1009                    row.try_get::<Vec<u8>, _>("state").ok()
1010                        .and_then(|bytes| String::from_utf8(bytes).ok())
1011                })
1012                .unwrap_or_else(|| "default".to_string());
1013            
1014            let (content, content_type) = if let Some(ref json_str) = payload_json {
1015                (json_str.as_bytes().to_vec(), ContentType::Json)
1016            } else if let Some(blob) = payload_blob {
1017                (blob, ContentType::Binary)
1018            } else {
1019                continue;
1020            };
1021            
1022            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1023            
1024            let item = SyncItem::reconstruct(
1025                id,
1026                version as u64,
1027                timestamp,
1028                content_type,
1029                content,
1030                batch_id,
1031                trace_parent,
1032                payload_hash.unwrap_or_default(),
1033                home_instance_id,
1034                state,
1035            );
1036            items.push(item);
1037        }
1038        
1039        Ok(items)
1040    }
1041    
1042    /// Count items matching an ID prefix.
1043    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1044        let pattern = format!("{}%", prefix);
1045        
1046        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1047            .bind(&pattern)
1048            .fetch_one(&self.pool)
1049            .await
1050            .map_err(|e| StorageError::Backend(e.to_string()))?;
1051        
1052        let count: i64 = result.try_get("cnt")
1053            .map_err(|e| StorageError::Backend(e.to_string()))?;
1054        
1055        Ok(count as u64)
1056    }
1057    
1058    /// Delete all items matching an ID prefix.
1059    ///
1060    /// Returns the number of deleted items.
1061    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1062        let pattern = format!("{}%", prefix);
1063        
1064        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1065            .bind(&pattern)
1066            .execute(&self.pool)
1067            .await
1068            .map_err(|e| StorageError::Backend(e.to_string()))?;
1069        
1070        Ok(result.rows_affected())
1071    }
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076    use super::*;
1077    use std::path::PathBuf;
1078    use serde_json::json;
1079    
1080    fn temp_db_path(name: &str) -> PathBuf {
1081        // Use local temp/ folder (gitignored) instead of system temp
1082        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1083    }
1084    
1085    /// Clean up SQLite database and its WAL files
1086    fn cleanup_db(path: &PathBuf) {
1087        let _ = std::fs::remove_file(path);
1088        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1089        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1090    }
1091    
1092    fn test_item(id: &str, state: &str) -> SyncItem {
1093        SyncItem::from_json(id.to_string(), json!({"id": id}))
1094            .with_state(state)
1095    }
1096
1097    #[tokio::test]
1098    async fn test_state_stored_and_retrieved() {
1099        let db_path = temp_db_path("stored");
1100        cleanup_db(&db_path);
1101        
1102        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1103        let store = SqlStore::new(&url).await.unwrap();
1104        
1105        // Store item with custom state
1106        let item = test_item("item1", "delta");
1107        store.put(&item).await.unwrap();
1108        
1109        // Retrieve and verify state
1110        let retrieved = store.get("item1").await.unwrap().unwrap();
1111        assert_eq!(retrieved.state, "delta");
1112        
1113        cleanup_db(&db_path);
1114    }
1115
1116    #[tokio::test]
1117    async fn test_state_default_value() {
1118        let db_path = temp_db_path("default");
1119        cleanup_db(&db_path);
1120        
1121        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1122        let store = SqlStore::new(&url).await.unwrap();
1123        
1124        // Store item with default state
1125        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1126        store.put(&item).await.unwrap();
1127        
1128        let retrieved = store.get("item1").await.unwrap().unwrap();
1129        assert_eq!(retrieved.state, "default");
1130        
1131        cleanup_db(&db_path);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_get_by_state() {
1136        let db_path = temp_db_path("get_by_state");
1137        cleanup_db(&db_path);
1138        
1139        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1140        let store = SqlStore::new(&url).await.unwrap();
1141        
1142        // Insert items in different states
1143        store.put(&test_item("delta1", "delta")).await.unwrap();
1144        store.put(&test_item("delta2", "delta")).await.unwrap();
1145        store.put(&test_item("base1", "base")).await.unwrap();
1146        store.put(&test_item("pending1", "pending")).await.unwrap();
1147        
1148        // Query by state
1149        let deltas = store.get_by_state("delta", 100).await.unwrap();
1150        assert_eq!(deltas.len(), 2);
1151        assert!(deltas.iter().all(|i| i.state == "delta"));
1152        
1153        let bases = store.get_by_state("base", 100).await.unwrap();
1154        assert_eq!(bases.len(), 1);
1155        assert_eq!(bases[0].object_id, "base1");
1156        
1157        // Empty result for non-existent state
1158        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1159        assert!(none.is_empty());
1160        
1161        cleanup_db(&db_path);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_get_by_state_with_limit() {
1166        let db_path = temp_db_path("get_by_state_limit");
1167        cleanup_db(&db_path);
1168        
1169        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1170        let store = SqlStore::new(&url).await.unwrap();
1171        
1172        // Insert 10 items
1173        for i in 0..10 {
1174            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1175        }
1176        
1177        // Query with limit
1178        let limited = store.get_by_state("batch", 5).await.unwrap();
1179        assert_eq!(limited.len(), 5);
1180        
1181        cleanup_db(&db_path);
1182    }
1183
1184    #[tokio::test]
1185    async fn test_count_by_state() {
1186        let db_path = temp_db_path("count_by_state");
1187        cleanup_db(&db_path);
1188        
1189        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1190        let store = SqlStore::new(&url).await.unwrap();
1191        
1192        // Insert items
1193        store.put(&test_item("a1", "alpha")).await.unwrap();
1194        store.put(&test_item("a2", "alpha")).await.unwrap();
1195        store.put(&test_item("a3", "alpha")).await.unwrap();
1196        store.put(&test_item("b1", "beta")).await.unwrap();
1197        
1198        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1199        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1200        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1201        
1202        cleanup_db(&db_path);
1203    }
1204
1205    #[tokio::test]
1206    async fn test_list_state_ids() {
1207        let db_path = temp_db_path("list_state_ids");
1208        cleanup_db(&db_path);
1209        
1210        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1211        let store = SqlStore::new(&url).await.unwrap();
1212        
1213        store.put(&test_item("id1", "pending")).await.unwrap();
1214        store.put(&test_item("id2", "pending")).await.unwrap();
1215        store.put(&test_item("id3", "done")).await.unwrap();
1216        
1217        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1218        assert_eq!(pending_ids.len(), 2);
1219        assert!(pending_ids.contains(&"id1".to_string()));
1220        assert!(pending_ids.contains(&"id2".to_string()));
1221        
1222        cleanup_db(&db_path);
1223    }
1224
1225    #[tokio::test]
1226    async fn test_set_state() {
1227        let db_path = temp_db_path("set_state");
1228        cleanup_db(&db_path);
1229        
1230        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1231        let store = SqlStore::new(&url).await.unwrap();
1232        
1233        store.put(&test_item("item1", "pending")).await.unwrap();
1234        
1235        // Verify initial state
1236        let before = store.get("item1").await.unwrap().unwrap();
1237        assert_eq!(before.state, "pending");
1238        
1239        // Update state
1240        let updated = store.set_state("item1", "approved").await.unwrap();
1241        assert!(updated);
1242        
1243        // Verify new state
1244        let after = store.get("item1").await.unwrap().unwrap();
1245        assert_eq!(after.state, "approved");
1246        
1247        // Non-existent item returns false
1248        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1249        assert!(!not_found);
1250        
1251        cleanup_db(&db_path);
1252    }
1253
1254    #[tokio::test]
1255    async fn test_delete_by_state() {
1256        let db_path = temp_db_path("delete_by_state");
1257        cleanup_db(&db_path);
1258        
1259        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1260        let store = SqlStore::new(&url).await.unwrap();
1261        
1262        store.put(&test_item("keep1", "keep")).await.unwrap();
1263        store.put(&test_item("keep2", "keep")).await.unwrap();
1264        store.put(&test_item("del1", "delete_me")).await.unwrap();
1265        store.put(&test_item("del2", "delete_me")).await.unwrap();
1266        store.put(&test_item("del3", "delete_me")).await.unwrap();
1267        
1268        // Delete by state
1269        let deleted = store.delete_by_state("delete_me").await.unwrap();
1270        assert_eq!(deleted, 3);
1271        
1272        // Verify deleted
1273        assert!(store.get("del1").await.unwrap().is_none());
1274        assert!(store.get("del2").await.unwrap().is_none());
1275        
1276        // Verify others remain
1277        assert!(store.get("keep1").await.unwrap().is_some());
1278        assert!(store.get("keep2").await.unwrap().is_some());
1279        
1280        // Delete non-existent state returns 0
1281        let zero = store.delete_by_state("nonexistent").await.unwrap();
1282        assert_eq!(zero, 0);
1283        
1284        cleanup_db(&db_path);
1285    }
1286
1287    #[tokio::test]
1288    async fn test_multiple_puts_preserve_state() {
1289        let db_path = temp_db_path("multi_put_state");
1290        cleanup_db(&db_path);
1291        
1292        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1293        let store = SqlStore::new(&url).await.unwrap();
1294        
1295        // Put multiple items with different states
1296        store.put(&test_item("a", "state_a")).await.unwrap();
1297        store.put(&test_item("b", "state_b")).await.unwrap();
1298        store.put(&test_item("c", "state_c")).await.unwrap();
1299        
1300        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1301        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1302        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1303        
1304        cleanup_db(&db_path);
1305    }
1306
1307    #[tokio::test]
1308    async fn test_scan_prefix() {
1309        let db_path = temp_db_path("scan_prefix");
1310        cleanup_db(&db_path);
1311        
1312        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1313        let store = SqlStore::new(&url).await.unwrap();
1314        
1315        // Insert items with different prefixes (CRDT pattern)
1316        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1317        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1318        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1319        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1320        store.put(&test_item("base:user.123", "base")).await.unwrap();
1321        store.put(&test_item("base:user.456", "base")).await.unwrap();
1322        
1323        // Scan specific object's deltas
1324        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1325        assert_eq!(user123_deltas.len(), 3);
1326        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1327        
1328        // Scan different object
1329        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1330        assert_eq!(user456_deltas.len(), 1);
1331        
1332        // Scan all deltas
1333        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1334        assert_eq!(all_deltas.len(), 4);
1335        
1336        // Scan all bases
1337        let bases = store.scan_prefix("base:", 100).await.unwrap();
1338        assert_eq!(bases.len(), 2);
1339        
1340        // Empty result for non-matching prefix
1341        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1342        assert!(none.is_empty());
1343        
1344        cleanup_db(&db_path);
1345    }
1346
1347    #[tokio::test]
1348    async fn test_scan_prefix_with_limit() {
1349        let db_path = temp_db_path("scan_prefix_limit");
1350        cleanup_db(&db_path);
1351        
1352        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1353        let store = SqlStore::new(&url).await.unwrap();
1354        
1355        // Insert 20 items
1356        for i in 0..20 {
1357            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1358        }
1359        
1360        // Query with limit
1361        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1362        assert_eq!(limited.len(), 5);
1363        
1364        // Verify we can get all with larger limit
1365        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1366        assert_eq!(all.len(), 20);
1367        
1368        cleanup_db(&db_path);
1369    }
1370
1371    #[tokio::test]
1372    async fn test_count_prefix() {
1373        let db_path = temp_db_path("count_prefix");
1374        cleanup_db(&db_path);
1375        
1376        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1377        let store = SqlStore::new(&url).await.unwrap();
1378        
1379        // Insert items with different prefixes
1380        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1381        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1382        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1383        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1384        store.put(&test_item("base:user.123", "base")).await.unwrap();
1385        
1386        // Count by prefix
1387        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1388        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1389        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1390        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1391        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1392        
1393        cleanup_db(&db_path);
1394    }
1395
1396    #[tokio::test]
1397    async fn test_delete_prefix() {
1398        let db_path = temp_db_path("delete_prefix");
1399        cleanup_db(&db_path);
1400        
1401        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1402        let store = SqlStore::new(&url).await.unwrap();
1403        
1404        // Insert items with different prefixes
1405        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1406        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1407        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1408        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1409        store.put(&test_item("base:user.123", "base")).await.unwrap();
1410        
1411        // Delete one object's deltas
1412        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1413        assert_eq!(deleted, 3);
1414        
1415        // Verify deleted
1416        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1417        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1418        
1419        // Verify other deltas remain
1420        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1421        
1422        // Verify bases remain
1423        assert!(store.get("base:user.123").await.unwrap().is_some());
1424        
1425        // Delete non-existent prefix returns 0
1426        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1427        assert_eq!(zero, 0);
1428        
1429        cleanup_db(&db_path);
1430    }
1431}