sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49// SQLx `Any` driver requires runtime installation
50static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53    INSTALL_DRIVERS.call_once(|| {
54        sqlx::any::install_default_drivers();
55    });
56}
57
58pub struct SqlStore {
59    pool: AnyPool,
60    is_sqlite: bool,
61}
62
63impl SqlStore {
64    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
65    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66        install_drivers();
67        
68        let is_sqlite = connection_string.starts_with("sqlite:");
69        
70        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71            AnyPoolOptions::new()
72                .max_connections(20)
73                .acquire_timeout(Duration::from_secs(10))
74                .idle_timeout(Duration::from_secs(300))
75                .connect(connection_string)
76                .await
77                .map_err(|e| StorageError::Backend(e.to_string()))
78        })
79        .await?;
80
81        let store = Self { pool, is_sqlite };
82        
83        // Enable WAL mode for SQLite (better concurrency, faster writes)
84        if is_sqlite {
85            store.enable_wal_mode().await?;
86        }
87        
88        store.init_schema().await?;
89        Ok(store)
90    }
91    
92    /// Get a clone of the connection pool for sharing with other stores.
93    pub fn pool(&self) -> AnyPool {
94        self.pool.clone()
95    }
96    
97    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
98    /// 
99    /// Benefits:
100    /// - Concurrent reads during writes (readers don't block writers)
101    /// - Better write performance (single fsync instead of two)
102    /// - More predictable performance under load
103    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
104        sqlx::query("PRAGMA journal_mode = WAL")
105            .execute(&self.pool)
106            .await
107            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
108        
109        // Also set synchronous to NORMAL for better performance while still safe
110        // (FULL is default but WAL mode is safe with NORMAL)
111        sqlx::query("PRAGMA synchronous = NORMAL")
112            .execute(&self.pool)
113            .await
114            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
115        
116        Ok(())
117    }
118
119    async fn init_schema(&self) -> Result<(), StorageError> {
120        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
121        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
122        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
123        //
124        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
125        // This enables efficient multi-instance coordination without locking.
126        //
127        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
128        // Indexed for fast state-based queries.
129        let sql = if self.is_sqlite {
130            r#"
131            CREATE TABLE IF NOT EXISTS sync_items (
132                id TEXT PRIMARY KEY,
133                version INTEGER NOT NULL DEFAULT 1,
134                timestamp INTEGER NOT NULL,
135                payload_hash TEXT,
136                payload TEXT,
137                payload_blob BLOB,
138                audit TEXT,
139                merkle_dirty INTEGER NOT NULL DEFAULT 1,
140                state TEXT NOT NULL DEFAULT 'default',
141                access_count INTEGER NOT NULL DEFAULT 0,
142                last_accessed INTEGER NOT NULL DEFAULT 0
143            )
144            "#
145        } else {
146            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
147            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
148            r#"
149            CREATE TABLE IF NOT EXISTS sync_items (
150                id VARCHAR(255) PRIMARY KEY,
151                version BIGINT NOT NULL DEFAULT 1,
152                timestamp BIGINT NOT NULL,
153                payload_hash VARCHAR(64),
154                payload LONGTEXT,
155                payload_blob MEDIUMBLOB,
156                audit TEXT,
157                merkle_dirty TINYINT NOT NULL DEFAULT 1,
158                state VARCHAR(32) NOT NULL DEFAULT 'default',
159                access_count BIGINT NOT NULL DEFAULT 0,
160                last_accessed BIGINT NOT NULL DEFAULT 0,
161                INDEX idx_timestamp (timestamp),
162                INDEX idx_merkle_dirty (merkle_dirty),
163                INDEX idx_state (state)
164            )
165            "#
166        };
167
168        retry("sql_init_schema", &RetryConfig::startup(), || async {
169            sqlx::query(sql)
170                .execute(&self.pool)
171                .await
172                .map_err(|e| StorageError::Backend(e.to_string()))
173        })
174        .await?;
175
176        Ok(())
177    }
178    
179    /// Build the audit JSON object for operational metadata.
180    fn build_audit_json(item: &SyncItem) -> Option<String> {
181        let mut audit = serde_json::Map::new();
182        
183        if let Some(ref batch_id) = item.batch_id {
184            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
185        }
186        if let Some(ref trace_parent) = item.trace_parent {
187            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
188        }
189        if let Some(ref home) = item.home_instance_id {
190            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
191        }
192        
193        if audit.is_empty() {
194            None
195        } else {
196            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
197        }
198    }
199    
200    /// Parse audit JSON back into SyncItem fields.
201    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
202        match audit_str {
203            Some(s) => {
204                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
205                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
206                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
207                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
208                    (batch_id, trace_parent, home_instance_id)
209                } else {
210                    (None, None, None)
211                }
212            }
213            None => (None, None, None),
214        }
215    }
216}
217
218#[async_trait]
219impl ArchiveStore for SqlStore {
220    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
221        let id = id.to_string();
222        
223        retry("sql_get", &RetryConfig::query(), || async {
224            let result = sqlx::query(
225                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
226            )
227                .bind(&id)
228                .fetch_optional(&self.pool)
229                .await
230                .map_err(|e| StorageError::Backend(e.to_string()))?;
231
232            match result {
233                Some(row) => {
234                    let version: i64 = row.try_get("version").unwrap_or(1);
235                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
236                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
237                    
238                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
239                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
240                        .or_else(|| {
241                            row.try_get::<Vec<u8>, _>("payload").ok()
242                                .and_then(|bytes| String::from_utf8(bytes).ok())
243                        });
244                    
245                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
246                    
247                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
248                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
249                        .or_else(|| {
250                            row.try_get::<Vec<u8>, _>("audit").ok()
251                                .and_then(|bytes| String::from_utf8(bytes).ok())
252                        });
253                    
254                    // State field - try String first (SQLite), then bytes (MySQL)
255                    let state: String = row.try_get::<String, _>("state").ok()
256                        .or_else(|| {
257                            row.try_get::<Vec<u8>, _>("state").ok()
258                                .and_then(|bytes| String::from_utf8(bytes).ok())
259                        })
260                        .unwrap_or_else(|| "default".to_string());
261                    
262                    // Access metadata (local eviction stats, not replicated)
263                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
264                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
265                    
266                    // Determine content and content_type
267                    let (content, content_type) = if let Some(ref json_str) = payload_json {
268                        // JSON content - parse and re-serialize to bytes
269                        let content = json_str.as_bytes().to_vec();
270                        (content, ContentType::Json)
271                    } else if let Some(blob) = payload_blob {
272                        // Binary content
273                        (blob, ContentType::Binary)
274                    } else {
275                        return Err(StorageError::Backend("No payload in row".to_string()));
276                    };
277                    
278                    // Parse audit fields
279                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
280                    
281                    let item = SyncItem::reconstruct(
282                        id.clone(),
283                        version as u64,
284                        timestamp,
285                        content_type,
286                        content,
287                        batch_id,
288                        trace_parent,
289                        payload_hash.unwrap_or_default(),
290                        home_instance_id,
291                        state,
292                        access_count as u64,
293                        last_accessed as u64,
294                    );
295                    Ok(Some(item))
296                }
297                None => Ok(None),
298            }
299        })
300        .await
301    }
302
303    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
304        let id = item.object_id.clone();
305        let version = item.version as i64;
306        let timestamp = item.updated_at;
307        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
308        let audit_json = Self::build_audit_json(item);
309        let state = item.state.clone();
310        
311        // Determine payload storage based on content type
312        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
313            ContentType::Json => {
314                let json_str = String::from_utf8_lossy(&item.content).to_string();
315                (Some(json_str), None)
316            }
317            ContentType::Binary => {
318                (None, Some(item.content.clone()))
319            }
320        };
321
322        let sql = if self.is_sqlite {
323            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
324             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
325             ON CONFLICT(id) DO UPDATE SET 
326                version = excluded.version, 
327                timestamp = excluded.timestamp, 
328                payload_hash = excluded.payload_hash, 
329                payload = excluded.payload, 
330                payload_blob = excluded.payload_blob, 
331                audit = excluded.audit, 
332                merkle_dirty = 1, 
333                state = excluded.state,
334                access_count = excluded.access_count,
335                last_accessed = excluded.last_accessed"
336        } else {
337            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
338             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
339             ON DUPLICATE KEY UPDATE 
340                version = VALUES(version), 
341                timestamp = VALUES(timestamp), 
342                payload_hash = VALUES(payload_hash), 
343                payload = VALUES(payload), 
344                payload_blob = VALUES(payload_blob), 
345                audit = VALUES(audit), 
346                merkle_dirty = 1, 
347                state = VALUES(state),
348                access_count = VALUES(access_count),
349                last_accessed = VALUES(last_accessed)"
350        };
351
352        retry("sql_put", &RetryConfig::query(), || async {
353            sqlx::query(sql)
354                .bind(&id)
355                .bind(version)
356                .bind(timestamp)
357                .bind(&payload_hash)
358                .bind(&payload_json)
359                .bind(&payload_blob)
360                .bind(&audit_json)
361                .bind(&state)
362                .bind(item.access_count as i64)
363                .bind(item.last_accessed as i64)
364                .execute(&self.pool)
365                .await
366                .map_err(|e| StorageError::Backend(e.to_string()))?;
367            Ok(())
368        })
369        .await
370    }
371
372    async fn delete(&self, id: &str) -> Result<(), StorageError> {
373        let id = id.to_string();
374        retry("sql_delete", &RetryConfig::query(), || async {
375            sqlx::query("DELETE FROM sync_items WHERE id = ?")
376                .bind(&id)
377                .execute(&self.pool)
378                .await
379                .map_err(|e| StorageError::Backend(e.to_string()))?;
380            Ok(())
381        })
382        .await
383    }
384
385    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
386        let id = id.to_string();
387        retry("sql_exists", &RetryConfig::query(), || async {
388            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
389                .bind(&id)
390                .fetch_optional(&self.pool)
391                .await
392                .map_err(|e| StorageError::Backend(e.to_string()))?;
393            Ok(result.is_some())
394        })
395        .await
396    }
397    
398    /// Write a batch of items in a single multi-row INSERT with verification.
399    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
400        if items.is_empty() {
401            return Ok(BatchWriteResult {
402                batch_id: String::new(),
403                written: 0,
404                verified: true,
405            });
406        }
407
408        // Generate a unique batch ID
409        let batch_id = uuid::Uuid::new_v4().to_string();
410        
411        // Stamp all items with the batch_id
412        for item in items.iter_mut() {
413            item.batch_id = Some(batch_id.clone());
414        }
415
416        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
417        const CHUNK_SIZE: usize = 500;
418        let mut total_written = 0usize;
419
420        for chunk in items.chunks(CHUNK_SIZE) {
421            let written = self.put_batch_chunk(chunk, &batch_id).await?;
422            total_written += written;
423        }
424
425        // Verify the batch was written
426        let verified_count = self.verify_batch(&batch_id).await?;
427        let verified = verified_count == items.len();
428
429        if !verified {
430            tracing::warn!(
431                batch_id = %batch_id,
432                expected = items.len(),
433                actual = verified_count,
434                "Batch verification mismatch"
435            );
436        }
437
438        Ok(BatchWriteResult {
439            batch_id,
440            written: total_written,
441            verified,
442        })
443    }
444
445    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
446        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
447            .bind(limit as i64)
448            .bind(offset as i64)
449            .fetch_all(&self.pool)
450            .await
451            .map_err(|e| StorageError::Backend(e.to_string()))?;
452        
453        let mut keys = Vec::with_capacity(rows.len());
454        for row in rows {
455            let id: String = row.try_get("id")
456                .map_err(|e| StorageError::Backend(e.to_string()))?;
457            keys.push(id);
458        }
459        
460        Ok(keys)
461    }
462
463    async fn count_all(&self) -> Result<u64, StorageError> {
464        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
465            .fetch_one(&self.pool)
466            .await
467            .map_err(|e| StorageError::Backend(e.to_string()))?;
468        
469        let count: i64 = result.try_get("cnt")
470            .map_err(|e| StorageError::Backend(e.to_string()))?;
471        
472        Ok(count as u64)
473    }
474}
475
476impl SqlStore {
477    /// Write a single chunk of items with content-type aware storage.
478    /// The batch_id is already embedded in each item's audit JSON.
479    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
480        let placeholders: Vec<String> = (0..chunk.len())
481            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
482            .collect();
483        
484        let sql = if self.is_sqlite {
485            format!(
486                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
487                 ON CONFLICT(id) DO UPDATE SET \
488                    version = excluded.version, \
489                    timestamp = excluded.timestamp, \
490                    payload_hash = excluded.payload_hash, \
491                    payload = excluded.payload, \
492                    payload_blob = excluded.payload_blob, \
493                    audit = excluded.audit, \
494                    merkle_dirty = 1, \
495                    state = excluded.state, \
496                    access_count = excluded.access_count, \
497                    last_accessed = excluded.last_accessed",
498                placeholders.join(", ")
499            )
500        } else {
501            format!(
502                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
503                 ON DUPLICATE KEY UPDATE \
504                    version = VALUES(version), \
505                    timestamp = VALUES(timestamp), \
506                    payload_hash = VALUES(payload_hash), \
507                    payload = VALUES(payload), \
508                    payload_blob = VALUES(payload_blob), \
509                    audit = VALUES(audit), \
510                    merkle_dirty = 1, \
511                    state = VALUES(state), \
512                    access_count = VALUES(access_count), \
513                    last_accessed = VALUES(last_accessed)",
514                placeholders.join(", ")
515            )
516        };
517
518        // Prepare all items with their fields
519        #[derive(Clone)]
520        struct PreparedRow {
521            id: String,
522            version: i64,
523            timestamp: i64,
524            payload_hash: Option<String>,
525            payload_json: Option<String>,
526            payload_blob: Option<Vec<u8>>,
527            audit_json: Option<String>,
528            state: String,
529            access_count: i64,
530            last_accessed: i64,
531        }
532        
533        let prepared: Vec<PreparedRow> = chunk.iter()
534            .map(|item| {
535                let (payload_json, payload_blob) = match item.content_type {
536                    ContentType::Json => {
537                        let json_str = String::from_utf8_lossy(&item.content).to_string();
538                        (Some(json_str), None)
539                    }
540                    ContentType::Binary => {
541                        (None, Some(item.content.clone()))
542                    }
543                };
544                
545                PreparedRow {
546                    id: item.object_id.clone(),
547                    version: item.version as i64,
548                    timestamp: item.updated_at,
549                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
550                    payload_json,
551                    payload_blob,
552                    audit_json: Self::build_audit_json(item),
553                    state: item.state.clone(),
554                    access_count: item.access_count as i64,
555                    last_accessed: item.last_accessed as i64,
556                }
557            })
558            .collect();
559
560        retry("sql_put_batch", &RetryConfig::query(), || {
561            let sql = sql.clone();
562            let prepared = prepared.clone();
563            async move {
564                let mut query = sqlx::query(&sql);
565                
566                for row in &prepared {
567                    query = query
568                        .bind(&row.id)
569                        .bind(row.version)
570                        .bind(row.timestamp)
571                        .bind(&row.payload_hash)
572                        .bind(&row.payload_json)
573                        .bind(&row.payload_blob)
574                        .bind(&row.audit_json)
575                        .bind(&row.state)
576                        .bind(row.access_count)
577                        .bind(row.last_accessed);
578                }
579                
580                query.execute(&self.pool)
581                    .await
582                    .map_err(|e| StorageError::Backend(e.to_string()))?;
583                
584                Ok(())
585            }
586        })
587        .await?;
588
589        Ok(chunk.len())
590    }
591
592    /// Verify a batch was written by counting items with the given batch_id (in audit JSON).
593    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
594        let batch_id = batch_id.to_string();
595        
596        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
597        let sql = if self.is_sqlite {
598            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
599        } else {
600            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
601        };
602        
603        let bind_value = if self.is_sqlite {
604            format!("%\"batch\":\"{}%", batch_id)
605        } else {
606            batch_id.clone()
607        };
608        
609        let result = sqlx::query(sql)
610            .bind(&bind_value)
611            .fetch_one(&self.pool)
612            .await
613            .map_err(|e| StorageError::Backend(e.to_string()))?;
614        
615        let count: i64 = result.try_get("cnt")
616            .map_err(|e| StorageError::Backend(e.to_string()))?;
617        
618        Ok(count as usize)
619    }
620
621    /// Scan a batch of items (for WAL drain).
622    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
623        let rows = sqlx::query(
624            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
625        )
626            .bind(limit as i64)
627            .fetch_all(&self.pool)
628            .await
629            .map_err(|e| StorageError::Backend(e.to_string()))?;
630        
631        let mut items = Vec::with_capacity(rows.len());
632        for row in rows {
633            let id: String = row.try_get("id")
634                .map_err(|e| StorageError::Backend(e.to_string()))?;
635            let version: i64 = row.try_get("version").unwrap_or(1);
636            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
637            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
638            
639            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
640            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
641            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
642            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
643            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
644            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
645            
646            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
647            let state: String = state_bytes
648                .and_then(|bytes| String::from_utf8(bytes).ok())
649                .unwrap_or_else(|| "default".to_string());
650            
651            // Access metadata
652            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
653            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
654            
655            let (content, content_type) = if let Some(ref json_str) = payload_json {
656                (json_str.as_bytes().to_vec(), ContentType::Json)
657            } else if let Some(blob) = payload_blob {
658                (blob, ContentType::Binary)
659            } else {
660                continue; // Skip rows with no payload
661            };
662            
663            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
664            
665            let item = SyncItem::reconstruct(
666                id,
667                version as u64,
668                timestamp,
669                content_type,
670                content,
671                batch_id,
672                trace_parent,
673                payload_hash.unwrap_or_default(),
674                home_instance_id,
675                state,
676                access_count as u64,
677                last_accessed as u64,
678            );
679            items.push(item);
680        }
681        
682        Ok(items)
683    }
684
685    /// Delete multiple items by ID in a single query.
686    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
687        if ids.is_empty() {
688            return Ok(0);
689        }
690
691        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
692        let sql = format!(
693            "DELETE FROM sync_items WHERE id IN ({})",
694            placeholders.join(", ")
695        );
696
697        retry("sql_delete_batch", &RetryConfig::query(), || {
698            let sql = sql.clone();
699            let ids = ids.to_vec();
700            async move {
701                let mut query = sqlx::query(&sql);
702                for id in &ids {
703                    query = query.bind(id);
704                }
705                
706                let result = query.execute(&self.pool)
707                    .await
708                    .map_err(|e| StorageError::Backend(e.to_string()))?;
709                
710                Ok(result.rows_affected() as usize)
711            }
712        })
713        .await
714    }
715    
716    // ═══════════════════════════════════════════════════════════════════════════
717    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
718    // ═══════════════════════════════════════════════════════════════════════════
719    
720    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
721    ///
722    /// Used by background merkle processor to batch recalculate affected trees.
723    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
724        let rows = sqlx::query(
725            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
726        )
727            .bind(limit as i64)
728            .fetch_all(&self.pool)
729            .await
730            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
731        
732        let mut ids = Vec::with_capacity(rows.len());
733        for row in rows {
734            let id: String = row.try_get("id")
735                .map_err(|e| StorageError::Backend(e.to_string()))?;
736            ids.push(id);
737        }
738        
739        Ok(ids)
740    }
741    
742    /// Count items with merkle_dirty = 1.
743    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
744        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
745            .fetch_one(&self.pool)
746            .await
747            .map_err(|e| StorageError::Backend(e.to_string()))?;
748        
749        let count: i64 = result.try_get("cnt")
750            .map_err(|e| StorageError::Backend(e.to_string()))?;
751        
752        Ok(count as u64)
753    }
754    
755    /// Mark items as merkle-clean after recalculation.
756    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
757        if ids.is_empty() {
758            return Ok(0);
759        }
760        
761        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
762        let sql = format!(
763            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
764            placeholders.join(", ")
765        );
766        
767        let mut query = sqlx::query(&sql);
768        for id in ids {
769            query = query.bind(id);
770        }
771        
772        let result = query.execute(&self.pool)
773            .await
774            .map_err(|e| StorageError::Backend(e.to_string()))?;
775        
776        Ok(result.rows_affected() as usize)
777    }
778    
779    /// Check if there are any dirty merkle items.
780    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
781        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
782            .fetch_optional(&self.pool)
783            .await
784            .map_err(|e| StorageError::Backend(e.to_string()))?;
785        
786        Ok(result.is_some())
787    }
788    
789    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
790    ///
791    /// Returns the items themselves so merkle can be calculated.
792    /// Use `mark_merkle_clean()` after processing to clear the flag.
793    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
794        let rows = sqlx::query(
795            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
796             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
797        )
798            .bind(limit as i64)
799            .fetch_all(&self.pool)
800            .await
801            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
802        
803        let mut items = Vec::with_capacity(rows.len());
804        for row in rows {
805            let id: String = row.try_get("id")
806                .map_err(|e| StorageError::Backend(e.to_string()))?;
807            let version: i64 = row.try_get("version").unwrap_or(1);
808            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
809            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
810            
811            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
812            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
813            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
814                String::from_utf8(bytes).ok()
815            });
816            
817            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
818            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
819            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
820                String::from_utf8(bytes).ok()
821            });
822            
823            // State field
824            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
825            let state: String = state_bytes
826                .and_then(|bytes| String::from_utf8(bytes).ok())
827                .unwrap_or_else(|| "default".to_string());
828            
829            // Access metadata
830            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
831            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
832            
833            // Determine content and content_type
834            let (content, content_type) = if let Some(ref json_str) = payload_json {
835                (json_str.as_bytes().to_vec(), ContentType::Json)
836            } else if let Some(blob) = payload_blob {
837                (blob, ContentType::Binary)
838            } else {
839                continue; // Skip items with no payload
840            };
841            
842            // Parse audit fields
843            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
844            
845            let item = SyncItem::reconstruct(
846                id,
847                version as u64,
848                timestamp,
849                content_type,
850                content,
851                batch_id,
852                trace_parent,
853                payload_hash.unwrap_or_default(),
854                home_instance_id,
855                state,
856                access_count as u64,
857                last_accessed as u64,
858            );
859            items.push(item);
860        }
861        
862        Ok(items)
863    }
864    
865    // ═══════════════════════════════════════════════════════════════════════════
866    // State-based queries: Fast indexed access by caller-defined state tag
867    // ═══════════════════════════════════════════════════════════════════════════
868    
869    /// Get items by state (e.g., "delta", "base", "pending").
870    ///
871    /// Uses indexed query for fast retrieval.
872    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
873        let rows = sqlx::query(
874            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
875             FROM sync_items WHERE state = ? LIMIT ?"
876        )
877            .bind(state)
878            .bind(limit as i64)
879            .fetch_all(&self.pool)
880            .await
881            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
882        
883        let mut items = Vec::with_capacity(rows.len());
884        for row in rows {
885            let id: String = row.try_get("id")
886                .map_err(|e| StorageError::Backend(e.to_string()))?;
887            let version: i64 = row.try_get("version").unwrap_or(1);
888            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
889            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
890            
891            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
892            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
893                .or_else(|| {
894                    row.try_get::<Vec<u8>, _>("payload").ok()
895                        .and_then(|bytes| String::from_utf8(bytes).ok())
896                });
897            
898            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
899            
900            // Try reading audit as String first (SQLite), then bytes (MySQL)
901            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
902                .or_else(|| {
903                    row.try_get::<Vec<u8>, _>("audit").ok()
904                        .and_then(|bytes| String::from_utf8(bytes).ok())
905                });
906            
907            // State field - try String first (SQLite), then bytes (MySQL)
908            let state: String = row.try_get::<String, _>("state").ok()
909                .or_else(|| {
910                    row.try_get::<Vec<u8>, _>("state").ok()
911                        .and_then(|bytes| String::from_utf8(bytes).ok())
912                })
913                .unwrap_or_else(|| "default".to_string());
914            
915            // Access metadata (local-only, not replicated)
916            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
917            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
918            
919            let (content, content_type) = if let Some(ref json_str) = payload_json {
920                (json_str.as_bytes().to_vec(), ContentType::Json)
921            } else if let Some(blob) = payload_blob {
922                (blob, ContentType::Binary)
923            } else {
924                continue;
925            };
926            
927            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
928            
929            let item = SyncItem::reconstruct(
930                id,
931                version as u64,
932                timestamp,
933                content_type,
934                content,
935                batch_id,
936                trace_parent,
937                payload_hash.unwrap_or_default(),
938                home_instance_id,
939                state,
940                access_count as u64,
941                last_accessed as u64,
942            );
943            items.push(item);
944        }
945        
946        Ok(items)
947    }
948    
949    /// Count items in a given state.
950    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
951        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
952            .bind(state)
953            .fetch_one(&self.pool)
954            .await
955            .map_err(|e| StorageError::Backend(e.to_string()))?;
956        
957        let count: i64 = result.try_get("cnt")
958            .map_err(|e| StorageError::Backend(e.to_string()))?;
959        
960        Ok(count as u64)
961    }
962    
963    /// Get just the IDs of items in a given state (lightweight query).
964    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
965        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
966            .bind(state)
967            .bind(limit as i64)
968            .fetch_all(&self.pool)
969            .await
970            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
971        
972        let mut ids = Vec::with_capacity(rows.len());
973        for row in rows {
974            let id: String = row.try_get("id")
975                .map_err(|e| StorageError::Backend(e.to_string()))?;
976            ids.push(id);
977        }
978        
979        Ok(ids)
980    }
981    
982    /// Update the state of an item by ID.
983    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
984        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
985            .bind(new_state)
986            .bind(id)
987            .execute(&self.pool)
988            .await
989            .map_err(|e| StorageError::Backend(e.to_string()))?;
990        
991        Ok(result.rows_affected() > 0)
992    }
993    
994    /// Delete all items in a given state.
995    ///
996    /// Returns the number of deleted items.
997    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
998        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
999            .bind(state)
1000            .execute(&self.pool)
1001            .await
1002            .map_err(|e| StorageError::Backend(e.to_string()))?;
1003        
1004        Ok(result.rows_affected())
1005    }
1006    
1007    /// Scan items by ID prefix.
1008    ///
1009    /// Efficiently retrieves all items whose ID starts with the given prefix.
1010    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1011    ///
1012    /// # Example
1013    /// ```rust,ignore
1014    /// // Get all deltas for object user.123
1015    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1016    /// ```
1017    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1018        // Build LIKE pattern: "prefix%"
1019        let pattern = format!("{}%", prefix);
1020        
1021        let rows = sqlx::query(
1022            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1023             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1024        )
1025            .bind(&pattern)
1026            .bind(limit as i64)
1027            .fetch_all(&self.pool)
1028            .await
1029            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1030        
1031        let mut items = Vec::with_capacity(rows.len());
1032        for row in rows {
1033            let id: String = row.try_get("id")
1034                .map_err(|e| StorageError::Backend(e.to_string()))?;
1035            let version: i64 = row.try_get("version").unwrap_or(1);
1036            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1037            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1038            
1039            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1040            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1041                .or_else(|| {
1042                    row.try_get::<Vec<u8>, _>("payload").ok()
1043                        .and_then(|bytes| String::from_utf8(bytes).ok())
1044                });
1045            
1046            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1047            
1048            // Try reading audit as String first (SQLite), then bytes (MySQL)
1049            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1050                .or_else(|| {
1051                    row.try_get::<Vec<u8>, _>("audit").ok()
1052                        .and_then(|bytes| String::from_utf8(bytes).ok())
1053                });
1054            
1055            // State field - try String first (SQLite), then bytes (MySQL)
1056            let state: String = row.try_get::<String, _>("state").ok()
1057                .or_else(|| {
1058                    row.try_get::<Vec<u8>, _>("state").ok()
1059                        .and_then(|bytes| String::from_utf8(bytes).ok())
1060                })
1061                .unwrap_or_else(|| "default".to_string());
1062            
1063            // Access metadata (local-only, not replicated)
1064            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1065            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1066            
1067            let (content, content_type) = if let Some(ref json_str) = payload_json {
1068                (json_str.as_bytes().to_vec(), ContentType::Json)
1069            } else if let Some(blob) = payload_blob {
1070                (blob, ContentType::Binary)
1071            } else {
1072                continue;
1073            };
1074            
1075            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1076            
1077            let item = SyncItem::reconstruct(
1078                id,
1079                version as u64,
1080                timestamp,
1081                content_type,
1082                content,
1083                batch_id,
1084                trace_parent,
1085                payload_hash.unwrap_or_default(),
1086                home_instance_id,
1087                state,
1088                access_count as u64,
1089                last_accessed as u64,
1090            );
1091            items.push(item);
1092        }
1093        
1094        Ok(items)
1095    }
1096    
1097    /// Count items matching an ID prefix.
1098    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1099        let pattern = format!("{}%", prefix);
1100        
1101        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1102            .bind(&pattern)
1103            .fetch_one(&self.pool)
1104            .await
1105            .map_err(|e| StorageError::Backend(e.to_string()))?;
1106        
1107        let count: i64 = result.try_get("cnt")
1108            .map_err(|e| StorageError::Backend(e.to_string()))?;
1109        
1110        Ok(count as u64)
1111    }
1112    
1113    /// Delete all items matching an ID prefix.
1114    ///
1115    /// Returns the number of deleted items.
1116    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1117        let pattern = format!("{}%", prefix);
1118        
1119        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1120            .bind(&pattern)
1121            .execute(&self.pool)
1122            .await
1123            .map_err(|e| StorageError::Backend(e.to_string()))?;
1124        
1125        Ok(result.rows_affected())
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use std::path::PathBuf;
1133    use serde_json::json;
1134    
1135    fn temp_db_path(name: &str) -> PathBuf {
1136        // Use local temp/ folder (gitignored) instead of system temp
1137        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1138    }
1139    
1140    /// Clean up SQLite database and its WAL files
1141    fn cleanup_db(path: &PathBuf) {
1142        let _ = std::fs::remove_file(path);
1143        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1144        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1145    }
1146    
1147    fn test_item(id: &str, state: &str) -> SyncItem {
1148        SyncItem::from_json(id.to_string(), json!({"id": id}))
1149            .with_state(state)
1150    }
1151
1152    #[tokio::test]
1153    async fn test_state_stored_and_retrieved() {
1154        let db_path = temp_db_path("stored");
1155        cleanup_db(&db_path);
1156        
1157        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1158        let store = SqlStore::new(&url).await.unwrap();
1159        
1160        // Store item with custom state
1161        let item = test_item("item1", "delta");
1162        store.put(&item).await.unwrap();
1163        
1164        // Retrieve and verify state
1165        let retrieved = store.get("item1").await.unwrap().unwrap();
1166        assert_eq!(retrieved.state, "delta");
1167        
1168        cleanup_db(&db_path);
1169    }
1170
1171    #[tokio::test]
1172    async fn test_state_default_value() {
1173        let db_path = temp_db_path("default");
1174        cleanup_db(&db_path);
1175        
1176        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1177        let store = SqlStore::new(&url).await.unwrap();
1178        
1179        // Store item with default state
1180        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1181        store.put(&item).await.unwrap();
1182        
1183        let retrieved = store.get("item1").await.unwrap().unwrap();
1184        assert_eq!(retrieved.state, "default");
1185        
1186        cleanup_db(&db_path);
1187    }
1188
1189    #[tokio::test]
1190    async fn test_get_by_state() {
1191        let db_path = temp_db_path("get_by_state");
1192        cleanup_db(&db_path);
1193        
1194        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1195        let store = SqlStore::new(&url).await.unwrap();
1196        
1197        // Insert items in different states
1198        store.put(&test_item("delta1", "delta")).await.unwrap();
1199        store.put(&test_item("delta2", "delta")).await.unwrap();
1200        store.put(&test_item("base1", "base")).await.unwrap();
1201        store.put(&test_item("pending1", "pending")).await.unwrap();
1202        
1203        // Query by state
1204        let deltas = store.get_by_state("delta", 100).await.unwrap();
1205        assert_eq!(deltas.len(), 2);
1206        assert!(deltas.iter().all(|i| i.state == "delta"));
1207        
1208        let bases = store.get_by_state("base", 100).await.unwrap();
1209        assert_eq!(bases.len(), 1);
1210        assert_eq!(bases[0].object_id, "base1");
1211        
1212        // Empty result for non-existent state
1213        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1214        assert!(none.is_empty());
1215        
1216        cleanup_db(&db_path);
1217    }
1218
1219    #[tokio::test]
1220    async fn test_get_by_state_with_limit() {
1221        let db_path = temp_db_path("get_by_state_limit");
1222        cleanup_db(&db_path);
1223        
1224        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1225        let store = SqlStore::new(&url).await.unwrap();
1226        
1227        // Insert 10 items
1228        for i in 0..10 {
1229            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1230        }
1231        
1232        // Query with limit
1233        let limited = store.get_by_state("batch", 5).await.unwrap();
1234        assert_eq!(limited.len(), 5);
1235        
1236        cleanup_db(&db_path);
1237    }
1238
1239    #[tokio::test]
1240    async fn test_count_by_state() {
1241        let db_path = temp_db_path("count_by_state");
1242        cleanup_db(&db_path);
1243        
1244        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1245        let store = SqlStore::new(&url).await.unwrap();
1246        
1247        // Insert items
1248        store.put(&test_item("a1", "alpha")).await.unwrap();
1249        store.put(&test_item("a2", "alpha")).await.unwrap();
1250        store.put(&test_item("a3", "alpha")).await.unwrap();
1251        store.put(&test_item("b1", "beta")).await.unwrap();
1252        
1253        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1254        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1255        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1256        
1257        cleanup_db(&db_path);
1258    }
1259
1260    #[tokio::test]
1261    async fn test_list_state_ids() {
1262        let db_path = temp_db_path("list_state_ids");
1263        cleanup_db(&db_path);
1264        
1265        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266        let store = SqlStore::new(&url).await.unwrap();
1267        
1268        store.put(&test_item("id1", "pending")).await.unwrap();
1269        store.put(&test_item("id2", "pending")).await.unwrap();
1270        store.put(&test_item("id3", "done")).await.unwrap();
1271        
1272        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1273        assert_eq!(pending_ids.len(), 2);
1274        assert!(pending_ids.contains(&"id1".to_string()));
1275        assert!(pending_ids.contains(&"id2".to_string()));
1276        
1277        cleanup_db(&db_path);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_set_state() {
1282        let db_path = temp_db_path("set_state");
1283        cleanup_db(&db_path);
1284        
1285        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1286        let store = SqlStore::new(&url).await.unwrap();
1287        
1288        store.put(&test_item("item1", "pending")).await.unwrap();
1289        
1290        // Verify initial state
1291        let before = store.get("item1").await.unwrap().unwrap();
1292        assert_eq!(before.state, "pending");
1293        
1294        // Update state
1295        let updated = store.set_state("item1", "approved").await.unwrap();
1296        assert!(updated);
1297        
1298        // Verify new state
1299        let after = store.get("item1").await.unwrap().unwrap();
1300        assert_eq!(after.state, "approved");
1301        
1302        // Non-existent item returns false
1303        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1304        assert!(!not_found);
1305        
1306        cleanup_db(&db_path);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_delete_by_state() {
1311        let db_path = temp_db_path("delete_by_state");
1312        cleanup_db(&db_path);
1313        
1314        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1315        let store = SqlStore::new(&url).await.unwrap();
1316        
1317        store.put(&test_item("keep1", "keep")).await.unwrap();
1318        store.put(&test_item("keep2", "keep")).await.unwrap();
1319        store.put(&test_item("del1", "delete_me")).await.unwrap();
1320        store.put(&test_item("del2", "delete_me")).await.unwrap();
1321        store.put(&test_item("del3", "delete_me")).await.unwrap();
1322        
1323        // Delete by state
1324        let deleted = store.delete_by_state("delete_me").await.unwrap();
1325        assert_eq!(deleted, 3);
1326        
1327        // Verify deleted
1328        assert!(store.get("del1").await.unwrap().is_none());
1329        assert!(store.get("del2").await.unwrap().is_none());
1330        
1331        // Verify others remain
1332        assert!(store.get("keep1").await.unwrap().is_some());
1333        assert!(store.get("keep2").await.unwrap().is_some());
1334        
1335        // Delete non-existent state returns 0
1336        let zero = store.delete_by_state("nonexistent").await.unwrap();
1337        assert_eq!(zero, 0);
1338        
1339        cleanup_db(&db_path);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_multiple_puts_preserve_state() {
1344        let db_path = temp_db_path("multi_put_state");
1345        cleanup_db(&db_path);
1346        
1347        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1348        let store = SqlStore::new(&url).await.unwrap();
1349        
1350        // Put multiple items with different states
1351        store.put(&test_item("a", "state_a")).await.unwrap();
1352        store.put(&test_item("b", "state_b")).await.unwrap();
1353        store.put(&test_item("c", "state_c")).await.unwrap();
1354        
1355        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1356        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1357        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1358        
1359        cleanup_db(&db_path);
1360    }
1361
1362    #[tokio::test]
1363    async fn test_scan_prefix() {
1364        let db_path = temp_db_path("scan_prefix");
1365        cleanup_db(&db_path);
1366        
1367        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1368        let store = SqlStore::new(&url).await.unwrap();
1369        
1370        // Insert items with different prefixes (CRDT pattern)
1371        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1372        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1373        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1374        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1375        store.put(&test_item("base:user.123", "base")).await.unwrap();
1376        store.put(&test_item("base:user.456", "base")).await.unwrap();
1377        
1378        // Scan specific object's deltas
1379        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1380        assert_eq!(user123_deltas.len(), 3);
1381        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1382        
1383        // Scan different object
1384        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1385        assert_eq!(user456_deltas.len(), 1);
1386        
1387        // Scan all deltas
1388        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1389        assert_eq!(all_deltas.len(), 4);
1390        
1391        // Scan all bases
1392        let bases = store.scan_prefix("base:", 100).await.unwrap();
1393        assert_eq!(bases.len(), 2);
1394        
1395        // Empty result for non-matching prefix
1396        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1397        assert!(none.is_empty());
1398        
1399        cleanup_db(&db_path);
1400    }
1401
1402    #[tokio::test]
1403    async fn test_scan_prefix_with_limit() {
1404        let db_path = temp_db_path("scan_prefix_limit");
1405        cleanup_db(&db_path);
1406        
1407        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1408        let store = SqlStore::new(&url).await.unwrap();
1409        
1410        // Insert 20 items
1411        for i in 0..20 {
1412            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1413        }
1414        
1415        // Query with limit
1416        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1417        assert_eq!(limited.len(), 5);
1418        
1419        // Verify we can get all with larger limit
1420        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1421        assert_eq!(all.len(), 20);
1422        
1423        cleanup_db(&db_path);
1424    }
1425
1426    #[tokio::test]
1427    async fn test_count_prefix() {
1428        let db_path = temp_db_path("count_prefix");
1429        cleanup_db(&db_path);
1430        
1431        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1432        let store = SqlStore::new(&url).await.unwrap();
1433        
1434        // Insert items with different prefixes
1435        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1436        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1437        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1438        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1439        store.put(&test_item("base:user.123", "base")).await.unwrap();
1440        
1441        // Count by prefix
1442        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1443        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1444        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1445        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1446        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1447        
1448        cleanup_db(&db_path);
1449    }
1450
1451    #[tokio::test]
1452    async fn test_delete_prefix() {
1453        let db_path = temp_db_path("delete_prefix");
1454        cleanup_db(&db_path);
1455        
1456        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1457        let store = SqlStore::new(&url).await.unwrap();
1458        
1459        // Insert items with different prefixes
1460        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1461        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1462        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1463        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1464        store.put(&test_item("base:user.123", "base")).await.unwrap();
1465        
1466        // Delete one object's deltas
1467        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1468        assert_eq!(deleted, 3);
1469        
1470        // Verify deleted
1471        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1472        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1473        
1474        // Verify other deltas remain
1475        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1476        
1477        // Verify bases remain
1478        assert!(store.get("base:user.123").await.unwrap().is_some());
1479        
1480        // Delete non-existent prefix returns 0
1481        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1482        assert_eq!(zero, 0);
1483        
1484        cleanup_db(&db_path);
1485    }
1486}