sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50// SQLx `Any` driver requires runtime installation
51static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54    INSTALL_DRIVERS.call_once(|| {
55        sqlx::any::install_default_drivers();
56    });
57}
58
59use std::sync::Arc;
60use crate::schema::{SchemaRegistry, DEFAULT_TABLE};
61
62pub struct SqlStore {
63    pool: AnyPool,
64    is_sqlite: bool,
65    /// Schema registry for table routing.
66    /// Shared with SyncEngine for dynamic registration.
67    registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
72    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73        Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74    }
75    
76    /// Create a new SQL store with a shared schema registry.
77    ///
78    /// Use this when you want the SyncEngine to share the registry
79    /// for dynamic schema registration.
80    pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81        install_drivers();
82        
83        let is_sqlite = connection_string.starts_with("sqlite:");
84        
85        // For SQLite, use bypass mode (no partitioning benefit)
86        let registry = if is_sqlite && !registry.is_bypass() {
87            Arc::new(SchemaRegistry::bypass())
88        } else {
89            registry
90        };
91        
92        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93            AnyPoolOptions::new()
94                .max_connections(20)
95                .acquire_timeout(Duration::from_secs(10))
96                .idle_timeout(Duration::from_secs(300))
97                .connect(connection_string)
98                .await
99                .map_err(|e| StorageError::Backend(e.to_string()))
100        })
101        .await?;
102
103        let store = Self { pool, is_sqlite, registry };
104        
105        // Enable WAL mode for SQLite (better concurrency, faster writes)
106        if is_sqlite {
107            store.enable_wal_mode().await?;
108        }
109        
110        store.init_schema().await?;
111        
112        // MySQL: Set session-level transaction isolation to READ COMMITTED
113        // This reduces gap locking and deadlocks under high concurrency
114        if !is_sqlite {
115            sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116                .execute(&store.pool)
117                .await
118                .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119        }
120        
121        Ok(store)
122    }
123    
124    /// Get a clone of the connection pool for sharing with other stores.
125    pub fn pool(&self) -> AnyPool {
126        self.pool.clone()
127    }
128    
129    /// Get the schema registry for this store.
130    #[must_use]
131    pub fn registry(&self) -> &Arc<SchemaRegistry> {
132        &self.registry
133    }
134    
135    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
136    /// 
137    /// Benefits:
138    /// - Concurrent reads during writes (readers don't block writers)
139    /// - Better write performance (single fsync instead of two)
140    /// - More predictable performance under load
141    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142        sqlx::query("PRAGMA journal_mode = WAL")
143            .execute(&self.pool)
144            .await
145            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146        
147        // Also set synchronous to NORMAL for better performance while still safe
148        // (FULL is default but WAL mode is safe with NORMAL)
149        sqlx::query("PRAGMA synchronous = NORMAL")
150            .execute(&self.pool)
151            .await
152            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153        
154        Ok(())
155    }
156
157    async fn init_schema(&self) -> Result<(), StorageError> {
158        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
159        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
160        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
161        //
162        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
163        // This enables efficient multi-instance coordination without locking.
164        //
165        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
166        // Indexed for fast state-based queries.
167        let sql = if self.is_sqlite {
168            r#"
169            CREATE TABLE IF NOT EXISTS sync_items (
170                id TEXT PRIMARY KEY,
171                version INTEGER NOT NULL DEFAULT 1,
172                timestamp INTEGER NOT NULL,
173                payload_hash TEXT,
174                payload TEXT,
175                payload_blob BLOB,
176                audit TEXT,
177                merkle_dirty INTEGER NOT NULL DEFAULT 1,
178                state TEXT NOT NULL DEFAULT 'default',
179                access_count INTEGER NOT NULL DEFAULT 0,
180                last_accessed INTEGER NOT NULL DEFAULT 0
181            )
182            "#
183        } else {
184            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
185            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
186            r#"
187            CREATE TABLE IF NOT EXISTS sync_items (
188                id VARCHAR(255) PRIMARY KEY,
189                version BIGINT NOT NULL DEFAULT 1,
190                timestamp BIGINT NOT NULL,
191                payload_hash VARCHAR(64),
192                payload LONGTEXT,
193                payload_blob MEDIUMBLOB,
194                audit TEXT,
195                merkle_dirty TINYINT NOT NULL DEFAULT 1,
196                state VARCHAR(32) NOT NULL DEFAULT 'default',
197                access_count BIGINT NOT NULL DEFAULT 0,
198                last_accessed BIGINT NOT NULL DEFAULT 0,
199                INDEX idx_timestamp (timestamp),
200                INDEX idx_merkle_dirty (merkle_dirty),
201                INDEX idx_state (state)
202            )
203            "#
204        };
205
206        retry("sql_init_schema", &RetryConfig::startup(), || async {
207            sqlx::query(sql)
208                .execute(&self.pool)
209                .await
210                .map_err(|e| StorageError::Backend(e.to_string()))
211        })
212        .await?;
213
214        Ok(())
215    }
216    
217    /// Create a schema-specific table with the same structure as sync_items.
218    ///
219    /// Used for horizontal partitioning: each schema (e.g., "users", "orders")
220    /// gets its own table with identical DDL. This enables:
221    /// - Better query performance (smaller tables, focused indices)
222    /// - Easier maintenance (vacuum, backup per-schema)
223    /// - Future sharding potential
224    ///
225    /// For SQLite, this is a no-op (returns Ok) since partitioning doesn't help.
226    ///
227    /// # Arguments
228    /// * `table_name` - Name of the table to create (e.g., "users_items")
229    ///
230    /// # Example
231    /// ```rust,no_run
232    /// # use sync_engine::storage::sql::SqlStore;
233    /// # async fn example(store: &SqlStore) {
234    /// store.ensure_table("users_items").await.unwrap();
235    /// store.ensure_table("orders_items").await.unwrap();
236    /// # }
237    /// ```
238    pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239        // SQLite: skip partitioning - no benefit for embedded use
240        if self.is_sqlite {
241            return Ok(());
242        }
243        
244        // Validate table name to prevent SQL injection
245        // Only allow alphanumeric and underscore
246        if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247            return Err(StorageError::Backend(format!(
248                "Invalid table name '{}': only alphanumeric and underscore allowed",
249                table_name
250            )));
251        }
252        
253        // Don't recreate the default table
254        if table_name == crate::schema::DEFAULT_TABLE {
255            return Ok(());
256        }
257        
258        // MySQL table DDL - identical structure to sync_items
259        let sql = format!(
260            r#"
261            CREATE TABLE IF NOT EXISTS {} (
262                id VARCHAR(255) PRIMARY KEY,
263                version BIGINT NOT NULL DEFAULT 1,
264                timestamp BIGINT NOT NULL,
265                payload_hash VARCHAR(64),
266                payload LONGTEXT,
267                payload_blob MEDIUMBLOB,
268                audit TEXT,
269                merkle_dirty TINYINT NOT NULL DEFAULT 1,
270                state VARCHAR(32) NOT NULL DEFAULT 'default',
271                access_count BIGINT NOT NULL DEFAULT 0,
272                last_accessed BIGINT NOT NULL DEFAULT 0,
273                INDEX idx_{}_timestamp (timestamp),
274                INDEX idx_{}_merkle_dirty (merkle_dirty),
275                INDEX idx_{}_state (state)
276            )
277            "#,
278            table_name, table_name, table_name, table_name
279        );
280
281        retry("sql_ensure_table", &RetryConfig::startup(), || async {
282            sqlx::query(&sql)
283                .execute(&self.pool)
284                .await
285                .map_err(|e| StorageError::Backend(e.to_string()))
286        })
287        .await?;
288        
289        tracing::info!(table = %table_name, "Created schema table");
290
291        Ok(())
292    }
293    
294    /// Check if a table exists.
295    pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296        if self.is_sqlite {
297            let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298            let result = sqlx::query(sql)
299                .bind(table_name)
300                .fetch_optional(&self.pool)
301                .await
302                .map_err(|e| StorageError::Backend(e.to_string()))?;
303            Ok(result.is_some())
304        } else {
305            let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306            let result = sqlx::query(sql)
307                .bind(table_name)
308                .fetch_optional(&self.pool)
309                .await
310                .map_err(|e| StorageError::Backend(e.to_string()))?;
311            Ok(result.is_some())
312        }
313    }
314    
315    /// Check if this is a SQLite database.
316    #[must_use]
317    pub fn is_sqlite(&self) -> bool {
318        self.is_sqlite
319    }
320    
321    /// Build the audit JSON object for operational metadata.
322    fn build_audit_json(item: &SyncItem) -> Option<String> {
323        let mut audit = serde_json::Map::new();
324        
325        if let Some(ref batch_id) = item.batch_id {
326            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327        }
328        if let Some(ref trace_parent) = item.trace_parent {
329            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330        }
331        if let Some(ref home) = item.home_instance_id {
332            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333        }
334        
335        if audit.is_empty() {
336            None
337        } else {
338            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339        }
340    }
341    
342    /// Parse audit JSON back into SyncItem fields.
343    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344        match audit_str {
345            Some(s) => {
346                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350                    (batch_id, trace_parent, home_instance_id)
351                } else {
352                    (None, None, None)
353                }
354            }
355            None => (None, None, None),
356        }
357    }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363        let id = id.to_string();
364        let table = self.registry.table_for_key(&id);
365        
366        retry("sql_get", &RetryConfig::query(), || async {
367            let sql = format!(
368                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369                table
370            );
371            let result = sqlx::query(&sql)
372                .bind(&id)
373                .fetch_optional(&self.pool)
374                .await
375                .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377            match result {
378                Some(row) => {
379                    let version: i64 = row.try_get("version").unwrap_or(1);
380                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382                    
383                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
384                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385                        .or_else(|| {
386                            row.try_get::<Vec<u8>, _>("payload").ok()
387                                .and_then(|bytes| String::from_utf8(bytes).ok())
388                        });
389                    
390                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391                    
392                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
393                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394                        .or_else(|| {
395                            row.try_get::<Vec<u8>, _>("audit").ok()
396                                .and_then(|bytes| String::from_utf8(bytes).ok())
397                        });
398                    
399                    // State field - try String first (SQLite), then bytes (MySQL)
400                    let state: String = row.try_get::<String, _>("state").ok()
401                        .or_else(|| {
402                            row.try_get::<Vec<u8>, _>("state").ok()
403                                .and_then(|bytes| String::from_utf8(bytes).ok())
404                        })
405                        .unwrap_or_else(|| "default".to_string());
406                    
407                    // Access metadata (local eviction stats, not replicated)
408                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410                    
411                    // Determine content and content_type
412                    let (content, content_type) = if let Some(ref json_str) = payload_json {
413                        // JSON content - parse and re-serialize to bytes
414                        let content = json_str.as_bytes().to_vec();
415                        (content, ContentType::Json)
416                    } else if let Some(blob) = payload_blob {
417                        // Binary content
418                        (blob, ContentType::Binary)
419                    } else {
420                        return Err(StorageError::Backend("No payload in row".to_string()));
421                    };
422                    
423                    // Parse audit fields
424                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425                    
426                    let item = SyncItem::reconstruct(
427                        id.clone(),
428                        version as u64,
429                        timestamp,
430                        content_type,
431                        content,
432                        batch_id,
433                        trace_parent,
434                        payload_hash.unwrap_or_default(),
435                        home_instance_id,
436                        state,
437                        access_count as u64,
438                        last_accessed as u64,
439                    );
440                    Ok(Some(item))
441                }
442                None => Ok(None),
443            }
444        })
445        .await
446    }
447
448    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449        let id = item.object_id.clone();
450        let table = self.registry.table_for_key(&id);
451        let version = item.version as i64;
452        let timestamp = item.updated_at;
453        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454        let audit_json = Self::build_audit_json(item);
455        let state = item.state.clone();
456        
457        // Determine payload storage based on content type
458        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459            ContentType::Json => {
460                let json_str = String::from_utf8_lossy(&item.content).to_string();
461                (Some(json_str), None)
462            }
463            ContentType::Binary => {
464                (None, Some(item.content.clone()))
465            }
466        };
467
468        let sql = if self.is_sqlite {
469            // Only mark merkle_dirty if payload actually changed (content-addressed)
470            format!(
471                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
472                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
473                 ON CONFLICT(id) DO UPDATE SET 
474                    version = excluded.version, 
475                    timestamp = excluded.timestamp, 
476                    payload_hash = excluded.payload_hash, 
477                    payload = excluded.payload, 
478                    payload_blob = excluded.payload_blob, 
479                    audit = excluded.audit, 
480                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, 
481                    state = excluded.state,
482                    access_count = excluded.access_count,
483                    last_accessed = excluded.last_accessed",
484                table, table, table
485            )
486        } else {
487            // MySQL version: only mark merkle_dirty if payload_hash changed
488            format!(
489                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
490                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
491                 ON DUPLICATE KEY UPDATE 
492                    version = VALUES(version), 
493                    timestamp = VALUES(timestamp), 
494                    payload_hash = VALUES(payload_hash), 
495                    payload = VALUES(payload), 
496                    payload_blob = VALUES(payload_blob), 
497                    audit = VALUES(audit), 
498                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, 
499                    state = VALUES(state),
500                    access_count = VALUES(access_count),
501                    last_accessed = VALUES(last_accessed)",
502                table
503            )
504        };
505
506        retry("sql_put", &RetryConfig::query(), || async {
507            sqlx::query(&sql)
508                .bind(&id)
509                .bind(version)
510                .bind(timestamp)
511                .bind(&payload_hash)
512                .bind(&payload_json)
513                .bind(&payload_blob)
514                .bind(&audit_json)
515                .bind(&state)
516                .bind(item.access_count as i64)
517                .bind(item.last_accessed as i64)
518                .execute(&self.pool)
519                .await
520                .map_err(|e| StorageError::Backend(e.to_string()))?;
521            Ok(())
522        })
523        .await
524    }
525
526    async fn delete(&self, id: &str) -> Result<(), StorageError> {
527        let id = id.to_string();
528        let table = self.registry.table_for_key(&id);
529        retry("sql_delete", &RetryConfig::query(), || async {
530            let sql = format!("DELETE FROM {} WHERE id = ?", table);
531            sqlx::query(&sql)
532                .bind(&id)
533                .execute(&self.pool)
534                .await
535                .map_err(|e| StorageError::Backend(e.to_string()))?;
536            Ok(())
537        })
538        .await
539    }
540
541    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542        let id = id.to_string();
543        let table = self.registry.table_for_key(&id);
544        retry("sql_exists", &RetryConfig::query(), || async {
545            let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546            let result = sqlx::query(&sql)
547                .bind(&id)
548                .fetch_optional(&self.pool)
549                .await
550                .map_err(|e| StorageError::Backend(e.to_string()))?;
551            Ok(result.is_some())
552        })
553        .await
554    }
555    
556    /// Write a batch of items in a single multi-row INSERT with verification.
557    /// 
558    /// Items are grouped by target table (based on schema registry) and written
559    /// in separate batches per table.
560    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561        if items.is_empty() {
562            return Ok(BatchWriteResult {
563                batch_id: String::new(),
564                written: 0,
565                verified: true,
566            });
567        }
568
569        // Generate a unique batch ID (for audit trail, not verification)
570        let batch_id = uuid::Uuid::new_v4().to_string();
571        
572        // Stamp all items with the batch_id
573        for item in items.iter_mut() {
574            item.batch_id = Some(batch_id.clone());
575        }
576
577        // Collect IDs for verification
578        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580        // Group items by target table
581        let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582        for item in items.iter() {
583            let table = self.registry.table_for_key(&item.object_id);
584            by_table.entry(table).or_default().push(item);
585        }
586
587        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
588        const CHUNK_SIZE: usize = 500;
589        let mut total_written = 0usize;
590
591        // Write each table's items
592        for (table, table_items) in by_table {
593            for chunk in table_items.chunks(CHUNK_SIZE) {
594                let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595                total_written += written;
596            }
597        }
598
599        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
600        let verified_count = self.verify_batch_ids(&item_ids).await?;
601        let verified = verified_count == items.len();
602
603        if !verified {
604            tracing::warn!(
605                batch_id = %batch_id,
606                expected = items.len(),
607                actual = verified_count,
608                "Batch verification mismatch"
609            );
610        }
611
612        Ok(BatchWriteResult {
613            batch_id,
614            written: total_written,
615            verified,
616        })
617    }
618
619    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621            .bind(limit as i64)
622            .bind(offset as i64)
623            .fetch_all(&self.pool)
624            .await
625            .map_err(|e| StorageError::Backend(e.to_string()))?;
626        
627        let mut keys = Vec::with_capacity(rows.len());
628        for row in rows {
629            let id: String = row.try_get("id")
630                .map_err(|e| StorageError::Backend(e.to_string()))?;
631            keys.push(id);
632        }
633        
634        Ok(keys)
635    }
636
637    async fn count_all(&self) -> Result<u64, StorageError> {
638        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639            .fetch_one(&self.pool)
640            .await
641            .map_err(|e| StorageError::Backend(e.to_string()))?;
642        
643        let count: i64 = result.try_get("cnt")
644            .map_err(|e| StorageError::Backend(e.to_string()))?;
645        
646        Ok(count as u64)
647    }
648
649    async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650        use sqlx::Row;
651        
652        let sql = format!(
653            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654             FROM sync_items WHERE {} LIMIT {}",
655            where_clause, limit
656        );
657        
658        let mut query = sqlx::query(&sql);
659        for param in params {
660            query = match param {
661                SqlParam::Text(s) => query.bind(s.clone()),
662                SqlParam::Numeric(f) => query.bind(*f),
663                SqlParam::Boolean(b) => query.bind(*b),
664            };
665        }
666        
667        let rows = query.fetch_all(&self.pool)
668            .await
669            .map_err(|e| StorageError::Backend(e.to_string()))?;
670        
671        let mut items = Vec::with_capacity(rows.len());
672        for row in rows {
673            let id: String = row.try_get("id")
674                .map_err(|e| StorageError::Backend(e.to_string()))?;
675            
676            // Reuse the get() parsing logic by fetching each item
677            // This is slightly inefficient but ensures consistent parsing
678            if let Some(item) = self.get(&id).await? {
679                items.push(item);
680            }
681        }
682        
683        Ok(items)
684    }
685
686    async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687        self.count_where_in_table(crate::schema::DEFAULT_TABLE, where_clause, params).await
688    }
689}
690
691impl SqlStore {
692    /// Search in a specific table using a WHERE clause.
693    pub async fn search_in_table(
694        &self,
695        table: &str,
696        where_clause: &str,
697        params: &[SqlParam],
698        limit: usize,
699    ) -> Result<Vec<SyncItem>, StorageError> {
700        use sqlx::Row;
701        
702        // Validate table name to prevent SQL injection
703        if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
704            return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
705        }
706        
707        let sql = format!(
708            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
709             FROM {} WHERE {} LIMIT {}",
710            table, where_clause, limit
711        );
712        
713        let mut query = sqlx::query(&sql);
714        for param in params {
715            query = match param {
716                SqlParam::Text(s) => query.bind(s.clone()),
717                SqlParam::Numeric(f) => query.bind(*f),
718                SqlParam::Boolean(b) => query.bind(*b),
719            };
720        }
721        
722        let rows = query.fetch_all(&self.pool)
723            .await
724            .map_err(|e| StorageError::Backend(e.to_string()))?;
725        
726        let mut items = Vec::with_capacity(rows.len());
727        for row in rows {
728            let id: String = row.try_get("id")
729                .map_err(|e| StorageError::Backend(e.to_string()))?;
730            
731            // Use get() which already routes to the correct table via registry
732            if let Some(item) = self.get(&id).await? {
733                items.push(item);
734            }
735        }
736        
737        Ok(items)
738    }
739
740    /// Count items in a specific table matching a WHERE clause.
741    pub async fn count_where_in_table(
742        &self,
743        table: &str,
744        where_clause: &str,
745        params: &[SqlParam],
746    ) -> Result<u64, StorageError> {
747        use sqlx::Row;
748        
749        // Validate table name to prevent SQL injection
750        if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
751            return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
752        }
753        
754        let sql = format!("SELECT COUNT(*) as cnt FROM {} WHERE {}", table, where_clause);
755        
756        let mut query = sqlx::query(&sql);
757        for param in params {
758            query = match param {
759                SqlParam::Text(s) => query.bind(s.clone()),
760                SqlParam::Numeric(f) => query.bind(*f),
761                SqlParam::Boolean(b) => query.bind(*b),
762            };
763        }
764        
765        let result = query.fetch_one(&self.pool)
766            .await
767            .map_err(|e| StorageError::Backend(e.to_string()))?;
768        
769        let count: i64 = result.try_get("cnt")
770            .map_err(|e| StorageError::Backend(e.to_string()))?;
771        
772        Ok(count as u64)
773    }
774
775    /// Write a single chunk of items to a specific table with content-type aware storage.
776    /// The batch_id is already embedded in each item's audit JSON.
777    async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
778        let placeholders: Vec<String> = (0..chunk.len())
779            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
780            .collect();
781        
782        let sql = if self.is_sqlite {
783            format!(
784                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
785                 ON CONFLICT(id) DO UPDATE SET \
786                    version = excluded.version, \
787                    timestamp = excluded.timestamp, \
788                    payload_hash = excluded.payload_hash, \
789                    payload = excluded.payload, \
790                    payload_blob = excluded.payload_blob, \
791                    audit = excluded.audit, \
792                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
793                    state = excluded.state, \
794                    access_count = excluded.access_count, \
795                    last_accessed = excluded.last_accessed",
796                table, placeholders.join(", "), table, table
797            )
798        } else {
799            format!(
800                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
801                 ON DUPLICATE KEY UPDATE \
802                    version = VALUES(version), \
803                    timestamp = VALUES(timestamp), \
804                    payload_hash = VALUES(payload_hash), \
805                    payload = VALUES(payload), \
806                    payload_blob = VALUES(payload_blob), \
807                    audit = VALUES(audit), \
808                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
809                    state = VALUES(state), \
810                    access_count = VALUES(access_count), \
811                    last_accessed = VALUES(last_accessed)",
812                table, placeholders.join(", ")
813            )
814        };
815
816        // Prepare all items with their fields
817        #[derive(Clone)]
818        struct PreparedRow {
819            id: String,
820            version: i64,
821            timestamp: i64,
822            payload_hash: Option<String>,
823            payload_json: Option<String>,
824            payload_blob: Option<Vec<u8>>,
825            audit_json: Option<String>,
826            state: String,
827            access_count: i64,
828            last_accessed: i64,
829        }
830        
831        let prepared: Vec<PreparedRow> = chunk.iter()
832            .map(|item| {
833                let (payload_json, payload_blob) = match item.content_type {
834                    ContentType::Json => {
835                        let json_str = String::from_utf8_lossy(&item.content).to_string();
836                        (Some(json_str), None)
837                    }
838                    ContentType::Binary => {
839                        (None, Some(item.content.clone()))
840                    }
841                };
842                
843                PreparedRow {
844                    id: item.object_id.clone(),
845                    version: item.version as i64,
846                    timestamp: item.updated_at,
847                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
848                    payload_json,
849                    payload_blob,
850                    audit_json: Self::build_audit_json(item),
851                    state: item.state.clone(),
852                    access_count: item.access_count as i64,
853                    last_accessed: item.last_accessed as i64,
854                }
855            })
856            .collect();
857
858        retry("sql_put_batch", &RetryConfig::batch_write(), || {
859            let sql = sql.clone();
860            let prepared = prepared.clone();
861            async move {
862                let mut query = sqlx::query(&sql);
863                
864                for row in &prepared {
865                    query = query
866                        .bind(&row.id)
867                        .bind(row.version)
868                        .bind(row.timestamp)
869                        .bind(&row.payload_hash)
870                        .bind(&row.payload_json)
871                        .bind(&row.payload_blob)
872                        .bind(&row.audit_json)
873                        .bind(&row.state)
874                        .bind(row.access_count)
875                        .bind(row.last_accessed);
876                }
877                
878                query.execute(&self.pool)
879                    .await
880                    .map_err(|e| StorageError::Backend(e.to_string()))?;
881                
882                Ok(())
883            }
884        })
885        .await?;
886
887        Ok(chunk.len())
888    }
889
890    /// Verify a batch was written by checking all IDs exist.
891    /// This is more reliable than batch_id verification under concurrent writes.
892    /// Groups IDs by table for proper routing.
893    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
894        if ids.is_empty() {
895            return Ok(0);
896        }
897
898        // Group IDs by table
899        let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
900        for id in ids {
901            let table = self.registry.table_for_key(id);
902            by_table.entry(table).or_default().push(id);
903        }
904
905        // Use chunked EXISTS queries to avoid overly large IN clauses
906        const CHUNK_SIZE: usize = 500;
907        let mut total_found = 0usize;
908
909        for (table, table_ids) in by_table {
910            for chunk in table_ids.chunks(CHUNK_SIZE) {
911                let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
912                let sql = format!(
913                    "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
914                    table, placeholders.join(", ")
915                );
916
917                let mut query = sqlx::query(&sql);
918                for id in chunk {
919                    query = query.bind(*id);
920                }
921
922                let result = query
923                    .fetch_one(&self.pool)
924                    .await
925                    .map_err(|e| StorageError::Backend(e.to_string()))?;
926
927                let count: i64 = result
928                    .try_get("cnt")
929                    .map_err(|e| StorageError::Backend(e.to_string()))?;
930                total_found += count as usize;
931            }
932        }
933
934        Ok(total_found)
935    }
936
937    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
938    #[allow(dead_code)]
939    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
940        let batch_id = batch_id.to_string();
941        
942        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
943        let sql = if self.is_sqlite {
944            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
945        } else {
946            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
947        };
948        
949        let bind_value = if self.is_sqlite {
950            format!("%\"batch\":\"{}%", batch_id)
951        } else {
952            batch_id.clone()
953        };
954        
955        let result = sqlx::query(sql)
956            .bind(&bind_value)
957            .fetch_one(&self.pool)
958            .await
959            .map_err(|e| StorageError::Backend(e.to_string()))?;
960        
961        let count: i64 = result.try_get("cnt")
962            .map_err(|e| StorageError::Backend(e.to_string()))?;
963        
964        Ok(count as usize)
965    }
966
967    /// Scan a batch of items (for WAL drain).
968    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
969        let rows = sqlx::query(
970            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
971        )
972            .bind(limit as i64)
973            .fetch_all(&self.pool)
974            .await
975            .map_err(|e| StorageError::Backend(e.to_string()))?;
976        
977        let mut items = Vec::with_capacity(rows.len());
978        for row in rows {
979            let id: String = row.try_get("id")
980                .map_err(|e| StorageError::Backend(e.to_string()))?;
981            let version: i64 = row.try_get("version").unwrap_or(1);
982            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
983            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
984            
985            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
986            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
987            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
988            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
989            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
990            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
991            
992            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
993            let state: String = state_bytes
994                .and_then(|bytes| String::from_utf8(bytes).ok())
995                .unwrap_or_else(|| "default".to_string());
996            
997            // Access metadata
998            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
999            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1000            
1001            let (content, content_type) = if let Some(ref json_str) = payload_json {
1002                (json_str.as_bytes().to_vec(), ContentType::Json)
1003            } else if let Some(blob) = payload_blob {
1004                (blob, ContentType::Binary)
1005            } else {
1006                continue; // Skip rows with no payload
1007            };
1008            
1009            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1010            
1011            let item = SyncItem::reconstruct(
1012                id,
1013                version as u64,
1014                timestamp,
1015                content_type,
1016                content,
1017                batch_id,
1018                trace_parent,
1019                payload_hash.unwrap_or_default(),
1020                home_instance_id,
1021                state,
1022                access_count as u64,
1023                last_accessed as u64,
1024            );
1025            items.push(item);
1026        }
1027        
1028        Ok(items)
1029    }
1030
1031    /// Delete multiple items by ID in a single query.
1032    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
1033        if ids.is_empty() {
1034            return Ok(0);
1035        }
1036
1037        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1038        let sql = format!(
1039            "DELETE FROM sync_items WHERE id IN ({})",
1040            placeholders.join(", ")
1041        );
1042
1043        retry("sql_delete_batch", &RetryConfig::query(), || {
1044            let sql = sql.clone();
1045            let ids = ids.to_vec();
1046            async move {
1047                let mut query = sqlx::query(&sql);
1048                for id in &ids {
1049                    query = query.bind(id);
1050                }
1051                
1052                let result = query.execute(&self.pool)
1053                    .await
1054                    .map_err(|e| StorageError::Backend(e.to_string()))?;
1055                
1056                Ok(result.rows_affected() as usize)
1057            }
1058        })
1059        .await
1060    }
1061    
1062    // ═══════════════════════════════════════════════════════════════════════════
1063    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
1064    // ═══════════════════════════════════════════════════════════════════════════
1065    
1066    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
1067    ///
1068    /// Used by background merkle processor to batch recalculate affected trees.
1069    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1070        let rows = sqlx::query(
1071            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1072        )
1073            .bind(limit as i64)
1074            .fetch_all(&self.pool)
1075            .await
1076            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1077        
1078        let mut ids = Vec::with_capacity(rows.len());
1079        for row in rows {
1080            let id: String = row.try_get("id")
1081                .map_err(|e| StorageError::Backend(e.to_string()))?;
1082            ids.push(id);
1083        }
1084        
1085        Ok(ids)
1086    }
1087    
1088    /// Count items with merkle_dirty = 1.
1089    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1090        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1091            .fetch_one(&self.pool)
1092            .await
1093            .map_err(|e| StorageError::Backend(e.to_string()))?;
1094        
1095        let count: i64 = result.try_get("cnt")
1096            .map_err(|e| StorageError::Backend(e.to_string()))?;
1097        
1098        Ok(count as u64)
1099    }
1100    
1101    /// Mark items as merkle-clean after recalculation.
1102    /// 
1103    /// Updates all schema-partitioned tables based on the item ID's routing.
1104    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1105        if ids.is_empty() {
1106            return Ok(0);
1107        }
1108        
1109        // Group IDs by their target table
1110        let mut ids_by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
1111        for id in ids {
1112            let table = self.registry.table_for_key(id);
1113            ids_by_table.entry(table).or_default().push(id);
1114        }
1115        
1116        let mut total_updated = 0usize;
1117        
1118        for (table, table_ids) in ids_by_table {
1119            let placeholders: Vec<&str> = table_ids.iter().map(|_| "?").collect();
1120            let sql = format!(
1121                "UPDATE {} SET merkle_dirty = 0 WHERE id IN ({})",
1122                table,
1123                placeholders.join(", ")
1124            );
1125            
1126            let mut query = sqlx::query(&sql);
1127            for id in &table_ids {
1128                query = query.bind(*id);
1129            }
1130            
1131            match query.execute(&self.pool).await {
1132                Ok(result) => {
1133                    total_updated += result.rows_affected() as usize;
1134                }
1135                Err(e) => {
1136                    tracing::warn!(table = %table, error = %e, "Failed to mark merkle clean in table");
1137                }
1138            }
1139        }
1140        
1141        Ok(total_updated)
1142    }
1143    
1144    /// Check if there are any dirty merkle items.
1145    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1146        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1147            .fetch_optional(&self.pool)
1148            .await
1149            .map_err(|e| StorageError::Backend(e.to_string()))?;
1150        
1151        Ok(result.is_some())
1152    }
1153    
1154    /// Count dirty merkle items within a specific branch prefix.
1155    ///
1156    /// Used for branch-level hygiene checks. Returns 0 if the branch is "clean"
1157    /// (all merkle hashes up-to-date), allowing safe comparison with peers.
1158    ///
1159    /// # Arguments
1160    /// * `prefix` - Branch prefix (e.g., "uk.nhs" matches "uk.nhs.patient.123")
1161    pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1162        let pattern = format!("{}%", prefix);
1163        let result = sqlx::query(
1164            "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1165        )
1166            .bind(&pattern)
1167            .fetch_one(&self.pool)
1168            .await
1169            .map_err(|e| StorageError::Backend(e.to_string()))?;
1170        
1171        let count: i64 = result.try_get("cnt")
1172            .map_err(|e| StorageError::Backend(e.to_string()))?;
1173        
1174        Ok(count as u64)
1175    }
1176    
1177    /// Get distinct top-level prefixes that have dirty items.
1178    ///
1179    /// Returns prefixes like ["uk", "us", "de"] that have pending merkle recalcs.
1180    /// Branches NOT in this list are "clean" and safe to compare with peers.
1181    pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1182        // Extract first segment before '.' for items with merkle_dirty = 1
1183        let sql = if self.is_sqlite {
1184            // SQLite: use substr and instr
1185            "SELECT DISTINCT CASE 
1186                WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1187                ELSE id 
1188            END as prefix FROM sync_items WHERE merkle_dirty = 1"
1189        } else {
1190            // MySQL: use SUBSTRING_INDEX
1191            "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1192        };
1193        
1194        let rows = sqlx::query(sql)
1195            .fetch_all(&self.pool)
1196            .await
1197            .map_err(|e| StorageError::Backend(e.to_string()))?;
1198        
1199        let mut prefixes = Vec::with_capacity(rows.len());
1200        for row in rows {
1201            let prefix: String = row.try_get("prefix")
1202                .map_err(|e| StorageError::Backend(e.to_string()))?;
1203            prefixes.push(prefix);
1204        }
1205        
1206        Ok(prefixes)
1207    }
1208
1209    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
1210    ///
1211    /// Queries all schema-partitioned tables plus the default table.
1212    /// Returns the items themselves so merkle can be calculated.
1213    /// Use `mark_merkle_clean()` after processing to clear the flag.
1214    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1215        // Get all tables to query (registered tables + default)
1216        let mut tables_to_query: Vec<String> = self.registry.tables();
1217        if !tables_to_query.contains(&DEFAULT_TABLE.to_string()) {
1218            tables_to_query.push(DEFAULT_TABLE.to_string());
1219        }
1220        
1221        let mut all_items = Vec::new();
1222        let per_table_limit = (limit / tables_to_query.len().max(1)).max(100);
1223        
1224        for table in &tables_to_query {
1225            let sql = format!(
1226                "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1227                 FROM {} WHERE merkle_dirty = 1 LIMIT ?",
1228                table
1229            );
1230            
1231            let rows = match sqlx::query(&sql)
1232                .bind(per_table_limit as i64)
1233                .fetch_all(&self.pool)
1234                .await 
1235            {
1236                Ok(r) => r,
1237                Err(e) => {
1238                    // Table might not exist yet - skip it
1239                    tracing::debug!(table = %table, error = %e, "Skipping table for dirty merkle query");
1240                    continue;
1241                }
1242            };
1243            
1244            for row in rows {
1245                if let Some(item) = self.row_to_sync_item(&row) {
1246                    all_items.push(item);
1247                }
1248            }
1249            
1250            // Stop early if we hit the limit
1251            if all_items.len() >= limit {
1252                break;
1253            }
1254        }
1255        
1256        // Truncate to limit
1257        all_items.truncate(limit);
1258        Ok(all_items)
1259    }
1260    
1261    /// Helper to convert a SQL row to SyncItem.
1262    fn row_to_sync_item(&self, row: &sqlx::any::AnyRow) -> Option<SyncItem> {
1263        let id: String = row.try_get("id").ok()?;
1264        let version: i64 = row.try_get("version").unwrap_or(1);
1265        let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1266        let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1267        
1268        // Handle JSON payload (MySQL returns as bytes, SQLite as string)
1269        let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1270        let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1271            String::from_utf8(bytes).ok()
1272        });
1273        
1274        let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1275        let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1276        let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1277            String::from_utf8(bytes).ok()
1278        });
1279        
1280        // State field
1281        let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1282        let state: String = state_bytes
1283            .and_then(|bytes| String::from_utf8(bytes).ok())
1284            .unwrap_or_else(|| "default".to_string());
1285        
1286        // Access metadata
1287        let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1288        let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1289        
1290        // Determine content and content_type
1291        let (content, content_type) = if let Some(ref json_str) = payload_json {
1292            (json_str.as_bytes().to_vec(), ContentType::Json)
1293        } else if let Some(blob) = payload_blob {
1294            (blob, ContentType::Binary)
1295        } else {
1296            return None; // Skip items with no payload
1297        };
1298        
1299        // Parse audit fields
1300        let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1301        
1302        Some(SyncItem::reconstruct(
1303            id,
1304            version as u64,
1305            timestamp,
1306            content_type,
1307            content,
1308            batch_id,
1309            trace_parent,
1310            payload_hash.unwrap_or_default(),
1311            home_instance_id,
1312            state,
1313            access_count as u64,
1314            last_accessed as u64,
1315        ))
1316    }
1317    
1318    // ═══════════════════════════════════════════════════════════════════════════
1319    // State-based queries: Fast indexed access by caller-defined state tag
1320    // ═══════════════════════════════════════════════════════════════════════════
1321    
1322    /// Get items by state (e.g., "delta", "base", "pending").
1323    ///
1324    /// Uses indexed query for fast retrieval.
1325    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1326        let rows = sqlx::query(
1327            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1328             FROM sync_items WHERE state = ? LIMIT ?"
1329        )
1330            .bind(state)
1331            .bind(limit as i64)
1332            .fetch_all(&self.pool)
1333            .await
1334            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1335        
1336        let mut items = Vec::with_capacity(rows.len());
1337        for row in rows {
1338            let id: String = row.try_get("id")
1339                .map_err(|e| StorageError::Backend(e.to_string()))?;
1340            let version: i64 = row.try_get("version").unwrap_or(1);
1341            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1342            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1343            
1344            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1345            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1346                .or_else(|| {
1347                    row.try_get::<Vec<u8>, _>("payload").ok()
1348                        .and_then(|bytes| String::from_utf8(bytes).ok())
1349                });
1350            
1351            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1352            
1353            // Try reading audit as String first (SQLite), then bytes (MySQL)
1354            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1355                .or_else(|| {
1356                    row.try_get::<Vec<u8>, _>("audit").ok()
1357                        .and_then(|bytes| String::from_utf8(bytes).ok())
1358                });
1359            
1360            // State field - try String first (SQLite), then bytes (MySQL)
1361            let state: String = row.try_get::<String, _>("state").ok()
1362                .or_else(|| {
1363                    row.try_get::<Vec<u8>, _>("state").ok()
1364                        .and_then(|bytes| String::from_utf8(bytes).ok())
1365                })
1366                .unwrap_or_else(|| "default".to_string());
1367            
1368            // Access metadata (local-only, not replicated)
1369            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1370            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1371            
1372            let (content, content_type) = if let Some(ref json_str) = payload_json {
1373                (json_str.as_bytes().to_vec(), ContentType::Json)
1374            } else if let Some(blob) = payload_blob {
1375                (blob, ContentType::Binary)
1376            } else {
1377                continue;
1378            };
1379            
1380            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1381            
1382            let item = SyncItem::reconstruct(
1383                id,
1384                version as u64,
1385                timestamp,
1386                content_type,
1387                content,
1388                batch_id,
1389                trace_parent,
1390                payload_hash.unwrap_or_default(),
1391                home_instance_id,
1392                state,
1393                access_count as u64,
1394                last_accessed as u64,
1395            );
1396            items.push(item);
1397        }
1398        
1399        Ok(items)
1400    }
1401    
1402    /// Count items in a given state.
1403    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1404        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1405            .bind(state)
1406            .fetch_one(&self.pool)
1407            .await
1408            .map_err(|e| StorageError::Backend(e.to_string()))?;
1409        
1410        let count: i64 = result.try_get("cnt")
1411            .map_err(|e| StorageError::Backend(e.to_string()))?;
1412        
1413        Ok(count as u64)
1414    }
1415    
1416    /// Get just the IDs of items in a given state (lightweight query).
1417    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1418        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1419            .bind(state)
1420            .bind(limit as i64)
1421            .fetch_all(&self.pool)
1422            .await
1423            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1424        
1425        let mut ids = Vec::with_capacity(rows.len());
1426        for row in rows {
1427            let id: String = row.try_get("id")
1428                .map_err(|e| StorageError::Backend(e.to_string()))?;
1429            ids.push(id);
1430        }
1431        
1432        Ok(ids)
1433    }
1434    
1435    /// Update the state of an item by ID.
1436    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1437        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1438            .bind(new_state)
1439            .bind(id)
1440            .execute(&self.pool)
1441            .await
1442            .map_err(|e| StorageError::Backend(e.to_string()))?;
1443        
1444        Ok(result.rows_affected() > 0)
1445    }
1446    
1447    /// Delete all items in a given state.
1448    ///
1449    /// Returns the number of deleted items.
1450    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1451        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1452            .bind(state)
1453            .execute(&self.pool)
1454            .await
1455            .map_err(|e| StorageError::Backend(e.to_string()))?;
1456        
1457        Ok(result.rows_affected())
1458    }
1459    
1460    /// Scan items by ID prefix.
1461    ///
1462    /// Efficiently retrieves all items whose ID starts with the given prefix.
1463    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1464    ///
1465    /// # Example
1466    /// ```rust,ignore
1467    /// // Get all deltas for object user.123
1468    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1469    /// ```
1470    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1471        // Build LIKE pattern: "prefix%"
1472        let pattern = format!("{}%", prefix);
1473        
1474        let rows = sqlx::query(
1475            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1476             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1477        )
1478            .bind(&pattern)
1479            .bind(limit as i64)
1480            .fetch_all(&self.pool)
1481            .await
1482            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1483        
1484        let mut items = Vec::with_capacity(rows.len());
1485        for row in rows {
1486            let id: String = row.try_get("id")
1487                .map_err(|e| StorageError::Backend(e.to_string()))?;
1488            let version: i64 = row.try_get("version").unwrap_or(1);
1489            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1490            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1491            
1492            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1493            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1494                .or_else(|| {
1495                    row.try_get::<Vec<u8>, _>("payload").ok()
1496                        .and_then(|bytes| String::from_utf8(bytes).ok())
1497                });
1498            
1499            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1500            
1501            // Try reading audit as String first (SQLite), then bytes (MySQL)
1502            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1503                .or_else(|| {
1504                    row.try_get::<Vec<u8>, _>("audit").ok()
1505                        .and_then(|bytes| String::from_utf8(bytes).ok())
1506                });
1507            
1508            // State field - try String first (SQLite), then bytes (MySQL)
1509            let state: String = row.try_get::<String, _>("state").ok()
1510                .or_else(|| {
1511                    row.try_get::<Vec<u8>, _>("state").ok()
1512                        .and_then(|bytes| String::from_utf8(bytes).ok())
1513                })
1514                .unwrap_or_else(|| "default".to_string());
1515            
1516            // Access metadata (local-only, not replicated)
1517            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1518            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1519            
1520            let (content, content_type) = if let Some(ref json_str) = payload_json {
1521                (json_str.as_bytes().to_vec(), ContentType::Json)
1522            } else if let Some(blob) = payload_blob {
1523                (blob, ContentType::Binary)
1524            } else {
1525                continue;
1526            };
1527            
1528            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1529            
1530            let item = SyncItem::reconstruct(
1531                id,
1532                version as u64,
1533                timestamp,
1534                content_type,
1535                content,
1536                batch_id,
1537                trace_parent,
1538                payload_hash.unwrap_or_default(),
1539                home_instance_id,
1540                state,
1541                access_count as u64,
1542                last_accessed as u64,
1543            );
1544            items.push(item);
1545        }
1546        
1547        Ok(items)
1548    }
1549    
1550    /// Count items matching an ID prefix.
1551    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1552        let pattern = format!("{}%", prefix);
1553        
1554        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1555            .bind(&pattern)
1556            .fetch_one(&self.pool)
1557            .await
1558            .map_err(|e| StorageError::Backend(e.to_string()))?;
1559        
1560        let count: i64 = result.try_get("cnt")
1561            .map_err(|e| StorageError::Backend(e.to_string()))?;
1562        
1563        Ok(count as u64)
1564    }
1565    
1566    /// Delete all items matching an ID prefix.
1567    ///
1568    /// Returns the number of deleted items.
1569    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1570        let pattern = format!("{}%", prefix);
1571        
1572        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1573            .bind(&pattern)
1574            .execute(&self.pool)
1575            .await
1576            .map_err(|e| StorageError::Backend(e.to_string()))?;
1577        
1578        Ok(result.rows_affected())
1579    }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584    use super::*;
1585    use std::path::PathBuf;
1586    use serde_json::json;
1587    
1588    fn temp_db_path(name: &str) -> PathBuf {
1589        // Use local temp/ folder (gitignored) instead of system temp
1590        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1591    }
1592    
1593    /// Clean up SQLite database and its WAL files
1594    fn cleanup_db(path: &PathBuf) {
1595        let _ = std::fs::remove_file(path);
1596        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1597        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1598    }
1599    
1600    fn test_item(id: &str, state: &str) -> SyncItem {
1601        SyncItem::from_json(id.to_string(), json!({"id": id}))
1602            .with_state(state)
1603    }
1604
1605    #[tokio::test]
1606    async fn test_state_stored_and_retrieved() {
1607        let db_path = temp_db_path("stored");
1608        cleanup_db(&db_path);
1609        
1610        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1611        let store = SqlStore::new(&url).await.unwrap();
1612        
1613        // Store item with custom state
1614        let item = test_item("item1", "delta");
1615        store.put(&item).await.unwrap();
1616        
1617        // Retrieve and verify state
1618        let retrieved = store.get("item1").await.unwrap().unwrap();
1619        assert_eq!(retrieved.state, "delta");
1620        
1621        cleanup_db(&db_path);
1622    }
1623
1624    #[tokio::test]
1625    async fn test_state_default_value() {
1626        let db_path = temp_db_path("default");
1627        cleanup_db(&db_path);
1628        
1629        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1630        let store = SqlStore::new(&url).await.unwrap();
1631        
1632        // Store item with default state
1633        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1634        store.put(&item).await.unwrap();
1635        
1636        let retrieved = store.get("item1").await.unwrap().unwrap();
1637        assert_eq!(retrieved.state, "default");
1638        
1639        cleanup_db(&db_path);
1640    }
1641
1642    #[tokio::test]
1643    async fn test_get_by_state() {
1644        let db_path = temp_db_path("get_by_state");
1645        cleanup_db(&db_path);
1646        
1647        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1648        let store = SqlStore::new(&url).await.unwrap();
1649        
1650        // Insert items in different states
1651        store.put(&test_item("delta1", "delta")).await.unwrap();
1652        store.put(&test_item("delta2", "delta")).await.unwrap();
1653        store.put(&test_item("base1", "base")).await.unwrap();
1654        store.put(&test_item("pending1", "pending")).await.unwrap();
1655        
1656        // Query by state
1657        let deltas = store.get_by_state("delta", 100).await.unwrap();
1658        assert_eq!(deltas.len(), 2);
1659        assert!(deltas.iter().all(|i| i.state == "delta"));
1660        
1661        let bases = store.get_by_state("base", 100).await.unwrap();
1662        assert_eq!(bases.len(), 1);
1663        assert_eq!(bases[0].object_id, "base1");
1664        
1665        // Empty result for non-existent state
1666        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1667        assert!(none.is_empty());
1668        
1669        cleanup_db(&db_path);
1670    }
1671
1672    #[tokio::test]
1673    async fn test_get_by_state_with_limit() {
1674        let db_path = temp_db_path("get_by_state_limit");
1675        cleanup_db(&db_path);
1676        
1677        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1678        let store = SqlStore::new(&url).await.unwrap();
1679        
1680        // Insert 10 items
1681        for i in 0..10 {
1682            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1683        }
1684        
1685        // Query with limit
1686        let limited = store.get_by_state("batch", 5).await.unwrap();
1687        assert_eq!(limited.len(), 5);
1688        
1689        cleanup_db(&db_path);
1690    }
1691
1692    #[tokio::test]
1693    async fn test_count_by_state() {
1694        let db_path = temp_db_path("count_by_state");
1695        cleanup_db(&db_path);
1696        
1697        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1698        let store = SqlStore::new(&url).await.unwrap();
1699        
1700        // Insert items
1701        store.put(&test_item("a1", "alpha")).await.unwrap();
1702        store.put(&test_item("a2", "alpha")).await.unwrap();
1703        store.put(&test_item("a3", "alpha")).await.unwrap();
1704        store.put(&test_item("b1", "beta")).await.unwrap();
1705        
1706        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1707        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1708        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1709        
1710        cleanup_db(&db_path);
1711    }
1712
1713    #[tokio::test]
1714    async fn test_list_state_ids() {
1715        let db_path = temp_db_path("list_state_ids");
1716        cleanup_db(&db_path);
1717        
1718        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1719        let store = SqlStore::new(&url).await.unwrap();
1720        
1721        store.put(&test_item("id1", "pending")).await.unwrap();
1722        store.put(&test_item("id2", "pending")).await.unwrap();
1723        store.put(&test_item("id3", "done")).await.unwrap();
1724        
1725        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1726        assert_eq!(pending_ids.len(), 2);
1727        assert!(pending_ids.contains(&"id1".to_string()));
1728        assert!(pending_ids.contains(&"id2".to_string()));
1729        
1730        cleanup_db(&db_path);
1731    }
1732
1733    #[tokio::test]
1734    async fn test_set_state() {
1735        let db_path = temp_db_path("set_state");
1736        cleanup_db(&db_path);
1737        
1738        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1739        let store = SqlStore::new(&url).await.unwrap();
1740        
1741        store.put(&test_item("item1", "pending")).await.unwrap();
1742        
1743        // Verify initial state
1744        let before = store.get("item1").await.unwrap().unwrap();
1745        assert_eq!(before.state, "pending");
1746        
1747        // Update state
1748        let updated = store.set_state("item1", "approved").await.unwrap();
1749        assert!(updated);
1750        
1751        // Verify new state
1752        let after = store.get("item1").await.unwrap().unwrap();
1753        assert_eq!(after.state, "approved");
1754        
1755        // Non-existent item returns false
1756        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1757        assert!(!not_found);
1758        
1759        cleanup_db(&db_path);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_delete_by_state() {
1764        let db_path = temp_db_path("delete_by_state");
1765        cleanup_db(&db_path);
1766        
1767        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1768        let store = SqlStore::new(&url).await.unwrap();
1769        
1770        store.put(&test_item("keep1", "keep")).await.unwrap();
1771        store.put(&test_item("keep2", "keep")).await.unwrap();
1772        store.put(&test_item("del1", "delete_me")).await.unwrap();
1773        store.put(&test_item("del2", "delete_me")).await.unwrap();
1774        store.put(&test_item("del3", "delete_me")).await.unwrap();
1775        
1776        // Delete by state
1777        let deleted = store.delete_by_state("delete_me").await.unwrap();
1778        assert_eq!(deleted, 3);
1779        
1780        // Verify deleted
1781        assert!(store.get("del1").await.unwrap().is_none());
1782        assert!(store.get("del2").await.unwrap().is_none());
1783        
1784        // Verify others remain
1785        assert!(store.get("keep1").await.unwrap().is_some());
1786        assert!(store.get("keep2").await.unwrap().is_some());
1787        
1788        // Delete non-existent state returns 0
1789        let zero = store.delete_by_state("nonexistent").await.unwrap();
1790        assert_eq!(zero, 0);
1791        
1792        cleanup_db(&db_path);
1793    }
1794
1795    #[tokio::test]
1796    async fn test_multiple_puts_preserve_state() {
1797        let db_path = temp_db_path("multi_put_state");
1798        cleanup_db(&db_path);
1799        
1800        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1801        let store = SqlStore::new(&url).await.unwrap();
1802        
1803        // Put multiple items with different states
1804        store.put(&test_item("a", "state_a")).await.unwrap();
1805        store.put(&test_item("b", "state_b")).await.unwrap();
1806        store.put(&test_item("c", "state_c")).await.unwrap();
1807        
1808        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1809        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1810        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1811        
1812        cleanup_db(&db_path);
1813    }
1814
1815    #[tokio::test]
1816    async fn test_scan_prefix() {
1817        let db_path = temp_db_path("scan_prefix");
1818        cleanup_db(&db_path);
1819        
1820        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1821        let store = SqlStore::new(&url).await.unwrap();
1822        
1823        // Insert items with different prefixes (CRDT pattern)
1824        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1825        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1826        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1827        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1828        store.put(&test_item("base:user.123", "base")).await.unwrap();
1829        store.put(&test_item("base:user.456", "base")).await.unwrap();
1830        
1831        // Scan specific object's deltas
1832        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1833        assert_eq!(user123_deltas.len(), 3);
1834        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1835        
1836        // Scan different object
1837        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1838        assert_eq!(user456_deltas.len(), 1);
1839        
1840        // Scan all deltas
1841        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1842        assert_eq!(all_deltas.len(), 4);
1843        
1844        // Scan all bases
1845        let bases = store.scan_prefix("base:", 100).await.unwrap();
1846        assert_eq!(bases.len(), 2);
1847        
1848        // Empty result for non-matching prefix
1849        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1850        assert!(none.is_empty());
1851        
1852        cleanup_db(&db_path);
1853    }
1854
1855    #[tokio::test]
1856    async fn test_scan_prefix_with_limit() {
1857        let db_path = temp_db_path("scan_prefix_limit");
1858        cleanup_db(&db_path);
1859        
1860        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1861        let store = SqlStore::new(&url).await.unwrap();
1862        
1863        // Insert 20 items
1864        for i in 0..20 {
1865            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1866        }
1867        
1868        // Query with limit
1869        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1870        assert_eq!(limited.len(), 5);
1871        
1872        // Verify we can get all with larger limit
1873        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1874        assert_eq!(all.len(), 20);
1875        
1876        cleanup_db(&db_path);
1877    }
1878
1879    #[tokio::test]
1880    async fn test_count_prefix() {
1881        let db_path = temp_db_path("count_prefix");
1882        cleanup_db(&db_path);
1883        
1884        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1885        let store = SqlStore::new(&url).await.unwrap();
1886        
1887        // Insert items with different prefixes
1888        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1889        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1890        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1891        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1892        store.put(&test_item("base:user.123", "base")).await.unwrap();
1893        
1894        // Count by prefix
1895        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1896        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1897        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1898        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1899        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1900        
1901        cleanup_db(&db_path);
1902    }
1903
1904    #[tokio::test]
1905    async fn test_delete_prefix() {
1906        let db_path = temp_db_path("delete_prefix");
1907        cleanup_db(&db_path);
1908        
1909        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1910        let store = SqlStore::new(&url).await.unwrap();
1911        
1912        // Insert items with different prefixes
1913        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1914        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1915        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1916        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1917        store.put(&test_item("base:user.123", "base")).await.unwrap();
1918        
1919        // Delete one object's deltas
1920        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1921        assert_eq!(deleted, 3);
1922        
1923        // Verify deleted
1924        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1925        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1926        
1927        // Verify other deltas remain
1928        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1929        
1930        // Verify bases remain
1931        assert!(store.get("base:user.123").await.unwrap().is_some());
1932        
1933        // Delete non-existent prefix returns 0
1934        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1935        assert_eq!(zero, 0);
1936        
1937        cleanup_db(&db_path);
1938    }
1939}