sync_engine/storage/
sql.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! SQL storage backend for L3 archive.
5//!
6//! Content-type aware storage with proper columns for queryability:
7//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
8//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
9//!
10//! Schema mirrors Redis structure:
11//! ```sql
12//! CREATE TABLE sync_items (
13//!   id VARCHAR(255) PRIMARY KEY,
14//!   version BIGINT NOT NULL,
15//!   timestamp BIGINT NOT NULL,
16//!   payload_hash VARCHAR(64),
17//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
18//!   payload_blob MEDIUMBLOB, -- For binary content
19//!   audit TEXT,              -- Operational metadata: {batch, trace, home}
20//!   access_count BIGINT,     -- Local access frequency (for eviction)
21//!   last_accessed BIGINT     -- Last access timestamp (for eviction)
22//! )
23//! ```
24//!
25//! ## sqlx Any Driver Quirks
26//! 
27//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
28//! 1. Doesn't support MySQL's JSON type mapping
29//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
30//!
31//! JSON functions still work on TEXT columns:
32//!
33//! ```sql
34//! -- Find users named Alice
35//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
36//! 
37//! -- Find all items from a batch
38//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
39//! ```
40
41use async_trait::async_trait;
42use sqlx::{AnyPool, Row, any::AnyPoolOptions};
43use crate::sync_item::{SyncItem, ContentType};
44use crate::search::SqlParam;
45use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
46use crate::resilience::retry::{retry, RetryConfig};
47use std::sync::Once;
48use std::time::Duration;
49
50// SQLx `Any` driver requires runtime installation
51static INSTALL_DRIVERS: Once = Once::new();
52
53fn install_drivers() {
54    INSTALL_DRIVERS.call_once(|| {
55        sqlx::any::install_default_drivers();
56    });
57}
58
59use std::sync::Arc;
60use crate::schema::SchemaRegistry;
61
62pub struct SqlStore {
63    pool: AnyPool,
64    is_sqlite: bool,
65    /// Schema registry for table routing.
66    /// Shared with SyncEngine for dynamic registration.
67    registry: Arc<SchemaRegistry>,
68}
69
70impl SqlStore {
71    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
72    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
73        Self::with_registry(connection_string, Arc::new(SchemaRegistry::new())).await
74    }
75    
76    /// Create a new SQL store with a shared schema registry.
77    ///
78    /// Use this when you want the SyncEngine to share the registry
79    /// for dynamic schema registration.
80    pub async fn with_registry(connection_string: &str, registry: Arc<SchemaRegistry>) -> Result<Self, StorageError> {
81        install_drivers();
82        
83        let is_sqlite = connection_string.starts_with("sqlite:");
84        
85        // For SQLite, use bypass mode (no partitioning benefit)
86        let registry = if is_sqlite && !registry.is_bypass() {
87            Arc::new(SchemaRegistry::bypass())
88        } else {
89            registry
90        };
91        
92        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
93            AnyPoolOptions::new()
94                .max_connections(20)
95                .acquire_timeout(Duration::from_secs(10))
96                .idle_timeout(Duration::from_secs(300))
97                .connect(connection_string)
98                .await
99                .map_err(|e| StorageError::Backend(e.to_string()))
100        })
101        .await?;
102
103        let store = Self { pool, is_sqlite, registry };
104        
105        // Enable WAL mode for SQLite (better concurrency, faster writes)
106        if is_sqlite {
107            store.enable_wal_mode().await?;
108        }
109        
110        store.init_schema().await?;
111        
112        // MySQL: Set session-level transaction isolation to READ COMMITTED
113        // This reduces gap locking and deadlocks under high concurrency
114        if !is_sqlite {
115            sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
116                .execute(&store.pool)
117                .await
118                .map_err(|e| StorageError::Backend(format!("Failed to set MySQL isolation level: {}", e)))?;
119        }
120        
121        Ok(store)
122    }
123    
124    /// Get a clone of the connection pool for sharing with other stores.
125    pub fn pool(&self) -> AnyPool {
126        self.pool.clone()
127    }
128    
129    /// Get the schema registry for this store.
130    #[must_use]
131    pub fn registry(&self) -> &Arc<SchemaRegistry> {
132        &self.registry
133    }
134    
135    /// Enable WAL (Write-Ahead Logging) mode for SQLite.
136    /// 
137    /// Benefits:
138    /// - Concurrent reads during writes (readers don't block writers)
139    /// - Better write performance (single fsync instead of two)
140    /// - More predictable performance under load
141    async fn enable_wal_mode(&self) -> Result<(), StorageError> {
142        sqlx::query("PRAGMA journal_mode = WAL")
143            .execute(&self.pool)
144            .await
145            .map_err(|e| StorageError::Backend(format!("Failed to enable WAL mode: {}", e)))?;
146        
147        // Also set synchronous to NORMAL for better performance while still safe
148        // (FULL is default but WAL mode is safe with NORMAL)
149        sqlx::query("PRAGMA synchronous = NORMAL")
150            .execute(&self.pool)
151            .await
152            .map_err(|e| StorageError::Backend(format!("Failed to set synchronous mode: {}", e)))?;
153        
154        Ok(())
155    }
156
157    async fn init_schema(&self) -> Result<(), StorageError> {
158        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
159        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
160        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
161        //
162        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
163        // This enables efficient multi-instance coordination without locking.
164        //
165        // state: Arbitrary caller-defined state tag (e.g., "delta", "base", "pending").
166        // Indexed for fast state-based queries.
167        let sql = if self.is_sqlite {
168            r#"
169            CREATE TABLE IF NOT EXISTS sync_items (
170                id TEXT PRIMARY KEY,
171                version INTEGER NOT NULL DEFAULT 1,
172                timestamp INTEGER NOT NULL,
173                payload_hash TEXT,
174                payload TEXT,
175                payload_blob BLOB,
176                audit TEXT,
177                merkle_dirty INTEGER NOT NULL DEFAULT 1,
178                state TEXT NOT NULL DEFAULT 'default',
179                access_count INTEGER NOT NULL DEFAULT 0,
180                last_accessed INTEGER NOT NULL DEFAULT 0
181            )
182            "#
183        } else {
184            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
185            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
186            r#"
187            CREATE TABLE IF NOT EXISTS sync_items (
188                id VARCHAR(255) PRIMARY KEY,
189                version BIGINT NOT NULL DEFAULT 1,
190                timestamp BIGINT NOT NULL,
191                payload_hash VARCHAR(64),
192                payload LONGTEXT,
193                payload_blob MEDIUMBLOB,
194                audit TEXT,
195                merkle_dirty TINYINT NOT NULL DEFAULT 1,
196                state VARCHAR(32) NOT NULL DEFAULT 'default',
197                access_count BIGINT NOT NULL DEFAULT 0,
198                last_accessed BIGINT NOT NULL DEFAULT 0,
199                INDEX idx_timestamp (timestamp),
200                INDEX idx_merkle_dirty (merkle_dirty),
201                INDEX idx_state (state)
202            )
203            "#
204        };
205
206        retry("sql_init_schema", &RetryConfig::startup(), || async {
207            sqlx::query(sql)
208                .execute(&self.pool)
209                .await
210                .map_err(|e| StorageError::Backend(e.to_string()))
211        })
212        .await?;
213
214        Ok(())
215    }
216    
217    /// Create a schema-specific table with the same structure as sync_items.
218    ///
219    /// Used for horizontal partitioning: each schema (e.g., "users", "orders")
220    /// gets its own table with identical DDL. This enables:
221    /// - Better query performance (smaller tables, focused indices)
222    /// - Easier maintenance (vacuum, backup per-schema)
223    /// - Future sharding potential
224    ///
225    /// For SQLite, this is a no-op (returns Ok) since partitioning doesn't help.
226    ///
227    /// # Arguments
228    /// * `table_name` - Name of the table to create (e.g., "users_items")
229    ///
230    /// # Example
231    /// ```rust,no_run
232    /// # use sync_engine::storage::sql::SqlStore;
233    /// # async fn example(store: &SqlStore) {
234    /// store.ensure_table("users_items").await.unwrap();
235    /// store.ensure_table("orders_items").await.unwrap();
236    /// # }
237    /// ```
238    pub async fn ensure_table(&self, table_name: &str) -> Result<(), StorageError> {
239        // SQLite: skip partitioning - no benefit for embedded use
240        if self.is_sqlite {
241            return Ok(());
242        }
243        
244        // Validate table name to prevent SQL injection
245        // Only allow alphanumeric and underscore
246        if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
247            return Err(StorageError::Backend(format!(
248                "Invalid table name '{}': only alphanumeric and underscore allowed",
249                table_name
250            )));
251        }
252        
253        // Don't recreate the default table
254        if table_name == crate::schema::DEFAULT_TABLE {
255            return Ok(());
256        }
257        
258        // MySQL table DDL - identical structure to sync_items
259        let sql = format!(
260            r#"
261            CREATE TABLE IF NOT EXISTS {} (
262                id VARCHAR(255) PRIMARY KEY,
263                version BIGINT NOT NULL DEFAULT 1,
264                timestamp BIGINT NOT NULL,
265                payload_hash VARCHAR(64),
266                payload LONGTEXT,
267                payload_blob MEDIUMBLOB,
268                audit TEXT,
269                merkle_dirty TINYINT NOT NULL DEFAULT 1,
270                state VARCHAR(32) NOT NULL DEFAULT 'default',
271                access_count BIGINT NOT NULL DEFAULT 0,
272                last_accessed BIGINT NOT NULL DEFAULT 0,
273                INDEX idx_{}_timestamp (timestamp),
274                INDEX idx_{}_merkle_dirty (merkle_dirty),
275                INDEX idx_{}_state (state)
276            )
277            "#,
278            table_name, table_name, table_name, table_name
279        );
280
281        retry("sql_ensure_table", &RetryConfig::startup(), || async {
282            sqlx::query(&sql)
283                .execute(&self.pool)
284                .await
285                .map_err(|e| StorageError::Backend(e.to_string()))
286        })
287        .await?;
288        
289        tracing::info!(table = %table_name, "Created schema table");
290
291        Ok(())
292    }
293    
294    /// Check if a table exists.
295    pub async fn table_exists(&self, table_name: &str) -> Result<bool, StorageError> {
296        if self.is_sqlite {
297            let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?";
298            let result = sqlx::query(sql)
299                .bind(table_name)
300                .fetch_optional(&self.pool)
301                .await
302                .map_err(|e| StorageError::Backend(e.to_string()))?;
303            Ok(result.is_some())
304        } else {
305            let sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = ?";
306            let result = sqlx::query(sql)
307                .bind(table_name)
308                .fetch_optional(&self.pool)
309                .await
310                .map_err(|e| StorageError::Backend(e.to_string()))?;
311            Ok(result.is_some())
312        }
313    }
314    
315    /// Check if this is a SQLite database.
316    #[must_use]
317    pub fn is_sqlite(&self) -> bool {
318        self.is_sqlite
319    }
320    
321    /// Build the audit JSON object for operational metadata.
322    fn build_audit_json(item: &SyncItem) -> Option<String> {
323        let mut audit = serde_json::Map::new();
324        
325        if let Some(ref batch_id) = item.batch_id {
326            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
327        }
328        if let Some(ref trace_parent) = item.trace_parent {
329            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
330        }
331        if let Some(ref home) = item.home_instance_id {
332            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
333        }
334        
335        if audit.is_empty() {
336            None
337        } else {
338            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
339        }
340    }
341    
342    /// Parse audit JSON back into SyncItem fields.
343    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
344        match audit_str {
345            Some(s) => {
346                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
347                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
348                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
349                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
350                    (batch_id, trace_parent, home_instance_id)
351                } else {
352                    (None, None, None)
353                }
354            }
355            None => (None, None, None),
356        }
357    }
358}
359
360#[async_trait]
361impl ArchiveStore for SqlStore {
362    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
363        let id = id.to_string();
364        let table = self.registry.table_for_key(&id);
365        
366        retry("sql_get", &RetryConfig::query(), || async {
367            let sql = format!(
368                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM {} WHERE id = ?",
369                table
370            );
371            let result = sqlx::query(&sql)
372                .bind(&id)
373                .fetch_optional(&self.pool)
374                .await
375                .map_err(|e| StorageError::Backend(e.to_string()))?;
376
377            match result {
378                Some(row) => {
379                    let version: i64 = row.try_get("version").unwrap_or(1);
380                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
381                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
382                    
383                    // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
384                    let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
385                        .or_else(|| {
386                            row.try_get::<Vec<u8>, _>("payload").ok()
387                                .and_then(|bytes| String::from_utf8(bytes).ok())
388                        });
389                    
390                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
391                    
392                    // Try reading audit as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
393                    let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
394                        .or_else(|| {
395                            row.try_get::<Vec<u8>, _>("audit").ok()
396                                .and_then(|bytes| String::from_utf8(bytes).ok())
397                        });
398                    
399                    // State field - try String first (SQLite), then bytes (MySQL)
400                    let state: String = row.try_get::<String, _>("state").ok()
401                        .or_else(|| {
402                            row.try_get::<Vec<u8>, _>("state").ok()
403                                .and_then(|bytes| String::from_utf8(bytes).ok())
404                        })
405                        .unwrap_or_else(|| "default".to_string());
406                    
407                    // Access metadata (local eviction stats, not replicated)
408                    let access_count: i64 = row.try_get("access_count").unwrap_or(0);
409                    let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
410                    
411                    // Determine content and content_type
412                    let (content, content_type) = if let Some(ref json_str) = payload_json {
413                        // JSON content - parse and re-serialize to bytes
414                        let content = json_str.as_bytes().to_vec();
415                        (content, ContentType::Json)
416                    } else if let Some(blob) = payload_blob {
417                        // Binary content
418                        (blob, ContentType::Binary)
419                    } else {
420                        return Err(StorageError::Backend("No payload in row".to_string()));
421                    };
422                    
423                    // Parse audit fields
424                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
425                    
426                    let item = SyncItem::reconstruct(
427                        id.clone(),
428                        version as u64,
429                        timestamp,
430                        content_type,
431                        content,
432                        batch_id,
433                        trace_parent,
434                        payload_hash.unwrap_or_default(),
435                        home_instance_id,
436                        state,
437                        access_count as u64,
438                        last_accessed as u64,
439                    );
440                    Ok(Some(item))
441                }
442                None => Ok(None),
443            }
444        })
445        .await
446    }
447
448    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
449        let id = item.object_id.clone();
450        let table = self.registry.table_for_key(&id);
451        let version = item.version as i64;
452        let timestamp = item.updated_at;
453        let payload_hash = if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) };
454        let audit_json = Self::build_audit_json(item);
455        let state = item.state.clone();
456        
457        // Determine payload storage based on content type
458        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
459            ContentType::Json => {
460                let json_str = String::from_utf8_lossy(&item.content).to_string();
461                (Some(json_str), None)
462            }
463            ContentType::Binary => {
464                (None, Some(item.content.clone()))
465            }
466        };
467
468        let sql = if self.is_sqlite {
469            // Only mark merkle_dirty if payload actually changed (content-addressed)
470            format!(
471                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
472                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
473                 ON CONFLICT(id) DO UPDATE SET 
474                    version = excluded.version, 
475                    timestamp = excluded.timestamp, 
476                    payload_hash = excluded.payload_hash, 
477                    payload = excluded.payload, 
478                    payload_blob = excluded.payload_blob, 
479                    audit = excluded.audit, 
480                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, 
481                    state = excluded.state,
482                    access_count = excluded.access_count,
483                    last_accessed = excluded.last_accessed",
484                table, table, table
485            )
486        } else {
487            // MySQL version: only mark merkle_dirty if payload_hash changed
488            format!(
489                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) 
490                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?) 
491                 ON DUPLICATE KEY UPDATE 
492                    version = VALUES(version), 
493                    timestamp = VALUES(timestamp), 
494                    payload_hash = VALUES(payload_hash), 
495                    payload = VALUES(payload), 
496                    payload_blob = VALUES(payload_blob), 
497                    audit = VALUES(audit), 
498                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, 
499                    state = VALUES(state),
500                    access_count = VALUES(access_count),
501                    last_accessed = VALUES(last_accessed)",
502                table
503            )
504        };
505
506        retry("sql_put", &RetryConfig::query(), || async {
507            sqlx::query(&sql)
508                .bind(&id)
509                .bind(version)
510                .bind(timestamp)
511                .bind(&payload_hash)
512                .bind(&payload_json)
513                .bind(&payload_blob)
514                .bind(&audit_json)
515                .bind(&state)
516                .bind(item.access_count as i64)
517                .bind(item.last_accessed as i64)
518                .execute(&self.pool)
519                .await
520                .map_err(|e| StorageError::Backend(e.to_string()))?;
521            Ok(())
522        })
523        .await
524    }
525
526    async fn delete(&self, id: &str) -> Result<(), StorageError> {
527        let id = id.to_string();
528        let table = self.registry.table_for_key(&id);
529        retry("sql_delete", &RetryConfig::query(), || async {
530            let sql = format!("DELETE FROM {} WHERE id = ?", table);
531            sqlx::query(&sql)
532                .bind(&id)
533                .execute(&self.pool)
534                .await
535                .map_err(|e| StorageError::Backend(e.to_string()))?;
536            Ok(())
537        })
538        .await
539    }
540
541    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
542        let id = id.to_string();
543        let table = self.registry.table_for_key(&id);
544        retry("sql_exists", &RetryConfig::query(), || async {
545            let sql = format!("SELECT 1 FROM {} WHERE id = ? LIMIT 1", table);
546            let result = sqlx::query(&sql)
547                .bind(&id)
548                .fetch_optional(&self.pool)
549                .await
550                .map_err(|e| StorageError::Backend(e.to_string()))?;
551            Ok(result.is_some())
552        })
553        .await
554    }
555    
556    /// Write a batch of items in a single multi-row INSERT with verification.
557    /// 
558    /// Items are grouped by target table (based on schema registry) and written
559    /// in separate batches per table.
560    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
561        if items.is_empty() {
562            return Ok(BatchWriteResult {
563                batch_id: String::new(),
564                written: 0,
565                verified: true,
566            });
567        }
568
569        // Generate a unique batch ID (for audit trail, not verification)
570        let batch_id = uuid::Uuid::new_v4().to_string();
571        
572        // Stamp all items with the batch_id
573        for item in items.iter_mut() {
574            item.batch_id = Some(batch_id.clone());
575        }
576
577        // Collect IDs for verification
578        let item_ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
579
580        // Group items by target table
581        let mut by_table: std::collections::HashMap<&'static str, Vec<&SyncItem>> = std::collections::HashMap::new();
582        for item in items.iter() {
583            let table = self.registry.table_for_key(&item.object_id);
584            by_table.entry(table).or_default().push(item);
585        }
586
587        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
588        const CHUNK_SIZE: usize = 500;
589        let mut total_written = 0usize;
590
591        // Write each table's items
592        for (table, table_items) in by_table {
593            for chunk in table_items.chunks(CHUNK_SIZE) {
594                let written = self.put_batch_chunk_to_table(table, chunk, &batch_id).await?;
595                total_written += written;
596            }
597        }
598
599        // Verify ALL items exist (not by batch_id - that's unreliable under concurrency)
600        let verified_count = self.verify_batch_ids(&item_ids).await?;
601        let verified = verified_count == items.len();
602
603        if !verified {
604            tracing::warn!(
605                batch_id = %batch_id,
606                expected = items.len(),
607                actual = verified_count,
608                "Batch verification mismatch"
609            );
610        }
611
612        Ok(BatchWriteResult {
613            batch_id,
614            written: total_written,
615            verified,
616        })
617    }
618
619    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
620        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
621            .bind(limit as i64)
622            .bind(offset as i64)
623            .fetch_all(&self.pool)
624            .await
625            .map_err(|e| StorageError::Backend(e.to_string()))?;
626        
627        let mut keys = Vec::with_capacity(rows.len());
628        for row in rows {
629            let id: String = row.try_get("id")
630                .map_err(|e| StorageError::Backend(e.to_string()))?;
631            keys.push(id);
632        }
633        
634        Ok(keys)
635    }
636
637    async fn count_all(&self) -> Result<u64, StorageError> {
638        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
639            .fetch_one(&self.pool)
640            .await
641            .map_err(|e| StorageError::Backend(e.to_string()))?;
642        
643        let count: i64 = result.try_get("cnt")
644            .map_err(|e| StorageError::Backend(e.to_string()))?;
645        
646        Ok(count as u64)
647    }
648
649    async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
650        use sqlx::Row;
651        
652        let sql = format!(
653            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
654             FROM sync_items WHERE {} LIMIT {}",
655            where_clause, limit
656        );
657        
658        let mut query = sqlx::query(&sql);
659        for param in params {
660            query = match param {
661                SqlParam::Text(s) => query.bind(s.clone()),
662                SqlParam::Numeric(f) => query.bind(*f),
663                SqlParam::Boolean(b) => query.bind(*b),
664            };
665        }
666        
667        let rows = query.fetch_all(&self.pool)
668            .await
669            .map_err(|e| StorageError::Backend(e.to_string()))?;
670        
671        let mut items = Vec::with_capacity(rows.len());
672        for row in rows {
673            let id: String = row.try_get("id")
674                .map_err(|e| StorageError::Backend(e.to_string()))?;
675            
676            // Reuse the get() parsing logic by fetching each item
677            // This is slightly inefficient but ensures consistent parsing
678            if let Some(item) = self.get(&id).await? {
679                items.push(item);
680            }
681        }
682        
683        Ok(items)
684    }
685
686    async fn count_where(&self, where_clause: &str, params: &[SqlParam]) -> Result<u64, StorageError> {
687        self.count_where_in_table(crate::schema::DEFAULT_TABLE, where_clause, params).await
688    }
689}
690
691impl SqlStore {
692    /// Search in a specific table using a WHERE clause.
693    pub async fn search_in_table(
694        &self,
695        table: &str,
696        where_clause: &str,
697        params: &[SqlParam],
698        limit: usize,
699    ) -> Result<Vec<SyncItem>, StorageError> {
700        use sqlx::Row;
701        
702        // Validate table name to prevent SQL injection
703        if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
704            return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
705        }
706        
707        let sql = format!(
708            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed \
709             FROM {} WHERE {} LIMIT {}",
710            table, where_clause, limit
711        );
712        
713        let mut query = sqlx::query(&sql);
714        for param in params {
715            query = match param {
716                SqlParam::Text(s) => query.bind(s.clone()),
717                SqlParam::Numeric(f) => query.bind(*f),
718                SqlParam::Boolean(b) => query.bind(*b),
719            };
720        }
721        
722        let rows = query.fetch_all(&self.pool)
723            .await
724            .map_err(|e| StorageError::Backend(e.to_string()))?;
725        
726        let mut items = Vec::with_capacity(rows.len());
727        for row in rows {
728            let id: String = row.try_get("id")
729                .map_err(|e| StorageError::Backend(e.to_string()))?;
730            
731            // Use get() which already routes to the correct table via registry
732            if let Some(item) = self.get(&id).await? {
733                items.push(item);
734            }
735        }
736        
737        Ok(items)
738    }
739
740    /// Count items in a specific table matching a WHERE clause.
741    pub async fn count_where_in_table(
742        &self,
743        table: &str,
744        where_clause: &str,
745        params: &[SqlParam],
746    ) -> Result<u64, StorageError> {
747        use sqlx::Row;
748        
749        // Validate table name to prevent SQL injection
750        if !table.chars().all(|c| c.is_alphanumeric() || c == '_') {
751            return Err(StorageError::Backend(format!("Invalid table name: {}", table)));
752        }
753        
754        let sql = format!("SELECT COUNT(*) as cnt FROM {} WHERE {}", table, where_clause);
755        
756        let mut query = sqlx::query(&sql);
757        for param in params {
758            query = match param {
759                SqlParam::Text(s) => query.bind(s.clone()),
760                SqlParam::Numeric(f) => query.bind(*f),
761                SqlParam::Boolean(b) => query.bind(*b),
762            };
763        }
764        
765        let result = query.fetch_one(&self.pool)
766            .await
767            .map_err(|e| StorageError::Backend(e.to_string()))?;
768        
769        let count: i64 = result.try_get("cnt")
770            .map_err(|e| StorageError::Backend(e.to_string()))?;
771        
772        Ok(count as u64)
773    }
774
775    /// Write a single chunk of items to a specific table with content-type aware storage.
776    /// The batch_id is already embedded in each item's audit JSON.
777    async fn put_batch_chunk_to_table(&self, table: &str, chunk: &[&SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
778        let placeholders: Vec<String> = (0..chunk.len())
779            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)".to_string())
780            .collect();
781        
782        let sql = if self.is_sqlite {
783            format!(
784                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
785                 ON CONFLICT(id) DO UPDATE SET \
786                    version = excluded.version, \
787                    timestamp = excluded.timestamp, \
788                    payload_hash = excluded.payload_hash, \
789                    payload = excluded.payload, \
790                    payload_blob = excluded.payload_blob, \
791                    audit = excluded.audit, \
792                    merkle_dirty = CASE WHEN {}.payload_hash IS DISTINCT FROM excluded.payload_hash THEN 1 ELSE {}.merkle_dirty END, \
793                    state = excluded.state, \
794                    access_count = excluded.access_count, \
795                    last_accessed = excluded.last_accessed",
796                table, placeholders.join(", "), table, table
797            )
798        } else {
799            format!(
800                "INSERT INTO {} (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty, state, access_count, last_accessed) VALUES {} \
801                 ON DUPLICATE KEY UPDATE \
802                    version = VALUES(version), \
803                    timestamp = VALUES(timestamp), \
804                    payload_hash = VALUES(payload_hash), \
805                    payload = VALUES(payload), \
806                    payload_blob = VALUES(payload_blob), \
807                    audit = VALUES(audit), \
808                    merkle_dirty = CASE WHEN payload_hash != VALUES(payload_hash) OR payload_hash IS NULL THEN 1 ELSE merkle_dirty END, \
809                    state = VALUES(state), \
810                    access_count = VALUES(access_count), \
811                    last_accessed = VALUES(last_accessed)",
812                table, placeholders.join(", ")
813            )
814        };
815
816        // Prepare all items with their fields
817        #[derive(Clone)]
818        struct PreparedRow {
819            id: String,
820            version: i64,
821            timestamp: i64,
822            payload_hash: Option<String>,
823            payload_json: Option<String>,
824            payload_blob: Option<Vec<u8>>,
825            audit_json: Option<String>,
826            state: String,
827            access_count: i64,
828            last_accessed: i64,
829        }
830        
831        let prepared: Vec<PreparedRow> = chunk.iter()
832            .map(|item| {
833                let (payload_json, payload_blob) = match item.content_type {
834                    ContentType::Json => {
835                        let json_str = String::from_utf8_lossy(&item.content).to_string();
836                        (Some(json_str), None)
837                    }
838                    ContentType::Binary => {
839                        (None, Some(item.content.clone()))
840                    }
841                };
842                
843                PreparedRow {
844                    id: item.object_id.clone(),
845                    version: item.version as i64,
846                    timestamp: item.updated_at,
847                    payload_hash: if item.content_hash.is_empty() { None } else { Some(item.content_hash.clone()) },
848                    payload_json,
849                    payload_blob,
850                    audit_json: Self::build_audit_json(item),
851                    state: item.state.clone(),
852                    access_count: item.access_count as i64,
853                    last_accessed: item.last_accessed as i64,
854                }
855            })
856            .collect();
857
858        retry("sql_put_batch", &RetryConfig::batch_write(), || {
859            let sql = sql.clone();
860            let prepared = prepared.clone();
861            async move {
862                let mut query = sqlx::query(&sql);
863                
864                for row in &prepared {
865                    query = query
866                        .bind(&row.id)
867                        .bind(row.version)
868                        .bind(row.timestamp)
869                        .bind(&row.payload_hash)
870                        .bind(&row.payload_json)
871                        .bind(&row.payload_blob)
872                        .bind(&row.audit_json)
873                        .bind(&row.state)
874                        .bind(row.access_count)
875                        .bind(row.last_accessed);
876                }
877                
878                query.execute(&self.pool)
879                    .await
880                    .map_err(|e| StorageError::Backend(e.to_string()))?;
881                
882                Ok(())
883            }
884        })
885        .await?;
886
887        Ok(chunk.len())
888    }
889
890    /// Verify a batch was written by checking all IDs exist.
891    /// This is more reliable than batch_id verification under concurrent writes.
892    /// Groups IDs by table for proper routing.
893    async fn verify_batch_ids(&self, ids: &[String]) -> Result<usize, StorageError> {
894        if ids.is_empty() {
895            return Ok(0);
896        }
897
898        // Group IDs by table
899        let mut by_table: std::collections::HashMap<&'static str, Vec<&String>> = std::collections::HashMap::new();
900        for id in ids {
901            let table = self.registry.table_for_key(id);
902            by_table.entry(table).or_default().push(id);
903        }
904
905        // Use chunked EXISTS queries to avoid overly large IN clauses
906        const CHUNK_SIZE: usize = 500;
907        let mut total_found = 0usize;
908
909        for (table, table_ids) in by_table {
910            for chunk in table_ids.chunks(CHUNK_SIZE) {
911                let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
912                let sql = format!(
913                    "SELECT COUNT(*) as cnt FROM {} WHERE id IN ({})",
914                    table, placeholders.join(", ")
915                );
916
917                let mut query = sqlx::query(&sql);
918                for id in chunk {
919                    query = query.bind(*id);
920                }
921
922                let result = query
923                    .fetch_one(&self.pool)
924                    .await
925                    .map_err(|e| StorageError::Backend(e.to_string()))?;
926
927                let count: i64 = result
928                    .try_get("cnt")
929                    .map_err(|e| StorageError::Backend(e.to_string()))?;
930                total_found += count as usize;
931            }
932        }
933
934        Ok(total_found)
935    }
936
937    /// Legacy batch_id verification (kept for reference, but not used under concurrency)
938    #[allow(dead_code)]
939    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
940        let batch_id = batch_id.to_string();
941        
942        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
943        let sql = if self.is_sqlite {
944            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
945        } else {
946            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
947        };
948        
949        let bind_value = if self.is_sqlite {
950            format!("%\"batch\":\"{}%", batch_id)
951        } else {
952            batch_id.clone()
953        };
954        
955        let result = sqlx::query(sql)
956            .bind(&bind_value)
957            .fetch_one(&self.pool)
958            .await
959            .map_err(|e| StorageError::Backend(e.to_string()))?;
960        
961        let count: i64 = result.try_get("cnt")
962            .map_err(|e| StorageError::Backend(e.to_string()))?;
963        
964        Ok(count as usize)
965    }
966
967    /// Scan a batch of items (for WAL drain).
968    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
969        let rows = sqlx::query(
970            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed FROM sync_items ORDER BY timestamp ASC LIMIT ?"
971        )
972            .bind(limit as i64)
973            .fetch_all(&self.pool)
974            .await
975            .map_err(|e| StorageError::Backend(e.to_string()))?;
976        
977        let mut items = Vec::with_capacity(rows.len());
978        for row in rows {
979            let id: String = row.try_get("id")
980                .map_err(|e| StorageError::Backend(e.to_string()))?;
981            let version: i64 = row.try_get("version").unwrap_or(1);
982            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
983            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
984            
985            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
986            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
987            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
988            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
989            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
990            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
991            
992            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
993            let state: String = state_bytes
994                .and_then(|bytes| String::from_utf8(bytes).ok())
995                .unwrap_or_else(|| "default".to_string());
996            
997            // Access metadata
998            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
999            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1000            
1001            let (content, content_type) = if let Some(ref json_str) = payload_json {
1002                (json_str.as_bytes().to_vec(), ContentType::Json)
1003            } else if let Some(blob) = payload_blob {
1004                (blob, ContentType::Binary)
1005            } else {
1006                continue; // Skip rows with no payload
1007            };
1008            
1009            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1010            
1011            let item = SyncItem::reconstruct(
1012                id,
1013                version as u64,
1014                timestamp,
1015                content_type,
1016                content,
1017                batch_id,
1018                trace_parent,
1019                payload_hash.unwrap_or_default(),
1020                home_instance_id,
1021                state,
1022                access_count as u64,
1023                last_accessed as u64,
1024            );
1025            items.push(item);
1026        }
1027        
1028        Ok(items)
1029    }
1030
1031    /// Delete multiple items by ID in a single query.
1032    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
1033        if ids.is_empty() {
1034            return Ok(0);
1035        }
1036
1037        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1038        let sql = format!(
1039            "DELETE FROM sync_items WHERE id IN ({})",
1040            placeholders.join(", ")
1041        );
1042
1043        retry("sql_delete_batch", &RetryConfig::query(), || {
1044            let sql = sql.clone();
1045            let ids = ids.to_vec();
1046            async move {
1047                let mut query = sqlx::query(&sql);
1048                for id in &ids {
1049                    query = query.bind(id);
1050                }
1051                
1052                let result = query.execute(&self.pool)
1053                    .await
1054                    .map_err(|e| StorageError::Backend(e.to_string()))?;
1055                
1056                Ok(result.rows_affected() as usize)
1057            }
1058        })
1059        .await
1060    }
1061    
1062    // ═══════════════════════════════════════════════════════════════════════════
1063    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
1064    // ═══════════════════════════════════════════════════════════════════════════
1065    
1066    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
1067    ///
1068    /// Used by background merkle processor to batch recalculate affected trees.
1069    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
1070        let rows = sqlx::query(
1071            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1072        )
1073            .bind(limit as i64)
1074            .fetch_all(&self.pool)
1075            .await
1076            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
1077        
1078        let mut ids = Vec::with_capacity(rows.len());
1079        for row in rows {
1080            let id: String = row.try_get("id")
1081                .map_err(|e| StorageError::Backend(e.to_string()))?;
1082            ids.push(id);
1083        }
1084        
1085        Ok(ids)
1086    }
1087    
1088    /// Count items with merkle_dirty = 1.
1089    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
1090        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
1091            .fetch_one(&self.pool)
1092            .await
1093            .map_err(|e| StorageError::Backend(e.to_string()))?;
1094        
1095        let count: i64 = result.try_get("cnt")
1096            .map_err(|e| StorageError::Backend(e.to_string()))?;
1097        
1098        Ok(count as u64)
1099    }
1100    
1101    /// Mark items as merkle-clean after recalculation.
1102    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
1103        if ids.is_empty() {
1104            return Ok(0);
1105        }
1106        
1107        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
1108        let sql = format!(
1109            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
1110            placeholders.join(", ")
1111        );
1112        
1113        let mut query = sqlx::query(&sql);
1114        for id in ids {
1115            query = query.bind(id);
1116        }
1117        
1118        let result = query.execute(&self.pool)
1119            .await
1120            .map_err(|e| StorageError::Backend(e.to_string()))?;
1121        
1122        Ok(result.rows_affected() as usize)
1123    }
1124    
1125    /// Check if there are any dirty merkle items.
1126    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
1127        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
1128            .fetch_optional(&self.pool)
1129            .await
1130            .map_err(|e| StorageError::Backend(e.to_string()))?;
1131        
1132        Ok(result.is_some())
1133    }
1134    
1135    /// Count dirty merkle items within a specific branch prefix.
1136    ///
1137    /// Used for branch-level hygiene checks. Returns 0 if the branch is "clean"
1138    /// (all merkle hashes up-to-date), allowing safe comparison with peers.
1139    ///
1140    /// # Arguments
1141    /// * `prefix` - Branch prefix (e.g., "uk.nhs" matches "uk.nhs.patient.123")
1142    pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
1143        let pattern = format!("{}%", prefix);
1144        let result = sqlx::query(
1145            "SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ? AND merkle_dirty = 1"
1146        )
1147            .bind(&pattern)
1148            .fetch_one(&self.pool)
1149            .await
1150            .map_err(|e| StorageError::Backend(e.to_string()))?;
1151        
1152        let count: i64 = result.try_get("cnt")
1153            .map_err(|e| StorageError::Backend(e.to_string()))?;
1154        
1155        Ok(count as u64)
1156    }
1157    
1158    /// Get distinct top-level prefixes that have dirty items.
1159    ///
1160    /// Returns prefixes like ["uk", "us", "de"] that have pending merkle recalcs.
1161    /// Branches NOT in this list are "clean" and safe to compare with peers.
1162    pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
1163        // Extract first segment before '.' for items with merkle_dirty = 1
1164        let sql = if self.is_sqlite {
1165            // SQLite: use substr and instr
1166            "SELECT DISTINCT CASE 
1167                WHEN instr(id, '.') > 0 THEN substr(id, 1, instr(id, '.') - 1)
1168                ELSE id 
1169            END as prefix FROM sync_items WHERE merkle_dirty = 1"
1170        } else {
1171            // MySQL: use SUBSTRING_INDEX
1172            "SELECT DISTINCT SUBSTRING_INDEX(id, '.', 1) as prefix FROM sync_items WHERE merkle_dirty = 1"
1173        };
1174        
1175        let rows = sqlx::query(sql)
1176            .fetch_all(&self.pool)
1177            .await
1178            .map_err(|e| StorageError::Backend(e.to_string()))?;
1179        
1180        let mut prefixes = Vec::with_capacity(rows.len());
1181        for row in rows {
1182            let prefix: String = row.try_get("prefix")
1183                .map_err(|e| StorageError::Backend(e.to_string()))?;
1184            prefixes.push(prefix);
1185        }
1186        
1187        Ok(prefixes)
1188    }
1189
1190    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
1191    ///
1192    /// Returns the items themselves so merkle can be calculated.
1193    /// Use `mark_merkle_clean()` after processing to clear the flag.
1194    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1195        let rows = sqlx::query(
1196            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1197             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
1198        )
1199            .bind(limit as i64)
1200            .fetch_all(&self.pool)
1201            .await
1202            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
1203        
1204        let mut items = Vec::with_capacity(rows.len());
1205        for row in rows {
1206            let id: String = row.try_get("id")
1207                .map_err(|e| StorageError::Backend(e.to_string()))?;
1208            let version: i64 = row.try_get("version").unwrap_or(1);
1209            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1210            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1211            
1212            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
1213            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
1214            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
1215                String::from_utf8(bytes).ok()
1216            });
1217            
1218            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1219            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
1220            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
1221                String::from_utf8(bytes).ok()
1222            });
1223            
1224            // State field
1225            let state_bytes: Option<Vec<u8>> = row.try_get("state").ok();
1226            let state: String = state_bytes
1227                .and_then(|bytes| String::from_utf8(bytes).ok())
1228                .unwrap_or_else(|| "default".to_string());
1229            
1230            // Access metadata
1231            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1232            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1233            
1234            // Determine content and content_type
1235            let (content, content_type) = if let Some(ref json_str) = payload_json {
1236                (json_str.as_bytes().to_vec(), ContentType::Json)
1237            } else if let Some(blob) = payload_blob {
1238                (blob, ContentType::Binary)
1239            } else {
1240                continue; // Skip items with no payload
1241            };
1242            
1243            // Parse audit fields
1244            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1245            
1246            let item = SyncItem::reconstruct(
1247                id,
1248                version as u64,
1249                timestamp,
1250                content_type,
1251                content,
1252                batch_id,
1253                trace_parent,
1254                payload_hash.unwrap_or_default(),
1255                home_instance_id,
1256                state,
1257                access_count as u64,
1258                last_accessed as u64,
1259            );
1260            items.push(item);
1261        }
1262        
1263        Ok(items)
1264    }
1265    
1266    // ═══════════════════════════════════════════════════════════════════════════
1267    // State-based queries: Fast indexed access by caller-defined state tag
1268    // ═══════════════════════════════════════════════════════════════════════════
1269    
1270    /// Get items by state (e.g., "delta", "base", "pending").
1271    ///
1272    /// Uses indexed query for fast retrieval.
1273    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1274        let rows = sqlx::query(
1275            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1276             FROM sync_items WHERE state = ? LIMIT ?"
1277        )
1278            .bind(state)
1279            .bind(limit as i64)
1280            .fetch_all(&self.pool)
1281            .await
1282            .map_err(|e| StorageError::Backend(format!("Failed to get items by state: {}", e)))?;
1283        
1284        let mut items = Vec::with_capacity(rows.len());
1285        for row in rows {
1286            let id: String = row.try_get("id")
1287                .map_err(|e| StorageError::Backend(e.to_string()))?;
1288            let version: i64 = row.try_get("version").unwrap_or(1);
1289            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1290            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1291            
1292            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1293            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1294                .or_else(|| {
1295                    row.try_get::<Vec<u8>, _>("payload").ok()
1296                        .and_then(|bytes| String::from_utf8(bytes).ok())
1297                });
1298            
1299            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1300            
1301            // Try reading audit as String first (SQLite), then bytes (MySQL)
1302            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1303                .or_else(|| {
1304                    row.try_get::<Vec<u8>, _>("audit").ok()
1305                        .and_then(|bytes| String::from_utf8(bytes).ok())
1306                });
1307            
1308            // State field - try String first (SQLite), then bytes (MySQL)
1309            let state: String = row.try_get::<String, _>("state").ok()
1310                .or_else(|| {
1311                    row.try_get::<Vec<u8>, _>("state").ok()
1312                        .and_then(|bytes| String::from_utf8(bytes).ok())
1313                })
1314                .unwrap_or_else(|| "default".to_string());
1315            
1316            // Access metadata (local-only, not replicated)
1317            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1318            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1319            
1320            let (content, content_type) = if let Some(ref json_str) = payload_json {
1321                (json_str.as_bytes().to_vec(), ContentType::Json)
1322            } else if let Some(blob) = payload_blob {
1323                (blob, ContentType::Binary)
1324            } else {
1325                continue;
1326            };
1327            
1328            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1329            
1330            let item = SyncItem::reconstruct(
1331                id,
1332                version as u64,
1333                timestamp,
1334                content_type,
1335                content,
1336                batch_id,
1337                trace_parent,
1338                payload_hash.unwrap_or_default(),
1339                home_instance_id,
1340                state,
1341                access_count as u64,
1342                last_accessed as u64,
1343            );
1344            items.push(item);
1345        }
1346        
1347        Ok(items)
1348    }
1349    
1350    /// Count items in a given state.
1351    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
1352        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE state = ?")
1353            .bind(state)
1354            .fetch_one(&self.pool)
1355            .await
1356            .map_err(|e| StorageError::Backend(e.to_string()))?;
1357        
1358        let count: i64 = result.try_get("cnt")
1359            .map_err(|e| StorageError::Backend(e.to_string()))?;
1360        
1361        Ok(count as u64)
1362    }
1363    
1364    /// Get just the IDs of items in a given state (lightweight query).
1365    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
1366        let rows = sqlx::query("SELECT id FROM sync_items WHERE state = ? LIMIT ?")
1367            .bind(state)
1368            .bind(limit as i64)
1369            .fetch_all(&self.pool)
1370            .await
1371            .map_err(|e| StorageError::Backend(format!("Failed to list state IDs: {}", e)))?;
1372        
1373        let mut ids = Vec::with_capacity(rows.len());
1374        for row in rows {
1375            let id: String = row.try_get("id")
1376                .map_err(|e| StorageError::Backend(e.to_string()))?;
1377            ids.push(id);
1378        }
1379        
1380        Ok(ids)
1381    }
1382    
1383    /// Update the state of an item by ID.
1384    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
1385        let result = sqlx::query("UPDATE sync_items SET state = ? WHERE id = ?")
1386            .bind(new_state)
1387            .bind(id)
1388            .execute(&self.pool)
1389            .await
1390            .map_err(|e| StorageError::Backend(e.to_string()))?;
1391        
1392        Ok(result.rows_affected() > 0)
1393    }
1394    
1395    /// Delete all items in a given state.
1396    ///
1397    /// Returns the number of deleted items.
1398    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
1399        let result = sqlx::query("DELETE FROM sync_items WHERE state = ?")
1400            .bind(state)
1401            .execute(&self.pool)
1402            .await
1403            .map_err(|e| StorageError::Backend(e.to_string()))?;
1404        
1405        Ok(result.rows_affected())
1406    }
1407    
1408    /// Scan items by ID prefix.
1409    ///
1410    /// Efficiently retrieves all items whose ID starts with the given prefix.
1411    /// Uses SQL `LIKE 'prefix%'` which leverages the primary key index.
1412    ///
1413    /// # Example
1414    /// ```rust,ignore
1415    /// // Get all deltas for object user.123
1416    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
1417    /// ```
1418    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
1419        // Build LIKE pattern: "prefix%"
1420        let pattern = format!("{}%", prefix);
1421        
1422        let rows = sqlx::query(
1423            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit, state, access_count, last_accessed 
1424             FROM sync_items WHERE id LIKE ? ORDER BY id LIMIT ?"
1425        )
1426            .bind(&pattern)
1427            .bind(limit as i64)
1428            .fetch_all(&self.pool)
1429            .await
1430            .map_err(|e| StorageError::Backend(format!("Failed to scan by prefix: {}", e)))?;
1431        
1432        let mut items = Vec::with_capacity(rows.len());
1433        for row in rows {
1434            let id: String = row.try_get("id")
1435                .map_err(|e| StorageError::Backend(e.to_string()))?;
1436            let version: i64 = row.try_get("version").unwrap_or(1);
1437            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
1438            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
1439            
1440            // Try reading payload as String first (SQLite TEXT), then as bytes (MySQL LONGTEXT)
1441            let payload_json: Option<String> = row.try_get::<String, _>("payload").ok()
1442                .or_else(|| {
1443                    row.try_get::<Vec<u8>, _>("payload").ok()
1444                        .and_then(|bytes| String::from_utf8(bytes).ok())
1445                });
1446            
1447            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
1448            
1449            // Try reading audit as String first (SQLite), then bytes (MySQL)
1450            let audit_json: Option<String> = row.try_get::<String, _>("audit").ok()
1451                .or_else(|| {
1452                    row.try_get::<Vec<u8>, _>("audit").ok()
1453                        .and_then(|bytes| String::from_utf8(bytes).ok())
1454                });
1455            
1456            // State field - try String first (SQLite), then bytes (MySQL)
1457            let state: String = row.try_get::<String, _>("state").ok()
1458                .or_else(|| {
1459                    row.try_get::<Vec<u8>, _>("state").ok()
1460                        .and_then(|bytes| String::from_utf8(bytes).ok())
1461                })
1462                .unwrap_or_else(|| "default".to_string());
1463            
1464            // Access metadata (local-only, not replicated)
1465            let access_count: i64 = row.try_get("access_count").unwrap_or(0);
1466            let last_accessed: i64 = row.try_get("last_accessed").unwrap_or(0);
1467            
1468            let (content, content_type) = if let Some(ref json_str) = payload_json {
1469                (json_str.as_bytes().to_vec(), ContentType::Json)
1470            } else if let Some(blob) = payload_blob {
1471                (blob, ContentType::Binary)
1472            } else {
1473                continue;
1474            };
1475            
1476            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
1477            
1478            let item = SyncItem::reconstruct(
1479                id,
1480                version as u64,
1481                timestamp,
1482                content_type,
1483                content,
1484                batch_id,
1485                trace_parent,
1486                payload_hash.unwrap_or_default(),
1487                home_instance_id,
1488                state,
1489                access_count as u64,
1490                last_accessed as u64,
1491            );
1492            items.push(item);
1493        }
1494        
1495        Ok(items)
1496    }
1497    
1498    /// Count items matching an ID prefix.
1499    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1500        let pattern = format!("{}%", prefix);
1501        
1502        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE id LIKE ?")
1503            .bind(&pattern)
1504            .fetch_one(&self.pool)
1505            .await
1506            .map_err(|e| StorageError::Backend(e.to_string()))?;
1507        
1508        let count: i64 = result.try_get("cnt")
1509            .map_err(|e| StorageError::Backend(e.to_string()))?;
1510        
1511        Ok(count as u64)
1512    }
1513    
1514    /// Delete all items matching an ID prefix.
1515    ///
1516    /// Returns the number of deleted items.
1517    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
1518        let pattern = format!("{}%", prefix);
1519        
1520        let result = sqlx::query("DELETE FROM sync_items WHERE id LIKE ?")
1521            .bind(&pattern)
1522            .execute(&self.pool)
1523            .await
1524            .map_err(|e| StorageError::Backend(e.to_string()))?;
1525        
1526        Ok(result.rows_affected())
1527    }
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use std::path::PathBuf;
1534    use serde_json::json;
1535    
1536    fn temp_db_path(name: &str) -> PathBuf {
1537        // Use local temp/ folder (gitignored) instead of system temp
1538        PathBuf::from("temp").join(format!("sql_test_{}.db", name))
1539    }
1540    
1541    /// Clean up SQLite database and its WAL files
1542    fn cleanup_db(path: &PathBuf) {
1543        let _ = std::fs::remove_file(path);
1544        let _ = std::fs::remove_file(format!("{}-wal", path.display()));
1545        let _ = std::fs::remove_file(format!("{}-shm", path.display()));
1546    }
1547    
1548    fn test_item(id: &str, state: &str) -> SyncItem {
1549        SyncItem::from_json(id.to_string(), json!({"id": id}))
1550            .with_state(state)
1551    }
1552
1553    #[tokio::test]
1554    async fn test_state_stored_and_retrieved() {
1555        let db_path = temp_db_path("stored");
1556        cleanup_db(&db_path);
1557        
1558        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1559        let store = SqlStore::new(&url).await.unwrap();
1560        
1561        // Store item with custom state
1562        let item = test_item("item1", "delta");
1563        store.put(&item).await.unwrap();
1564        
1565        // Retrieve and verify state
1566        let retrieved = store.get("item1").await.unwrap().unwrap();
1567        assert_eq!(retrieved.state, "delta");
1568        
1569        cleanup_db(&db_path);
1570    }
1571
1572    #[tokio::test]
1573    async fn test_state_default_value() {
1574        let db_path = temp_db_path("default");
1575        cleanup_db(&db_path);
1576        
1577        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1578        let store = SqlStore::new(&url).await.unwrap();
1579        
1580        // Store item with default state
1581        let item = SyncItem::from_json("item1".into(), json!({"test": true}));
1582        store.put(&item).await.unwrap();
1583        
1584        let retrieved = store.get("item1").await.unwrap().unwrap();
1585        assert_eq!(retrieved.state, "default");
1586        
1587        cleanup_db(&db_path);
1588    }
1589
1590    #[tokio::test]
1591    async fn test_get_by_state() {
1592        let db_path = temp_db_path("get_by_state");
1593        cleanup_db(&db_path);
1594        
1595        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1596        let store = SqlStore::new(&url).await.unwrap();
1597        
1598        // Insert items in different states
1599        store.put(&test_item("delta1", "delta")).await.unwrap();
1600        store.put(&test_item("delta2", "delta")).await.unwrap();
1601        store.put(&test_item("base1", "base")).await.unwrap();
1602        store.put(&test_item("pending1", "pending")).await.unwrap();
1603        
1604        // Query by state
1605        let deltas = store.get_by_state("delta", 100).await.unwrap();
1606        assert_eq!(deltas.len(), 2);
1607        assert!(deltas.iter().all(|i| i.state == "delta"));
1608        
1609        let bases = store.get_by_state("base", 100).await.unwrap();
1610        assert_eq!(bases.len(), 1);
1611        assert_eq!(bases[0].object_id, "base1");
1612        
1613        // Empty result for non-existent state
1614        let none = store.get_by_state("nonexistent", 100).await.unwrap();
1615        assert!(none.is_empty());
1616        
1617        cleanup_db(&db_path);
1618    }
1619
1620    #[tokio::test]
1621    async fn test_get_by_state_with_limit() {
1622        let db_path = temp_db_path("get_by_state_limit");
1623        cleanup_db(&db_path);
1624        
1625        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1626        let store = SqlStore::new(&url).await.unwrap();
1627        
1628        // Insert 10 items
1629        for i in 0..10 {
1630            store.put(&test_item(&format!("item{}", i), "batch")).await.unwrap();
1631        }
1632        
1633        // Query with limit
1634        let limited = store.get_by_state("batch", 5).await.unwrap();
1635        assert_eq!(limited.len(), 5);
1636        
1637        cleanup_db(&db_path);
1638    }
1639
1640    #[tokio::test]
1641    async fn test_count_by_state() {
1642        let db_path = temp_db_path("count_by_state");
1643        cleanup_db(&db_path);
1644        
1645        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1646        let store = SqlStore::new(&url).await.unwrap();
1647        
1648        // Insert items
1649        store.put(&test_item("a1", "alpha")).await.unwrap();
1650        store.put(&test_item("a2", "alpha")).await.unwrap();
1651        store.put(&test_item("a3", "alpha")).await.unwrap();
1652        store.put(&test_item("b1", "beta")).await.unwrap();
1653        
1654        assert_eq!(store.count_by_state("alpha").await.unwrap(), 3);
1655        assert_eq!(store.count_by_state("beta").await.unwrap(), 1);
1656        assert_eq!(store.count_by_state("gamma").await.unwrap(), 0);
1657        
1658        cleanup_db(&db_path);
1659    }
1660
1661    #[tokio::test]
1662    async fn test_list_state_ids() {
1663        let db_path = temp_db_path("list_state_ids");
1664        cleanup_db(&db_path);
1665        
1666        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1667        let store = SqlStore::new(&url).await.unwrap();
1668        
1669        store.put(&test_item("id1", "pending")).await.unwrap();
1670        store.put(&test_item("id2", "pending")).await.unwrap();
1671        store.put(&test_item("id3", "done")).await.unwrap();
1672        
1673        let pending_ids = store.list_state_ids("pending", 100).await.unwrap();
1674        assert_eq!(pending_ids.len(), 2);
1675        assert!(pending_ids.contains(&"id1".to_string()));
1676        assert!(pending_ids.contains(&"id2".to_string()));
1677        
1678        cleanup_db(&db_path);
1679    }
1680
1681    #[tokio::test]
1682    async fn test_set_state() {
1683        let db_path = temp_db_path("set_state");
1684        cleanup_db(&db_path);
1685        
1686        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1687        let store = SqlStore::new(&url).await.unwrap();
1688        
1689        store.put(&test_item("item1", "pending")).await.unwrap();
1690        
1691        // Verify initial state
1692        let before = store.get("item1").await.unwrap().unwrap();
1693        assert_eq!(before.state, "pending");
1694        
1695        // Update state
1696        let updated = store.set_state("item1", "approved").await.unwrap();
1697        assert!(updated);
1698        
1699        // Verify new state
1700        let after = store.get("item1").await.unwrap().unwrap();
1701        assert_eq!(after.state, "approved");
1702        
1703        // Non-existent item returns false
1704        let not_found = store.set_state("nonexistent", "x").await.unwrap();
1705        assert!(!not_found);
1706        
1707        cleanup_db(&db_path);
1708    }
1709
1710    #[tokio::test]
1711    async fn test_delete_by_state() {
1712        let db_path = temp_db_path("delete_by_state");
1713        cleanup_db(&db_path);
1714        
1715        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1716        let store = SqlStore::new(&url).await.unwrap();
1717        
1718        store.put(&test_item("keep1", "keep")).await.unwrap();
1719        store.put(&test_item("keep2", "keep")).await.unwrap();
1720        store.put(&test_item("del1", "delete_me")).await.unwrap();
1721        store.put(&test_item("del2", "delete_me")).await.unwrap();
1722        store.put(&test_item("del3", "delete_me")).await.unwrap();
1723        
1724        // Delete by state
1725        let deleted = store.delete_by_state("delete_me").await.unwrap();
1726        assert_eq!(deleted, 3);
1727        
1728        // Verify deleted
1729        assert!(store.get("del1").await.unwrap().is_none());
1730        assert!(store.get("del2").await.unwrap().is_none());
1731        
1732        // Verify others remain
1733        assert!(store.get("keep1").await.unwrap().is_some());
1734        assert!(store.get("keep2").await.unwrap().is_some());
1735        
1736        // Delete non-existent state returns 0
1737        let zero = store.delete_by_state("nonexistent").await.unwrap();
1738        assert_eq!(zero, 0);
1739        
1740        cleanup_db(&db_path);
1741    }
1742
1743    #[tokio::test]
1744    async fn test_multiple_puts_preserve_state() {
1745        let db_path = temp_db_path("multi_put_state");
1746        cleanup_db(&db_path);
1747        
1748        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1749        let store = SqlStore::new(&url).await.unwrap();
1750        
1751        // Put multiple items with different states
1752        store.put(&test_item("a", "state_a")).await.unwrap();
1753        store.put(&test_item("b", "state_b")).await.unwrap();
1754        store.put(&test_item("c", "state_c")).await.unwrap();
1755        
1756        assert_eq!(store.get("a").await.unwrap().unwrap().state, "state_a");
1757        assert_eq!(store.get("b").await.unwrap().unwrap().state, "state_b");
1758        assert_eq!(store.get("c").await.unwrap().unwrap().state, "state_c");
1759        
1760        cleanup_db(&db_path);
1761    }
1762
1763    #[tokio::test]
1764    async fn test_scan_prefix() {
1765        let db_path = temp_db_path("scan_prefix");
1766        cleanup_db(&db_path);
1767        
1768        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1769        let store = SqlStore::new(&url).await.unwrap();
1770        
1771        // Insert items with different prefixes (CRDT pattern)
1772        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1773        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1774        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1775        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1776        store.put(&test_item("base:user.123", "base")).await.unwrap();
1777        store.put(&test_item("base:user.456", "base")).await.unwrap();
1778        
1779        // Scan specific object's deltas
1780        let user123_deltas = store.scan_prefix("delta:user.123:", 100).await.unwrap();
1781        assert_eq!(user123_deltas.len(), 3);
1782        assert!(user123_deltas.iter().all(|i| i.object_id.starts_with("delta:user.123:")));
1783        
1784        // Scan different object
1785        let user456_deltas = store.scan_prefix("delta:user.456:", 100).await.unwrap();
1786        assert_eq!(user456_deltas.len(), 1);
1787        
1788        // Scan all deltas
1789        let all_deltas = store.scan_prefix("delta:", 100).await.unwrap();
1790        assert_eq!(all_deltas.len(), 4);
1791        
1792        // Scan all bases
1793        let bases = store.scan_prefix("base:", 100).await.unwrap();
1794        assert_eq!(bases.len(), 2);
1795        
1796        // Empty result for non-matching prefix
1797        let none = store.scan_prefix("nonexistent:", 100).await.unwrap();
1798        assert!(none.is_empty());
1799        
1800        cleanup_db(&db_path);
1801    }
1802
1803    #[tokio::test]
1804    async fn test_scan_prefix_with_limit() {
1805        let db_path = temp_db_path("scan_prefix_limit");
1806        cleanup_db(&db_path);
1807        
1808        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1809        let store = SqlStore::new(&url).await.unwrap();
1810        
1811        // Insert 20 items
1812        for i in 0..20 {
1813            store.put(&test_item(&format!("delta:obj:op{:03}", i), "delta")).await.unwrap();
1814        }
1815        
1816        // Query with limit
1817        let limited = store.scan_prefix("delta:obj:", 5).await.unwrap();
1818        assert_eq!(limited.len(), 5);
1819        
1820        // Verify we can get all with larger limit
1821        let all = store.scan_prefix("delta:obj:", 100).await.unwrap();
1822        assert_eq!(all.len(), 20);
1823        
1824        cleanup_db(&db_path);
1825    }
1826
1827    #[tokio::test]
1828    async fn test_count_prefix() {
1829        let db_path = temp_db_path("count_prefix");
1830        cleanup_db(&db_path);
1831        
1832        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1833        let store = SqlStore::new(&url).await.unwrap();
1834        
1835        // Insert items with different prefixes
1836        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1837        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1838        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1839        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1840        store.put(&test_item("base:user.123", "base")).await.unwrap();
1841        
1842        // Count by prefix
1843        assert_eq!(store.count_prefix("delta:user.123:").await.unwrap(), 3);
1844        assert_eq!(store.count_prefix("delta:user.456:").await.unwrap(), 1);
1845        assert_eq!(store.count_prefix("delta:").await.unwrap(), 4);
1846        assert_eq!(store.count_prefix("base:").await.unwrap(), 1);
1847        assert_eq!(store.count_prefix("nonexistent:").await.unwrap(), 0);
1848        
1849        cleanup_db(&db_path);
1850    }
1851
1852    #[tokio::test]
1853    async fn test_delete_prefix() {
1854        let db_path = temp_db_path("delete_prefix");
1855        cleanup_db(&db_path);
1856        
1857        let url = format!("sqlite://{}?mode=rwc", db_path.display());
1858        let store = SqlStore::new(&url).await.unwrap();
1859        
1860        // Insert items with different prefixes
1861        store.put(&test_item("delta:user.123:op001", "delta")).await.unwrap();
1862        store.put(&test_item("delta:user.123:op002", "delta")).await.unwrap();
1863        store.put(&test_item("delta:user.123:op003", "delta")).await.unwrap();
1864        store.put(&test_item("delta:user.456:op001", "delta")).await.unwrap();
1865        store.put(&test_item("base:user.123", "base")).await.unwrap();
1866        
1867        // Delete one object's deltas
1868        let deleted = store.delete_prefix("delta:user.123:").await.unwrap();
1869        assert_eq!(deleted, 3);
1870        
1871        // Verify deleted
1872        assert!(store.get("delta:user.123:op001").await.unwrap().is_none());
1873        assert!(store.get("delta:user.123:op002").await.unwrap().is_none());
1874        
1875        // Verify other deltas remain
1876        assert!(store.get("delta:user.456:op001").await.unwrap().is_some());
1877        
1878        // Verify bases remain
1879        assert!(store.get("base:user.123").await.unwrap().is_some());
1880        
1881        // Delete non-existent prefix returns 0
1882        let zero = store.delete_prefix("nonexistent:").await.unwrap();
1883        assert_eq!(zero, 0);
1884        
1885        cleanup_db(&db_path);
1886    }
1887}