sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
45use crate::resilience::retry::{retry, RetryConfig};
46use std::sync::Once;
47use std::time::Duration;
48
49// SQLx `Any` driver requires runtime installation
50static INSTALL_DRIVERS: Once = Once::new();
51
52fn install_drivers() {
53    INSTALL_DRIVERS.call_once(|| {
54        sqlx::any::install_default_drivers();
55    });
56}
57
58pub struct SqlStore {
59    pool: AnyPool,
60    is_sqlite: bool,
61}
62
63impl SqlStore {
64    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
65    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
66        install_drivers();
67        
68        let is_sqlite = connection_string.starts_with("sqlite:");
69        
70        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
71            AnyPoolOptions::new()
72                .max_connections(20)
73                .acquire_timeout(Duration::from_secs(10))
74                .idle_timeout(Duration::from_secs(300))
75                .connect(connection_string)
76                .await
77                .map_err(|e| StorageError::Backend(e.to_string()))
78        })
79        .await?;
80
81        let store = Self { pool, is_sqlite };
82        
83        // Enable WAL mode for SQLite (better concurrency, faster writes)
84        if is_sqlite {
85            store.enable_wal_mode().await?;
86        }
87        
88        store.init_schema().await?;
89        
90        // MySQL: Set session-level transaction isolation to READ COMMITTED
91        // This reduces gap locking and deadlocks under high concurrency
92        if !is_sqlite {
93            sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
94                .execute(&store.pool)
95                .await
96                .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
97        }
98        
99        Ok(store)
100    }
101    
102    /// Get a clone of the connection pool for sharing with other stores.
103    pub fn pool(&self) -> AnyPool {
104        self.pool.clone()
105    }
106    
107    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
108    /// 
109    /// Benefits:
110    /// - Concurrent reads during writes (readers don't block writers)
111    /// - Better write performance (single fsync instead of two)
112    /// - More predictable performance under load
113    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
114        sqlx::query("PRAGMA journal_mode = WAL")
115            .execute(&self.pool)
116            .await
117            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
118        
119        // Also set synchronous to NORMAL for better performance while still safe
120        // (FULL is default but WAL mode is safe with NORMAL)
121        sqlx::query("PRAGMA synchronous = NORMAL")
122            .execute(&self.pool)
123            .await
124            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
125        
126        Ok(())
127    }
128
129    async fn init_schema(&self) -> Result<(), StorageError> {
130        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
131        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
132        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
133        //
134        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
135        // This enables efficient multi-instance coordination without locking.
136        //
137        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
138        // Indexed for fast state-based queries.
139        let sql = if self.is_sqlite {
140            r#"
141            CREATE TABLE IF NOT EXISTS sync_items (
142                id TEXT PRIMARY KEY,
143                version INTEGER NOT NULL DEFAULT 1,
144                timestamp INTEGER NOT NULL,
145                payload_hash TEXT,
146                payload TEXT,
147                payload_blob BLOB,
148                audit TEXT,
149                merkle_dirty INTEGER NOT NULL DEFAULT 1,
150                state TEXT NOT NULL DEFAULT 'default',
151                access_count INTEGER NOT NULL DEFAULT 0,
152                last_accessed INTEGER NOT NULL DEFAULT 0
153            )
154            "#
155        } else {
156            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
157            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
158            r#"
159            CREATE TABLE IF NOT EXISTS sync_items (
160                id VARCHAR(255) PRIMARY KEY,
161                version BIGINT NOT NULL DEFAULT 1,
162                timestamp BIGINT NOT NULL,
163                payload_hash VARCHAR(64),
164                payload LONGTEXT,
165                payload_blob MEDIUMBLOB,
166                audit TEXT,
167                merkle_dirty TINYINT NOT NULL DEFAULT 1,
168                state VARCHAR(32) NOT NULL DEFAULT 'default',
169                access_count BIGINT NOT NULL DEFAULT 0,
170                last_accessed BIGINT NOT NULL DEFAULT 0,
171                INDEX idx_timestamp (timestamp),
172                INDEX idx_merkle_dirty (merkle_dirty),
173                INDEX idx_state (state)
174            )
175            "#
176        };
177
178        retry("sql_init_schema", &RetryConfig::startup(), || async {
179            sqlx::query(sql)
180                .execute(&self.pool)
181                .await
182                .map_err(|e| StorageError::Backend(e.to_string()))
183        })
184        .await?;
185
186        Ok(())
187    }
188    
189    /// Build the audit JSON object for operational metadata.
190    fn build_audit_json(item: &SyncItem) -> Option<String> {
191        let mut audit = serde_json::Map::new();
192        
193        if let Some(ref batch_id) = item.batch_id {
194            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
195        }
196        if let Some(ref trace_parent) = item.trace_parent {
197            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
198        }
199        if let Some(ref home) = item.home_instance_id {
200            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
201        }
202        
203        if audit.is_empty() {
204            None
205        } else {
206            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
207        }
208    }
209    
210    /// Parse audit JSON back into SyncItem fields.
211    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
212        match audit_str {
213            Some(s) => {
214                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
215                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
216                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
217                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
218                    (batch_id, trace_parent, home_instance_id)
219                } else {
220                    (None, None, None)
221                }
222            }
223            None => (None, None, None),
224        }
225    }
226}
227
228#[async_trait]
229impl ArchiveStore for SqlStore {
230    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
231        let id = id.to_string();
232        
233        retry("sql_get", &RetryConfig::query(), || async {
234            let result = sqlx::query(
235                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items WHERE id = ?"
236            )
237                .bind(&id)
238                .fetch_optional(&self.pool)
239                .await
240                .map_err(|e| StorageError::Backend(e.to_string()))?;
241
242            match result {
243                Some(row) => {
244                    let version: i64 = row.try_get("version").unwrap_or(1);
245                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
246                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
247                    
248                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
249                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
250                        .or_else(|| {
251                            row.try_get::<Vec<u8>, _>("payload").ok()
252                                .and_then(|bytes| String::from_utf8(bytes).ok())
253                        });
254                    
255                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
256                    
257                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
258                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
259                        .or_else(|| {
260                            row.try_get::<Vec<u8>, _>("audit").ok()
261                                .and_then(|bytes| String::from_utf8(bytes).ok())
262                        });
263                    
264                    // State field - try String first (SQLite), then bytes (MySQL)
265                    let state: String = row.try_get::<String, _>("state").ok()
266                        .or_else(|| {
267                            row.try_get::<Vec<u8>, _>("state").ok()
268                                .and_then(|bytes| String::from_utf8(bytes).ok())
269                        })
270                        .unwrap_or_else(|| "default".to_string());
271                    
272                    // Access metadata (local eviction stats, not replicated)
273                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
274                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
275                    
276                    // Determine content and content_type
277                    let (content, content_type) = if let Some(ref json_str) = payload_json {
278                        // JSON content - parse and re-serialize to bytes
279                        let content = json_str.as_bytes().to_vec();
280                        (content, ContentType::Json)
281                    } else if let Some(blob) = payload_blob {
282                        // Binary content
283                        (blob, ContentType::Binary)
284                    } else {
285                        return Err(StorageError::Backend("No payload in row".to_string()));
286                    };
287                    
288                    // Parse audit fields
289                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
290                    
291                    let item = SyncItem::reconstruct(
292                        id.clone(),
293                        version as u64,
294                        timestamp,
295                        content_type,
296                        content,
297                        batch_id,
298                        trace_parent,
299                        payload_hash.unwrap_or_default(),
300                        home_instance_id,
301                        state,
302                        access_count as u64,
303                        last_accessed as u64,
304                    );
305                    Ok(Some(item))
306                }
307                None => Ok(None),
308            }
309        })
310        .await
311    }
312
313    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
314        let id = item.object_id.clone();
315        let version = item.version as i64;
316        let timestamp = item.updated_at;
317        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
318        let audit_json = Self::build_audit_json(item);
319        let state = item.state.clone();
320        
321        // Determine payload storage based on content type
322        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
323            ContentType::Json => {
324                let json_str = String::from_utf8_lossy(&item.content).to_string();
325                (Some(json_str), None)
326            }
327            ContentType::Binary => {
328                (None, Some(item.content.clone()))
329            }
330        };
331
332        let sql = if self.is_sqlite {
333            // Only mark merkle_dirty if payload actually changed (content-addressed)
334            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
335             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
336             ON CONFLICT(id) DO UPDATE SET 
337                version = excluded.version, 
338                timestamp = excluded.timestamp, 
339                payload_hash = excluded.payload_hash, 
340                payload = excluded.payload, 
341                payload_blob = excluded.payload_blob, 
342                audit = excluded.audit, 
343                merkle_dirty = CASE WHEN sync_items.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE sync_items.merkle_dirty END, 
344                state = excluded.state,
345                access_count = excluded.access_count,
346                last_accessed = excluded.last_accessed"
347        } else {
348            // MySQL version: only mark merkle_dirty if payload_hash changed
349            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
350             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
351             ON DUPLICATE KEY UPDATE 
352                version = VALUES(version), 
353                timestamp = VALUES(timestamp), 
354                payload_hash = VALUES(payload_hash), 
355                payload = VALUES(payload), 
356                payload_blob = VALUES(payload_blob), 
357                audit = VALUES(audit), 
358                merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, 
359                state = VALUES(state),
360                access_count = VALUES(access_count),
361                last_accessed = VALUES(last_accessed)"
362        };
363
364        retry("sql_put", &RetryConfig::query(), || async {
365            sqlx::query(sql)
366                .bind(&id)
367                .bind(version)
368                .bind(timestamp)
369                .bind(&payload_hash)
370                .bind(&payload_json)
371                .bind(&payload_blob)
372                .bind(&audit_json)
373                .bind(&state)
374                .bind(item.access_count as i64)
375                .bind(item.last_accessed as i64)
376                .execute(&self.pool)
377                .await
378                .map_err(|e| StorageError::Backend(e.to_string()))?;
379            Ok(())
380        })
381        .await
382    }
383
384    async fn delete(&self, id: &str) -> Result<(), StorageError> {
385        let id = id.to_string();
386        retry("sql_delete", &RetryConfig::query(), || async {
387            sqlx::query("DELETE FROM sync_items WHERE id = ?")
388                .bind(&id)
389                .execute(&self.pool)
390                .await
391                .map_err(|e| StorageError::Backend(e.to_string()))?;
392            Ok(())
393        })
394        .await
395    }
396
397    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
398        let id = id.to_string();
399        retry("sql_exists", &RetryConfig::query(), || async {
400            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
401                .bind(&id)
402                .fetch_optional(&self.pool)
403                .await
404                .map_err(|e| StorageError::Backend(e.to_string()))?;
405            Ok(result.is_some())
406        })
407        .await
408    }
409    
410    /// Write a batch of items in a single multi-row INSERT with verification.
411    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
412        if items.is_empty() {
413            return Ok(BatchWriteResult {
414                batch_id: String::new(),
415                written: 0,
416                verified: true,
417            });
418        }
419
420        // Generate a unique batch ID (for audit trail, not verification)
421        let batch_id = uuid::Uuid::new_v4().to_string();
422        
423        // Stamp all items with the batch_id
424        for item in items.iter_mut() {
425            item.batch_id = Some(batch_id.clone());
426        }
427
428        // Collect IDs for verification
429        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
430
431        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
432        const CHUNK_SIZE: usize = 500;
433        let mut total_written = 0usize;
434
435        for chunk in items.chunks(CHUNK_SIZE) {
436            let written = self.put_batch_chunk(chunk, &batch_id).await?;
437            total_written += written;
438        }
439
440        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
441        let verified_count = self.verify_batch_ids(&item_ids).await?;
442        let verified = verified_count == items.len();
443
444        if !verified {
445            tracing::warn!(
446                batch_id = %batch_id,
447                expected = items.len(),
448                actual = verified_count,
449                "Batch verification mismatch"
450            );
451        }
452
453        Ok(BatchWriteResult {
454            batch_id,
455            written: total_written,
456            verified,
457        })
458    }
459
460    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
461        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
462            .bind(limit as i64)
463            .bind(offset as i64)
464            .fetch_all(&self.pool)
465            .await
466            .map_err(|e| StorageError::Backend(e.to_string()))?;
467        
468        let mut keys = Vec::with_capacity(rows.len());
469        for row in rows {
470            let id: String = row.try_get("id")
471                .map_err(|e| StorageError::Backend(e.to_string()))?;
472            keys.push(id);
473        }
474        
475        Ok(keys)
476    }
477
478    async fn count_all(&self) -> Result<u64, StorageError> {
479        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
480            .fetch_one(&self.pool)
481            .await
482            .map_err(|e| StorageError::Backend(e.to_string()))?;
483        
484        let count: i64 = result.try_get("cnt")
485            .map_err(|e| StorageError::Backend(e.to_string()))?;
486        
487        Ok(count as u64)
488    }
489}
490
491impl SqlStore {
492    /// Write a single chunk of items with content-type aware storage.
493    /// The batch_id is already embedded in each item's audit JSON.
494    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
495        let placeholders: Vec<String> = (0..chunk.len())
496            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
497            .collect();
498        
499        let sql = if self.is_sqlite {
500            format!(
501                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
502                 ON CONFLICT(id) DO UPDATE SET \
503                    version = excluded.version, \
504                    timestamp = excluded.timestamp, \
505                    payload_hash = excluded.payload_hash, \
506                    payload = excluded.payload, \
507                    payload_blob = excluded.payload_blob, \
508                    audit = excluded.audit, \
509                    merkle_dirty = CASE WHEN sync_items.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE sync_items.merkle_dirty END, \
510                    state = excluded.state, \
511                    access_count = excluded.access_count, \
512                    last_accessed = excluded.last_accessed",
513                placeholders.join(", ")
514            )
515        } else {
516            format!(
517                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
518                 ON DUPLICATE KEY UPDATE \
519                    version = VALUES(version), \
520                    timestamp = VALUES(timestamp), \
521                    payload_hash = VALUES(payload_hash), \
522                    payload = VALUES(payload), \
523                    payload_blob = VALUES(payload_blob), \
524                    audit = VALUES(audit), \
525                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
526                    state = VALUES(state), \
527                    access_count = VALUES(access_count), \
528                    last_accessed = VALUES(last_accessed)",
529                placeholders.join(", ")
530            )
531        };
532
533        // Prepare all items with their fields
534        #[derive(Clone)]
535        struct PreparedRow {
536            id: String,
537            version: i64,
538            timestamp: i64,
539            payload_hash: Option<String>,
540            payload_json: Option<String>,
541            payload_blob: Option<Vec<u8>>,
542            audit_json: Option<String>,
543            state: String,
544            access_count: i64,
545            last_accessed: i64,
546        }
547        
548        let prepared: Vec<PreparedRow> = chunk.iter()
549            .map(|item| {
550                let (payload_json, payload_blob) = match item.content_type {
551                    ContentType::Json => {
552                        let json_str = String::from_utf8_lossy(&item.content).to_string();
553                        (Some(json_str), None)
554                    }
555                    ContentType::Binary => {
556                        (None, Some(item.content.clone()))
557                    }
558                };
559                
560                PreparedRow {
561                    id: item.object_id.clone(),
562                    version: item.version as i64,
563                    timestamp: item.updated_at,
564                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
565                    payload_json,
566                    payload_blob,
567                    audit_json: Self::build_audit_json(item),
568                    state: item.state.clone(),
569                    access_count: item.access_count as i64,
570                    last_accessed: item.last_accessed as i64,
571                }
572            })
573            .collect();
574
575        retry("sql_put_batch", &RetryConfig::batch_write(), || {
576            let sql = sql.clone();
577            let prepared = prepared.clone();
578            async move {
579                let mut query = sqlx::query(&sql);
580                
581                for row in &prepared {
582                    query = query
583                        .bind(&row.id)
584                        .bind(row.version)
585                        .bind(row.timestamp)
586                        .bind(&row.payload_hash)
587                        .bind(&row.payload_json)
588                        .bind(&row.payload_blob)
589                        .bind(&row.audit_json)
590                        .bind(&row.state)
591                        .bind(row.access_count)
592                        .bind(row.last_accessed);
593                }
594                
595                query.execute(&self.pool)
596                    .await
597                    .map_err(|e| StorageError::Backend(e.to_string()))?;
598                
599                Ok(())
600            }
601        })
602        .await?;
603
604        Ok(chunk.len())
605    }
606
607    /// Verify a batch was written by checking all IDs exist.
608    /// This is more reliable than batch_id verification under concurrent writes.
609    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
610        if ids.is_empty() {
611            return Ok(0);
612        }
613
614        // Use chunked EXISTS queries to avoid overly large IN clauses
615        const CHUNK_SIZE: usize = 500;
616        let mut total_found = 0usize;
617
618        for chunk in ids.chunks(CHUNK_SIZE) {
619            let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
620            let sql = format!(
621                "SELECT COUNT(*) as cnt FROM sync_items WHERE id IN ({})",
622                placeholders.join(", ")
623            );
624
625            let mut query = sqlx::query(&sql);
626            for id in chunk {
627                query = query.bind(id);
628            }
629
630            let result = query
631                .fetch_one(&self.pool)
632                .await
633                .map_err(|e| StorageError::Backend(e.to_string()))?;
634
635            let count: i64 = result
636                .try_get("cnt")
637                .map_err(|e| StorageError::Backend(e.to_string()))?;
638            total_found += count as usize;
639        }
640
641        Ok(total_found)
642    }
643
644    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
645    #[allow(dead_code)]
646    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
647        let batch_id = batch_id.to_string();
648        
649        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
650        let sql = if self.is_sqlite {
651            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
652        } else {
653            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
654        };
655        
656        let bind_value = if self.is_sqlite {
657            format!("%\"batch\":\"{}%", batch_id)
658        } else {
659            batch_id.clone()
660        };
661        
662        let result = sqlx::query(sql)
663            .bind(&bind_value)
664            .fetch_one(&self.pool)
665            .await
666            .map_err(|e| StorageError::Backend(e.to_string()))?;
667        
668        let count: i64 = result.try_get("cnt")
669            .map_err(|e| StorageError::Backend(e.to_string()))?;
670        
671        Ok(count as usize)
672    }
673
674    /// Scan a batch of items (for WAL drain).
675    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
676        let rows = sqlx::query(
677            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
678        )
679            .bind(limit as i64)
680            .fetch_all(&self.pool)
681            .await
682            .map_err(|e| StorageError::Backend(e.to_string()))?;
683        
684        let mut items = Vec::with_capacity(rows.len());
685        for row in rows {
686            let id: String = row.try_get("id")
687                .map_err(|e| StorageError::Backend(e.to_string()))?;
688            let version: i64 = row.try_get("version").unwrap_or(1);
689            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
690            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
691            
692            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
693            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
694            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
695            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
696            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
697            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
698            
699            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
700            let state: String = state_bytes
701                .and_then(|bytes| String::from_utf8(bytes).ok())
702                .unwrap_or_else(|| "default".to_string());
703            
704            // Access metadata
705            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
706            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
707            
708            let (content, content_type) = if let Some(ref json_str) = payload_json {
709                (json_str.as_bytes().to_vec(), ContentType::Json)
710            } else if let Some(blob) = payload_blob {
711                (blob, ContentType::Binary)
712            } else {
713                continue; // Skip rows with no payload
714            };
715            
716            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
717            
718            let item = SyncItem::reconstruct(
719                id,
720                version as u64,
721                timestamp,
722                content_type,
723                content,
724                batch_id,
725                trace_parent,
726                payload_hash.unwrap_or_default(),
727                home_instance_id,
728                state,
729                access_count as u64,
730                last_accessed as u64,
731            );
732            items.push(item);
733        }
734        
735        Ok(items)
736    }
737
738    /// Delete multiple items by ID in a single query.
739    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
740        if ids.is_empty() {
741            return Ok(0);
742        }
743
744        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
745        let sql = format!(
746            "DELETE FROM sync_items WHERE id IN ({})",
747            placeholders.join(", ")
748        );
749
750        retry("sql_delete_batch", &RetryConfig::query(), || {
751            let sql = sql.clone();
752            let ids = ids.to_vec();
753            async move {
754                let mut query = sqlx::query(&sql);
755                for id in &ids {
756                    query = query.bind(id);
757                }
758                
759                let result = query.execute(&self.pool)
760                    .await
761                    .map_err(|e| StorageError::Backend(e.to_string()))?;
762                
763                Ok(result.rows_affected() as usize)
764            }
765        })
766        .await
767    }
768    
769    // ═══════════════════════════════════════════════════════════════════════════
770    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
771    // ═══════════════════════════════════════════════════════════════════════════
772    
773    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
774    ///
775    /// Used by background merkle processor to batch recalculate affected trees.
776    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
777        let rows = sqlx::query(
778            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
779        )
780            .bind(limit as i64)
781            .fetch_all(&self.pool)
782            .await
783            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
784        
785        let mut ids = Vec::with_capacity(rows.len());
786        for row in rows {
787            let id: String = row.try_get("id")
788                .map_err(|e| StorageError::Backend(e.to_string()))?;
789            ids.push(id);
790        }
791        
792        Ok(ids)
793    }
794    
795    /// Count items with merkle_dirty = 1.
796    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
797        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
798            .fetch_one(&self.pool)
799            .await
800            .map_err(|e| StorageError::Backend(e.to_string()))?;
801        
802        let count: i64 = result.try_get("cnt")
803            .map_err(|e| StorageError::Backend(e.to_string()))?;
804        
805        Ok(count as u64)
806    }
807    
808    /// Mark items as merkle-clean after recalculation.
809    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
810        if ids.is_empty() {
811            return Ok(0);
812        }
813        
814        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
815        let sql = format!(
816            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
817            placeholders.join(", ")
818        );
819        
820        let mut query = sqlx::query(&sql);
821        for id in ids {
822            query = query.bind(id);
823        }
824        
825        let result = query.execute(&self.pool)
826            .await
827            .map_err(|e| StorageError::Backend(e.to_string()))?;
828        
829        Ok(result.rows_affected() as usize)
830    }
831    
832    /// Check if there are any dirty merkle items.
833    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
834        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
835            .fetch_optional(&self.pool)
836            .await
837            .map_err(|e| StorageError::Backend(e.to_string()))?;
838        
839        Ok(result.is_some())
840    }
841    
842    /// Count dirty merkle items within a specific branch prefix.
843    ///
844    /// Used for branch-level hygiene checks. Returns 0 if the branch is "clean"
845    /// (all merkle hashes up-to-date), allowing safe comparison with peers.
846    ///
847    /// # Arguments
848    /// * `prefix` - Branch prefix (e.g., "uk.nhs" matches "uk.nhs.patient.123")
849    pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
850        let pattern = format!("{}%", prefix);
851        let result = sqlx::query(
852            "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
853        )
854            .bind(&pattern)
855            .fetch_one(&self.pool)
856            .await
857            .map_err(|e| StorageError::Backend(e.to_string()))?;
858        
859        let count: i64 = result.try_get("cnt")
860            .map_err(|e| StorageError::Backend(e.to_string()))?;
861        
862        Ok(count as u64)
863    }
864    
865    /// Get distinct top-level prefixes that have dirty items.
866    ///
867    /// Returns prefixes like ["uk", "us", "de"] that have pending merkle recalcs.
868    /// Branches NOT in this list are "clean" and safe to compare with peers.
869    pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
870        // Extract first segment before '.' for items with merkle_dirty = 1
871        let sql = if self.is_sqlite {
872            // SQLite: use substr and instr
873            "SELECT DISTINCT CASE 
874                WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
875                ELSE id 
876            END as prefix FROM sync_items WHERE merkle_dirty = 1"
877        } else {
878            // MySQL: use SUBSTRING_INDEX
879            "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
880        };
881        
882        let rows = sqlx::query(sql)
883            .fetch_all(&self.pool)
884            .await
885            .map_err(|e| StorageError::Backend(e.to_string()))?;
886        
887        let mut prefixes = Vec::with_capacity(rows.len());
888        for row in rows {
889            let prefix: String = row.try_get("prefix")
890                .map_err(|e| StorageError::Backend(e.to_string()))?;
891            prefixes.push(prefix);
892        }
893        
894        Ok(prefixes)
895    }
896
897    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
898    ///
899    /// Returns the items themselves so merkle can be calculated.
900    /// Use `mark_merkle_clean()` after processing to clear the flag.
901    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
902        let rows = sqlx::query(
903            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
904             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
905        )
906            .bind(limit as i64)
907            .fetch_all(&self.pool)
908            .await
909            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
910        
911        let mut items = Vec::with_capacity(rows.len());
912        for row in rows {
913            let id: String = row.try_get("id")
914                .map_err(|e| StorageError::Backend(e.to_string()))?;
915            let version: i64 = row.try_get("version").unwrap_or(1);
916            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
917            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
918            
919            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
920            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
921            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
922                String::from_utf8(bytes).ok()
923            });
924            
925            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
926            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
927            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
928                String::from_utf8(bytes).ok()
929            });
930            
931            // State field
932            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
933            let state: String = state_bytes
934                .and_then(|bytes| String::from_utf8(bytes).ok())
935                .unwrap_or_else(|| "default".to_string());
936            
937            // Access metadata
938            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
939            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
940            
941            // Determine content and content_type
942            let (content, content_type) = if let Some(ref json_str) = payload_json {
943                (json_str.as_bytes().to_vec(), ContentType::Json)
944            } else if let Some(blob) = payload_blob {
945                (blob, ContentType::Binary)
946            } else {
947                continue; // Skip items with no payload
948            };
949            
950            // Parse audit fields
951            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
952            
953            let item = SyncItem::reconstruct(
954                id,
955                version as u64,
956                timestamp,
957                content_type,
958                content,
959                batch_id,
960                trace_parent,
961                payload_hash.unwrap_or_default(),
962                home_instance_id,
963                state,
964                access_count as u64,
965                last_accessed as u64,
966            );
967            items.push(item);
968        }
969        
970        Ok(items)
971    }
972    
973    // ═══════════════════════════════════════════════════════════════════════════
974    // State-based queries: Fast indexed access by caller-defined state tag
975    // ═══════════════════════════════════════════════════════════════════════════
976    
977    /// Get items by state (e.g., "delta", "base", "pending").
978    ///
979    /// Uses indexed query for fast retrieval.
980    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
981        let rows = sqlx::query(
982            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
983             FROM sync_items WHERE state = ? LIMIT ?"
984        )
985            .bind(state)
986            .bind(limit as i64)
987            .fetch_all(&self.pool)
988            .await
989            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
990        
991        let mut items = Vec::with_capacity(rows.len());
992        for row in rows {
993            let id: String = row.try_get("id")
994                .map_err(|e| StorageError::Backend(e.to_string()))?;
995            let version: i64 = row.try_get("version").unwrap_or(1);
996            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
997            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
998            
999            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1000            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1001                .or_else(|| {
1002                    row.try_get::<Vec<u8>, _>("payload").ok()
1003                        .and_then(|bytes| String::from_utf8(bytes).ok())
1004                });
1005            
1006            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1007            
1008            // Try reading audit as String first (SQLite), then bytes (MySQL)
1009            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1010                .or_else(|| {
1011                    row.try_get::<Vec<u8>, _>("audit").ok()
1012                        .and_then(|bytes| String::from_utf8(bytes).ok())
1013                });
1014            
1015            // State field - try String first (SQLite), then bytes (MySQL)
1016            let state: String = row.try_get::<String, _>("state").ok()
1017                .or_else(|| {
1018                    row.try_get::<Vec<u8>, _>("state").ok()
1019                        .and_then(|bytes| String::from_utf8(bytes).ok())
1020                })
1021                .unwrap_or_else(|| "default".to_string());
1022            
1023            // Access metadata (local-only, not replicated)
1024            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1025            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1026            
1027            let (content, content_type) = if let Some(ref json_str) = payload_json {
1028                (json_str.as_bytes().to_vec(), ContentType::Json)
1029            } else if let Some(blob) = payload_blob {
1030                (blob, ContentType::Binary)
1031            } else {
1032                continue;
1033            };
1034            
1035            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1036            
1037            let item = SyncItem::reconstruct(
1038                id,
1039                version as u64,
1040                timestamp,
1041                content_type,
1042                content,
1043                batch_id,
1044                trace_parent,
1045                payload_hash.unwrap_or_default(),
1046                home_instance_id,
1047                state,
1048                access_count as u64,
1049                last_accessed as u64,
1050            );
1051            items.push(item);
1052        }
1053        
1054        Ok(items)
1055    }
1056    
1057    /// Count items in a given state.
1058    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1059        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1060            .bind(state)
1061            .fetch_one(&self.pool)
1062            .await
1063            .map_err(|e| StorageError::Backend(e.to_string()))?;
1064        
1065        let count: i64 = result.try_get("cnt")
1066            .map_err(|e| StorageError::Backend(e.to_string()))?;
1067        
1068        Ok(count as u64)
1069    }
1070    
1071    /// Get just the IDs of items in a given state (lightweight query).
1072    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1073        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1074            .bind(state)
1075            .bind(limit as i64)
1076            .fetch_all(&self.pool)
1077            .await
1078            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1079        
1080        let mut ids = Vec::with_capacity(rows.len());
1081        for row in rows {
1082            let id: String = row.try_get("id")
1083                .map_err(|e| StorageError::Backend(e.to_string()))?;
1084            ids.push(id);
1085        }
1086        
1087        Ok(ids)
1088    }
1089    
1090    /// Update the state of an item by ID.
1091    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1092        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1093            .bind(new_state)
1094            .bind(id)
1095            .execute(&self.pool)
1096            .await
1097            .map_err(|e| StorageError::Backend(e.to_string()))?;
1098        
1099        Ok(result.rows_affected() > 0)
1100    }
1101    
1102    /// Delete all items in a given state.
1103    ///
1104    /// Returns the number of deleted items.
1105    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1106        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1107            .bind(state)
1108            .execute(&self.pool)
1109            .await
1110            .map_err(|e| StorageError::Backend(e.to_string()))?;
1111        
1112        Ok(result.rows_affected())
1113    }
1114    
1115    /// Scan items by ID prefix.
1116    ///
1117    /// Efficiently retrieves all items whose ID starts with the given prefix.
1118    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1119    ///
1120    /// # Example
1121    /// ```rust,ignore
1122    /// // Get all deltas for object user.123
1123    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1124    /// ```
1125    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1126        // Build LIKE pattern: "prefix%"
1127        let pattern = format!("{}%", prefix);
1128        
1129        let rows = sqlx::query(
1130            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1131             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1132        )
1133            .bind(&pattern)
1134            .bind(limit as i64)
1135            .fetch_all(&self.pool)
1136            .await
1137            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1138        
1139        let mut items = Vec::with_capacity(rows.len());
1140        for row in rows {
1141            let id: String = row.try_get("id")
1142                .map_err(|e| StorageError::Backend(e.to_string()))?;
1143            let version: i64 = row.try_get("version").unwrap_or(1);
1144            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1145            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1146            
1147            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1148            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1149                .or_else(|| {
1150                    row.try_get::<Vec<u8>, _>("payload").ok()
1151                        .and_then(|bytes| String::from_utf8(bytes).ok())
1152                });
1153            
1154            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1155            
1156            // Try reading audit as String first (SQLite), then bytes (MySQL)
1157            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1158                .or_else(|| {
1159                    row.try_get::<Vec<u8>, _>("audit").ok()
1160                        .and_then(|bytes| String::from_utf8(bytes).ok())
1161                });
1162            
1163            // State field - try String first (SQLite), then bytes (MySQL)
1164            let state: String = row.try_get::<String, _>("state").ok()
1165                .or_else(|| {
1166                    row.try_get::<Vec<u8>, _>("state").ok()
1167                        .and_then(|bytes| String::from_utf8(bytes).ok())
1168                })
1169                .unwrap_or_else(|| "default".to_string());
1170            
1171            // Access metadata (local-only, not replicated)
1172            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1173            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1174            
1175            let (content, content_type) = if let Some(ref json_str) = payload_json {
1176                (json_str.as_bytes().to_vec(), ContentType::Json)
1177            } else if let Some(blob) = payload_blob {
1178                (blob, ContentType::Binary)
1179            } else {
1180                continue;
1181            };
1182            
1183            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1184            
1185            let item = SyncItem::reconstruct(
1186                id,
1187                version as u64,
1188                timestamp,
1189                content_type,
1190                content,
1191                batch_id,
1192                trace_parent,
1193                payload_hash.unwrap_or_default(),
1194                home_instance_id,
1195                state,
1196                access_count as u64,
1197                last_accessed as u64,
1198            );
1199            items.push(item);
1200        }
1201        
1202        Ok(items)
1203    }
1204    
1205    /// Count items matching an ID prefix.
1206    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1207        let pattern = format!("{}%", prefix);
1208        
1209        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1210            .bind(&pattern)
1211            .fetch_one(&self.pool)
1212            .await
1213            .map_err(|e| StorageError::Backend(e.to_string()))?;
1214        
1215        let count: i64 = result.try_get("cnt")
1216            .map_err(|e| StorageError::Backend(e.to_string()))?;
1217        
1218        Ok(count as u64)
1219    }
1220    
1221    /// Delete all items matching an ID prefix.
1222    ///
1223    /// Returns the number of deleted items.
1224    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1225        let pattern = format!("{}%", prefix);
1226        
1227        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1228            .bind(&pattern)
1229            .execute(&self.pool)
1230            .await
1231            .map_err(|e| StorageError::Backend(e.to_string()))?;
1232        
1233        Ok(result.rows_affected())
1234    }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use super::*;
1240    use std::path::PathBuf;
1241    use serde_json::json;
1242    
1243    fn temp_db_path(name: &str) -> PathBuf {
1244        // Use local temp/ folder (gitignored) instead of system temp
1245        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1246    }
1247    
1248    /// Clean up SQLite database and its WAL files
1249    fn cleanup_db(path: &PathBuf) {
1250        let _ = std::fs::remove_file(path);
1251        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1252        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1253    }
1254    
1255    fn test_item(id: &str, state: &str) -> SyncItem {
1256        SyncItem::from_json(id.to_string(), json!({"id": id}))
1257            .with_state(state)
1258    }
1259
1260    #[tokio::test]
1261    async fn test_state_stored_and_retrieved() {
1262        let db_path = temp_db_path("stored");
1263        cleanup_db(&db_path);
1264        
1265        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1266        let store = SqlStore::new(&url).await.unwrap();
1267        
1268        // Store item with custom state
1269        let item = test_item("item1", "delta");
1270        store.put(&item).await.unwrap();
1271        
1272        // Retrieve and verify state
1273        let retrieved = store.get("item1").await.unwrap().unwrap();
1274        assert_eq!(retrieved.state, "delta");
1275        
1276        cleanup_db(&db_path);
1277    }
1278
1279    #[tokio::test]
1280    async fn test_state_default_value() {
1281        let db_path = temp_db_path("default");
1282        cleanup_db(&db_path);
1283        
1284        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1285        let store = SqlStore::new(&url).await.unwrap();
1286        
1287        // Store item with default state
1288        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1289        store.put(&item).await.unwrap();
1290        
1291        let retrieved = store.get("item1").await.unwrap().unwrap();
1292        assert_eq!(retrieved.state, "default");
1293        
1294        cleanup_db(&db_path);
1295    }
1296
1297    #[tokio::test]
1298    async fn test_get_by_state() {
1299        let db_path = temp_db_path("get_by_state");
1300        cleanup_db(&db_path);
1301        
1302        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1303        let store = SqlStore::new(&url).await.unwrap();
1304        
1305        // Insert items in different states
1306        store.put(&test_item("delta1", "delta")).await.unwrap();
1307        store.put(&test_item("delta2", "delta")).await.unwrap();
1308        store.put(&test_item("base1", "base")).await.unwrap();
1309        store.put(&test_item("pending1", "pending")).await.unwrap();
1310        
1311        // Query by state
1312        let deltas = store.get_by_state("delta", 100).await.unwrap();
1313        assert_eq!(deltas.len(), 2);
1314        assert!(deltas.iter().all(|i| i.state == "delta"));
1315        
1316        let bases = store.get_by_state("base", 100).await.unwrap();
1317        assert_eq!(bases.len(), 1);
1318        assert_eq!(bases[0].object_id, "base1");
1319        
1320        // Empty result for non-existent state
1321        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1322        assert!(none.is_empty());
1323        
1324        cleanup_db(&db_path);
1325    }
1326
1327    #[tokio::test]
1328    async fn test_get_by_state_with_limit() {
1329        let db_path = temp_db_path("get_by_state_limit");
1330        cleanup_db(&db_path);
1331        
1332        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1333        let store = SqlStore::new(&url).await.unwrap();
1334        
1335        // Insert 10 items
1336        for i in 0..10 {
1337            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1338        }
1339        
1340        // Query with limit
1341        let limited = store.get_by_state("batch", 5).await.unwrap();
1342        assert_eq!(limited.len(), 5);
1343        
1344        cleanup_db(&db_path);
1345    }
1346
1347    #[tokio::test]
1348    async fn test_count_by_state() {
1349        let db_path = temp_db_path("count_by_state");
1350        cleanup_db(&db_path);
1351        
1352        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1353        let store = SqlStore::new(&url).await.unwrap();
1354        
1355        // Insert items
1356        store.put(&test_item("a1", "alpha")).await.unwrap();
1357        store.put(&test_item("a2", "alpha")).await.unwrap();
1358        store.put(&test_item("a3", "alpha")).await.unwrap();
1359        store.put(&test_item("b1", "beta")).await.unwrap();
1360        
1361        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1362        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1363        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1364        
1365        cleanup_db(&db_path);
1366    }
1367
1368    #[tokio::test]
1369    async fn test_list_state_ids() {
1370        let db_path = temp_db_path("list_state_ids");
1371        cleanup_db(&db_path);
1372        
1373        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1374        let store = SqlStore::new(&url).await.unwrap();
1375        
1376        store.put(&test_item("id1", "pending")).await.unwrap();
1377        store.put(&test_item("id2", "pending")).await.unwrap();
1378        store.put(&test_item("id3", "done")).await.unwrap();
1379        
1380        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1381        assert_eq!(pending_ids.len(), 2);
1382        assert!(pending_ids.contains(&"id1".to_string()));
1383        assert!(pending_ids.contains(&"id2".to_string()));
1384        
1385        cleanup_db(&db_path);
1386    }
1387
1388    #[tokio::test]
1389    async fn test_set_state() {
1390        let db_path = temp_db_path("set_state");
1391        cleanup_db(&db_path);
1392        
1393        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1394        let store = SqlStore::new(&url).await.unwrap();
1395        
1396        store.put(&test_item("item1", "pending")).await.unwrap();
1397        
1398        // Verify initial state
1399        let before = store.get("item1").await.unwrap().unwrap();
1400        assert_eq!(before.state, "pending");
1401        
1402        // Update state
1403        let updated = store.set_state("item1", "approved").await.unwrap();
1404        assert!(updated);
1405        
1406        // Verify new state
1407        let after = store.get("item1").await.unwrap().unwrap();
1408        assert_eq!(after.state, "approved");
1409        
1410        // Non-existent item returns false
1411        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1412        assert!(!not_found);
1413        
1414        cleanup_db(&db_path);
1415    }
1416
1417    #[tokio::test]
1418    async fn test_delete_by_state() {
1419        let db_path = temp_db_path("delete_by_state");
1420        cleanup_db(&db_path);
1421        
1422        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1423        let store = SqlStore::new(&url).await.unwrap();
1424        
1425        store.put(&test_item("keep1", "keep")).await.unwrap();
1426        store.put(&test_item("keep2", "keep")).await.unwrap();
1427        store.put(&test_item("del1", "delete_me")).await.unwrap();
1428        store.put(&test_item("del2", "delete_me")).await.unwrap();
1429        store.put(&test_item("del3", "delete_me")).await.unwrap();
1430        
1431        // Delete by state
1432        let deleted = store.delete_by_state("delete_me").await.unwrap();
1433        assert_eq!(deleted, 3);
1434        
1435        // Verify deleted
1436        assert!(store.get("del1").await.unwrap().is_none());
1437        assert!(store.get("del2").await.unwrap().is_none());
1438        
1439        // Verify others remain
1440        assert!(store.get("keep1").await.unwrap().is_some());
1441        assert!(store.get("keep2").await.unwrap().is_some());
1442        
1443        // Delete non-existent state returns 0
1444        let zero = store.delete_by_state("nonexistent").await.unwrap();
1445        assert_eq!(zero, 0);
1446        
1447        cleanup_db(&db_path);
1448    }
1449
1450    #[tokio::test]
1451    async fn test_multiple_puts_preserve_state() {
1452        let db_path = temp_db_path("multi_put_state");
1453        cleanup_db(&db_path);
1454        
1455        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1456        let store = SqlStore::new(&url).await.unwrap();
1457        
1458        // Put multiple items with different states
1459        store.put(&test_item("a", "state_a")).await.unwrap();
1460        store.put(&test_item("b", "state_b")).await.unwrap();
1461        store.put(&test_item("c", "state_c")).await.unwrap();
1462        
1463        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1464        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1465        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1466        
1467        cleanup_db(&db_path);
1468    }
1469
1470    #[tokio::test]
1471    async fn test_scan_prefix() {
1472        let db_path = temp_db_path("scan_prefix");
1473        cleanup_db(&db_path);
1474        
1475        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1476        let store = SqlStore::new(&url).await.unwrap();
1477        
1478        // Insert items with different prefixes (CRDT pattern)
1479        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1480        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1481        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1482        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1483        store.put(&test_item("base:user.123", "base")).await.unwrap();
1484        store.put(&test_item("base:user.456", "base")).await.unwrap();
1485        
1486        // Scan specific object's deltas
1487        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1488        assert_eq!(user123_deltas.len(), 3);
1489        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1490        
1491        // Scan different object
1492        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1493        assert_eq!(user456_deltas.len(), 1);
1494        
1495        // Scan all deltas
1496        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1497        assert_eq!(all_deltas.len(), 4);
1498        
1499        // Scan all bases
1500        let bases = store.scan_prefix("base:", 100).await.unwrap();
1501        assert_eq!(bases.len(), 2);
1502        
1503        // Empty result for non-matching prefix
1504        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1505        assert!(none.is_empty());
1506        
1507        cleanup_db(&db_path);
1508    }
1509
1510    #[tokio::test]
1511    async fn test_scan_prefix_with_limit() {
1512        let db_path = temp_db_path("scan_prefix_limit");
1513        cleanup_db(&db_path);
1514        
1515        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1516        let store = SqlStore::new(&url).await.unwrap();
1517        
1518        // Insert 20 items
1519        for i in 0..20 {
1520            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1521        }
1522        
1523        // Query with limit
1524        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1525        assert_eq!(limited.len(), 5);
1526        
1527        // Verify we can get all with larger limit
1528        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1529        assert_eq!(all.len(), 20);
1530        
1531        cleanup_db(&db_path);
1532    }
1533
1534    #[tokio::test]
1535    async fn test_count_prefix() {
1536        let db_path = temp_db_path("count_prefix");
1537        cleanup_db(&db_path);
1538        
1539        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1540        let store = SqlStore::new(&url).await.unwrap();
1541        
1542        // Insert items with different prefixes
1543        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1544        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1545        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1546        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1547        store.put(&test_item("base:user.123", "base")).await.unwrap();
1548        
1549        // Count by prefix
1550        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1551        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1552        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1553        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1554        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1555        
1556        cleanup_db(&db_path);
1557    }
1558
1559    #[tokio::test]
1560    async fn test_delete_prefix() {
1561        let db_path = temp_db_path("delete_prefix");
1562        cleanup_db(&db_path);
1563        
1564        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1565        let store = SqlStore::new(&url).await.unwrap();
1566        
1567        // Insert items with different prefixes
1568        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1569        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1570        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1571        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1572        store.put(&test_item("base:user.123", "base")).await.unwrap();
1573        
1574        // Delete one object's deltas
1575        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1576        assert_eq!(deleted, 3);
1577        
1578        // Verify deleted
1579        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1580        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1581        
1582        // Verify other deltas remain
1583        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1584        
1585        // Verify bases remain
1586        assert!(store.get("base:user.123").await.unwrap().is_some());
1587        
1588        // Delete non-existent prefix returns 0
1589        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1590        assert_eq!(zero, 0);
1591        
1592        cleanup_db(&db_path);
1593    }
1594}