sync_engine/storage/
sql.rs

1//! SQL storage backend for L3 archive.
2//!
3//! Content-type aware storage with proper columns for queryability:
4//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
5//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
6//!
7//! Schema mirrors Redis structure:
8//! ```sql
9//! CREATE TABLE sync_items (
10//!   id VARCHAR(255) PRIMARY KEY,
11//!   version BIGINT NOT NULL,
12//!   timestamp BIGINT NOT NULL,
13//!   payload_hash VARCHAR(64),
14//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
15//!   payload_blob MEDIUMBLOB, -- For binary content
16//!   audit TEXT               -- Operational metadata: {batch, trace, home}
17//! )
18//! ```
19//!
20//! ## sqlx Any Driver Quirks
21//! 
22//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
23//! 1. Doesn't support MySQL's JSON type mapping
24//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
25//!
26//! JSON functions still work on TEXT columns:
27//!
28//! ```sql
29//! -- Find users named Alice
30//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
31//! 
32//! -- Find all items from a batch
33//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
34//! ```
35
36use async_trait::async_trait;
37use sqlx::{AnyPool, Row, any::AnyPoolOptions};
38use crate::sync_item::{SyncItem, ContentType};
39use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
40use crate::resilience::retry::{retry, RetryConfig};
41use std::sync::Once;
42use std::time::Duration;
43
44// SQLx `Any` driver requires runtime installation
45static INSTALL_DRIVERS: Once = Once::new();
46
47fn install_drivers() {
48    INSTALL_DRIVERS.call_once(|| {
49        sqlx::any::install_default_drivers();
50    });
51}
52
53pub struct SqlStore {
54    pool: AnyPool,
55    is_sqlite: bool,
56}
57
58impl SqlStore {
59    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
60    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
61        install_drivers();
62        
63        let is_sqlite = connection_string.starts_with("sqlite:");
64        
65        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
66            AnyPoolOptions::new()
67                .max_connections(20)
68                .acquire_timeout(Duration::from_secs(10))
69                .idle_timeout(Duration::from_secs(300))
70                .connect(connection_string)
71                .await
72                .map_err(|e| StorageError::Backend(e.to_string()))
73        })
74        .await?;
75
76        let store = Self { pool, is_sqlite };
77        store.init_schema().await?;
78        Ok(store)
79    }
80    
81    /// Get a clone of the connection pool for sharing with other stores.
82    pub fn pool(&self) -> AnyPool {
83        self.pool.clone()
84    }
85
86    async fn init_schema(&self) -> Result<(), StorageError> {
87        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
88        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
89        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
90        //
91        // merkle_dirty: Set to 1 on write, background task sets to 0 after merkle recalc.
92        // This enables efficient multi-instance coordination without locking.
93        let sql = if self.is_sqlite {
94            r#"
95            CREATE TABLE IF NOT EXISTS sync_items (
96                id TEXT PRIMARY KEY,
97                version INTEGER NOT NULL DEFAULT 1,
98                timestamp INTEGER NOT NULL,
99                payload_hash TEXT,
100                payload TEXT,
101                payload_blob BLOB,
102                audit TEXT,
103                merkle_dirty INTEGER NOT NULL DEFAULT 1
104            )
105            "#
106        } else {
107            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
108            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
109            r#"
110            CREATE TABLE IF NOT EXISTS sync_items (
111                id VARCHAR(255) PRIMARY KEY,
112                version BIGINT NOT NULL DEFAULT 1,
113                timestamp BIGINT NOT NULL,
114                payload_hash VARCHAR(64),
115                payload LONGTEXT,
116                payload_blob MEDIUMBLOB,
117                audit TEXT,
118                merkle_dirty TINYINT NOT NULL DEFAULT 1,
119                INDEX idx_timestamp (timestamp),
120                INDEX idx_merkle_dirty (merkle_dirty)
121            )
122            "#
123        };
124
125        retry("sql_init_schema", &RetryConfig::startup(), || async {
126            sqlx::query(sql)
127                .execute(&self.pool)
128                .await
129                .map_err(|e| StorageError::Backend(e.to_string()))
130        })
131        .await?;
132
133        Ok(())
134    }
135    
136    /// Build the audit JSON object for operational metadata.
137    fn build_audit_json(item: &SyncItem) -> Option<String> {
138        let mut audit = serde_json::Map::new();
139        
140        if let Some(ref batch_id) = item.batch_id {
141            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
142        }
143        if let Some(ref trace_parent) = item.trace_parent {
144            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
145        }
146        if let Some(ref home) = item.home_instance_id {
147            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
148        }
149        
150        if audit.is_empty() {
151            None
152        } else {
153            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
154        }
155    }
156    
157    /// Parse audit JSON back into SyncItem fields.
158    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
159        match audit_str {
160            Some(s) => {
161                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
162                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
163                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
164                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
165                    (batch_id, trace_parent, home_instance_id)
166                } else {
167                    (None, None, None)
168                }
169            }
170            None => (None, None, None),
171        }
172    }
173}
174
175#[async_trait]
176impl ArchiveStore for SqlStore {
177    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
178        let id = id.to_string();
179        
180        retry("sql_get", &RetryConfig::query(), || async {
181            let result = sqlx::query(
182                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items WHERE id = ?"
183            )
184                .bind(&id)
185                .fetch_optional(&self.pool)
186                .await
187                .map_err(|e| StorageError::Backend(e.to_string()))?;
188
189            match result {
190                Some(row) => {
191                    let version: i64 = row.try_get("version").unwrap_or(1);
192                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
193                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
194                    
195                    // sqlx Any driver treats MySQL LONGTEXT as BLOB, so we read as bytes
196                    // and convert to String for JSON payload
197                    let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
198                    let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
199                        String::from_utf8(bytes).ok()
200                    });
201                    
202                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
203                    let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
204                    let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
205                        String::from_utf8(bytes).ok()
206                    });
207                    
208                    // Determine content and content_type
209                    let (content, content_type) = if let Some(ref json_str) = payload_json {
210                        // JSON content - parse and re-serialize to bytes
211                        let content = json_str.as_bytes().to_vec();
212                        (content, ContentType::Json)
213                    } else if let Some(blob) = payload_blob {
214                        // Binary content
215                        (blob, ContentType::Binary)
216                    } else {
217                        return Err(StorageError::Backend("No payload in row".to_string()));
218                    };
219                    
220                    // Parse audit fields
221                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
222                    
223                    let item = SyncItem::reconstruct(
224                        id.clone(),
225                        version as u64,
226                        timestamp,
227                        content_type,
228                        content,
229                        batch_id,
230                        trace_parent,
231                        payload_hash.unwrap_or_default(),
232                        home_instance_id,
233                    );
234                    Ok(Some(item))
235                }
236                None => Ok(None),
237            }
238        })
239        .await
240    }
241
242    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
243        let id = item.object_id.clone();
244        let version = item.version as i64;
245        let timestamp = item.updated_at;
246        let payload_hash = if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) };
247        let audit_json = Self::build_audit_json(item);
248        
249        // Determine payload storage based on content type
250        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
251            ContentType::Json => {
252                let json_str = String::from_utf8_lossy(&item.content).to_string();
253                (Some(json_str), None)
254            }
255            ContentType::Binary => {
256                (None, Some(item.content.clone()))
257            }
258        };
259
260        let sql = if self.is_sqlite {
261            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) 
262             VALUES (?, ?, ?, ?, ?, ?, ?, 1) 
263             ON CONFLICT(id) DO UPDATE SET 
264                version = excluded.version, 
265                timestamp = excluded.timestamp, 
266                payload_hash = excluded.payload_hash, 
267                payload = excluded.payload, 
268                payload_blob = excluded.payload_blob, 
269                audit = excluded.audit, 
270                merkle_dirty = 1"
271        } else {
272            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) 
273             VALUES (?, ?, ?, ?, ?, ?, ?, 1) 
274             ON DUPLICATE KEY UPDATE 
275                version = VALUES(version), 
276                timestamp = VALUES(timestamp), 
277                payload_hash = VALUES(payload_hash), 
278                payload = VALUES(payload), 
279                payload_blob = VALUES(payload_blob), 
280                audit = VALUES(audit), 
281                merkle_dirty = 1"
282        };
283
284        retry("sql_put", &RetryConfig::query(), || async {
285            sqlx::query(sql)
286                .bind(&id)
287                .bind(version)
288                .bind(timestamp)
289                .bind(&payload_hash)
290                .bind(&payload_json)
291                .bind(&payload_blob)
292                .bind(&audit_json)
293                .execute(&self.pool)
294                .await
295                .map_err(|e| StorageError::Backend(e.to_string()))?;
296            Ok(())
297        })
298        .await
299    }
300
301    async fn delete(&self, id: &str) -> Result<(), StorageError> {
302        let id = id.to_string();
303        retry("sql_delete", &RetryConfig::query(), || async {
304            sqlx::query("DELETE FROM sync_items WHERE id = ?")
305                .bind(&id)
306                .execute(&self.pool)
307                .await
308                .map_err(|e| StorageError::Backend(e.to_string()))?;
309            Ok(())
310        })
311        .await
312    }
313
314    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
315        let id = id.to_string();
316        retry("sql_exists", &RetryConfig::query(), || async {
317            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
318                .bind(&id)
319                .fetch_optional(&self.pool)
320                .await
321                .map_err(|e| StorageError::Backend(e.to_string()))?;
322            Ok(result.is_some())
323        })
324        .await
325    }
326    
327    /// Write a batch of items in a single multi-row INSERT with verification.
328    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
329        if items.is_empty() {
330            return Ok(BatchWriteResult {
331                batch_id: String::new(),
332                written: 0,
333                verified: true,
334            });
335        }
336
337        // Generate a unique batch ID
338        let batch_id = uuid::Uuid::new_v4().to_string();
339        
340        // Stamp all items with the batch_id
341        for item in items.iter_mut() {
342            item.batch_id = Some(batch_id.clone());
343        }
344
345        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
346        const CHUNK_SIZE: usize = 500;
347        let mut total_written = 0usize;
348
349        for chunk in items.chunks(CHUNK_SIZE) {
350            let written = self.put_batch_chunk(chunk, &batch_id).await?;
351            total_written += written;
352        }
353
354        // Verify the batch was written
355        let verified_count = self.verify_batch(&batch_id).await?;
356        let verified = verified_count == items.len();
357
358        if !verified {
359            tracing::warn!(
360                batch_id = %batch_id,
361                expected = items.len(),
362                actual = verified_count,
363                "Batch verification mismatch"
364            );
365        }
366
367        Ok(BatchWriteResult {
368            batch_id,
369            written: total_written,
370            verified,
371        })
372    }
373
374    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
375        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
376            .bind(limit as i64)
377            .bind(offset as i64)
378            .fetch_all(&self.pool)
379            .await
380            .map_err(|e| StorageError::Backend(e.to_string()))?;
381        
382        let mut keys = Vec::with_capacity(rows.len());
383        for row in rows {
384            let id: String = row.try_get("id")
385                .map_err(|e| StorageError::Backend(e.to_string()))?;
386            keys.push(id);
387        }
388        
389        Ok(keys)
390    }
391
392    async fn count_all(&self) -> Result<u64, StorageError> {
393        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
394            .fetch_one(&self.pool)
395            .await
396            .map_err(|e| StorageError::Backend(e.to_string()))?;
397        
398        let count: i64 = result.try_get("cnt")
399            .map_err(|e| StorageError::Backend(e.to_string()))?;
400        
401        Ok(count as u64)
402    }
403}
404
405impl SqlStore {
406    /// Write a single chunk of items with content-type aware storage.
407    /// The batch_id is already embedded in each item's audit JSON.
408    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
409        let placeholders: Vec<String> = (0..chunk.len())
410            .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1)".to_string())
411            .collect();
412        
413        let sql = if self.is_sqlite {
414            format!(
415                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) VALUES {} \
416                 ON CONFLICT(id) DO UPDATE SET \
417                    version = excluded.version, \
418                    timestamp = excluded.timestamp, \
419                    payload_hash = excluded.payload_hash, \
420                    payload = excluded.payload, \
421                    payload_blob = excluded.payload_blob, \
422                    audit = excluded.audit, \
423                    merkle_dirty = 1",
424                placeholders.join(", ")
425            )
426        } else {
427            format!(
428                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) VALUES {} \
429                 ON DUPLICATE KEY UPDATE \
430                    version = VALUES(version), \
431                    timestamp = VALUES(timestamp), \
432                    payload_hash = VALUES(payload_hash), \
433                    payload = VALUES(payload), \
434                    payload_blob = VALUES(payload_blob), \
435                    audit = VALUES(audit), \
436                    merkle_dirty = 1",
437                placeholders.join(", ")
438            )
439        };
440
441        // Prepare all items with their fields
442        #[derive(Clone)]
443        struct PreparedRow {
444            id: String,
445            version: i64,
446            timestamp: i64,
447            payload_hash: Option<String>,
448            payload_json: Option<String>,
449            payload_blob: Option<Vec<u8>>,
450            audit_json: Option<String>,
451        }
452        
453        let prepared: Vec<PreparedRow> = chunk.iter()
454            .map(|item| {
455                let (payload_json, payload_blob) = match item.content_type {
456                    ContentType::Json => {
457                        let json_str = String::from_utf8_lossy(&item.content).to_string();
458                        (Some(json_str), None)
459                    }
460                    ContentType::Binary => {
461                        (None, Some(item.content.clone()))
462                    }
463                };
464                
465                PreparedRow {
466                    id: item.object_id.clone(),
467                    version: item.version as i64,
468                    timestamp: item.updated_at,
469                    payload_hash: if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) },
470                    payload_json,
471                    payload_blob,
472                    audit_json: Self::build_audit_json(item),
473                }
474            })
475            .collect();
476
477        retry("sql_put_batch", &RetryConfig::query(), || {
478            let sql = sql.clone();
479            let prepared = prepared.clone();
480            async move {
481                let mut query = sqlx::query(&sql);
482                
483                for row in &prepared {
484                    query = query
485                        .bind(&row.id)
486                        .bind(row.version)
487                        .bind(row.timestamp)
488                        .bind(&row.payload_hash)
489                        .bind(&row.payload_json)
490                        .bind(&row.payload_blob)
491                        .bind(&row.audit_json);
492                }
493                
494                query.execute(&self.pool)
495                    .await
496                    .map_err(|e| StorageError::Backend(e.to_string()))?;
497                
498                Ok(())
499            }
500        })
501        .await?;
502
503        Ok(chunk.len())
504    }
505
506    /// Verify a batch was written by counting items with the given batch_id (in audit JSON).
507    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
508        let batch_id = batch_id.to_string();
509        
510        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
511        let sql = if self.is_sqlite {
512            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
513        } else {
514            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
515        };
516        
517        let bind_value = if self.is_sqlite {
518            format!("%\"batch\":\"{}%", batch_id)
519        } else {
520            batch_id.clone()
521        };
522        
523        let result = sqlx::query(sql)
524            .bind(&bind_value)
525            .fetch_one(&self.pool)
526            .await
527            .map_err(|e| StorageError::Backend(e.to_string()))?;
528        
529        let count: i64 = result.try_get("cnt")
530            .map_err(|e| StorageError::Backend(e.to_string()))?;
531        
532        Ok(count as usize)
533    }
534
535    /// Scan a batch of items (for WAL drain).
536    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
537        let rows = sqlx::query(
538            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items ORDER BY timestamp ASC LIMIT ?"
539        )
540            .bind(limit as i64)
541            .fetch_all(&self.pool)
542            .await
543            .map_err(|e| StorageError::Backend(e.to_string()))?;
544        
545        let mut items = Vec::with_capacity(rows.len());
546        for row in rows {
547            let id: String = row.try_get("id")
548                .map_err(|e| StorageError::Backend(e.to_string()))?;
549            let version: i64 = row.try_get("version").unwrap_or(1);
550            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
551            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
552            
553            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
554            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
555            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
556            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
557            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
558            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
559            
560            let (content, content_type) = if let Some(ref json_str) = payload_json {
561                (json_str.as_bytes().to_vec(), ContentType::Json)
562            } else if let Some(blob) = payload_blob {
563                (blob, ContentType::Binary)
564            } else {
565                continue; // Skip rows with no payload
566            };
567            
568            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
569            
570            let item = SyncItem::reconstruct(
571                id,
572                version as u64,
573                timestamp,
574                content_type,
575                content,
576                batch_id,
577                trace_parent,
578                payload_hash.unwrap_or_default(),
579                home_instance_id,
580            );
581            items.push(item);
582        }
583        
584        Ok(items)
585    }
586
587    /// Delete multiple items by ID in a single query.
588    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
589        if ids.is_empty() {
590            return Ok(0);
591        }
592
593        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
594        let sql = format!(
595            "DELETE FROM sync_items WHERE id IN ({})",
596            placeholders.join(", ")
597        );
598
599        retry("sql_delete_batch", &RetryConfig::query(), || {
600            let sql = sql.clone();
601            let ids = ids.to_vec();
602            async move {
603                let mut query = sqlx::query(&sql);
604                for id in &ids {
605                    query = query.bind(id);
606                }
607                
608                let result = query.execute(&self.pool)
609                    .await
610                    .map_err(|e| StorageError::Backend(e.to_string()))?;
611                
612                Ok(result.rows_affected() as usize)
613            }
614        })
615        .await
616    }
617    
618    // ═══════════════════════════════════════════════════════════════════════════
619    // Merkle Dirty Flag: For deferred merkle calculation in multi-instance setups
620    // ═══════════════════════════════════════════════════════════════════════════
621    
622    /// Get IDs of items with merkle_dirty = 1 (need merkle recalculation).
623    ///
624    /// Used by background merkle processor to batch recalculate affected trees.
625    pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
626        let rows = sqlx::query(
627            "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
628        )
629            .bind(limit as i64)
630            .fetch_all(&self.pool)
631            .await
632            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
633        
634        let mut ids = Vec::with_capacity(rows.len());
635        for row in rows {
636            let id: String = row.try_get("id")
637                .map_err(|e| StorageError::Backend(e.to_string()))?;
638            ids.push(id);
639        }
640        
641        Ok(ids)
642    }
643    
644    /// Count items with merkle_dirty = 1.
645    pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
646        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
647            .fetch_one(&self.pool)
648            .await
649            .map_err(|e| StorageError::Backend(e.to_string()))?;
650        
651        let count: i64 = result.try_get("cnt")
652            .map_err(|e| StorageError::Backend(e.to_string()))?;
653        
654        Ok(count as u64)
655    }
656    
657    /// Mark items as merkle-clean after recalculation.
658    pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
659        if ids.is_empty() {
660            return Ok(0);
661        }
662        
663        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
664        let sql = format!(
665            "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
666            placeholders.join(", ")
667        );
668        
669        let mut query = sqlx::query(&sql);
670        for id in ids {
671            query = query.bind(id);
672        }
673        
674        let result = query.execute(&self.pool)
675            .await
676            .map_err(|e| StorageError::Backend(e.to_string()))?;
677        
678        Ok(result.rows_affected() as usize)
679    }
680    
681    /// Check if there are any dirty merkle items.
682    pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
683        let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
684            .fetch_optional(&self.pool)
685            .await
686            .map_err(|e| StorageError::Backend(e.to_string()))?;
687        
688        Ok(result.is_some())
689    }
690    
691    /// Get full SyncItems with merkle_dirty = 1 (need merkle recalculation).
692    ///
693    /// Returns the items themselves so merkle can be calculated.
694    /// Use `mark_merkle_clean()` after processing to clear the flag.
695    pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
696        let rows = sqlx::query(
697            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit 
698             FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
699        )
700            .bind(limit as i64)
701            .fetch_all(&self.pool)
702            .await
703            .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
704        
705        let mut items = Vec::with_capacity(rows.len());
706        for row in rows {
707            let id: String = row.try_get("id")
708                .map_err(|e| StorageError::Backend(e.to_string()))?;
709            let version: i64 = row.try_get("version").unwrap_or(1);
710            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
711            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
712            
713            // Handle JSON payload (MySQL returns as bytes, SQLite as string)
714            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
715            let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
716                String::from_utf8(bytes).ok()
717            });
718            
719            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
720            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
721            let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
722                String::from_utf8(bytes).ok()
723            });
724            
725            // Determine content and content_type
726            let (content, content_type) = if let Some(ref json_str) = payload_json {
727                (json_str.as_bytes().to_vec(), ContentType::Json)
728            } else if let Some(blob) = payload_blob {
729                (blob, ContentType::Binary)
730            } else {
731                continue; // Skip items with no payload
732            };
733            
734            // Parse audit fields
735            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
736            
737            let item = SyncItem::reconstruct(
738                id,
739                version as u64,
740                timestamp,
741                content_type,
742                content,
743                batch_id,
744                trace_parent,
745                payload_hash.unwrap_or_default(),
746                home_instance_id,
747            );
748            items.push(item);
749        }
750        
751        Ok(items)
752    }
753}