sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50// SQLx `Any` driver requires runtime installation
51static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54    INSTALL_DRIVERS.call_once(|| {
55        sqlx::any::install_default_drivers();
56    });
57}
58
59use std::sync::Arc;
60use crate::schema::SchemaRegistry;
61
62pub struct SqlStore {
63    pool: AnyPool,
64    is_sqlite: bool,
65    /// Schema registry for table routing.
66    /// Shared with SyncEngine for dynamic registration.
67    registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
72    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73        Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74    }
75    
76    /// Create a new SQL store with a shared schema registry.
77    ///
78    /// Use this when you want the SyncEngine to share the registry
79    /// for dynamic schema registration.
80    pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81        install_drivers();
82        
83        let is_sqlite = connection_string.starts_with("sqlite:");
84        
85        // For SQLite, use bypass mode (no partitioning benefit)
86        let registry = if is_sqlite && !registry.is_bypass() {
87            Arc::new(SchemaRegistry::bypass())
88        } else {
89            registry
90        };
91        
92        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93            AnyPoolOptions::new()
94                .max_connections(20)
95                .acquire_timeout(Duration::from_secs(10))
96                .idle_timeout(Duration::from_secs(300))
97                .connect(connection_string)
98                .await
99                .map_err(|e| StorageError::Backend(e.to_string()))
100        })
101        .await?;
102
103        let store = Self { pool, is_sqlite, registry };
104        
105        // Enable WAL mode for SQLite (better concurrency, faster writes)
106        if is_sqlite {
107            store.enable_wal_mode().await?;
108        }
109        
110        store.init_schema().await?;
111        
112        // MySQL: Set session-level transaction isolation to READ COMMITTED
113        // This reduces gap locking and deadlocks under high concurrency
114        if !is_sqlite {
115            sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116                .execute(&store.pool)
117                .await
118                .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119        }
120        
121        Ok(store)
122    }
123    
124    /// Get a clone of the connection pool for sharing with other stores.
125    pub fn pool(&self) -> AnyPool {
126        self.pool.clone()
127    }
128    
129    /// Get the schema registry for this store.
130    #[must_use]
131    pub fn registry(&self) -> &Arc<SchemaRegistry> {
132        &self.registry
133    }
134    
135    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
136    /// 
137    /// Benefits:
138    /// - Concurrent reads during writes (readers don't block writers)
139    /// - Better write performance (single fsync instead of two)
140    /// - More predictable performance under load
141    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142        sqlx::query("PRAGMA journal_mode = WAL")
143            .execute(&self.pool)
144            .await
145            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146        
147        // Also set synchronous to NORMAL for better performance while still safe
148        // (FULL is default but WAL mode is safe with NORMAL)
149        sqlx::query("PRAGMA synchronous = NORMAL")
150            .execute(&self.pool)
151            .await
152            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153        
154        Ok(())
155    }
156
157    async fn init_schema(&self) -> Result<(), StorageError> {
158        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
159        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
160        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
161        //
162        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
163        // This enables efficient multi-instance coordination without locking.
164        //
165        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
166        // Indexed for fast state-based queries.
167        let sql = if self.is_sqlite {
168            r#"
169            CREATE TABLE IF NOT EXISTS sync_items (
170                id TEXT PRIMARY KEY,
171                version INTEGER NOT NULL DEFAULT 1,
172                timestamp INTEGER NOT NULL,
173                payload_hash TEXT,
174                payload TEXT,
175                payload_blob BLOB,
176                audit TEXT,
177                merkle_dirty INTEGER NOT NULL DEFAULT 1,
178                state TEXT NOT NULL DEFAULT 'default',
179                access_count INTEGER NOT NULL DEFAULT 0,
180                last_accessed INTEGER NOT NULL DEFAULT 0
181            )
182            "#
183        } else {
184            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
185            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
186            r#"
187            CREATE TABLE IF NOT EXISTS sync_items (
188                id VARCHAR(255) PRIMARY KEY,
189                version BIGINT NOT NULL DEFAULT 1,
190                timestamp BIGINT NOT NULL,
191                payload_hash VARCHAR(64),
192                payload LONGTEXT,
193                payload_blob MEDIUMBLOB,
194                audit TEXT,
195                merkle_dirty TINYINT NOT NULL DEFAULT 1,
196                state VARCHAR(32) NOT NULL DEFAULT 'default',
197                access_count BIGINT NOT NULL DEFAULT 0,
198                last_accessed BIGINT NOT NULL DEFAULT 0,
199                INDEX idx_timestamp (timestamp),
200                INDEX idx_merkle_dirty (merkle_dirty),
201                INDEX idx_state (state)
202            )
203            "#
204        };
205
206        retry("sql_init_schema", &RetryConfig::startup(), || async {
207            sqlx::query(sql)
208                .execute(&self.pool)
209                .await
210                .map_err(|e| StorageError::Backend(e.to_string()))
211        })
212        .await?;
213
214        Ok(())
215    }
216    
217    /// Create a schema-specific table with the same structure as sync_items.
218    ///
219    /// Used for horizontal partitioning: each schema (e.g., "users", "orders")
220    /// gets its own table with identical DDL. This enables:
221    /// - Better query performance (smaller tables, focused indices)
222    /// - Easier maintenance (vacuum, backup per-schema)
223    /// - Future sharding potential
224    ///
225    /// For SQLite, this is a no-op (returns Ok) since partitioning doesn't help.
226    ///
227    /// # Arguments
228    /// * `table_name` - Name of the table to create (e.g., "users_items")
229    ///
230    /// # Example
231    /// ```rust,no_run
232    /// # use sync_engine::storage::sql::SqlStore;
233    /// # async fn example(store: &SqlStore) {
234    /// store.ensure_table("users_items").await.unwrap();
235    /// store.ensure_table("orders_items").await.unwrap();
236    /// # }
237    /// ```
238    pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239        // SQLite: skip partitioning - no benefit for embedded use
240        if self.is_sqlite {
241            return Ok(());
242        }
243        
244        // Validate table name to prevent SQL injection
245        // Only allow alphanumeric and underscore
246        if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247            return Err(StorageError::Backend(format!(
248                "Invalid table name '{}': only alphanumeric and underscore allowed",
249                table_name
250            )));
251        }
252        
253        // Don't recreate the default table
254        if table_name == crate::schema::DEFAULT_TABLE {
255            return Ok(());
256        }
257        
258        // MySQL table DDL - identical structure to sync_items
259        let sql = format!(
260            r#"
261            CREATE TABLE IF NOT EXISTS {} (
262                id VARCHAR(255) PRIMARY KEY,
263                version BIGINT NOT NULL DEFAULT 1,
264                timestamp BIGINT NOT NULL,
265                payload_hash VARCHAR(64),
266                payload LONGTEXT,
267                payload_blob MEDIUMBLOB,
268                audit TEXT,
269                merkle_dirty TINYINT NOT NULL DEFAULT 1,
270                state VARCHAR(32) NOT NULL DEFAULT 'default',
271                access_count BIGINT NOT NULL DEFAULT 0,
272                last_accessed BIGINT NOT NULL DEFAULT 0,
273                INDEX idx_{}_timestamp (timestamp),
274                INDEX idx_{}_merkle_dirty (merkle_dirty),
275                INDEX idx_{}_state (state)
276            )
277            "#,
278            table_name, table_name, table_name, table_name
279        );
280
281        retry("sql_ensure_table", &RetryConfig::startup(), || async {
282            sqlx::query(&sql)
283                .execute(&self.pool)
284                .await
285                .map_err(|e| StorageError::Backend(e.to_string()))
286        })
287        .await?;
288        
289        tracing::info!(table = %table_name, "Created schema table");
290
291        Ok(())
292    }
293    
294    /// Check if a table exists.
295    pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296        if self.is_sqlite {
297            let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298            let result = sqlx::query(sql)
299                .bind(table_name)
300                .fetch_optional(&self.pool)
301                .await
302                .map_err(|e| StorageError::Backend(e.to_string()))?;
303            Ok(result.is_some())
304        } else {
305            let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306            let result = sqlx::query(sql)
307                .bind(table_name)
308                .fetch_optional(&self.pool)
309                .await
310                .map_err(|e| StorageError::Backend(e.to_string()))?;
311            Ok(result.is_some())
312        }
313    }
314    
315    /// Check if this is a SQLite database.
316    #[must_use]
317    pub fn is_sqlite(&self) -> bool {
318        self.is_sqlite
319    }
320    
321    /// Build the audit JSON object for operational metadata.
322    fn build_audit_json(item: &SyncItem) -> Option<String> {
323        let mut audit = serde_json::Map::new();
324        
325        if let Some(ref batch_id) = item.batch_id {
326            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327        }
328        if let Some(ref trace_parent) = item.trace_parent {
329            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330        }
331        if let Some(ref home) = item.home_instance_id {
332            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333        }
334        
335        if audit.is_empty() {
336            None
337        } else {
338            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339        }
340    }
341    
342    /// Parse audit JSON back into SyncItem fields.
343    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344        match audit_str {
345            Some(s) => {
346                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350                    (batch_id, trace_parent, home_instance_id)
351                } else {
352                    (None, None, None)
353                }
354            }
355            None => (None, None, None),
356        }
357    }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363        let id = id.to_string();
364        let table = self.registry.table_for_key(&id);
365        
366        retry("sql_get", &RetryConfig::query(), || async {
367            let sql = format!(
368                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369                table
370            );
371            let result = sqlx::query(&sql)
372                .bind(&id)
373                .fetch_optional(&self.pool)
374                .await
375                .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377            match result {
378                Some(row) => {
379                    let version: i64 = row.try_get("version").unwrap_or(1);
380                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382                    
383                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
384                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385                        .or_else(|| {
386                            row.try_get::<Vec<u8>, _>("payload").ok()
387                                .and_then(|bytes| String::from_utf8(bytes).ok())
388                        });
389                    
390                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391                    
392                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
393                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394                        .or_else(|| {
395                            row.try_get::<Vec<u8>, _>("audit").ok()
396                                .and_then(|bytes| String::from_utf8(bytes).ok())
397                        });
398                    
399                    // State field - try String first (SQLite), then bytes (MySQL)
400                    let state: String = row.try_get::<String, _>("state").ok()
401                        .or_else(|| {
402                            row.try_get::<Vec<u8>, _>("state").ok()
403                                .and_then(|bytes| String::from_utf8(bytes).ok())
404                        })
405                        .unwrap_or_else(|| "default".to_string());
406                    
407                    // Access metadata (local eviction stats, not replicated)
408                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410                    
411                    // Determine content and content_type
412                    let (content, content_type) = if let Some(ref json_str) = payload_json {
413                        // JSON content - parse and re-serialize to bytes
414                        let content = json_str.as_bytes().to_vec();
415                        (content, ContentType::Json)
416                    } else if let Some(blob) = payload_blob {
417                        // Binary content
418                        (blob, ContentType::Binary)
419                    } else {
420                        return Err(StorageError::Backend("No payload in row".to_string()));
421                    };
422                    
423                    // Parse audit fields
424                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425                    
426                    let item = SyncItem::reconstruct(
427                        id.clone(),
428                        version as u64,
429                        timestamp,
430                        content_type,
431                        content,
432                        batch_id,
433                        trace_parent,
434                        payload_hash.unwrap_or_default(),
435                        home_instance_id,
436                        state,
437                        access_count as u64,
438                        last_accessed as u64,
439                    );
440                    Ok(Some(item))
441                }
442                None => Ok(None),
443            }
444        })
445        .await
446    }
447
448    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449        let id = item.object_id.clone();
450        let table = self.registry.table_for_key(&id);
451        let version = item.version as i64;
452        let timestamp = item.updated_at;
453        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454        let audit_json = Self::build_audit_json(item);
455        let state = item.state.clone();
456        
457        // Determine payload storage based on content type
458        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459            ContentType::Json => {
460                let json_str = String::from_utf8_lossy(&item.content).to_string();
461                (Some(json_str), None)
462            }
463            ContentType::Binary => {
464                (None, Some(item.content.clone()))
465            }
466        };
467
468        let sql = if self.is_sqlite {
469            // Only mark merkle_dirty if payload actually changed (content-addressed)
470            format!(
471                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
472                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
473                 ON CONFLICT(id) DO UPDATE SET 
474                    version = excluded.version, 
475                    timestamp = excluded.timestamp, 
476                    payload_hash = excluded.payload_hash, 
477                    payload = excluded.payload, 
478                    payload_blob = excluded.payload_blob, 
479                    audit = excluded.audit, 
480                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, 
481                    state = excluded.state,
482                    access_count = excluded.access_count,
483                    last_accessed = excluded.last_accessed",
484                table, table, table
485            )
486        } else {
487            // MySQL version: only mark merkle_dirty if payload_hash changed
488            format!(
489                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
490                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
491                 ON DUPLICATE KEY UPDATE 
492                    version = VALUES(version), 
493                    timestamp = VALUES(timestamp), 
494                    payload_hash = VALUES(payload_hash), 
495                    payload = VALUES(payload), 
496                    payload_blob = VALUES(payload_blob), 
497                    audit = VALUES(audit), 
498                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, 
499                    state = VALUES(state),
500                    access_count = VALUES(access_count),
501                    last_accessed = VALUES(last_accessed)",
502                table
503            )
504        };
505
506        retry("sql_put", &RetryConfig::query(), || async {
507            sqlx::query(&sql)
508                .bind(&id)
509                .bind(version)
510                .bind(timestamp)
511                .bind(&payload_hash)
512                .bind(&payload_json)
513                .bind(&payload_blob)
514                .bind(&audit_json)
515                .bind(&state)
516                .bind(item.access_count as i64)
517                .bind(item.last_accessed as i64)
518                .execute(&self.pool)
519                .await
520                .map_err(|e| StorageError::Backend(e.to_string()))?;
521            Ok(())
522        })
523        .await
524    }
525
526    async fn delete(&self, id: &str) -> Result<(), StorageError> {
527        let id = id.to_string();
528        let table = self.registry.table_for_key(&id);
529        retry("sql_delete", &RetryConfig::query(), || async {
530            let sql = format!("DELETE FROM {} WHERE id = ?", table);
531            sqlx::query(&sql)
532                .bind(&id)
533                .execute(&self.pool)
534                .await
535                .map_err(|e| StorageError::Backend(e.to_string()))?;
536            Ok(())
537        })
538        .await
539    }
540
541    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542        let id = id.to_string();
543        let table = self.registry.table_for_key(&id);
544        retry("sql_exists", &RetryConfig::query(), || async {
545            let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546            let result = sqlx::query(&sql)
547                .bind(&id)
548                .fetch_optional(&self.pool)
549                .await
550                .map_err(|e| StorageError::Backend(e.to_string()))?;
551            Ok(result.is_some())
552        })
553        .await
554    }
555    
556    /// Write a batch of items in a single multi-row INSERT with verification.
557    /// 
558    /// Items are grouped by target table (based on schema registry) and written
559    /// in separate batches per table.
560    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561        if items.is_empty() {
562            return Ok(BatchWriteResult {
563                batch_id: String::new(),
564                written: 0,
565                verified: true,
566            });
567        }
568
569        // Generate a unique batch ID (for audit trail, not verification)
570        let batch_id = uuid::Uuid::new_v4().to_string();
571        
572        // Stamp all items with the batch_id
573        for item in items.iter_mut() {
574            item.batch_id = Some(batch_id.clone());
575        }
576
577        // Collect IDs for verification
578        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580        // Group items by target table
581        let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582        for item in items.iter() {
583            let table = self.registry.table_for_key(&item.object_id);
584            by_table.entry(table).or_default().push(item);
585        }
586
587        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
588        const CHUNK_SIZE: usize = 500;
589        let mut total_written = 0usize;
590
591        // Write each table's items
592        for (table, table_items) in by_table {
593            for chunk in table_items.chunks(CHUNK_SIZE) {
594                let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595                total_written += written;
596            }
597        }
598
599        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
600        let verified_count = self.verify_batch_ids(&item_ids).await?;
601        let verified = verified_count == items.len();
602
603        if !verified {
604            tracing::warn!(
605                batch_id = %batch_id,
606                expected = items.len(),
607                actual = verified_count,
608                "Batch verification mismatch"
609            );
610        }
611
612        Ok(BatchWriteResult {
613            batch_id,
614            written: total_written,
615            verified,
616        })
617    }
618
619    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621            .bind(limit as i64)
622            .bind(offset as i64)
623            .fetch_all(&self.pool)
624            .await
625            .map_err(|e| StorageError::Backend(e.to_string()))?;
626        
627        let mut keys = Vec::with_capacity(rows.len());
628        for row in rows {
629            let id: String = row.try_get("id")
630                .map_err(|e| StorageError::Backend(e.to_string()))?;
631            keys.push(id);
632        }
633        
634        Ok(keys)
635    }
636
637    async fn count_all(&self) -> Result<u64, StorageError> {
638        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639            .fetch_one(&self.pool)
640            .await
641            .map_err(|e| StorageError::Backend(e.to_string()))?;
642        
643        let count: i64 = result.try_get("cnt")
644            .map_err(|e| StorageError::Backend(e.to_string()))?;
645        
646        Ok(count as u64)
647    }
648
649    async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650        use sqlx::Row;
651        
652        let sql = format!(
653            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654             FROM sync_items WHERE {} LIMIT {}",
655            where_clause, limit
656        );
657        
658        let mut query = sqlx::query(&sql);
659        for param in params {
660            query = match param {
661                SqlParam::Text(s) => query.bind(s.clone()),
662                SqlParam::Numeric(f) => query.bind(*f),
663                SqlParam::Boolean(b) => query.bind(*b),
664            };
665        }
666        
667        let rows = query.fetch_all(&self.pool)
668            .await
669            .map_err(|e| StorageError::Backend(e.to_string()))?;
670        
671        let mut items = Vec::with_capacity(rows.len());
672        for row in rows {
673            let id: String = row.try_get("id")
674                .map_err(|e| StorageError::Backend(e.to_string()))?;
675            
676            // Reuse the get() parsing logic by fetching each item
677            // This is slightly inefficient but ensures consistent parsing
678            if let Some(item) = self.get(&id).await? {
679                items.push(item);
680            }
681        }
682        
683        Ok(items)
684    }
685
686    async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687        let sql = format!("SELECT COUNT(*) as cnt FROM sync_items WHERE {}", where_clause);
688        
689        let mut query = sqlx::query(&sql);
690        for param in params {
691            query = match param {
692                SqlParam::Text(s) => query.bind(s.clone()),
693                SqlParam::Numeric(f) => query.bind(*f),
694                SqlParam::Boolean(b) => query.bind(*b),
695            };
696        }
697        
698        let result = query.fetch_one(&self.pool)
699            .await
700            .map_err(|e| StorageError::Backend(e.to_string()))?;
701        
702        let count: i64 = result.try_get("cnt")
703            .map_err(|e| StorageError::Backend(e.to_string()))?;
704        
705        Ok(count as u64)
706    }
707}
708
709impl SqlStore {
710    /// Write a single chunk of items to a specific table with content-type aware storage.
711    /// The batch_id is already embedded in each item's audit JSON.
712    async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
713        let placeholders: Vec<String> = (0..chunk.len())
714            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
715            .collect();
716        
717        let sql = if self.is_sqlite {
718            format!(
719                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
720                 ON CONFLICT(id) DO UPDATE SET \
721                    version = excluded.version, \
722                    timestamp = excluded.timestamp, \
723                    payload_hash = excluded.payload_hash, \
724                    payload = excluded.payload, \
725                    payload_blob = excluded.payload_blob, \
726                    audit = excluded.audit, \
727                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
728                    state = excluded.state, \
729                    access_count = excluded.access_count, \
730                    last_accessed = excluded.last_accessed",
731                table, placeholders.join(", "), table, table
732            )
733        } else {
734            format!(
735                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
736                 ON DUPLICATE KEY UPDATE \
737                    version = VALUES(version), \
738                    timestamp = VALUES(timestamp), \
739                    payload_hash = VALUES(payload_hash), \
740                    payload = VALUES(payload), \
741                    payload_blob = VALUES(payload_blob), \
742                    audit = VALUES(audit), \
743                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
744                    state = VALUES(state), \
745                    access_count = VALUES(access_count), \
746                    last_accessed = VALUES(last_accessed)",
747                table, placeholders.join(", ")
748            )
749        };
750
751        // Prepare all items with their fields
752        #[derive(Clone)]
753        struct PreparedRow {
754            id: String,
755            version: i64,
756            timestamp: i64,
757            payload_hash: Option<String>,
758            payload_json: Option<String>,
759            payload_blob: Option<Vec<u8>>,
760            audit_json: Option<String>,
761            state: String,
762            access_count: i64,
763            last_accessed: i64,
764        }
765        
766        let prepared: Vec<PreparedRow> = chunk.iter()
767            .map(|item| {
768                let (payload_json, payload_blob) = match item.content_type {
769                    ContentType::Json => {
770                        let json_str = String::from_utf8_lossy(&item.content).to_string();
771                        (Some(json_str), None)
772                    }
773                    ContentType::Binary => {
774                        (None, Some(item.content.clone()))
775                    }
776                };
777                
778                PreparedRow {
779                    id: item.object_id.clone(),
780                    version: item.version as i64,
781                    timestamp: item.updated_at,
782                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
783                    payload_json,
784                    payload_blob,
785                    audit_json: Self::build_audit_json(item),
786                    state: item.state.clone(),
787                    access_count: item.access_count as i64,
788                    last_accessed: item.last_accessed as i64,
789                }
790            })
791            .collect();
792
793        retry("sql_put_batch", &RetryConfig::batch_write(), || {
794            let sql = sql.clone();
795            let prepared = prepared.clone();
796            async move {
797                let mut query = sqlx::query(&sql);
798                
799                for row in &prepared {
800                    query = query
801                        .bind(&row.id)
802                        .bind(row.version)
803                        .bind(row.timestamp)
804                        .bind(&row.payload_hash)
805                        .bind(&row.payload_json)
806                        .bind(&row.payload_blob)
807                        .bind(&row.audit_json)
808                        .bind(&row.state)
809                        .bind(row.access_count)
810                        .bind(row.last_accessed);
811                }
812                
813                query.execute(&self.pool)
814                    .await
815                    .map_err(|e| StorageError::Backend(e.to_string()))?;
816                
817                Ok(())
818            }
819        })
820        .await?;
821
822        Ok(chunk.len())
823    }
824
825    /// Verify a batch was written by checking all IDs exist.
826    /// This is more reliable than batch_id verification under concurrent writes.
827    /// Groups IDs by table for proper routing.
828    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
829        if ids.is_empty() {
830            return Ok(0);
831        }
832
833        // Group IDs by table
834        let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
835        for id in ids {
836            let table = self.registry.table_for_key(id);
837            by_table.entry(table).or_default().push(id);
838        }
839
840        // Use chunked EXISTS queries to avoid overly large IN clauses
841        const CHUNK_SIZE: usize = 500;
842        let mut total_found = 0usize;
843
844        for (table, table_ids) in by_table {
845            for chunk in table_ids.chunks(CHUNK_SIZE) {
846                let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
847                let sql = format!(
848                    "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
849                    table, placeholders.join(", ")
850                );
851
852                let mut query = sqlx::query(&sql);
853                for id in chunk {
854                    query = query.bind(*id);
855                }
856
857                let result = query
858                    .fetch_one(&self.pool)
859                    .await
860                    .map_err(|e| StorageError::Backend(e.to_string()))?;
861
862                let count: i64 = result
863                    .try_get("cnt")
864                    .map_err(|e| StorageError::Backend(e.to_string()))?;
865                total_found += count as usize;
866            }
867        }
868
869        Ok(total_found)
870    }
871
872    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
873    #[allow(dead_code)]
874    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
875        let batch_id = batch_id.to_string();
876        
877        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
878        let sql = if self.is_sqlite {
879            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
880        } else {
881            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
882        };
883        
884        let bind_value = if self.is_sqlite {
885            format!("%\"batch\":\"{}%", batch_id)
886        } else {
887            batch_id.clone()
888        };
889        
890        let result = sqlx::query(sql)
891            .bind(&bind_value)
892            .fetch_one(&self.pool)
893            .await
894            .map_err(|e| StorageError::Backend(e.to_string()))?;
895        
896        let count: i64 = result.try_get("cnt")
897            .map_err(|e| StorageError::Backend(e.to_string()))?;
898        
899        Ok(count as usize)
900    }
901
902    /// Scan a batch of items (for WAL drain).
903    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
904        let rows = sqlx::query(
905            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
906        )
907            .bind(limit as i64)
908            .fetch_all(&self.pool)
909            .await
910            .map_err(|e| StorageError::Backend(e.to_string()))?;
911        
912        let mut items = Vec::with_capacity(rows.len());
913        for row in rows {
914            let id: String = row.try_get("id")
915                .map_err(|e| StorageError::Backend(e.to_string()))?;
916            let version: i64 = row.try_get("version").unwrap_or(1);
917            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
918            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
919            
920            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
921            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
922            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
923            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
924            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
925            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
926            
927            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
928            let state: String = state_bytes
929                .and_then(|bytes| String::from_utf8(bytes).ok())
930                .unwrap_or_else(|| "default".to_string());
931            
932            // Access metadata
933            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
934            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
935            
936            let (content, content_type) = if let Some(ref json_str) = payload_json {
937                (json_str.as_bytes().to_vec(), ContentType::Json)
938            } else if let Some(blob) = payload_blob {
939                (blob, ContentType::Binary)
940            } else {
941                continue; // Skip rows with no payload
942            };
943            
944            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
945            
946            let item = SyncItem::reconstruct(
947                id,
948                version as u64,
949                timestamp,
950                content_type,
951                content,
952                batch_id,
953                trace_parent,
954                payload_hash.unwrap_or_default(),
955                home_instance_id,
956                state,
957                access_count as u64,
958                last_accessed as u64,
959            );
960            items.push(item);
961        }
962        
963        Ok(items)
964    }
965
966    /// Delete multiple items by ID in a single query.
967    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
968        if ids.is_empty() {
969            return Ok(0);
970        }
971
972        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
973        let sql = format!(
974            "DELETE FROM sync_items WHERE id IN ({})",
975            placeholders.join(", ")
976        );
977
978        retry("sql_delete_batch", &RetryConfig::query(), || {
979            let sql = sql.clone();
980            let ids = ids.to_vec();
981            async move {
982                let mut query = sqlx::query(&sql);
983                for id in &ids {
984                    query = query.bind(id);
985                }
986                
987                let result = query.execute(&self.pool)
988                    .await
989                    .map_err(|e| StorageError::Backend(e.to_string()))?;
990                
991                Ok(result.rows_affected() as usize)
992            }
993        })
994        .await
995    }
996    
997    // ═══════════════════════════════════════════════════════════════════════════
998    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
999    // ═══════════════════════════════════════════════════════════════════════════
1000    
1001    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
1002    ///
1003    /// Used by background merkle processor to batch recalculate affected trees.
1004    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1005        let rows = sqlx::query(
1006            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1007        )
1008            .bind(limit as i64)
1009            .fetch_all(&self.pool)
1010            .await
1011            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1012        
1013        let mut ids = Vec::with_capacity(rows.len());
1014        for row in rows {
1015            let id: String = row.try_get("id")
1016                .map_err(|e| StorageError::Backend(e.to_string()))?;
1017            ids.push(id);
1018        }
1019        
1020        Ok(ids)
1021    }
1022    
1023    /// Count items with merkle_dirty = 1.
1024    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1025        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1026            .fetch_one(&self.pool)
1027            .await
1028            .map_err(|e| StorageError::Backend(e.to_string()))?;
1029        
1030        let count: i64 = result.try_get("cnt")
1031            .map_err(|e| StorageError::Backend(e.to_string()))?;
1032        
1033        Ok(count as u64)
1034    }
1035    
1036    /// Mark items as merkle-clean after recalculation.
1037    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1038        if ids.is_empty() {
1039            return Ok(0);
1040        }
1041        
1042        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1043        let sql = format!(
1044            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
1045            placeholders.join(", ")
1046        );
1047        
1048        let mut query = sqlx::query(&sql);
1049        for id in ids {
1050            query = query.bind(id);
1051        }
1052        
1053        let result = query.execute(&self.pool)
1054            .await
1055            .map_err(|e| StorageError::Backend(e.to_string()))?;
1056        
1057        Ok(result.rows_affected() as usize)
1058    }
1059    
1060    /// Check if there are any dirty merkle items.
1061    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1062        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1063            .fetch_optional(&self.pool)
1064            .await
1065            .map_err(|e| StorageError::Backend(e.to_string()))?;
1066        
1067        Ok(result.is_some())
1068    }
1069    
1070    /// Count dirty merkle items within a specific branch prefix.
1071    ///
1072    /// Used for branch-level hygiene checks. Returns 0 if the branch is "clean"
1073    /// (all merkle hashes up-to-date), allowing safe comparison with peers.
1074    ///
1075    /// # Arguments
1076    /// * `prefix` - Branch prefix (e.g., "uk.nhs" matches "uk.nhs.patient.123")
1077    pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1078        let pattern = format!("{}%", prefix);
1079        let result = sqlx::query(
1080            "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1081        )
1082            .bind(&pattern)
1083            .fetch_one(&self.pool)
1084            .await
1085            .map_err(|e| StorageError::Backend(e.to_string()))?;
1086        
1087        let count: i64 = result.try_get("cnt")
1088            .map_err(|e| StorageError::Backend(e.to_string()))?;
1089        
1090        Ok(count as u64)
1091    }
1092    
1093    /// Get distinct top-level prefixes that have dirty items.
1094    ///
1095    /// Returns prefixes like ["uk", "us", "de"] that have pending merkle recalcs.
1096    /// Branches NOT in this list are "clean" and safe to compare with peers.
1097    pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1098        // Extract first segment before '.' for items with merkle_dirty = 1
1099        let sql = if self.is_sqlite {
1100            // SQLite: use substr and instr
1101            "SELECT DISTINCT CASE 
1102                WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1103                ELSE id 
1104            END as prefix FROM sync_items WHERE merkle_dirty = 1"
1105        } else {
1106            // MySQL: use SUBSTRING_INDEX
1107            "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1108        };
1109        
1110        let rows = sqlx::query(sql)
1111            .fetch_all(&self.pool)
1112            .await
1113            .map_err(|e| StorageError::Backend(e.to_string()))?;
1114        
1115        let mut prefixes = Vec::with_capacity(rows.len());
1116        for row in rows {
1117            let prefix: String = row.try_get("prefix")
1118                .map_err(|e| StorageError::Backend(e.to_string()))?;
1119            prefixes.push(prefix);
1120        }
1121        
1122        Ok(prefixes)
1123    }
1124
1125    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
1126    ///
1127    /// Returns the items themselves so merkle can be calculated.
1128    /// Use `mark_merkle_clean()` after processing to clear the flag.
1129    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1130        let rows = sqlx::query(
1131            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1132             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1133        )
1134            .bind(limit as i64)
1135            .fetch_all(&self.pool)
1136            .await
1137            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
1138        
1139        let mut items = Vec::with_capacity(rows.len());
1140        for row in rows {
1141            let id: String = row.try_get("id")
1142                .map_err(|e| StorageError::Backend(e.to_string()))?;
1143            let version: i64 = row.try_get("version").unwrap_or(1);
1144            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1145            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1146            
1147            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
1148            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1149            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1150                String::from_utf8(bytes).ok()
1151            });
1152            
1153            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1154            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1155            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1156                String::from_utf8(bytes).ok()
1157            });
1158            
1159            // State field
1160            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1161            let state: String = state_bytes
1162                .and_then(|bytes| String::from_utf8(bytes).ok())
1163                .unwrap_or_else(|| "default".to_string());
1164            
1165            // Access metadata
1166            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1167            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1168            
1169            // Determine content and content_type
1170            let (content, content_type) = if let Some(ref json_str) = payload_json {
1171                (json_str.as_bytes().to_vec(), ContentType::Json)
1172            } else if let Some(blob) = payload_blob {
1173                (blob, ContentType::Binary)
1174            } else {
1175                continue; // Skip items with no payload
1176            };
1177            
1178            // Parse audit fields
1179            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1180            
1181            let item = SyncItem::reconstruct(
1182                id,
1183                version as u64,
1184                timestamp,
1185                content_type,
1186                content,
1187                batch_id,
1188                trace_parent,
1189                payload_hash.unwrap_or_default(),
1190                home_instance_id,
1191                state,
1192                access_count as u64,
1193                last_accessed as u64,
1194            );
1195            items.push(item);
1196        }
1197        
1198        Ok(items)
1199    }
1200    
1201    // ═══════════════════════════════════════════════════════════════════════════
1202    // State-based queries: Fast indexed access by caller-defined state tag
1203    // ═══════════════════════════════════════════════════════════════════════════
1204    
1205    /// Get items by state (e.g., "delta", "base", "pending").
1206    ///
1207    /// Uses indexed query for fast retrieval.
1208    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1209        let rows = sqlx::query(
1210            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1211             FROM sync_items WHERE state = ? LIMIT ?"
1212        )
1213            .bind(state)
1214            .bind(limit as i64)
1215            .fetch_all(&self.pool)
1216            .await
1217            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1218        
1219        let mut items = Vec::with_capacity(rows.len());
1220        for row in rows {
1221            let id: String = row.try_get("id")
1222                .map_err(|e| StorageError::Backend(e.to_string()))?;
1223            let version: i64 = row.try_get("version").unwrap_or(1);
1224            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1225            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1226            
1227            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1228            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1229                .or_else(|| {
1230                    row.try_get::<Vec<u8>, _>("payload").ok()
1231                        .and_then(|bytes| String::from_utf8(bytes).ok())
1232                });
1233            
1234            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1235            
1236            // Try reading audit as String first (SQLite), then bytes (MySQL)
1237            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1238                .or_else(|| {
1239                    row.try_get::<Vec<u8>, _>("audit").ok()
1240                        .and_then(|bytes| String::from_utf8(bytes).ok())
1241                });
1242            
1243            // State field - try String first (SQLite), then bytes (MySQL)
1244            let state: String = row.try_get::<String, _>("state").ok()
1245                .or_else(|| {
1246                    row.try_get::<Vec<u8>, _>("state").ok()
1247                        .and_then(|bytes| String::from_utf8(bytes).ok())
1248                })
1249                .unwrap_or_else(|| "default".to_string());
1250            
1251            // Access metadata (local-only, not replicated)
1252            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1253            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1254            
1255            let (content, content_type) = if let Some(ref json_str) = payload_json {
1256                (json_str.as_bytes().to_vec(), ContentType::Json)
1257            } else if let Some(blob) = payload_blob {
1258                (blob, ContentType::Binary)
1259            } else {
1260                continue;
1261            };
1262            
1263            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1264            
1265            let item = SyncItem::reconstruct(
1266                id,
1267                version as u64,
1268                timestamp,
1269                content_type,
1270                content,
1271                batch_id,
1272                trace_parent,
1273                payload_hash.unwrap_or_default(),
1274                home_instance_id,
1275                state,
1276                access_count as u64,
1277                last_accessed as u64,
1278            );
1279            items.push(item);
1280        }
1281        
1282        Ok(items)
1283    }
1284    
1285    /// Count items in a given state.
1286    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1287        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1288            .bind(state)
1289            .fetch_one(&self.pool)
1290            .await
1291            .map_err(|e| StorageError::Backend(e.to_string()))?;
1292        
1293        let count: i64 = result.try_get("cnt")
1294            .map_err(|e| StorageError::Backend(e.to_string()))?;
1295        
1296        Ok(count as u64)
1297    }
1298    
1299    /// Get just the IDs of items in a given state (lightweight query).
1300    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1301        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1302            .bind(state)
1303            .bind(limit as i64)
1304            .fetch_all(&self.pool)
1305            .await
1306            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1307        
1308        let mut ids = Vec::with_capacity(rows.len());
1309        for row in rows {
1310            let id: String = row.try_get("id")
1311                .map_err(|e| StorageError::Backend(e.to_string()))?;
1312            ids.push(id);
1313        }
1314        
1315        Ok(ids)
1316    }
1317    
1318    /// Update the state of an item by ID.
1319    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1320        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1321            .bind(new_state)
1322            .bind(id)
1323            .execute(&self.pool)
1324            .await
1325            .map_err(|e| StorageError::Backend(e.to_string()))?;
1326        
1327        Ok(result.rows_affected() > 0)
1328    }
1329    
1330    /// Delete all items in a given state.
1331    ///
1332    /// Returns the number of deleted items.
1333    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1334        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1335            .bind(state)
1336            .execute(&self.pool)
1337            .await
1338            .map_err(|e| StorageError::Backend(e.to_string()))?;
1339        
1340        Ok(result.rows_affected())
1341    }
1342    
1343    /// Scan items by ID prefix.
1344    ///
1345    /// Efficiently retrieves all items whose ID starts with the given prefix.
1346    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1347    ///
1348    /// # Example
1349    /// ```rust,ignore
1350    /// // Get all deltas for object user.123
1351    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1352    /// ```
1353    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1354        // Build LIKE pattern: "prefix%"
1355        let pattern = format!("{}%", prefix);
1356        
1357        let rows = sqlx::query(
1358            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1359             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1360        )
1361            .bind(&pattern)
1362            .bind(limit as i64)
1363            .fetch_all(&self.pool)
1364            .await
1365            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1366        
1367        let mut items = Vec::with_capacity(rows.len());
1368        for row in rows {
1369            let id: String = row.try_get("id")
1370                .map_err(|e| StorageError::Backend(e.to_string()))?;
1371            let version: i64 = row.try_get("version").unwrap_or(1);
1372            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1373            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1374            
1375            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1376            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1377                .or_else(|| {
1378                    row.try_get::<Vec<u8>, _>("payload").ok()
1379                        .and_then(|bytes| String::from_utf8(bytes).ok())
1380                });
1381            
1382            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1383            
1384            // Try reading audit as String first (SQLite), then bytes (MySQL)
1385            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1386                .or_else(|| {
1387                    row.try_get::<Vec<u8>, _>("audit").ok()
1388                        .and_then(|bytes| String::from_utf8(bytes).ok())
1389                });
1390            
1391            // State field - try String first (SQLite), then bytes (MySQL)
1392            let state: String = row.try_get::<String, _>("state").ok()
1393                .or_else(|| {
1394                    row.try_get::<Vec<u8>, _>("state").ok()
1395                        .and_then(|bytes| String::from_utf8(bytes).ok())
1396                })
1397                .unwrap_or_else(|| "default".to_string());
1398            
1399            // Access metadata (local-only, not replicated)
1400            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1401            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1402            
1403            let (content, content_type) = if let Some(ref json_str) = payload_json {
1404                (json_str.as_bytes().to_vec(), ContentType::Json)
1405            } else if let Some(blob) = payload_blob {
1406                (blob, ContentType::Binary)
1407            } else {
1408                continue;
1409            };
1410            
1411            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1412            
1413            let item = SyncItem::reconstruct(
1414                id,
1415                version as u64,
1416                timestamp,
1417                content_type,
1418                content,
1419                batch_id,
1420                trace_parent,
1421                payload_hash.unwrap_or_default(),
1422                home_instance_id,
1423                state,
1424                access_count as u64,
1425                last_accessed as u64,
1426            );
1427            items.push(item);
1428        }
1429        
1430        Ok(items)
1431    }
1432    
1433    /// Count items matching an ID prefix.
1434    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1435        let pattern = format!("{}%", prefix);
1436        
1437        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1438            .bind(&pattern)
1439            .fetch_one(&self.pool)
1440            .await
1441            .map_err(|e| StorageError::Backend(e.to_string()))?;
1442        
1443        let count: i64 = result.try_get("cnt")
1444            .map_err(|e| StorageError::Backend(e.to_string()))?;
1445        
1446        Ok(count as u64)
1447    }
1448    
1449    /// Delete all items matching an ID prefix.
1450    ///
1451    /// Returns the number of deleted items.
1452    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1453        let pattern = format!("{}%", prefix);
1454        
1455        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1456            .bind(&pattern)
1457            .execute(&self.pool)
1458            .await
1459            .map_err(|e| StorageError::Backend(e.to_string()))?;
1460        
1461        Ok(result.rows_affected())
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468    use std::path::PathBuf;
1469    use serde_json::json;
1470    
1471    fn temp_db_path(name: &str) -> PathBuf {
1472        // Use local temp/ folder (gitignored) instead of system temp
1473        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1474    }
1475    
1476    /// Clean up SQLite database and its WAL files
1477    fn cleanup_db(path: &PathBuf) {
1478        let _ = std::fs::remove_file(path);
1479        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1480        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1481    }
1482    
1483    fn test_item(id: &str, state: &str) -> SyncItem {
1484        SyncItem::from_json(id.to_string(), json!({"id": id}))
1485            .with_state(state)
1486    }
1487
1488    #[tokio::test]
1489    async fn test_state_stored_and_retrieved() {
1490        let db_path = temp_db_path("stored");
1491        cleanup_db(&db_path);
1492        
1493        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1494        let store = SqlStore::new(&url).await.unwrap();
1495        
1496        // Store item with custom state
1497        let item = test_item("item1", "delta");
1498        store.put(&item).await.unwrap();
1499        
1500        // Retrieve and verify state
1501        let retrieved = store.get("item1").await.unwrap().unwrap();
1502        assert_eq!(retrieved.state, "delta");
1503        
1504        cleanup_db(&db_path);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_state_default_value() {
1509        let db_path = temp_db_path("default");
1510        cleanup_db(&db_path);
1511        
1512        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1513        let store = SqlStore::new(&url).await.unwrap();
1514        
1515        // Store item with default state
1516        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1517        store.put(&item).await.unwrap();
1518        
1519        let retrieved = store.get("item1").await.unwrap().unwrap();
1520        assert_eq!(retrieved.state, "default");
1521        
1522        cleanup_db(&db_path);
1523    }
1524
1525    #[tokio::test]
1526    async fn test_get_by_state() {
1527        let db_path = temp_db_path("get_by_state");
1528        cleanup_db(&db_path);
1529        
1530        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1531        let store = SqlStore::new(&url).await.unwrap();
1532        
1533        // Insert items in different states
1534        store.put(&test_item("delta1", "delta")).await.unwrap();
1535        store.put(&test_item("delta2", "delta")).await.unwrap();
1536        store.put(&test_item("base1", "base")).await.unwrap();
1537        store.put(&test_item("pending1", "pending")).await.unwrap();
1538        
1539        // Query by state
1540        let deltas = store.get_by_state("delta", 100).await.unwrap();
1541        assert_eq!(deltas.len(), 2);
1542        assert!(deltas.iter().all(|i| i.state == "delta"));
1543        
1544        let bases = store.get_by_state("base", 100).await.unwrap();
1545        assert_eq!(bases.len(), 1);
1546        assert_eq!(bases[0].object_id, "base1");
1547        
1548        // Empty result for non-existent state
1549        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1550        assert!(none.is_empty());
1551        
1552        cleanup_db(&db_path);
1553    }
1554
1555    #[tokio::test]
1556    async fn test_get_by_state_with_limit() {
1557        let db_path = temp_db_path("get_by_state_limit");
1558        cleanup_db(&db_path);
1559        
1560        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1561        let store = SqlStore::new(&url).await.unwrap();
1562        
1563        // Insert 10 items
1564        for i in 0..10 {
1565            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1566        }
1567        
1568        // Query with limit
1569        let limited = store.get_by_state("batch", 5).await.unwrap();
1570        assert_eq!(limited.len(), 5);
1571        
1572        cleanup_db(&db_path);
1573    }
1574
1575    #[tokio::test]
1576    async fn test_count_by_state() {
1577        let db_path = temp_db_path("count_by_state");
1578        cleanup_db(&db_path);
1579        
1580        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1581        let store = SqlStore::new(&url).await.unwrap();
1582        
1583        // Insert items
1584        store.put(&test_item("a1", "alpha")).await.unwrap();
1585        store.put(&test_item("a2", "alpha")).await.unwrap();
1586        store.put(&test_item("a3", "alpha")).await.unwrap();
1587        store.put(&test_item("b1", "beta")).await.unwrap();
1588        
1589        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1590        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1591        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1592        
1593        cleanup_db(&db_path);
1594    }
1595
1596    #[tokio::test]
1597    async fn test_list_state_ids() {
1598        let db_path = temp_db_path("list_state_ids");
1599        cleanup_db(&db_path);
1600        
1601        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1602        let store = SqlStore::new(&url).await.unwrap();
1603        
1604        store.put(&test_item("id1", "pending")).await.unwrap();
1605        store.put(&test_item("id2", "pending")).await.unwrap();
1606        store.put(&test_item("id3", "done")).await.unwrap();
1607        
1608        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1609        assert_eq!(pending_ids.len(), 2);
1610        assert!(pending_ids.contains(&"id1".to_string()));
1611        assert!(pending_ids.contains(&"id2".to_string()));
1612        
1613        cleanup_db(&db_path);
1614    }
1615
1616    #[tokio::test]
1617    async fn test_set_state() {
1618        let db_path = temp_db_path("set_state");
1619        cleanup_db(&db_path);
1620        
1621        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1622        let store = SqlStore::new(&url).await.unwrap();
1623        
1624        store.put(&test_item("item1", "pending")).await.unwrap();
1625        
1626        // Verify initial state
1627        let before = store.get("item1").await.unwrap().unwrap();
1628        assert_eq!(before.state, "pending");
1629        
1630        // Update state
1631        let updated = store.set_state("item1", "approved").await.unwrap();
1632        assert!(updated);
1633        
1634        // Verify new state
1635        let after = store.get("item1").await.unwrap().unwrap();
1636        assert_eq!(after.state, "approved");
1637        
1638        // Non-existent item returns false
1639        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1640        assert!(!not_found);
1641        
1642        cleanup_db(&db_path);
1643    }
1644
1645    #[tokio::test]
1646    async fn test_delete_by_state() {
1647        let db_path = temp_db_path("delete_by_state");
1648        cleanup_db(&db_path);
1649        
1650        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1651        let store = SqlStore::new(&url).await.unwrap();
1652        
1653        store.put(&test_item("keep1", "keep")).await.unwrap();
1654        store.put(&test_item("keep2", "keep")).await.unwrap();
1655        store.put(&test_item("del1", "delete_me")).await.unwrap();
1656        store.put(&test_item("del2", "delete_me")).await.unwrap();
1657        store.put(&test_item("del3", "delete_me")).await.unwrap();
1658        
1659        // Delete by state
1660        let deleted = store.delete_by_state("delete_me").await.unwrap();
1661        assert_eq!(deleted, 3);
1662        
1663        // Verify deleted
1664        assert!(store.get("del1").await.unwrap().is_none());
1665        assert!(store.get("del2").await.unwrap().is_none());
1666        
1667        // Verify others remain
1668        assert!(store.get("keep1").await.unwrap().is_some());
1669        assert!(store.get("keep2").await.unwrap().is_some());
1670        
1671        // Delete non-existent state returns 0
1672        let zero = store.delete_by_state("nonexistent").await.unwrap();
1673        assert_eq!(zero, 0);
1674        
1675        cleanup_db(&db_path);
1676    }
1677
1678    #[tokio::test]
1679    async fn test_multiple_puts_preserve_state() {
1680        let db_path = temp_db_path("multi_put_state");
1681        cleanup_db(&db_path);
1682        
1683        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1684        let store = SqlStore::new(&url).await.unwrap();
1685        
1686        // Put multiple items with different states
1687        store.put(&test_item("a", "state_a")).await.unwrap();
1688        store.put(&test_item("b", "state_b")).await.unwrap();
1689        store.put(&test_item("c", "state_c")).await.unwrap();
1690        
1691        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1692        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1693        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1694        
1695        cleanup_db(&db_path);
1696    }
1697
1698    #[tokio::test]
1699    async fn test_scan_prefix() {
1700        let db_path = temp_db_path("scan_prefix");
1701        cleanup_db(&db_path);
1702        
1703        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1704        let store = SqlStore::new(&url).await.unwrap();
1705        
1706        // Insert items with different prefixes (CRDT pattern)
1707        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1708        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1709        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1710        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1711        store.put(&test_item("base:user.123", "base")).await.unwrap();
1712        store.put(&test_item("base:user.456", "base")).await.unwrap();
1713        
1714        // Scan specific object's deltas
1715        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1716        assert_eq!(user123_deltas.len(), 3);
1717        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1718        
1719        // Scan different object
1720        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1721        assert_eq!(user456_deltas.len(), 1);
1722        
1723        // Scan all deltas
1724        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1725        assert_eq!(all_deltas.len(), 4);
1726        
1727        // Scan all bases
1728        let bases = store.scan_prefix("base:", 100).await.unwrap();
1729        assert_eq!(bases.len(), 2);
1730        
1731        // Empty result for non-matching prefix
1732        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1733        assert!(none.is_empty());
1734        
1735        cleanup_db(&db_path);
1736    }
1737
1738    #[tokio::test]
1739    async fn test_scan_prefix_with_limit() {
1740        let db_path = temp_db_path("scan_prefix_limit");
1741        cleanup_db(&db_path);
1742        
1743        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1744        let store = SqlStore::new(&url).await.unwrap();
1745        
1746        // Insert 20 items
1747        for i in 0..20 {
1748            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1749        }
1750        
1751        // Query with limit
1752        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1753        assert_eq!(limited.len(), 5);
1754        
1755        // Verify we can get all with larger limit
1756        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1757        assert_eq!(all.len(), 20);
1758        
1759        cleanup_db(&db_path);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_count_prefix() {
1764        let db_path = temp_db_path("count_prefix");
1765        cleanup_db(&db_path);
1766        
1767        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1768        let store = SqlStore::new(&url).await.unwrap();
1769        
1770        // Insert items with different prefixes
1771        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1772        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1773        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1774        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1775        store.put(&test_item("base:user.123", "base")).await.unwrap();
1776        
1777        // Count by prefix
1778        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1779        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1780        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1781        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1782        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1783        
1784        cleanup_db(&db_path);
1785    }
1786
1787    #[tokio::test]
1788    async fn test_delete_prefix() {
1789        let db_path = temp_db_path("delete_prefix");
1790        cleanup_db(&db_path);
1791        
1792        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1793        let store = SqlStore::new(&url).await.unwrap();
1794        
1795        // Insert items with different prefixes
1796        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1797        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1798        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1799        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1800        store.put(&test_item("base:user.123", "base")).await.unwrap();
1801        
1802        // Delete one object's deltas
1803        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1804        assert_eq!(deleted, 3);
1805        
1806        // Verify deleted
1807        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1808        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1809        
1810        // Verify other deltas remain
1811        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1812        
1813        // Verify bases remain
1814        assert!(store.get("base:user.123").await.unwrap().is_some());
1815        
1816        // Delete non-existent prefix returns 0
1817        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1818        assert_eq!(zero, 0);
1819        
1820        cleanup_db(&db_path);
1821    }
1822}