sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49// SQLx `Any` driver requires runtime installation
50static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53    INSTALL_DRIVERS.call_once(|| {
54        sqlx::any::install_default_drivers();
55    });
56}
57
58pub struct SqlStore {
59    pool: AnyPool,
60    is_sqlite: bool,
61}
62
63impl SqlStore {
64    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
65    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66        install_drivers();
67        
68        let is_sqlite = connection_string.starts_with("sqlite:");
69        
70        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71            AnyPoolOptions::new()
72                .max_connections(20)
73                .acquire_timeout(Duration::from_secs(10))
74                .idle_timeout(Duration::from_secs(300))
75                .connect(connection_string)
76                .await
77                .map_err(|e| StorageError::Backend(e.to_string()))
78        })
79        .await?;
80
81        let store = Self { pool, is_sqlite };
82        
83        // Enable WAL mode for SQLite (better concurrency, faster writes)
84        if is_sqlite {
85            store.enable_wal_mode().await?;
86        }
87        
88        store.init_schema().await?;
89        
90        // MySQL: Set session-level transaction isolation to READ COMMITTED
91        // This reduces gap locking and deadlocks under high concurrency
92        if !is_sqlite {
93            sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
94                .execute(&store.pool)
95                .await
96                .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
97        }
98        
99        Ok(store)
100    }
101    
102    /// Get a clone of the connection pool for sharing with other stores.
103    pub fn pool(&self) -> AnyPool {
104        self.pool.clone()
105    }
106    
107    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
108    /// 
109    /// Benefits:
110    /// - Concurrent reads during writes (readers don't block writers)
111    /// - Better write performance (single fsync instead of two)
112    /// - More predictable performance under load
113    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
114        sqlx::query("PRAGMA journal_mode = WAL")
115            .execute(&self.pool)
116            .await
117            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
118        
119        // Also set synchronous to NORMAL for better performance while still safe
120        // (FULL is default but WAL mode is safe with NORMAL)
121        sqlx::query("PRAGMA synchronous = NORMAL")
122            .execute(&self.pool)
123            .await
124            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
125        
126        Ok(())
127    }
128
129    async fn init_schema(&self) -> Result<(), StorageError> {
130        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
131        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
132        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
133        //
134        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
135        // This enables efficient multi-instance coordination without locking.
136        //
137        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
138        // Indexed for fast state-based queries.
139        let sql = if self.is_sqlite {
140            r#"
141            CREATE TABLE IF NOT EXISTS sync_items (
142                id TEXT PRIMARY KEY,
143                version INTEGER NOT NULL DEFAULT 1,
144                timestamp INTEGER NOT NULL,
145                payload_hash TEXT,
146                payload TEXT,
147                payload_blob BLOB,
148                audit TEXT,
149                merkle_dirty INTEGER NOT NULL DEFAULT 1,
150                state TEXT NOT NULL DEFAULT 'default',
151                access_count INTEGER NOT NULL DEFAULT 0,
152                last_accessed INTEGER NOT NULL DEFAULT 0
153            )
154            "#
155        } else {
156            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
157            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
158            r#"
159            CREATE TABLE IF NOT EXISTS sync_items (
160                id VARCHAR(255) PRIMARY KEY,
161                version BIGINT NOT NULL DEFAULT 1,
162                timestamp BIGINT NOT NULL,
163                payload_hash VARCHAR(64),
164                payload LONGTEXT,
165                payload_blob MEDIUMBLOB,
166                audit TEXT,
167                merkle_dirty TINYINT NOT NULL DEFAULT 1,
168                state VARCHAR(32) NOT NULL DEFAULT 'default',
169                access_count BIGINT NOT NULL DEFAULT 0,
170                last_accessed BIGINT NOT NULL DEFAULT 0,
171                INDEX idx_timestamp (timestamp),
172                INDEX idx_merkle_dirty (merkle_dirty),
173                INDEX idx_state (state)
174            )
175            "#
176        };
177
178        retry("sql_init_schema", &RetryConfig::startup(), || async {
179            sqlx::query(sql)
180                .execute(&self.pool)
181                .await
182                .map_err(|e| StorageError::Backend(e.to_string()))
183        })
184        .await?;
185
186        Ok(())
187    }
188    
189    /// Build the audit JSON object for operational metadata.
190    fn build_audit_json(item: &SyncItem) -> Option<String> {
191        let mut audit = serde_json::Map::new();
192        
193        if let Some(ref batch_id) = item.batch_id {
194            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
195        }
196        if let Some(ref trace_parent) = item.trace_parent {
197            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
198        }
199        if let Some(ref home) = item.home_instance_id {
200            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
201        }
202        
203        if audit.is_empty() {
204            None
205        } else {
206            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
207        }
208    }
209    
210    /// Parse audit JSON back into SyncItem fields.
211    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
212        match audit_str {
213            Some(s) => {
214                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
215                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
216                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
217                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
218                    (batch_id, trace_parent, home_instance_id)
219                } else {
220                    (None, None, None)
221                }
222            }
223            None => (None, None, None),
224        }
225    }
226}
227
228#[async_trait]
229impl ArchiveStore for SqlStore {
230    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
231        let id = id.to_string();
232        
233        retry("sql_get", &RetryConfig::query(), || async {
234            let result = sqlx::query(
235                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
236            )
237                .bind(&id)
238                .fetch_optional(&self.pool)
239                .await
240                .map_err(|e| StorageError::Backend(e.to_string()))?;
241
242            match result {
243                Some(row) => {
244                    let version: i64 = row.try_get("version").unwrap_or(1);
245                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
246                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
247                    
248                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
249                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
250                        .or_else(|| {
251                            row.try_get::<Vec<u8>, _>("payload").ok()
252                                .and_then(|bytes| String::from_utf8(bytes).ok())
253                        });
254                    
255                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
256                    
257                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
258                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
259                        .or_else(|| {
260                            row.try_get::<Vec<u8>, _>("audit").ok()
261                                .and_then(|bytes| String::from_utf8(bytes).ok())
262                        });
263                    
264                    // State field - try String first (SQLite), then bytes (MySQL)
265                    let state: String = row.try_get::<String, _>("state").ok()
266                        .or_else(|| {
267                            row.try_get::<Vec<u8>, _>("state").ok()
268                                .and_then(|bytes| String::from_utf8(bytes).ok())
269                        })
270                        .unwrap_or_else(|| "default".to_string());
271                    
272                    // Access metadata (local eviction stats, not replicated)
273                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
274                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
275                    
276                    // Determine content and content_type
277                    let (content, content_type) = if let Some(ref json_str) = payload_json {
278                        // JSON content - parse and re-serialize to bytes
279                        let content = json_str.as_bytes().to_vec();
280                        (content, ContentType::Json)
281                    } else if let Some(blob) = payload_blob {
282                        // Binary content
283                        (blob, ContentType::Binary)
284                    } else {
285                        return Err(StorageError::Backend("No payload in row".to_string()));
286                    };
287                    
288                    // Parse audit fields
289                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
290                    
291                    let item = SyncItem::reconstruct(
292                        id.clone(),
293                        version as u64,
294                        timestamp,
295                        content_type,
296                        content,
297                        batch_id,
298                        trace_parent,
299                        payload_hash.unwrap_or_default(),
300                        home_instance_id,
301                        state,
302                        access_count as u64,
303                        last_accessed as u64,
304                    );
305                    Ok(Some(item))
306                }
307                None => Ok(None),
308            }
309        })
310        .await
311    }
312
313    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
314        let id = item.object_id.clone();
315        let version = item.version as i64;
316        let timestamp = item.updated_at;
317        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
318        let audit_json = Self::build_audit_json(item);
319        let state = item.state.clone();
320        
321        // Determine payload storage based on content type
322        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
323            ContentType::Json => {
324                let json_str = String::from_utf8_lossy(&item.content).to_string();
325                (Some(json_str), None)
326            }
327            ContentType::Binary => {
328                (None, Some(item.content.clone()))
329            }
330        };
331
332        let sql = if self.is_sqlite {
333            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
334             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
335             ON CONFLICT(id) DO UPDATE SET 
336                version = excluded.version, 
337                timestamp = excluded.timestamp, 
338                payload_hash = excluded.payload_hash, 
339                payload = excluded.payload, 
340                payload_blob = excluded.payload_blob, 
341                audit = excluded.audit, 
342                merkle_dirty = 1, 
343                state = excluded.state,
344                access_count = excluded.access_count,
345                last_accessed = excluded.last_accessed"
346        } else {
347            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
348             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
349             ON DUPLICATE KEY UPDATE 
350                version = VALUES(version), 
351                timestamp = VALUES(timestamp), 
352                payload_hash = VALUES(payload_hash), 
353                payload = VALUES(payload), 
354                payload_blob = VALUES(payload_blob), 
355                audit = VALUES(audit), 
356                merkle_dirty = 1, 
357                state = VALUES(state),
358                access_count = VALUES(access_count),
359                last_accessed = VALUES(last_accessed)"
360        };
361
362        retry("sql_put", &RetryConfig::query(), || async {
363            sqlx::query(sql)
364                .bind(&id)
365                .bind(version)
366                .bind(timestamp)
367                .bind(&payload_hash)
368                .bind(&payload_json)
369                .bind(&payload_blob)
370                .bind(&audit_json)
371                .bind(&state)
372                .bind(item.access_count as i64)
373                .bind(item.last_accessed as i64)
374                .execute(&self.pool)
375                .await
376                .map_err(|e| StorageError::Backend(e.to_string()))?;
377            Ok(())
378        })
379        .await
380    }
381
382    async fn delete(&self, id: &str) -> Result<(), StorageError> {
383        let id = id.to_string();
384        retry("sql_delete", &RetryConfig::query(), || async {
385            sqlx::query("DELETE FROM sync_items WHERE id = ?")
386                .bind(&id)
387                .execute(&self.pool)
388                .await
389                .map_err(|e| StorageError::Backend(e.to_string()))?;
390            Ok(())
391        })
392        .await
393    }
394
395    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
396        let id = id.to_string();
397        retry("sql_exists", &RetryConfig::query(), || async {
398            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
399                .bind(&id)
400                .fetch_optional(&self.pool)
401                .await
402                .map_err(|e| StorageError::Backend(e.to_string()))?;
403            Ok(result.is_some())
404        })
405        .await
406    }
407    
408    /// Write a batch of items in a single multi-row INSERT with verification.
409    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
410        if items.is_empty() {
411            return Ok(BatchWriteResult {
412                batch_id: String::new(),
413                written: 0,
414                verified: true,
415            });
416        }
417
418        // Generate a unique batch ID (for audit trail, not verification)
419        let batch_id = uuid::Uuid::new_v4().to_string();
420        
421        // Stamp all items with the batch_id
422        for item in items.iter_mut() {
423            item.batch_id = Some(batch_id.clone());
424        }
425
426        // Collect IDs for verification
427        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
428
429        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
430        const CHUNK_SIZE: usize = 500;
431        let mut total_written = 0usize;
432
433        for chunk in items.chunks(CHUNK_SIZE) {
434            let written = self.put_batch_chunk(chunk, &batch_id).await?;
435            total_written += written;
436        }
437
438        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
439        let verified_count = self.verify_batch_ids(&item_ids).await?;
440        let verified = verified_count == items.len();
441
442        if !verified {
443            tracing::warn!(
444                batch_id = %batch_id,
445                expected = items.len(),
446                actual = verified_count,
447                "Batch verification mismatch"
448            );
449        }
450
451        Ok(BatchWriteResult {
452            batch_id,
453            written: total_written,
454            verified,
455        })
456    }
457
458    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
459        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
460            .bind(limit as i64)
461            .bind(offset as i64)
462            .fetch_all(&self.pool)
463            .await
464            .map_err(|e| StorageError::Backend(e.to_string()))?;
465        
466        let mut keys = Vec::with_capacity(rows.len());
467        for row in rows {
468            let id: String = row.try_get("id")
469                .map_err(|e| StorageError::Backend(e.to_string()))?;
470            keys.push(id);
471        }
472        
473        Ok(keys)
474    }
475
476    async fn count_all(&self) -> Result<u64, StorageError> {
477        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
478            .fetch_one(&self.pool)
479            .await
480            .map_err(|e| StorageError::Backend(e.to_string()))?;
481        
482        let count: i64 = result.try_get("cnt")
483            .map_err(|e| StorageError::Backend(e.to_string()))?;
484        
485        Ok(count as u64)
486    }
487}
488
489impl SqlStore {
490    /// Write a single chunk of items with content-type aware storage.
491    /// The batch_id is already embedded in each item's audit JSON.
492    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
493        let placeholders: Vec<String> = (0..chunk.len())
494            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
495            .collect();
496        
497        let sql = if self.is_sqlite {
498            format!(
499                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
500                 ON CONFLICT(id) DO UPDATE SET \
501                    version = excluded.version, \
502                    timestamp = excluded.timestamp, \
503                    payload_hash = excluded.payload_hash, \
504                    payload = excluded.payload, \
505                    payload_blob = excluded.payload_blob, \
506                    audit = excluded.audit, \
507                    merkle_dirty = 1, \
508                    state = excluded.state, \
509                    access_count = excluded.access_count, \
510                    last_accessed = excluded.last_accessed",
511                placeholders.join(", ")
512            )
513        } else {
514            format!(
515                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
516                 ON DUPLICATE KEY UPDATE \
517                    version = VALUES(version), \
518                    timestamp = VALUES(timestamp), \
519                    payload_hash = VALUES(payload_hash), \
520                    payload = VALUES(payload), \
521                    payload_blob = VALUES(payload_blob), \
522                    audit = VALUES(audit), \
523                    merkle_dirty = 1, \
524                    state = VALUES(state), \
525                    access_count = VALUES(access_count), \
526                    last_accessed = VALUES(last_accessed)",
527                placeholders.join(", ")
528            )
529        };
530
531        // Prepare all items with their fields
532        #[derive(Clone)]
533        struct PreparedRow {
534            id: String,
535            version: i64,
536            timestamp: i64,
537            payload_hash: Option<String>,
538            payload_json: Option<String>,
539            payload_blob: Option<Vec<u8>>,
540            audit_json: Option<String>,
541            state: String,
542            access_count: i64,
543            last_accessed: i64,
544        }
545        
546        let prepared: Vec<PreparedRow> = chunk.iter()
547            .map(|item| {
548                let (payload_json, payload_blob) = match item.content_type {
549                    ContentType::Json => {
550                        let json_str = String::from_utf8_lossy(&item.content).to_string();
551                        (Some(json_str), None)
552                    }
553                    ContentType::Binary => {
554                        (None, Some(item.content.clone()))
555                    }
556                };
557                
558                PreparedRow {
559                    id: item.object_id.clone(),
560                    version: item.version as i64,
561                    timestamp: item.updated_at,
562                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
563                    payload_json,
564                    payload_blob,
565                    audit_json: Self::build_audit_json(item),
566                    state: item.state.clone(),
567                    access_count: item.access_count as i64,
568                    last_accessed: item.last_accessed as i64,
569                }
570            })
571            .collect();
572
573        retry("sql_put_batch", &RetryConfig::batch_write(), || {
574            let sql = sql.clone();
575            let prepared = prepared.clone();
576            async move {
577                let mut query = sqlx::query(&sql);
578                
579                for row in &prepared {
580                    query = query
581                        .bind(&row.id)
582                        .bind(row.version)
583                        .bind(row.timestamp)
584                        .bind(&row.payload_hash)
585                        .bind(&row.payload_json)
586                        .bind(&row.payload_blob)
587                        .bind(&row.audit_json)
588                        .bind(&row.state)
589                        .bind(row.access_count)
590                        .bind(row.last_accessed);
591                }
592                
593                query.execute(&self.pool)
594                    .await
595                    .map_err(|e| StorageError::Backend(e.to_string()))?;
596                
597                Ok(())
598            }
599        })
600        .await?;
601
602        Ok(chunk.len())
603    }
604
605    /// Verify a batch was written by checking all IDs exist.
606    /// This is more reliable than batch_id verification under concurrent writes.
607    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
608        if ids.is_empty() {
609            return Ok(0);
610        }
611
612        // Use chunked EXISTS queries to avoid overly large IN clauses
613        const CHUNK_SIZE: usize = 500;
614        let mut total_found = 0usize;
615
616        for chunk in ids.chunks(CHUNK_SIZE) {
617            let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
618            let sql = format!(
619                "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
620                placeholders.join(", ")
621            );
622
623            let mut query = sqlx::query(&sql);
624            for id in chunk {
625                query = query.bind(id);
626            }
627
628            let result = query
629                .fetch_one(&self.pool)
630                .await
631                .map_err(|e| StorageError::Backend(e.to_string()))?;
632
633            let count: i64 = result
634                .try_get("cnt")
635                .map_err(|e| StorageError::Backend(e.to_string()))?;
636            total_found += count as usize;
637        }
638
639        Ok(total_found)
640    }
641
642    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
643    #[allow(dead_code)]
644    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
645        let batch_id = batch_id.to_string();
646        
647        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
648        let sql = if self.is_sqlite {
649            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
650        } else {
651            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
652        };
653        
654        let bind_value = if self.is_sqlite {
655            format!("%\"batch\":\"{}%", batch_id)
656        } else {
657            batch_id.clone()
658        };
659        
660        let result = sqlx::query(sql)
661            .bind(&bind_value)
662            .fetch_one(&self.pool)
663            .await
664            .map_err(|e| StorageError::Backend(e.to_string()))?;
665        
666        let count: i64 = result.try_get("cnt")
667            .map_err(|e| StorageError::Backend(e.to_string()))?;
668        
669        Ok(count as usize)
670    }
671
672    /// Scan a batch of items (for WAL drain).
673    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
674        let rows = sqlx::query(
675            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
676        )
677            .bind(limit as i64)
678            .fetch_all(&self.pool)
679            .await
680            .map_err(|e| StorageError::Backend(e.to_string()))?;
681        
682        let mut items = Vec::with_capacity(rows.len());
683        for row in rows {
684            let id: String = row.try_get("id")
685                .map_err(|e| StorageError::Backend(e.to_string()))?;
686            let version: i64 = row.try_get("version").unwrap_or(1);
687            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
688            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
689            
690            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
691            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
692            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
693            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
694            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
695            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
696            
697            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
698            let state: String = state_bytes
699                .and_then(|bytes| String::from_utf8(bytes).ok())
700                .unwrap_or_else(|| "default".to_string());
701            
702            // Access metadata
703            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
704            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
705            
706            let (content, content_type) = if let Some(ref json_str) = payload_json {
707                (json_str.as_bytes().to_vec(), ContentType::Json)
708            } else if let Some(blob) = payload_blob {
709                (blob, ContentType::Binary)
710            } else {
711                continue; // Skip rows with no payload
712            };
713            
714            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
715            
716            let item = SyncItem::reconstruct(
717                id,
718                version as u64,
719                timestamp,
720                content_type,
721                content,
722                batch_id,
723                trace_parent,
724                payload_hash.unwrap_or_default(),
725                home_instance_id,
726                state,
727                access_count as u64,
728                last_accessed as u64,
729            );
730            items.push(item);
731        }
732        
733        Ok(items)
734    }
735
736    /// Delete multiple items by ID in a single query.
737    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
738        if ids.is_empty() {
739            return Ok(0);
740        }
741
742        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
743        let sql = format!(
744            "DELETE FROM sync_items WHERE id IN ({})",
745            placeholders.join(", ")
746        );
747
748        retry("sql_delete_batch", &RetryConfig::query(), || {
749            let sql = sql.clone();
750            let ids = ids.to_vec();
751            async move {
752                let mut query = sqlx::query(&sql);
753                for id in &ids {
754                    query = query.bind(id);
755                }
756                
757                let result = query.execute(&self.pool)
758                    .await
759                    .map_err(|e| StorageError::Backend(e.to_string()))?;
760                
761                Ok(result.rows_affected() as usize)
762            }
763        })
764        .await
765    }
766    
767    // ═══════════════════════════════════════════════════════════════════════════
768    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
769    // ═══════════════════════════════════════════════════════════════════════════
770    
771    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
772    ///
773    /// Used by background merkle processor to batch recalculate affected trees.
774    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
775        let rows = sqlx::query(
776            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
777        )
778            .bind(limit as i64)
779            .fetch_all(&self.pool)
780            .await
781            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
782        
783        let mut ids = Vec::with_capacity(rows.len());
784        for row in rows {
785            let id: String = row.try_get("id")
786                .map_err(|e| StorageError::Backend(e.to_string()))?;
787            ids.push(id);
788        }
789        
790        Ok(ids)
791    }
792    
793    /// Count items with merkle_dirty = 1.
794    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
795        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
796            .fetch_one(&self.pool)
797            .await
798            .map_err(|e| StorageError::Backend(e.to_string()))?;
799        
800        let count: i64 = result.try_get("cnt")
801            .map_err(|e| StorageError::Backend(e.to_string()))?;
802        
803        Ok(count as u64)
804    }
805    
806    /// Mark items as merkle-clean after recalculation.
807    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
808        if ids.is_empty() {
809            return Ok(0);
810        }
811        
812        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
813        let sql = format!(
814            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
815            placeholders.join(", ")
816        );
817        
818        let mut query = sqlx::query(&sql);
819        for id in ids {
820            query = query.bind(id);
821        }
822        
823        let result = query.execute(&self.pool)
824            .await
825            .map_err(|e| StorageError::Backend(e.to_string()))?;
826        
827        Ok(result.rows_affected() as usize)
828    }
829    
830    /// Check if there are any dirty merkle items.
831    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
832        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
833            .fetch_optional(&self.pool)
834            .await
835            .map_err(|e| StorageError::Backend(e.to_string()))?;
836        
837        Ok(result.is_some())
838    }
839    
840    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
841    ///
842    /// Returns the items themselves so merkle can be calculated.
843    /// Use `mark_merkle_clean()` after processing to clear the flag.
844    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
845        let rows = sqlx::query(
846            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
847             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
848        )
849            .bind(limit as i64)
850            .fetch_all(&self.pool)
851            .await
852            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
853        
854        let mut items = Vec::with_capacity(rows.len());
855        for row in rows {
856            let id: String = row.try_get("id")
857                .map_err(|e| StorageError::Backend(e.to_string()))?;
858            let version: i64 = row.try_get("version").unwrap_or(1);
859            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
860            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
861            
862            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
863            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
864            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
865                String::from_utf8(bytes).ok()
866            });
867            
868            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
869            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
870            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
871                String::from_utf8(bytes).ok()
872            });
873            
874            // State field
875            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
876            let state: String = state_bytes
877                .and_then(|bytes| String::from_utf8(bytes).ok())
878                .unwrap_or_else(|| "default".to_string());
879            
880            // Access metadata
881            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
882            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
883            
884            // Determine content and content_type
885            let (content, content_type) = if let Some(ref json_str) = payload_json {
886                (json_str.as_bytes().to_vec(), ContentType::Json)
887            } else if let Some(blob) = payload_blob {
888                (blob, ContentType::Binary)
889            } else {
890                continue; // Skip items with no payload
891            };
892            
893            // Parse audit fields
894            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
895            
896            let item = SyncItem::reconstruct(
897                id,
898                version as u64,
899                timestamp,
900                content_type,
901                content,
902                batch_id,
903                trace_parent,
904                payload_hash.unwrap_or_default(),
905                home_instance_id,
906                state,
907                access_count as u64,
908                last_accessed as u64,
909            );
910            items.push(item);
911        }
912        
913        Ok(items)
914    }
915    
916    // ═══════════════════════════════════════════════════════════════════════════
917    // State-based queries: Fast indexed access by caller-defined state tag
918    // ═══════════════════════════════════════════════════════════════════════════
919    
920    /// Get items by state (e.g., "delta", "base", "pending").
921    ///
922    /// Uses indexed query for fast retrieval.
923    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
924        let rows = sqlx::query(
925            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
926             FROM sync_items WHERE state = ? LIMIT ?"
927        )
928            .bind(state)
929            .bind(limit as i64)
930            .fetch_all(&self.pool)
931            .await
932            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
933        
934        let mut items = Vec::with_capacity(rows.len());
935        for row in rows {
936            let id: String = row.try_get("id")
937                .map_err(|e| StorageError::Backend(e.to_string()))?;
938            let version: i64 = row.try_get("version").unwrap_or(1);
939            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
940            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
941            
942            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
943            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
944                .or_else(|| {
945                    row.try_get::<Vec<u8>, _>("payload").ok()
946                        .and_then(|bytes| String::from_utf8(bytes).ok())
947                });
948            
949            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
950            
951            // Try reading audit as String first (SQLite), then bytes (MySQL)
952            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
953                .or_else(|| {
954                    row.try_get::<Vec<u8>, _>("audit").ok()
955                        .and_then(|bytes| String::from_utf8(bytes).ok())
956                });
957            
958            // State field - try String first (SQLite), then bytes (MySQL)
959            let state: String = row.try_get::<String, _>("state").ok()
960                .or_else(|| {
961                    row.try_get::<Vec<u8>, _>("state").ok()
962                        .and_then(|bytes| String::from_utf8(bytes).ok())
963                })
964                .unwrap_or_else(|| "default".to_string());
965            
966            // Access metadata (local-only, not replicated)
967            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
968            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
969            
970            let (content, content_type) = if let Some(ref json_str) = payload_json {
971                (json_str.as_bytes().to_vec(), ContentType::Json)
972            } else if let Some(blob) = payload_blob {
973                (blob, ContentType::Binary)
974            } else {
975                continue;
976            };
977            
978            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
979            
980            let item = SyncItem::reconstruct(
981                id,
982                version as u64,
983                timestamp,
984                content_type,
985                content,
986                batch_id,
987                trace_parent,
988                payload_hash.unwrap_or_default(),
989                home_instance_id,
990                state,
991                access_count as u64,
992                last_accessed as u64,
993            );
994            items.push(item);
995        }
996        
997        Ok(items)
998    }
999    
1000    /// Count items in a given state.
1001    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1002        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1003            .bind(state)
1004            .fetch_one(&self.pool)
1005            .await
1006            .map_err(|e| StorageError::Backend(e.to_string()))?;
1007        
1008        let count: i64 = result.try_get("cnt")
1009            .map_err(|e| StorageError::Backend(e.to_string()))?;
1010        
1011        Ok(count as u64)
1012    }
1013    
1014    /// Get just the IDs of items in a given state (lightweight query).
1015    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1016        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1017            .bind(state)
1018            .bind(limit as i64)
1019            .fetch_all(&self.pool)
1020            .await
1021            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1022        
1023        let mut ids = Vec::with_capacity(rows.len());
1024        for row in rows {
1025            let id: String = row.try_get("id")
1026                .map_err(|e| StorageError::Backend(e.to_string()))?;
1027            ids.push(id);
1028        }
1029        
1030        Ok(ids)
1031    }
1032    
1033    /// Update the state of an item by ID.
1034    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1035        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1036            .bind(new_state)
1037            .bind(id)
1038            .execute(&self.pool)
1039            .await
1040            .map_err(|e| StorageError::Backend(e.to_string()))?;
1041        
1042        Ok(result.rows_affected() > 0)
1043    }
1044    
1045    /// Delete all items in a given state.
1046    ///
1047    /// Returns the number of deleted items.
1048    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1049        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1050            .bind(state)
1051            .execute(&self.pool)
1052            .await
1053            .map_err(|e| StorageError::Backend(e.to_string()))?;
1054        
1055        Ok(result.rows_affected())
1056    }
1057    
1058    /// Scan items by ID prefix.
1059    ///
1060    /// Efficiently retrieves all items whose ID starts with the given prefix.
1061    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1062    ///
1063    /// # Example
1064    /// ```rust,ignore
1065    /// // Get all deltas for object user.123
1066    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1067    /// ```
1068    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1069        // Build LIKE pattern: "prefix%"
1070        let pattern = format!("{}%", prefix);
1071        
1072        let rows = sqlx::query(
1073            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1074             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1075        )
1076            .bind(&pattern)
1077            .bind(limit as i64)
1078            .fetch_all(&self.pool)
1079            .await
1080            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1081        
1082        let mut items = Vec::with_capacity(rows.len());
1083        for row in rows {
1084            let id: String = row.try_get("id")
1085                .map_err(|e| StorageError::Backend(e.to_string()))?;
1086            let version: i64 = row.try_get("version").unwrap_or(1);
1087            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1088            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1089            
1090            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1091            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1092                .or_else(|| {
1093                    row.try_get::<Vec<u8>, _>("payload").ok()
1094                        .and_then(|bytes| String::from_utf8(bytes).ok())
1095                });
1096            
1097            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1098            
1099            // Try reading audit as String first (SQLite), then bytes (MySQL)
1100            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1101                .or_else(|| {
1102                    row.try_get::<Vec<u8>, _>("audit").ok()
1103                        .and_then(|bytes| String::from_utf8(bytes).ok())
1104                });
1105            
1106            // State field - try String first (SQLite), then bytes (MySQL)
1107            let state: String = row.try_get::<String, _>("state").ok()
1108                .or_else(|| {
1109                    row.try_get::<Vec<u8>, _>("state").ok()
1110                        .and_then(|bytes| String::from_utf8(bytes).ok())
1111                })
1112                .unwrap_or_else(|| "default".to_string());
1113            
1114            // Access metadata (local-only, not replicated)
1115            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1116            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1117            
1118            let (content, content_type) = if let Some(ref json_str) = payload_json {
1119                (json_str.as_bytes().to_vec(), ContentType::Json)
1120            } else if let Some(blob) = payload_blob {
1121                (blob, ContentType::Binary)
1122            } else {
1123                continue;
1124            };
1125            
1126            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1127            
1128            let item = SyncItem::reconstruct(
1129                id,
1130                version as u64,
1131                timestamp,
1132                content_type,
1133                content,
1134                batch_id,
1135                trace_parent,
1136                payload_hash.unwrap_or_default(),
1137                home_instance_id,
1138                state,
1139                access_count as u64,
1140                last_accessed as u64,
1141            );
1142            items.push(item);
1143        }
1144        
1145        Ok(items)
1146    }
1147    
1148    /// Count items matching an ID prefix.
1149    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1150        let pattern = format!("{}%", prefix);
1151        
1152        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1153            .bind(&pattern)
1154            .fetch_one(&self.pool)
1155            .await
1156            .map_err(|e| StorageError::Backend(e.to_string()))?;
1157        
1158        let count: i64 = result.try_get("cnt")
1159            .map_err(|e| StorageError::Backend(e.to_string()))?;
1160        
1161        Ok(count as u64)
1162    }
1163    
1164    /// Delete all items matching an ID prefix.
1165    ///
1166    /// Returns the number of deleted items.
1167    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1168        let pattern = format!("{}%", prefix);
1169        
1170        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1171            .bind(&pattern)
1172            .execute(&self.pool)
1173            .await
1174            .map_err(|e| StorageError::Backend(e.to_string()))?;
1175        
1176        Ok(result.rows_affected())
1177    }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::*;
1183    use std::path::PathBuf;
1184    use serde_json::json;
1185    
1186    fn temp_db_path(name: &str) -> PathBuf {
1187        // Use local temp/ folder (gitignored) instead of system temp
1188        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1189    }
1190    
1191    /// Clean up SQLite database and its WAL files
1192    fn cleanup_db(path: &PathBuf) {
1193        let _ = std::fs::remove_file(path);
1194        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1195        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1196    }
1197    
1198    fn test_item(id: &str, state: &str) -> SyncItem {
1199        SyncItem::from_json(id.to_string(), json!({"id": id}))
1200            .with_state(state)
1201    }
1202
1203    #[tokio::test]
1204    async fn test_state_stored_and_retrieved() {
1205        let db_path = temp_db_path("stored");
1206        cleanup_db(&db_path);
1207        
1208        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1209        let store = SqlStore::new(&url).await.unwrap();
1210        
1211        // Store item with custom state
1212        let item = test_item("item1", "delta");
1213        store.put(&item).await.unwrap();
1214        
1215        // Retrieve and verify state
1216        let retrieved = store.get("item1").await.unwrap().unwrap();
1217        assert_eq!(retrieved.state, "delta");
1218        
1219        cleanup_db(&db_path);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_state_default_value() {
1224        let db_path = temp_db_path("default");
1225        cleanup_db(&db_path);
1226        
1227        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1228        let store = SqlStore::new(&url).await.unwrap();
1229        
1230        // Store item with default state
1231        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1232        store.put(&item).await.unwrap();
1233        
1234        let retrieved = store.get("item1").await.unwrap().unwrap();
1235        assert_eq!(retrieved.state, "default");
1236        
1237        cleanup_db(&db_path);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_get_by_state() {
1242        let db_path = temp_db_path("get_by_state");
1243        cleanup_db(&db_path);
1244        
1245        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1246        let store = SqlStore::new(&url).await.unwrap();
1247        
1248        // Insert items in different states
1249        store.put(&test_item("delta1", "delta")).await.unwrap();
1250        store.put(&test_item("delta2", "delta")).await.unwrap();
1251        store.put(&test_item("base1", "base")).await.unwrap();
1252        store.put(&test_item("pending1", "pending")).await.unwrap();
1253        
1254        // Query by state
1255        let deltas = store.get_by_state("delta", 100).await.unwrap();
1256        assert_eq!(deltas.len(), 2);
1257        assert!(deltas.iter().all(|i| i.state == "delta"));
1258        
1259        let bases = store.get_by_state("base", 100).await.unwrap();
1260        assert_eq!(bases.len(), 1);
1261        assert_eq!(bases[0].object_id, "base1");
1262        
1263        // Empty result for non-existent state
1264        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1265        assert!(none.is_empty());
1266        
1267        cleanup_db(&db_path);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_get_by_state_with_limit() {
1272        let db_path = temp_db_path("get_by_state_limit");
1273        cleanup_db(&db_path);
1274        
1275        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1276        let store = SqlStore::new(&url).await.unwrap();
1277        
1278        // Insert 10 items
1279        for i in 0..10 {
1280            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1281        }
1282        
1283        // Query with limit
1284        let limited = store.get_by_state("batch", 5).await.unwrap();
1285        assert_eq!(limited.len(), 5);
1286        
1287        cleanup_db(&db_path);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_count_by_state() {
1292        let db_path = temp_db_path("count_by_state");
1293        cleanup_db(&db_path);
1294        
1295        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1296        let store = SqlStore::new(&url).await.unwrap();
1297        
1298        // Insert items
1299        store.put(&test_item("a1", "alpha")).await.unwrap();
1300        store.put(&test_item("a2", "alpha")).await.unwrap();
1301        store.put(&test_item("a3", "alpha")).await.unwrap();
1302        store.put(&test_item("b1", "beta")).await.unwrap();
1303        
1304        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1305        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1306        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1307        
1308        cleanup_db(&db_path);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_list_state_ids() {
1313        let db_path = temp_db_path("list_state_ids");
1314        cleanup_db(&db_path);
1315        
1316        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1317        let store = SqlStore::new(&url).await.unwrap();
1318        
1319        store.put(&test_item("id1", "pending")).await.unwrap();
1320        store.put(&test_item("id2", "pending")).await.unwrap();
1321        store.put(&test_item("id3", "done")).await.unwrap();
1322        
1323        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1324        assert_eq!(pending_ids.len(), 2);
1325        assert!(pending_ids.contains(&"id1".to_string()));
1326        assert!(pending_ids.contains(&"id2".to_string()));
1327        
1328        cleanup_db(&db_path);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_set_state() {
1333        let db_path = temp_db_path("set_state");
1334        cleanup_db(&db_path);
1335        
1336        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1337        let store = SqlStore::new(&url).await.unwrap();
1338        
1339        store.put(&test_item("item1", "pending")).await.unwrap();
1340        
1341        // Verify initial state
1342        let before = store.get("item1").await.unwrap().unwrap();
1343        assert_eq!(before.state, "pending");
1344        
1345        // Update state
1346        let updated = store.set_state("item1", "approved").await.unwrap();
1347        assert!(updated);
1348        
1349        // Verify new state
1350        let after = store.get("item1").await.unwrap().unwrap();
1351        assert_eq!(after.state, "approved");
1352        
1353        // Non-existent item returns false
1354        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1355        assert!(!not_found);
1356        
1357        cleanup_db(&db_path);
1358    }
1359
1360    #[tokio::test]
1361    async fn test_delete_by_state() {
1362        let db_path = temp_db_path("delete_by_state");
1363        cleanup_db(&db_path);
1364        
1365        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1366        let store = SqlStore::new(&url).await.unwrap();
1367        
1368        store.put(&test_item("keep1", "keep")).await.unwrap();
1369        store.put(&test_item("keep2", "keep")).await.unwrap();
1370        store.put(&test_item("del1", "delete_me")).await.unwrap();
1371        store.put(&test_item("del2", "delete_me")).await.unwrap();
1372        store.put(&test_item("del3", "delete_me")).await.unwrap();
1373        
1374        // Delete by state
1375        let deleted = store.delete_by_state("delete_me").await.unwrap();
1376        assert_eq!(deleted, 3);
1377        
1378        // Verify deleted
1379        assert!(store.get("del1").await.unwrap().is_none());
1380        assert!(store.get("del2").await.unwrap().is_none());
1381        
1382        // Verify others remain
1383        assert!(store.get("keep1").await.unwrap().is_some());
1384        assert!(store.get("keep2").await.unwrap().is_some());
1385        
1386        // Delete non-existent state returns 0
1387        let zero = store.delete_by_state("nonexistent").await.unwrap();
1388        assert_eq!(zero, 0);
1389        
1390        cleanup_db(&db_path);
1391    }
1392
1393    #[tokio::test]
1394    async fn test_multiple_puts_preserve_state() {
1395        let db_path = temp_db_path("multi_put_state");
1396        cleanup_db(&db_path);
1397        
1398        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1399        let store = SqlStore::new(&url).await.unwrap();
1400        
1401        // Put multiple items with different states
1402        store.put(&test_item("a", "state_a")).await.unwrap();
1403        store.put(&test_item("b", "state_b")).await.unwrap();
1404        store.put(&test_item("c", "state_c")).await.unwrap();
1405        
1406        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1407        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1408        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1409        
1410        cleanup_db(&db_path);
1411    }
1412
1413    #[tokio::test]
1414    async fn test_scan_prefix() {
1415        let db_path = temp_db_path("scan_prefix");
1416        cleanup_db(&db_path);
1417        
1418        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1419        let store = SqlStore::new(&url).await.unwrap();
1420        
1421        // Insert items with different prefixes (CRDT pattern)
1422        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1423        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1424        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1425        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1426        store.put(&test_item("base:user.123", "base")).await.unwrap();
1427        store.put(&test_item("base:user.456", "base")).await.unwrap();
1428        
1429        // Scan specific object's deltas
1430        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1431        assert_eq!(user123_deltas.len(), 3);
1432        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1433        
1434        // Scan different object
1435        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1436        assert_eq!(user456_deltas.len(), 1);
1437        
1438        // Scan all deltas
1439        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1440        assert_eq!(all_deltas.len(), 4);
1441        
1442        // Scan all bases
1443        let bases = store.scan_prefix("base:", 100).await.unwrap();
1444        assert_eq!(bases.len(), 2);
1445        
1446        // Empty result for non-matching prefix
1447        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1448        assert!(none.is_empty());
1449        
1450        cleanup_db(&db_path);
1451    }
1452
1453    #[tokio::test]
1454    async fn test_scan_prefix_with_limit() {
1455        let db_path = temp_db_path("scan_prefix_limit");
1456        cleanup_db(&db_path);
1457        
1458        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1459        let store = SqlStore::new(&url).await.unwrap();
1460        
1461        // Insert 20 items
1462        for i in 0..20 {
1463            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1464        }
1465        
1466        // Query with limit
1467        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1468        assert_eq!(limited.len(), 5);
1469        
1470        // Verify we can get all with larger limit
1471        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1472        assert_eq!(all.len(), 20);
1473        
1474        cleanup_db(&db_path);
1475    }
1476
1477    #[tokio::test]
1478    async fn test_count_prefix() {
1479        let db_path = temp_db_path("count_prefix");
1480        cleanup_db(&db_path);
1481        
1482        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1483        let store = SqlStore::new(&url).await.unwrap();
1484        
1485        // Insert items with different prefixes
1486        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1487        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1488        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1489        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1490        store.put(&test_item("base:user.123", "base")).await.unwrap();
1491        
1492        // Count by prefix
1493        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1494        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1495        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1496        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1497        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1498        
1499        cleanup_db(&db_path);
1500    }
1501
1502    #[tokio::test]
1503    async fn test_delete_prefix() {
1504        let db_path = temp_db_path("delete_prefix");
1505        cleanup_db(&db_path);
1506        
1507        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1508        let store = SqlStore::new(&url).await.unwrap();
1509        
1510        // Insert items with different prefixes
1511        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1512        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1513        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1514        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1515        store.put(&test_item("base:user.123", "base")).await.unwrap();
1516        
1517        // Delete one object's deltas
1518        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1519        assert_eq!(deleted, 3);
1520        
1521        // Verify deleted
1522        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1523        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1524        
1525        // Verify other deltas remain
1526        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1527        
1528        // Verify bases remain
1529        assert!(store.get("base:user.123").await.unwrap().is_some());
1530        
1531        // Delete non-existent prefix returns 0
1532        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1533        assert_eq!(zero, 0);
1534        
1535        cleanup_db(&db_path);
1536    }
1537}