sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49// SQLx `Any` driver requires runtime installation
50static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53    INSTALL_DRIVERS.call_once(|| {
54        sqlx::any::install_default_drivers();
55    });
56}
57
58pub struct SqlStore {
59    pool: AnyPool,
60    is_sqlite: bool,
61}
62
63impl SqlStore {
64    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
65    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66        install_drivers();
67        
68        let is_sqlite = connection_string.starts_with("sqlite:");
69        
70        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71            AnyPoolOptions::new()
72                .max_connections(20)
73                .acquire_timeout(Duration::from_secs(10))
74                .idle_timeout(Duration::from_secs(300))
75                .connect(connection_string)
76                .await
77                .map_err(|e| StorageError::Backend(e.to_string()))
78        })
79        .await?;
80
81        let store = Self { pool, is_sqlite };
82        
83        // Enable WAL mode for SQLite (better concurrency, faster writes)
84        if is_sqlite {
85            store.enable_wal_mode().await?;
86        }
87        
88        store.init_schema().await?;
89        Ok(store)
90    }
91    
92    /// Get a clone of the connection pool for sharing with other stores.
93    pub fn pool(&self) -> AnyPool {
94        self.pool.clone()
95    }
96    
97    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
98    /// 
99    /// Benefits:
100    /// - Concurrent reads during writes (readers don't block writers)
101    /// - Better write performance (single fsync instead of two)
102    /// - More predictable performance under load
103    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
104        sqlx::query("PRAGMA journal_mode = WAL")
105            .execute(&self.pool)
106            .await
107            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
108        
109        // Also set synchronous to NORMAL for better performance while still safe
110        // (FULL is default but WAL mode is safe with NORMAL)
111        sqlx::query("PRAGMA synchronous = NORMAL")
112            .execute(&self.pool)
113            .await
114            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
115        
116        Ok(())
117    }
118
119    async fn init_schema(&self) -> Result<(), StorageError> {
120        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
121        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
122        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
123        //
124        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
125        // This enables efficient multi-instance coordination without locking.
126        //
127        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
128        // Indexed for fast state-based queries.
129        let sql = if self.is_sqlite {
130            r#"
131            CREATE TABLE IF NOT EXISTS sync_items (
132                id TEXT PRIMARY KEY,
133                version INTEGER NOT NULL DEFAULT 1,
134                timestamp INTEGER NOT NULL,
135                payload_hash TEXT,
136                payload TEXT,
137                payload_blob BLOB,
138                audit TEXT,
139                merkle_dirty INTEGER NOT NULL DEFAULT 1,
140                state TEXT NOT NULL DEFAULT 'default',
141                access_count INTEGER NOT NULL DEFAULT 0,
142                last_accessed INTEGER NOT NULL DEFAULT 0
143            )
144            "#
145        } else {
146            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
147            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
148            r#"
149            CREATE TABLE IF NOT EXISTS sync_items (
150                id VARCHAR(255) PRIMARY KEY,
151                version BIGINT NOT NULL DEFAULT 1,
152                timestamp BIGINT NOT NULL,
153                payload_hash VARCHAR(64),
154                payload LONGTEXT,
155                payload_blob MEDIUMBLOB,
156                audit TEXT,
157                merkle_dirty TINYINT NOT NULL DEFAULT 1,
158                state VARCHAR(32) NOT NULL DEFAULT 'default',
159                access_count BIGINT NOT NULL DEFAULT 0,
160                last_accessed BIGINT NOT NULL DEFAULT 0,
161                INDEX idx_timestamp (timestamp),
162                INDEX idx_merkle_dirty (merkle_dirty),
163                INDEX idx_state (state)
164            )
165            "#
166        };
167
168        retry("sql_init_schema", &RetryConfig::startup(), || async {
169            sqlx::query(sql)
170                .execute(&self.pool)
171                .await
172                .map_err(|e| StorageError::Backend(e.to_string()))
173        })
174        .await?;
175
176        Ok(())
177    }
178    
179    /// Build the audit JSON object for operational metadata.
180    fn build_audit_json(item: &SyncItem) -> Option<String> {
181        let mut audit = serde_json::Map::new();
182        
183        if let Some(ref batch_id) = item.batch_id {
184            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
185        }
186        if let Some(ref trace_parent) = item.trace_parent {
187            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
188        }
189        if let Some(ref home) = item.home_instance_id {
190            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
191        }
192        
193        if audit.is_empty() {
194            None
195        } else {
196            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
197        }
198    }
199    
200    /// Parse audit JSON back into SyncItem fields.
201    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
202        match audit_str {
203            Some(s) => {
204                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
205                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
206                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
207                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
208                    (batch_id, trace_parent, home_instance_id)
209                } else {
210                    (None, None, None)
211                }
212            }
213            None => (None, None, None),
214        }
215    }
216}
217
218#[async_trait]
219impl ArchiveStore for SqlStore {
220    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
221        let id = id.to_string();
222        
223        retry("sql_get", &RetryConfig::query(), || async {
224            let result = sqlx::query(
225                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
226            )
227                .bind(&id)
228                .fetch_optional(&self.pool)
229                .await
230                .map_err(|e| StorageError::Backend(e.to_string()))?;
231
232            match result {
233                Some(row) => {
234                    let version: i64 = row.try_get("version").unwrap_or(1);
235                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
236                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
237                    
238                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
239                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
240                        .or_else(|| {
241                            row.try_get::<Vec<u8>, _>("payload").ok()
242                                .and_then(|bytes| String::from_utf8(bytes).ok())
243                        });
244                    
245                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
246                    
247                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
248                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
249                        .or_else(|| {
250                            row.try_get::<Vec<u8>, _>("audit").ok()
251                                .and_then(|bytes| String::from_utf8(bytes).ok())
252                        });
253                    
254                    // State field - try String first (SQLite), then bytes (MySQL)
255                    let state: String = row.try_get::<String, _>("state").ok()
256                        .or_else(|| {
257                            row.try_get::<Vec<u8>, _>("state").ok()
258                                .and_then(|bytes| String::from_utf8(bytes).ok())
259                        })
260                        .unwrap_or_else(|| "default".to_string());
261                    
262                    // Access metadata (local eviction stats, not replicated)
263                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
264                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
265                    
266                    // Determine content and content_type
267                    let (content, content_type) = if let Some(ref json_str) = payload_json {
268                        // JSON content - parse and re-serialize to bytes
269                        let content = json_str.as_bytes().to_vec();
270                        (content, ContentType::Json)
271                    } else if let Some(blob) = payload_blob {
272                        // Binary content
273                        (blob, ContentType::Binary)
274                    } else {
275                        return Err(StorageError::Backend("No payload in row".to_string()));
276                    };
277                    
278                    // Parse audit fields
279                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
280                    
281                    let item = SyncItem::reconstruct(
282                        id.clone(),
283                        version as u64,
284                        timestamp,
285                        content_type,
286                        content,
287                        batch_id,
288                        trace_parent,
289                        payload_hash.unwrap_or_default(),
290                        home_instance_id,
291                        state,
292                        access_count as u64,
293                        last_accessed as u64,
294                    );
295                    Ok(Some(item))
296                }
297                None => Ok(None),
298            }
299        })
300        .await
301    }
302
303    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
304        let id = item.object_id.clone();
305        let version = item.version as i64;
306        let timestamp = item.updated_at;
307        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
308        let audit_json = Self::build_audit_json(item);
309        let state = item.state.clone();
310        
311        // Determine payload storage based on content type
312        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
313            ContentType::Json => {
314                let json_str = String::from_utf8_lossy(&item.content).to_string();
315                (Some(json_str), None)
316            }
317            ContentType::Binary => {
318                (None, Some(item.content.clone()))
319            }
320        };
321
322        let sql = if self.is_sqlite {
323            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
324             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
325             ON CONFLICT(id) DO UPDATE SET 
326                version = excluded.version, 
327                timestamp = excluded.timestamp, 
328                payload_hash = excluded.payload_hash, 
329                payload = excluded.payload, 
330                payload_blob = excluded.payload_blob, 
331                audit = excluded.audit, 
332                merkle_dirty = 1, 
333                state = excluded.state,
334                access_count = excluded.access_count,
335                last_accessed = excluded.last_accessed"
336        } else {
337            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
338             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
339             ON DUPLICATE KEY UPDATE 
340                version = VALUES(version), 
341                timestamp = VALUES(timestamp), 
342                payload_hash = VALUES(payload_hash), 
343                payload = VALUES(payload), 
344                payload_blob = VALUES(payload_blob), 
345                audit = VALUES(audit), 
346                merkle_dirty = 1, 
347                state = VALUES(state),
348                access_count = VALUES(access_count),
349                last_accessed = VALUES(last_accessed)"
350        };
351
352        retry("sql_put", &RetryConfig::query(), || async {
353            sqlx::query(sql)
354                .bind(&id)
355                .bind(version)
356                .bind(timestamp)
357                .bind(&payload_hash)
358                .bind(&payload_json)
359                .bind(&payload_blob)
360                .bind(&audit_json)
361                .bind(&state)
362                .bind(item.access_count as i64)
363                .bind(item.last_accessed as i64)
364                .execute(&self.pool)
365                .await
366                .map_err(|e| StorageError::Backend(e.to_string()))?;
367            Ok(())
368        })
369        .await
370    }
371
372    async fn delete(&self, id: &str) -> Result<(), StorageError> {
373        let id = id.to_string();
374        retry("sql_delete", &RetryConfig::query(), || async {
375            sqlx::query("DELETE FROM sync_items WHERE id = ?")
376                .bind(&id)
377                .execute(&self.pool)
378                .await
379                .map_err(|e| StorageError::Backend(e.to_string()))?;
380            Ok(())
381        })
382        .await
383    }
384
385    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
386        let id = id.to_string();
387        retry("sql_exists", &RetryConfig::query(), || async {
388            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
389                .bind(&id)
390                .fetch_optional(&self.pool)
391                .await
392                .map_err(|e| StorageError::Backend(e.to_string()))?;
393            Ok(result.is_some())
394        })
395        .await
396    }
397    
398    /// Write a batch of items in a single multi-row INSERT with verification.
399    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
400        if items.is_empty() {
401            return Ok(BatchWriteResult {
402                batch_id: String::new(),
403                written: 0,
404                verified: true,
405            });
406        }
407
408        // Generate a unique batch ID (for audit trail, not verification)
409        let batch_id = uuid::Uuid::new_v4().to_string();
410        
411        // Stamp all items with the batch_id
412        for item in items.iter_mut() {
413            item.batch_id = Some(batch_id.clone());
414        }
415
416        // Collect IDs for verification
417        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
418
419        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
420        const CHUNK_SIZE: usize = 500;
421        let mut total_written = 0usize;
422
423        for chunk in items.chunks(CHUNK_SIZE) {
424            let written = self.put_batch_chunk(chunk, &batch_id).await?;
425            total_written += written;
426        }
427
428        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
429        let verified_count = self.verify_batch_ids(&item_ids).await?;
430        let verified = verified_count == items.len();
431
432        if !verified {
433            tracing::warn!(
434                batch_id = %batch_id,
435                expected = items.len(),
436                actual = verified_count,
437                "Batch verification mismatch"
438            );
439        }
440
441        Ok(BatchWriteResult {
442            batch_id,
443            written: total_written,
444            verified,
445        })
446    }
447
448    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
449        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
450            .bind(limit as i64)
451            .bind(offset as i64)
452            .fetch_all(&self.pool)
453            .await
454            .map_err(|e| StorageError::Backend(e.to_string()))?;
455        
456        let mut keys = Vec::with_capacity(rows.len());
457        for row in rows {
458            let id: String = row.try_get("id")
459                .map_err(|e| StorageError::Backend(e.to_string()))?;
460            keys.push(id);
461        }
462        
463        Ok(keys)
464    }
465
466    async fn count_all(&self) -> Result<u64, StorageError> {
467        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
468            .fetch_one(&self.pool)
469            .await
470            .map_err(|e| StorageError::Backend(e.to_string()))?;
471        
472        let count: i64 = result.try_get("cnt")
473            .map_err(|e| StorageError::Backend(e.to_string()))?;
474        
475        Ok(count as u64)
476    }
477}
478
479impl SqlStore {
480    /// Write a single chunk of items with content-type aware storage.
481    /// The batch_id is already embedded in each item's audit JSON.
482    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
483        let placeholders: Vec<String> = (0..chunk.len())
484            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
485            .collect();
486        
487        let sql = if self.is_sqlite {
488            format!(
489                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
490                 ON CONFLICT(id) DO UPDATE SET \
491                    version = excluded.version, \
492                    timestamp = excluded.timestamp, \
493                    payload_hash = excluded.payload_hash, \
494                    payload = excluded.payload, \
495                    payload_blob = excluded.payload_blob, \
496                    audit = excluded.audit, \
497                    merkle_dirty = 1, \
498                    state = excluded.state, \
499                    access_count = excluded.access_count, \
500                    last_accessed = excluded.last_accessed",
501                placeholders.join(", ")
502            )
503        } else {
504            format!(
505                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
506                 ON DUPLICATE KEY UPDATE \
507                    version = VALUES(version), \
508                    timestamp = VALUES(timestamp), \
509                    payload_hash = VALUES(payload_hash), \
510                    payload = VALUES(payload), \
511                    payload_blob = VALUES(payload_blob), \
512                    audit = VALUES(audit), \
513                    merkle_dirty = 1, \
514                    state = VALUES(state), \
515                    access_count = VALUES(access_count), \
516                    last_accessed = VALUES(last_accessed)",
517                placeholders.join(", ")
518            )
519        };
520
521        // Prepare all items with their fields
522        #[derive(Clone)]
523        struct PreparedRow {
524            id: String,
525            version: i64,
526            timestamp: i64,
527            payload_hash: Option<String>,
528            payload_json: Option<String>,
529            payload_blob: Option<Vec<u8>>,
530            audit_json: Option<String>,
531            state: String,
532            access_count: i64,
533            last_accessed: i64,
534        }
535        
536        let prepared: Vec<PreparedRow> = chunk.iter()
537            .map(|item| {
538                let (payload_json, payload_blob) = match item.content_type {
539                    ContentType::Json => {
540                        let json_str = String::from_utf8_lossy(&item.content).to_string();
541                        (Some(json_str), None)
542                    }
543                    ContentType::Binary => {
544                        (None, Some(item.content.clone()))
545                    }
546                };
547                
548                PreparedRow {
549                    id: item.object_id.clone(),
550                    version: item.version as i64,
551                    timestamp: item.updated_at,
552                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
553                    payload_json,
554                    payload_blob,
555                    audit_json: Self::build_audit_json(item),
556                    state: item.state.clone(),
557                    access_count: item.access_count as i64,
558                    last_accessed: item.last_accessed as i64,
559                }
560            })
561            .collect();
562
563        retry("sql_put_batch", &RetryConfig::batch_write(), || {
564            let sql = sql.clone();
565            let prepared = prepared.clone();
566            async move {
567                let mut query = sqlx::query(&sql);
568                
569                for row in &prepared {
570                    query = query
571                        .bind(&row.id)
572                        .bind(row.version)
573                        .bind(row.timestamp)
574                        .bind(&row.payload_hash)
575                        .bind(&row.payload_json)
576                        .bind(&row.payload_blob)
577                        .bind(&row.audit_json)
578                        .bind(&row.state)
579                        .bind(row.access_count)
580                        .bind(row.last_accessed);
581                }
582                
583                query.execute(&self.pool)
584                    .await
585                    .map_err(|e| StorageError::Backend(e.to_string()))?;
586                
587                Ok(())
588            }
589        })
590        .await?;
591
592        Ok(chunk.len())
593    }
594
595    /// Verify a batch was written by checking all IDs exist.
596    /// This is more reliable than batch_id verification under concurrent writes.
597    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
598        if ids.is_empty() {
599            return Ok(0);
600        }
601
602        // Use chunked EXISTS queries to avoid overly large IN clauses
603        const CHUNK_SIZE: usize = 500;
604        let mut total_found = 0usize;
605
606        for chunk in ids.chunks(CHUNK_SIZE) {
607            let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
608            let sql = format!(
609                "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
610                placeholders.join(", ")
611            );
612
613            let mut query = sqlx::query(&sql);
614            for id in chunk {
615                query = query.bind(id);
616            }
617
618            let result = query
619                .fetch_one(&self.pool)
620                .await
621                .map_err(|e| StorageError::Backend(e.to_string()))?;
622
623            let count: i64 = result
624                .try_get("cnt")
625                .map_err(|e| StorageError::Backend(e.to_string()))?;
626            total_found += count as usize;
627        }
628
629        Ok(total_found)
630    }
631
632    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
633    #[allow(dead_code)]
634    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
635        let batch_id = batch_id.to_string();
636        
637        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
638        let sql = if self.is_sqlite {
639            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
640        } else {
641            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
642        };
643        
644        let bind_value = if self.is_sqlite {
645            format!("%\"batch\":\"{}%", batch_id)
646        } else {
647            batch_id.clone()
648        };
649        
650        let result = sqlx::query(sql)
651            .bind(&bind_value)
652            .fetch_one(&self.pool)
653            .await
654            .map_err(|e| StorageError::Backend(e.to_string()))?;
655        
656        let count: i64 = result.try_get("cnt")
657            .map_err(|e| StorageError::Backend(e.to_string()))?;
658        
659        Ok(count as usize)
660    }
661
662    /// Scan a batch of items (for WAL drain).
663    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
664        let rows = sqlx::query(
665            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
666        )
667            .bind(limit as i64)
668            .fetch_all(&self.pool)
669            .await
670            .map_err(|e| StorageError::Backend(e.to_string()))?;
671        
672        let mut items = Vec::with_capacity(rows.len());
673        for row in rows {
674            let id: String = row.try_get("id")
675                .map_err(|e| StorageError::Backend(e.to_string()))?;
676            let version: i64 = row.try_get("version").unwrap_or(1);
677            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
678            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
679            
680            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
681            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
682            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
683            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
684            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
685            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
686            
687            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
688            let state: String = state_bytes
689                .and_then(|bytes| String::from_utf8(bytes).ok())
690                .unwrap_or_else(|| "default".to_string());
691            
692            // Access metadata
693            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
694            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
695            
696            let (content, content_type) = if let Some(ref json_str) = payload_json {
697                (json_str.as_bytes().to_vec(), ContentType::Json)
698            } else if let Some(blob) = payload_blob {
699                (blob, ContentType::Binary)
700            } else {
701                continue; // Skip rows with no payload
702            };
703            
704            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
705            
706            let item = SyncItem::reconstruct(
707                id,
708                version as u64,
709                timestamp,
710                content_type,
711                content,
712                batch_id,
713                trace_parent,
714                payload_hash.unwrap_or_default(),
715                home_instance_id,
716                state,
717                access_count as u64,
718                last_accessed as u64,
719            );
720            items.push(item);
721        }
722        
723        Ok(items)
724    }
725
726    /// Delete multiple items by ID in a single query.
727    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
728        if ids.is_empty() {
729            return Ok(0);
730        }
731
732        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
733        let sql = format!(
734            "DELETE FROM sync_items WHERE id IN ({})",
735            placeholders.join(", ")
736        );
737
738        retry("sql_delete_batch", &RetryConfig::query(), || {
739            let sql = sql.clone();
740            let ids = ids.to_vec();
741            async move {
742                let mut query = sqlx::query(&sql);
743                for id in &ids {
744                    query = query.bind(id);
745                }
746                
747                let result = query.execute(&self.pool)
748                    .await
749                    .map_err(|e| StorageError::Backend(e.to_string()))?;
750                
751                Ok(result.rows_affected() as usize)
752            }
753        })
754        .await
755    }
756    
757    // ═══════════════════════════════════════════════════════════════════════════
758    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
759    // ═══════════════════════════════════════════════════════════════════════════
760    
761    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
762    ///
763    /// Used by background merkle processor to batch recalculate affected trees.
764    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
765        let rows = sqlx::query(
766            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
767        )
768            .bind(limit as i64)
769            .fetch_all(&self.pool)
770            .await
771            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
772        
773        let mut ids = Vec::with_capacity(rows.len());
774        for row in rows {
775            let id: String = row.try_get("id")
776                .map_err(|e| StorageError::Backend(e.to_string()))?;
777            ids.push(id);
778        }
779        
780        Ok(ids)
781    }
782    
783    /// Count items with merkle_dirty = 1.
784    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
785        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
786            .fetch_one(&self.pool)
787            .await
788            .map_err(|e| StorageError::Backend(e.to_string()))?;
789        
790        let count: i64 = result.try_get("cnt")
791            .map_err(|e| StorageError::Backend(e.to_string()))?;
792        
793        Ok(count as u64)
794    }
795    
796    /// Mark items as merkle-clean after recalculation.
797    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
798        if ids.is_empty() {
799            return Ok(0);
800        }
801        
802        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
803        let sql = format!(
804            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
805            placeholders.join(", ")
806        );
807        
808        let mut query = sqlx::query(&sql);
809        for id in ids {
810            query = query.bind(id);
811        }
812        
813        let result = query.execute(&self.pool)
814            .await
815            .map_err(|e| StorageError::Backend(e.to_string()))?;
816        
817        Ok(result.rows_affected() as usize)
818    }
819    
820    /// Check if there are any dirty merkle items.
821    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
822        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
823            .fetch_optional(&self.pool)
824            .await
825            .map_err(|e| StorageError::Backend(e.to_string()))?;
826        
827        Ok(result.is_some())
828    }
829    
830    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
831    ///
832    /// Returns the items themselves so merkle can be calculated.
833    /// Use `mark_merkle_clean()` after processing to clear the flag.
834    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
835        let rows = sqlx::query(
836            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
837             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
838        )
839            .bind(limit as i64)
840            .fetch_all(&self.pool)
841            .await
842            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
843        
844        let mut items = Vec::with_capacity(rows.len());
845        for row in rows {
846            let id: String = row.try_get("id")
847                .map_err(|e| StorageError::Backend(e.to_string()))?;
848            let version: i64 = row.try_get("version").unwrap_or(1);
849            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
850            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
851            
852            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
853            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
854            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
855                String::from_utf8(bytes).ok()
856            });
857            
858            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
859            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
860            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
861                String::from_utf8(bytes).ok()
862            });
863            
864            // State field
865            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
866            let state: String = state_bytes
867                .and_then(|bytes| String::from_utf8(bytes).ok())
868                .unwrap_or_else(|| "default".to_string());
869            
870            // Access metadata
871            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
872            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
873            
874            // Determine content and content_type
875            let (content, content_type) = if let Some(ref json_str) = payload_json {
876                (json_str.as_bytes().to_vec(), ContentType::Json)
877            } else if let Some(blob) = payload_blob {
878                (blob, ContentType::Binary)
879            } else {
880                continue; // Skip items with no payload
881            };
882            
883            // Parse audit fields
884            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
885            
886            let item = SyncItem::reconstruct(
887                id,
888                version as u64,
889                timestamp,
890                content_type,
891                content,
892                batch_id,
893                trace_parent,
894                payload_hash.unwrap_or_default(),
895                home_instance_id,
896                state,
897                access_count as u64,
898                last_accessed as u64,
899            );
900            items.push(item);
901        }
902        
903        Ok(items)
904    }
905    
906    // ═══════════════════════════════════════════════════════════════════════════
907    // State-based queries: Fast indexed access by caller-defined state tag
908    // ═══════════════════════════════════════════════════════════════════════════
909    
910    /// Get items by state (e.g., "delta", "base", "pending").
911    ///
912    /// Uses indexed query for fast retrieval.
913    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
914        let rows = sqlx::query(
915            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
916             FROM sync_items WHERE state = ? LIMIT ?"
917        )
918            .bind(state)
919            .bind(limit as i64)
920            .fetch_all(&self.pool)
921            .await
922            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
923        
924        let mut items = Vec::with_capacity(rows.len());
925        for row in rows {
926            let id: String = row.try_get("id")
927                .map_err(|e| StorageError::Backend(e.to_string()))?;
928            let version: i64 = row.try_get("version").unwrap_or(1);
929            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
930            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
931            
932            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
933            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
934                .or_else(|| {
935                    row.try_get::<Vec<u8>, _>("payload").ok()
936                        .and_then(|bytes| String::from_utf8(bytes).ok())
937                });
938            
939            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
940            
941            // Try reading audit as String first (SQLite), then bytes (MySQL)
942            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
943                .or_else(|| {
944                    row.try_get::<Vec<u8>, _>("audit").ok()
945                        .and_then(|bytes| String::from_utf8(bytes).ok())
946                });
947            
948            // State field - try String first (SQLite), then bytes (MySQL)
949            let state: String = row.try_get::<String, _>("state").ok()
950                .or_else(|| {
951                    row.try_get::<Vec<u8>, _>("state").ok()
952                        .and_then(|bytes| String::from_utf8(bytes).ok())
953                })
954                .unwrap_or_else(|| "default".to_string());
955            
956            // Access metadata (local-only, not replicated)
957            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
958            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
959            
960            let (content, content_type) = if let Some(ref json_str) = payload_json {
961                (json_str.as_bytes().to_vec(), ContentType::Json)
962            } else if let Some(blob) = payload_blob {
963                (blob, ContentType::Binary)
964            } else {
965                continue;
966            };
967            
968            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
969            
970            let item = SyncItem::reconstruct(
971                id,
972                version as u64,
973                timestamp,
974                content_type,
975                content,
976                batch_id,
977                trace_parent,
978                payload_hash.unwrap_or_default(),
979                home_instance_id,
980                state,
981                access_count as u64,
982                last_accessed as u64,
983            );
984            items.push(item);
985        }
986        
987        Ok(items)
988    }
989    
990    /// Count items in a given state.
991    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
992        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
993            .bind(state)
994            .fetch_one(&self.pool)
995            .await
996            .map_err(|e| StorageError::Backend(e.to_string()))?;
997        
998        let count: i64 = result.try_get("cnt")
999            .map_err(|e| StorageError::Backend(e.to_string()))?;
1000        
1001        Ok(count as u64)
1002    }
1003    
1004    /// Get just the IDs of items in a given state (lightweight query).
1005    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1006        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1007            .bind(state)
1008            .bind(limit as i64)
1009            .fetch_all(&self.pool)
1010            .await
1011            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1012        
1013        let mut ids = Vec::with_capacity(rows.len());
1014        for row in rows {
1015            let id: String = row.try_get("id")
1016                .map_err(|e| StorageError::Backend(e.to_string()))?;
1017            ids.push(id);
1018        }
1019        
1020        Ok(ids)
1021    }
1022    
1023    /// Update the state of an item by ID.
1024    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1025        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1026            .bind(new_state)
1027            .bind(id)
1028            .execute(&self.pool)
1029            .await
1030            .map_err(|e| StorageError::Backend(e.to_string()))?;
1031        
1032        Ok(result.rows_affected() > 0)
1033    }
1034    
1035    /// Delete all items in a given state.
1036    ///
1037    /// Returns the number of deleted items.
1038    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1039        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1040            .bind(state)
1041            .execute(&self.pool)
1042            .await
1043            .map_err(|e| StorageError::Backend(e.to_string()))?;
1044        
1045        Ok(result.rows_affected())
1046    }
1047    
1048    /// Scan items by ID prefix.
1049    ///
1050    /// Efficiently retrieves all items whose ID starts with the given prefix.
1051    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1052    ///
1053    /// # Example
1054    /// ```rust,ignore
1055    /// // Get all deltas for object user.123
1056    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1057    /// ```
1058    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1059        // Build LIKE pattern: "prefix%"
1060        let pattern = format!("{}%", prefix);
1061        
1062        let rows = sqlx::query(
1063            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1064             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1065        )
1066            .bind(&pattern)
1067            .bind(limit as i64)
1068            .fetch_all(&self.pool)
1069            .await
1070            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1071        
1072        let mut items = Vec::with_capacity(rows.len());
1073        for row in rows {
1074            let id: String = row.try_get("id")
1075                .map_err(|e| StorageError::Backend(e.to_string()))?;
1076            let version: i64 = row.try_get("version").unwrap_or(1);
1077            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1078            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1079            
1080            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1081            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1082                .or_else(|| {
1083                    row.try_get::<Vec<u8>, _>("payload").ok()
1084                        .and_then(|bytes| String::from_utf8(bytes).ok())
1085                });
1086            
1087            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1088            
1089            // Try reading audit as String first (SQLite), then bytes (MySQL)
1090            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1091                .or_else(|| {
1092                    row.try_get::<Vec<u8>, _>("audit").ok()
1093                        .and_then(|bytes| String::from_utf8(bytes).ok())
1094                });
1095            
1096            // State field - try String first (SQLite), then bytes (MySQL)
1097            let state: String = row.try_get::<String, _>("state").ok()
1098                .or_else(|| {
1099                    row.try_get::<Vec<u8>, _>("state").ok()
1100                        .and_then(|bytes| String::from_utf8(bytes).ok())
1101                })
1102                .unwrap_or_else(|| "default".to_string());
1103            
1104            // Access metadata (local-only, not replicated)
1105            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1106            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1107            
1108            let (content, content_type) = if let Some(ref json_str) = payload_json {
1109                (json_str.as_bytes().to_vec(), ContentType::Json)
1110            } else if let Some(blob) = payload_blob {
1111                (blob, ContentType::Binary)
1112            } else {
1113                continue;
1114            };
1115            
1116            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1117            
1118            let item = SyncItem::reconstruct(
1119                id,
1120                version as u64,
1121                timestamp,
1122                content_type,
1123                content,
1124                batch_id,
1125                trace_parent,
1126                payload_hash.unwrap_or_default(),
1127                home_instance_id,
1128                state,
1129                access_count as u64,
1130                last_accessed as u64,
1131            );
1132            items.push(item);
1133        }
1134        
1135        Ok(items)
1136    }
1137    
1138    /// Count items matching an ID prefix.
1139    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1140        let pattern = format!("{}%", prefix);
1141        
1142        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1143            .bind(&pattern)
1144            .fetch_one(&self.pool)
1145            .await
1146            .map_err(|e| StorageError::Backend(e.to_string()))?;
1147        
1148        let count: i64 = result.try_get("cnt")
1149            .map_err(|e| StorageError::Backend(e.to_string()))?;
1150        
1151        Ok(count as u64)
1152    }
1153    
1154    /// Delete all items matching an ID prefix.
1155    ///
1156    /// Returns the number of deleted items.
1157    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1158        let pattern = format!("{}%", prefix);
1159        
1160        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1161            .bind(&pattern)
1162            .execute(&self.pool)
1163            .await
1164            .map_err(|e| StorageError::Backend(e.to_string()))?;
1165        
1166        Ok(result.rows_affected())
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173    use std::path::PathBuf;
1174    use serde_json::json;
1175    
1176    fn temp_db_path(name: &str) -> PathBuf {
1177        // Use local temp/ folder (gitignored) instead of system temp
1178        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1179    }
1180    
1181    /// Clean up SQLite database and its WAL files
1182    fn cleanup_db(path: &PathBuf) {
1183        let _ = std::fs::remove_file(path);
1184        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1185        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1186    }
1187    
1188    fn test_item(id: &str, state: &str) -> SyncItem {
1189        SyncItem::from_json(id.to_string(), json!({"id": id}))
1190            .with_state(state)
1191    }
1192
1193    #[tokio::test]
1194    async fn test_state_stored_and_retrieved() {
1195        let db_path = temp_db_path("stored");
1196        cleanup_db(&db_path);
1197        
1198        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1199        let store = SqlStore::new(&url).await.unwrap();
1200        
1201        // Store item with custom state
1202        let item = test_item("item1", "delta");
1203        store.put(&item).await.unwrap();
1204        
1205        // Retrieve and verify state
1206        let retrieved = store.get("item1").await.unwrap().unwrap();
1207        assert_eq!(retrieved.state, "delta");
1208        
1209        cleanup_db(&db_path);
1210    }
1211
1212    #[tokio::test]
1213    async fn test_state_default_value() {
1214        let db_path = temp_db_path("default");
1215        cleanup_db(&db_path);
1216        
1217        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1218        let store = SqlStore::new(&url).await.unwrap();
1219        
1220        // Store item with default state
1221        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1222        store.put(&item).await.unwrap();
1223        
1224        let retrieved = store.get("item1").await.unwrap().unwrap();
1225        assert_eq!(retrieved.state, "default");
1226        
1227        cleanup_db(&db_path);
1228    }
1229
1230    #[tokio::test]
1231    async fn test_get_by_state() {
1232        let db_path = temp_db_path("get_by_state");
1233        cleanup_db(&db_path);
1234        
1235        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1236        let store = SqlStore::new(&url).await.unwrap();
1237        
1238        // Insert items in different states
1239        store.put(&test_item("delta1", "delta")).await.unwrap();
1240        store.put(&test_item("delta2", "delta")).await.unwrap();
1241        store.put(&test_item("base1", "base")).await.unwrap();
1242        store.put(&test_item("pending1", "pending")).await.unwrap();
1243        
1244        // Query by state
1245        let deltas = store.get_by_state("delta", 100).await.unwrap();
1246        assert_eq!(deltas.len(), 2);
1247        assert!(deltas.iter().all(|i| i.state == "delta"));
1248        
1249        let bases = store.get_by_state("base", 100).await.unwrap();
1250        assert_eq!(bases.len(), 1);
1251        assert_eq!(bases[0].object_id, "base1");
1252        
1253        // Empty result for non-existent state
1254        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1255        assert!(none.is_empty());
1256        
1257        cleanup_db(&db_path);
1258    }
1259
1260    #[tokio::test]
1261    async fn test_get_by_state_with_limit() {
1262        let db_path = temp_db_path("get_by_state_limit");
1263        cleanup_db(&db_path);
1264        
1265        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266        let store = SqlStore::new(&url).await.unwrap();
1267        
1268        // Insert 10 items
1269        for i in 0..10 {
1270            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1271        }
1272        
1273        // Query with limit
1274        let limited = store.get_by_state("batch", 5).await.unwrap();
1275        assert_eq!(limited.len(), 5);
1276        
1277        cleanup_db(&db_path);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_count_by_state() {
1282        let db_path = temp_db_path("count_by_state");
1283        cleanup_db(&db_path);
1284        
1285        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1286        let store = SqlStore::new(&url).await.unwrap();
1287        
1288        // Insert items
1289        store.put(&test_item("a1", "alpha")).await.unwrap();
1290        store.put(&test_item("a2", "alpha")).await.unwrap();
1291        store.put(&test_item("a3", "alpha")).await.unwrap();
1292        store.put(&test_item("b1", "beta")).await.unwrap();
1293        
1294        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1295        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1296        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1297        
1298        cleanup_db(&db_path);
1299    }
1300
1301    #[tokio::test]
1302    async fn test_list_state_ids() {
1303        let db_path = temp_db_path("list_state_ids");
1304        cleanup_db(&db_path);
1305        
1306        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1307        let store = SqlStore::new(&url).await.unwrap();
1308        
1309        store.put(&test_item("id1", "pending")).await.unwrap();
1310        store.put(&test_item("id2", "pending")).await.unwrap();
1311        store.put(&test_item("id3", "done")).await.unwrap();
1312        
1313        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1314        assert_eq!(pending_ids.len(), 2);
1315        assert!(pending_ids.contains(&"id1".to_string()));
1316        assert!(pending_ids.contains(&"id2".to_string()));
1317        
1318        cleanup_db(&db_path);
1319    }
1320
1321    #[tokio::test]
1322    async fn test_set_state() {
1323        let db_path = temp_db_path("set_state");
1324        cleanup_db(&db_path);
1325        
1326        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1327        let store = SqlStore::new(&url).await.unwrap();
1328        
1329        store.put(&test_item("item1", "pending")).await.unwrap();
1330        
1331        // Verify initial state
1332        let before = store.get("item1").await.unwrap().unwrap();
1333        assert_eq!(before.state, "pending");
1334        
1335        // Update state
1336        let updated = store.set_state("item1", "approved").await.unwrap();
1337        assert!(updated);
1338        
1339        // Verify new state
1340        let after = store.get("item1").await.unwrap().unwrap();
1341        assert_eq!(after.state, "approved");
1342        
1343        // Non-existent item returns false
1344        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1345        assert!(!not_found);
1346        
1347        cleanup_db(&db_path);
1348    }
1349
1350    #[tokio::test]
1351    async fn test_delete_by_state() {
1352        let db_path = temp_db_path("delete_by_state");
1353        cleanup_db(&db_path);
1354        
1355        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1356        let store = SqlStore::new(&url).await.unwrap();
1357        
1358        store.put(&test_item("keep1", "keep")).await.unwrap();
1359        store.put(&test_item("keep2", "keep")).await.unwrap();
1360        store.put(&test_item("del1", "delete_me")).await.unwrap();
1361        store.put(&test_item("del2", "delete_me")).await.unwrap();
1362        store.put(&test_item("del3", "delete_me")).await.unwrap();
1363        
1364        // Delete by state
1365        let deleted = store.delete_by_state("delete_me").await.unwrap();
1366        assert_eq!(deleted, 3);
1367        
1368        // Verify deleted
1369        assert!(store.get("del1").await.unwrap().is_none());
1370        assert!(store.get("del2").await.unwrap().is_none());
1371        
1372        // Verify others remain
1373        assert!(store.get("keep1").await.unwrap().is_some());
1374        assert!(store.get("keep2").await.unwrap().is_some());
1375        
1376        // Delete non-existent state returns 0
1377        let zero = store.delete_by_state("nonexistent").await.unwrap();
1378        assert_eq!(zero, 0);
1379        
1380        cleanup_db(&db_path);
1381    }
1382
1383    #[tokio::test]
1384    async fn test_multiple_puts_preserve_state() {
1385        let db_path = temp_db_path("multi_put_state");
1386        cleanup_db(&db_path);
1387        
1388        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1389        let store = SqlStore::new(&url).await.unwrap();
1390        
1391        // Put multiple items with different states
1392        store.put(&test_item("a", "state_a")).await.unwrap();
1393        store.put(&test_item("b", "state_b")).await.unwrap();
1394        store.put(&test_item("c", "state_c")).await.unwrap();
1395        
1396        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1397        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1398        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1399        
1400        cleanup_db(&db_path);
1401    }
1402
1403    #[tokio::test]
1404    async fn test_scan_prefix() {
1405        let db_path = temp_db_path("scan_prefix");
1406        cleanup_db(&db_path);
1407        
1408        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1409        let store = SqlStore::new(&url).await.unwrap();
1410        
1411        // Insert items with different prefixes (CRDT pattern)
1412        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1413        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1414        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1415        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1416        store.put(&test_item("base:user.123", "base")).await.unwrap();
1417        store.put(&test_item("base:user.456", "base")).await.unwrap();
1418        
1419        // Scan specific object's deltas
1420        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1421        assert_eq!(user123_deltas.len(), 3);
1422        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1423        
1424        // Scan different object
1425        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1426        assert_eq!(user456_deltas.len(), 1);
1427        
1428        // Scan all deltas
1429        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1430        assert_eq!(all_deltas.len(), 4);
1431        
1432        // Scan all bases
1433        let bases = store.scan_prefix("base:", 100).await.unwrap();
1434        assert_eq!(bases.len(), 2);
1435        
1436        // Empty result for non-matching prefix
1437        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1438        assert!(none.is_empty());
1439        
1440        cleanup_db(&db_path);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_scan_prefix_with_limit() {
1445        let db_path = temp_db_path("scan_prefix_limit");
1446        cleanup_db(&db_path);
1447        
1448        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1449        let store = SqlStore::new(&url).await.unwrap();
1450        
1451        // Insert 20 items
1452        for i in 0..20 {
1453            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1454        }
1455        
1456        // Query with limit
1457        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1458        assert_eq!(limited.len(), 5);
1459        
1460        // Verify we can get all with larger limit
1461        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1462        assert_eq!(all.len(), 20);
1463        
1464        cleanup_db(&db_path);
1465    }
1466
1467    #[tokio::test]
1468    async fn test_count_prefix() {
1469        let db_path = temp_db_path("count_prefix");
1470        cleanup_db(&db_path);
1471        
1472        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1473        let store = SqlStore::new(&url).await.unwrap();
1474        
1475        // Insert items with different prefixes
1476        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1477        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1478        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1479        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1480        store.put(&test_item("base:user.123", "base")).await.unwrap();
1481        
1482        // Count by prefix
1483        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1484        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1485        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1486        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1487        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1488        
1489        cleanup_db(&db_path);
1490    }
1491
1492    #[tokio::test]
1493    async fn test_delete_prefix() {
1494        let db_path = temp_db_path("delete_prefix");
1495        cleanup_db(&db_path);
1496        
1497        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1498        let store = SqlStore::new(&url).await.unwrap();
1499        
1500        // Insert items with different prefixes
1501        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1502        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1503        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1504        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1505        store.put(&test_item("base:user.123", "base")).await.unwrap();
1506        
1507        // Delete one object's deltas
1508        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1509        assert_eq!(deleted, 3);
1510        
1511        // Verify deleted
1512        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1513        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1514        
1515        // Verify other deltas remain
1516        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1517        
1518        // Verify bases remain
1519        assert!(store.get("base:user.123").await.unwrap().is_some());
1520        
1521        // Delete non-existent prefix returns 0
1522        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1523        assert_eq!(zero, 0);
1524        
1525        cleanup_db(&db_path);
1526    }
1527}