sync_engine/storage/
sql.rs

1//! SQL storage backend for L3 archive.
2//!
3//! Content-type aware storage with proper columns for queryability:
4//! - **JSON content** → Stored in `payload` TEXT column (queryable via JSON_EXTRACT)
5//! - **Binary content** → Stored in `payload_blob` MEDIUMBLOB column
6//!
7//! Schema mirrors Redis structure:
8//! ```sql
9//! CREATE TABLE sync_items (
10//!   id VARCHAR(255) PRIMARY KEY,
11//!   version BIGINT NOT NULL,
12//!   timestamp BIGINT NOT NULL,
13//!   payload_hash VARCHAR(64),
14//!   payload LONGTEXT,        -- JSON as text (sqlx Any driver limitation)
15//!   payload_blob MEDIUMBLOB, -- For binary content
16//!   audit TEXT               -- Operational metadata: {batch, trace, home}
17//! )
18//! ```
19//!
20//! ## sqlx Any Driver Quirks
21//! 
22//! We use TEXT instead of native JSON type because sqlx's `Any` driver:
23//! 1. Doesn't support MySQL's JSON type mapping
24//! 2. Treats LONGTEXT/TEXT as BLOB (requires reading as `Vec<u8>` then converting)
25//!
26//! JSON functions still work on TEXT columns:
27//!
28//! ```sql
29//! -- Find users named Alice
30//! SELECT * FROM sync_items WHERE JSON_EXTRACT(payload, '$.name') = 'Alice';
31//! 
32//! -- Find all items from a batch
33//! SELECT * FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = 'abc-123';
34//! ```
35
36use async_trait::async_trait;
37use sqlx::{AnyPool, Row, any::AnyPoolOptions};
38use crate::sync_item::{SyncItem, ContentType};
39use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
40use crate::resilience::retry::{retry, RetryConfig};
41use std::sync::Once;
42use std::time::Duration;
43
44// SQLx `Any` driver requires runtime installation
45static INSTALL_DRIVERS: Once = Once::new();
46
47fn install_drivers() {
48    INSTALL_DRIVERS.call_once(|| {
49        sqlx::any::install_default_drivers();
50    });
51}
52
53pub struct SqlStore {
54    pool: AnyPool,
55    is_sqlite: bool,
56}
57
58impl SqlStore {
59    /// Create a new SQL store with startup-mode retry (fails fast if config is wrong).
60    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
61        install_drivers();
62        
63        let is_sqlite = connection_string.starts_with("sqlite:");
64        
65        let pool = retry("sql_connect", &RetryConfig::startup(), || async {
66            AnyPoolOptions::new()
67                .max_connections(20)
68                .acquire_timeout(Duration::from_secs(10))
69                .idle_timeout(Duration::from_secs(300))
70                .connect(connection_string)
71                .await
72                .map_err(|e| StorageError::Backend(e.to_string()))
73        })
74        .await?;
75
76        let store = Self { pool, is_sqlite };
77        store.init_schema().await?;
78        Ok(store)
79    }
80    
81    /// Get a clone of the connection pool for sharing with other stores.
82    pub fn pool(&self) -> AnyPool {
83        self.pool.clone()
84    }
85
86    async fn init_schema(&self) -> Result<(), StorageError> {
87        // Note: We use TEXT/LONGTEXT instead of native JSON type because sqlx's
88        // `Any` driver doesn't support MySQL's JSON type mapping. The data is still
89        // valid JSON and can be queried with JSON_EXTRACT() in MySQL.
90        let sql = if self.is_sqlite {
91            r#"
92            CREATE TABLE IF NOT EXISTS sync_items (
93                id TEXT PRIMARY KEY,
94                version INTEGER NOT NULL DEFAULT 1,
95                timestamp INTEGER NOT NULL,
96                payload_hash TEXT,
97                payload TEXT,
98                payload_blob BLOB,
99                audit TEXT
100            )
101            "#
102        } else {
103            // MySQL - use LONGTEXT for JSON (sqlx Any driver doesn't support native JSON)
104            // JSON functions like JSON_EXTRACT() still work on TEXT columns containing valid JSON
105            r#"
106            CREATE TABLE IF NOT EXISTS sync_items (
107                id VARCHAR(255) PRIMARY KEY,
108                version BIGINT NOT NULL DEFAULT 1,
109                timestamp BIGINT NOT NULL,
110                payload_hash VARCHAR(64),
111                payload LONGTEXT,
112                payload_blob MEDIUMBLOB,
113                audit TEXT,
114                INDEX idx_timestamp (timestamp)
115            )
116            "#
117        };
118
119        retry("sql_init_schema", &RetryConfig::startup(), || async {
120            sqlx::query(sql)
121                .execute(&self.pool)
122                .await
123                .map_err(|e| StorageError::Backend(e.to_string()))
124        })
125        .await?;
126
127        Ok(())
128    }
129    
130    /// Build the audit JSON object for operational metadata.
131    fn build_audit_json(item: &SyncItem) -> Option<String> {
132        let mut audit = serde_json::Map::new();
133        
134        if let Some(ref batch_id) = item.batch_id {
135            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
136        }
137        if let Some(ref trace_parent) = item.trace_parent {
138            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
139        }
140        if let Some(ref home) = item.home_instance_id {
141            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
142        }
143        
144        if audit.is_empty() {
145            None
146        } else {
147            serde_json::to_string(&serde_json::Value::Object(audit)).ok()
148        }
149    }
150    
151    /// Parse audit JSON back into SyncItem fields.
152    fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
153        match audit_str {
154            Some(s) => {
155                if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
156                    let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
157                    let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
158                    let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
159                    (batch_id, trace_parent, home_instance_id)
160                } else {
161                    (None, None, None)
162                }
163            }
164            None => (None, None, None),
165        }
166    }
167}
168
169#[async_trait]
170impl ArchiveStore for SqlStore {
171    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
172        let id = id.to_string();
173        
174        retry("sql_get", &RetryConfig::query(), || async {
175            let result = sqlx::query(
176                "SELECT version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items WHERE id = ?"
177            )
178                .bind(&id)
179                .fetch_optional(&self.pool)
180                .await
181                .map_err(|e| StorageError::Backend(e.to_string()))?;
182
183            match result {
184                Some(row) => {
185                    let version: i64 = row.try_get("version").unwrap_or(1);
186                    let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
187                    let payload_hash: Option<String> = row.try_get("payload_hash").ok();
188                    
189                    // sqlx Any driver treats MySQL LONGTEXT as BLOB, so we read as bytes
190                    // and convert to String for JSON payload
191                    let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
192                    let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
193                        String::from_utf8(bytes).ok()
194                    });
195                    
196                    let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
197                    let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
198                    let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
199                        String::from_utf8(bytes).ok()
200                    });
201                    
202                    // Determine content and content_type
203                    let (content, content_type) = if let Some(ref json_str) = payload_json {
204                        // JSON content - parse and re-serialize to bytes
205                        let content = json_str.as_bytes().to_vec();
206                        (content, ContentType::Json)
207                    } else if let Some(blob) = payload_blob {
208                        // Binary content
209                        (blob, ContentType::Binary)
210                    } else {
211                        return Err(StorageError::Backend("No payload in row".to_string()));
212                    };
213                    
214                    // Parse audit fields
215                    let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
216                    
217                    let item = SyncItem::reconstruct(
218                        id.clone(),
219                        version as u64,
220                        timestamp,
221                        content_type,
222                        content,
223                        batch_id,
224                        trace_parent,
225                        payload_hash.unwrap_or_default(),
226                        home_instance_id,
227                    );
228                    Ok(Some(item))
229                }
230                None => Ok(None),
231            }
232        })
233        .await
234    }
235
236    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
237        let id = item.object_id.clone();
238        let version = item.version as i64;
239        let timestamp = item.updated_at;
240        let payload_hash = if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) };
241        let audit_json = Self::build_audit_json(item);
242        
243        // Determine payload storage based on content type
244        let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
245            ContentType::Json => {
246                let json_str = String::from_utf8_lossy(&item.content).to_string();
247                (Some(json_str), None)
248            }
249            ContentType::Binary => {
250                (None, Some(item.content.clone()))
251            }
252        };
253
254        let sql = if self.is_sqlite {
255            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) 
256             VALUES (?, ?, ?, ?, ?, ?, ?) 
257             ON CONFLICT(id) DO UPDATE SET 
258                version = excluded.version, 
259                timestamp = excluded.timestamp, 
260                payload_hash = excluded.payload_hash, 
261                payload = excluded.payload, 
262                payload_blob = excluded.payload_blob, 
263                audit = excluded.audit"
264        } else {
265            "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) 
266             VALUES (?, ?, ?, ?, ?, ?, ?) 
267             ON DUPLICATE KEY UPDATE 
268                version = VALUES(version), 
269                timestamp = VALUES(timestamp), 
270                payload_hash = VALUES(payload_hash), 
271                payload = VALUES(payload), 
272                payload_blob = VALUES(payload_blob), 
273                audit = VALUES(audit)"
274        };
275
276        retry("sql_put", &RetryConfig::query(), || async {
277            sqlx::query(sql)
278                .bind(&id)
279                .bind(version)
280                .bind(timestamp)
281                .bind(&payload_hash)
282                .bind(&payload_json)
283                .bind(&payload_blob)
284                .bind(&audit_json)
285                .execute(&self.pool)
286                .await
287                .map_err(|e| StorageError::Backend(e.to_string()))?;
288            Ok(())
289        })
290        .await
291    }
292
293    async fn delete(&self, id: &str) -> Result<(), StorageError> {
294        let id = id.to_string();
295        retry("sql_delete", &RetryConfig::query(), || async {
296            sqlx::query("DELETE FROM sync_items WHERE id = ?")
297                .bind(&id)
298                .execute(&self.pool)
299                .await
300                .map_err(|e| StorageError::Backend(e.to_string()))?;
301            Ok(())
302        })
303        .await
304    }
305
306    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
307        let id = id.to_string();
308        retry("sql_exists", &RetryConfig::query(), || async {
309            let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
310                .bind(&id)
311                .fetch_optional(&self.pool)
312                .await
313                .map_err(|e| StorageError::Backend(e.to_string()))?;
314            Ok(result.is_some())
315        })
316        .await
317    }
318    
319    /// Write a batch of items in a single multi-row INSERT with verification.
320    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
321        if items.is_empty() {
322            return Ok(BatchWriteResult {
323                batch_id: String::new(),
324                written: 0,
325                verified: true,
326            });
327        }
328
329        // Generate a unique batch ID
330        let batch_id = uuid::Uuid::new_v4().to_string();
331        
332        // Stamp all items with the batch_id
333        for item in items.iter_mut() {
334            item.batch_id = Some(batch_id.clone());
335        }
336
337        // MySQL max_allowed_packet is typically 16MB, so chunk into ~500 item batches
338        const CHUNK_SIZE: usize = 500;
339        let mut total_written = 0usize;
340
341        for chunk in items.chunks(CHUNK_SIZE) {
342            let written = self.put_batch_chunk(chunk, &batch_id).await?;
343            total_written += written;
344        }
345
346        // Verify the batch was written
347        let verified_count = self.verify_batch(&batch_id).await?;
348        let verified = verified_count == items.len();
349
350        if !verified {
351            tracing::warn!(
352                batch_id = %batch_id,
353                expected = items.len(),
354                actual = verified_count,
355                "Batch verification mismatch"
356            );
357        }
358
359        Ok(BatchWriteResult {
360            batch_id,
361            written: total_written,
362            verified,
363        })
364    }
365
366    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
367        let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
368            .bind(limit as i64)
369            .bind(offset as i64)
370            .fetch_all(&self.pool)
371            .await
372            .map_err(|e| StorageError::Backend(e.to_string()))?;
373        
374        let mut keys = Vec::with_capacity(rows.len());
375        for row in rows {
376            let id: String = row.try_get("id")
377                .map_err(|e| StorageError::Backend(e.to_string()))?;
378            keys.push(id);
379        }
380        
381        Ok(keys)
382    }
383
384    async fn count_all(&self) -> Result<u64, StorageError> {
385        let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
386            .fetch_one(&self.pool)
387            .await
388            .map_err(|e| StorageError::Backend(e.to_string()))?;
389        
390        let count: i64 = result.try_get("cnt")
391            .map_err(|e| StorageError::Backend(e.to_string()))?;
392        
393        Ok(count as u64)
394    }
395}
396
397impl SqlStore {
398    /// Write a single chunk of items with content-type aware storage.
399    /// The batch_id is already embedded in each item's audit JSON.
400    async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
401        let placeholders: Vec<String> = (0..chunk.len())
402            .map(|_| "(?, ?, ?, ?, ?, ?, ?)".to_string())
403            .collect();
404        
405        let sql = if self.is_sqlite {
406            format!(
407                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) VALUES {} \
408                 ON CONFLICT(id) DO UPDATE SET \
409                    version = excluded.version, \
410                    timestamp = excluded.timestamp, \
411                    payload_hash = excluded.payload_hash, \
412                    payload = excluded.payload, \
413                    payload_blob = excluded.payload_blob, \
414                    audit = excluded.audit",
415                placeholders.join(", ")
416            )
417        } else {
418            format!(
419                "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) VALUES {} \
420                 ON DUPLICATE KEY UPDATE \
421                    version = VALUES(version), \
422                    timestamp = VALUES(timestamp), \
423                    payload_hash = VALUES(payload_hash), \
424                    payload = VALUES(payload), \
425                    payload_blob = VALUES(payload_blob), \
426                    audit = VALUES(audit)",
427                placeholders.join(", ")
428            )
429        };
430
431        // Prepare all items with their fields
432        #[derive(Clone)]
433        struct PreparedRow {
434            id: String,
435            version: i64,
436            timestamp: i64,
437            payload_hash: Option<String>,
438            payload_json: Option<String>,
439            payload_blob: Option<Vec<u8>>,
440            audit_json: Option<String>,
441        }
442        
443        let prepared: Vec<PreparedRow> = chunk.iter()
444            .map(|item| {
445                let (payload_json, payload_blob) = match item.content_type {
446                    ContentType::Json => {
447                        let json_str = String::from_utf8_lossy(&item.content).to_string();
448                        (Some(json_str), None)
449                    }
450                    ContentType::Binary => {
451                        (None, Some(item.content.clone()))
452                    }
453                };
454                
455                PreparedRow {
456                    id: item.object_id.clone(),
457                    version: item.version as i64,
458                    timestamp: item.updated_at,
459                    payload_hash: if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) },
460                    payload_json,
461                    payload_blob,
462                    audit_json: Self::build_audit_json(item),
463                }
464            })
465            .collect();
466
467        retry("sql_put_batch", &RetryConfig::query(), || {
468            let sql = sql.clone();
469            let prepared = prepared.clone();
470            async move {
471                let mut query = sqlx::query(&sql);
472                
473                for row in &prepared {
474                    query = query
475                        .bind(&row.id)
476                        .bind(row.version)
477                        .bind(row.timestamp)
478                        .bind(&row.payload_hash)
479                        .bind(&row.payload_json)
480                        .bind(&row.payload_blob)
481                        .bind(&row.audit_json);
482                }
483                
484                query.execute(&self.pool)
485                    .await
486                    .map_err(|e| StorageError::Backend(e.to_string()))?;
487                
488                Ok(())
489            }
490        })
491        .await?;
492
493        Ok(chunk.len())
494    }
495
496    /// Verify a batch was written by counting items with the given batch_id (in audit JSON).
497    async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
498        let batch_id = batch_id.to_string();
499        
500        // Query varies by DB - MySQL has native JSON functions, SQLite uses string matching
501        let sql = if self.is_sqlite {
502            "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
503        } else {
504            "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
505        };
506        
507        let bind_value = if self.is_sqlite {
508            format!("%\"batch\":\"{}%", batch_id)
509        } else {
510            batch_id.clone()
511        };
512        
513        let result = sqlx::query(sql)
514            .bind(&bind_value)
515            .fetch_one(&self.pool)
516            .await
517            .map_err(|e| StorageError::Backend(e.to_string()))?;
518        
519        let count: i64 = result.try_get("cnt")
520            .map_err(|e| StorageError::Backend(e.to_string()))?;
521        
522        Ok(count as usize)
523    }
524
525    /// Scan a batch of items (for WAL drain).
526    pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
527        let rows = sqlx::query(
528            "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items ORDER BY timestamp ASC LIMIT ?"
529        )
530            .bind(limit as i64)
531            .fetch_all(&self.pool)
532            .await
533            .map_err(|e| StorageError::Backend(e.to_string()))?;
534        
535        let mut items = Vec::with_capacity(rows.len());
536        for row in rows {
537            let id: String = row.try_get("id")
538                .map_err(|e| StorageError::Backend(e.to_string()))?;
539            let version: i64 = row.try_get("version").unwrap_or(1);
540            let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
541            let payload_hash: Option<String> = row.try_get("payload_hash").ok();
542            
543            // sqlx Any driver treats MySQL LONGTEXT/TEXT as BLOB
544            let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
545            let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
546            let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
547            let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
548            let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
549            
550            let (content, content_type) = if let Some(ref json_str) = payload_json {
551                (json_str.as_bytes().to_vec(), ContentType::Json)
552            } else if let Some(blob) = payload_blob {
553                (blob, ContentType::Binary)
554            } else {
555                continue; // Skip rows with no payload
556            };
557            
558            let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
559            
560            let item = SyncItem::reconstruct(
561                id,
562                version as u64,
563                timestamp,
564                content_type,
565                content,
566                batch_id,
567                trace_parent,
568                payload_hash.unwrap_or_default(),
569                home_instance_id,
570            );
571            items.push(item);
572        }
573        
574        Ok(items)
575    }
576
577    /// Delete multiple items by ID in a single query.
578    pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
579        if ids.is_empty() {
580            return Ok(0);
581        }
582
583        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
584        let sql = format!(
585            "DELETE FROM sync_items WHERE id IN ({})",
586            placeholders.join(", ")
587        );
588
589        retry("sql_delete_batch", &RetryConfig::query(), || {
590            let sql = sql.clone();
591            let ids = ids.to_vec();
592            async move {
593                let mut query = sqlx::query(&sql);
594                for id in &ids {
595                    query = query.bind(id);
596                }
597                
598                let result = query.execute(&self.pool)
599                    .await
600                    .map_err(|e| StorageError::Backend(e.to_string()))?;
601                
602                Ok(result.rows_affected() as usize)
603            }
604        })
605        .await
606    }
607}