1use async_trait::async_trait;
37use sqlx::{AnyPool, Row, any::AnyPoolOptions};
38use crate::sync_item::{SyncItem, ContentType};
39use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
40use crate::resilience::retry::{retry, RetryConfig};
41use std::sync::Once;
42use std::time::Duration;
43
44static INSTALL_DRIVERS: Once = Once::new();
46
47fn install_drivers() {
48 INSTALL_DRIVERS.call_once(|| {
49 sqlx::any::install_default_drivers();
50 });
51}
52
53pub struct SqlStore {
54 pool: AnyPool,
55 is_sqlite: bool,
56}
57
58impl SqlStore {
59 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
61 install_drivers();
62
63 let is_sqlite = connection_string.starts_with("sqlite:");
64
65 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
66 AnyPoolOptions::new()
67 .max_connections(20)
68 .acquire_timeout(Duration::from_secs(10))
69 .idle_timeout(Duration::from_secs(300))
70 .connect(connection_string)
71 .await
72 .map_err(|e| StorageError::Backend(e.to_string()))
73 })
74 .await?;
75
76 let store = Self { pool, is_sqlite };
77 store.init_schema().await?;
78 Ok(store)
79 }
80
81 pub fn pool(&self) -> AnyPool {
83 self.pool.clone()
84 }
85
86 async fn init_schema(&self) -> Result<(), StorageError> {
87 let sql = if self.is_sqlite {
91 r#"
92 CREATE TABLE IF NOT EXISTS sync_items (
93 id TEXT PRIMARY KEY,
94 version INTEGER NOT NULL DEFAULT 1,
95 timestamp INTEGER NOT NULL,
96 payload_hash TEXT,
97 payload TEXT,
98 payload_blob BLOB,
99 audit TEXT
100 )
101 "#
102 } else {
103 r#"
106 CREATE TABLE IF NOT EXISTS sync_items (
107 id VARCHAR(255) PRIMARY KEY,
108 version BIGINT NOT NULL DEFAULT 1,
109 timestamp BIGINT NOT NULL,
110 payload_hash VARCHAR(64),
111 payload LONGTEXT,
112 payload_blob MEDIUMBLOB,
113 audit TEXT,
114 INDEX idx_timestamp (timestamp)
115 )
116 "#
117 };
118
119 retry("sql_init_schema", &RetryConfig::startup(), || async {
120 sqlx::query(sql)
121 .execute(&self.pool)
122 .await
123 .map_err(|e| StorageError::Backend(e.to_string()))
124 })
125 .await?;
126
127 Ok(())
128 }
129
130 fn build_audit_json(item: &SyncItem) -> Option<String> {
132 let mut audit = serde_json::Map::new();
133
134 if let Some(ref batch_id) = item.batch_id {
135 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
136 }
137 if let Some(ref trace_parent) = item.trace_parent {
138 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
139 }
140 if let Some(ref home) = item.home_instance_id {
141 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
142 }
143
144 if audit.is_empty() {
145 None
146 } else {
147 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
148 }
149 }
150
151 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
153 match audit_str {
154 Some(s) => {
155 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
156 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
157 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
158 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
159 (batch_id, trace_parent, home_instance_id)
160 } else {
161 (None, None, None)
162 }
163 }
164 None => (None, None, None),
165 }
166 }
167}
168
169#[async_trait]
170impl ArchiveStore for SqlStore {
171 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
172 let id = id.to_string();
173
174 retry("sql_get", &RetryConfig::query(), || async {
175 let result = sqlx::query(
176 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items WHERE id = ?"
177 )
178 .bind(&id)
179 .fetch_optional(&self.pool)
180 .await
181 .map_err(|e| StorageError::Backend(e.to_string()))?;
182
183 match result {
184 Some(row) => {
185 let version: i64 = row.try_get("version").unwrap_or(1);
186 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
187 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
188
189 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
192 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
193 String::from_utf8(bytes).ok()
194 });
195
196 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
197 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
198 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
199 String::from_utf8(bytes).ok()
200 });
201
202 let (content, content_type) = if let Some(ref json_str) = payload_json {
204 let content = json_str.as_bytes().to_vec();
206 (content, ContentType::Json)
207 } else if let Some(blob) = payload_blob {
208 (blob, ContentType::Binary)
210 } else {
211 return Err(StorageError::Backend("No payload in row".to_string()));
212 };
213
214 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
216
217 let item = SyncItem::reconstruct(
218 id.clone(),
219 version as u64,
220 timestamp,
221 content_type,
222 content,
223 batch_id,
224 trace_parent,
225 payload_hash.unwrap_or_default(),
226 home_instance_id,
227 );
228 Ok(Some(item))
229 }
230 None => Ok(None),
231 }
232 })
233 .await
234 }
235
236 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
237 let id = item.object_id.clone();
238 let version = item.version as i64;
239 let timestamp = item.updated_at;
240 let payload_hash = if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) };
241 let audit_json = Self::build_audit_json(item);
242
243 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
245 ContentType::Json => {
246 let json_str = String::from_utf8_lossy(&item.content).to_string();
247 (Some(json_str), None)
248 }
249 ContentType::Binary => {
250 (None, Some(item.content.clone()))
251 }
252 };
253
254 let sql = if self.is_sqlite {
255 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit)
256 VALUES (?, ?, ?, ?, ?, ?, ?)
257 ON CONFLICT(id) DO UPDATE SET
258 version = excluded.version,
259 timestamp = excluded.timestamp,
260 payload_hash = excluded.payload_hash,
261 payload = excluded.payload,
262 payload_blob = excluded.payload_blob,
263 audit = excluded.audit"
264 } else {
265 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit)
266 VALUES (?, ?, ?, ?, ?, ?, ?)
267 ON DUPLICATE KEY UPDATE
268 version = VALUES(version),
269 timestamp = VALUES(timestamp),
270 payload_hash = VALUES(payload_hash),
271 payload = VALUES(payload),
272 payload_blob = VALUES(payload_blob),
273 audit = VALUES(audit)"
274 };
275
276 retry("sql_put", &RetryConfig::query(), || async {
277 sqlx::query(sql)
278 .bind(&id)
279 .bind(version)
280 .bind(timestamp)
281 .bind(&payload_hash)
282 .bind(&payload_json)
283 .bind(&payload_blob)
284 .bind(&audit_json)
285 .execute(&self.pool)
286 .await
287 .map_err(|e| StorageError::Backend(e.to_string()))?;
288 Ok(())
289 })
290 .await
291 }
292
293 async fn delete(&self, id: &str) -> Result<(), StorageError> {
294 let id = id.to_string();
295 retry("sql_delete", &RetryConfig::query(), || async {
296 sqlx::query("DELETE FROM sync_items WHERE id = ?")
297 .bind(&id)
298 .execute(&self.pool)
299 .await
300 .map_err(|e| StorageError::Backend(e.to_string()))?;
301 Ok(())
302 })
303 .await
304 }
305
306 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
307 let id = id.to_string();
308 retry("sql_exists", &RetryConfig::query(), || async {
309 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
310 .bind(&id)
311 .fetch_optional(&self.pool)
312 .await
313 .map_err(|e| StorageError::Backend(e.to_string()))?;
314 Ok(result.is_some())
315 })
316 .await
317 }
318
319 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
321 if items.is_empty() {
322 return Ok(BatchWriteResult {
323 batch_id: String::new(),
324 written: 0,
325 verified: true,
326 });
327 }
328
329 let batch_id = uuid::Uuid::new_v4().to_string();
331
332 for item in items.iter_mut() {
334 item.batch_id = Some(batch_id.clone());
335 }
336
337 const CHUNK_SIZE: usize = 500;
339 let mut total_written = 0usize;
340
341 for chunk in items.chunks(CHUNK_SIZE) {
342 let written = self.put_batch_chunk(chunk, &batch_id).await?;
343 total_written += written;
344 }
345
346 let verified_count = self.verify_batch(&batch_id).await?;
348 let verified = verified_count == items.len();
349
350 if !verified {
351 tracing::warn!(
352 batch_id = %batch_id,
353 expected = items.len(),
354 actual = verified_count,
355 "Batch verification mismatch"
356 );
357 }
358
359 Ok(BatchWriteResult {
360 batch_id,
361 written: total_written,
362 verified,
363 })
364 }
365
366 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
367 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
368 .bind(limit as i64)
369 .bind(offset as i64)
370 .fetch_all(&self.pool)
371 .await
372 .map_err(|e| StorageError::Backend(e.to_string()))?;
373
374 let mut keys = Vec::with_capacity(rows.len());
375 for row in rows {
376 let id: String = row.try_get("id")
377 .map_err(|e| StorageError::Backend(e.to_string()))?;
378 keys.push(id);
379 }
380
381 Ok(keys)
382 }
383
384 async fn count_all(&self) -> Result<u64, StorageError> {
385 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
386 .fetch_one(&self.pool)
387 .await
388 .map_err(|e| StorageError::Backend(e.to_string()))?;
389
390 let count: i64 = result.try_get("cnt")
391 .map_err(|e| StorageError::Backend(e.to_string()))?;
392
393 Ok(count as u64)
394 }
395}
396
397impl SqlStore {
398 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
401 let placeholders: Vec<String> = (0..chunk.len())
402 .map(|_| "(?, ?, ?, ?, ?, ?, ?)".to_string())
403 .collect();
404
405 let sql = if self.is_sqlite {
406 format!(
407 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) VALUES {} \
408 ON CONFLICT(id) DO UPDATE SET \
409 version = excluded.version, \
410 timestamp = excluded.timestamp, \
411 payload_hash = excluded.payload_hash, \
412 payload = excluded.payload, \
413 payload_blob = excluded.payload_blob, \
414 audit = excluded.audit",
415 placeholders.join(", ")
416 )
417 } else {
418 format!(
419 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit) VALUES {} \
420 ON DUPLICATE KEY UPDATE \
421 version = VALUES(version), \
422 timestamp = VALUES(timestamp), \
423 payload_hash = VALUES(payload_hash), \
424 payload = VALUES(payload), \
425 payload_blob = VALUES(payload_blob), \
426 audit = VALUES(audit)",
427 placeholders.join(", ")
428 )
429 };
430
431 #[derive(Clone)]
433 struct PreparedRow {
434 id: String,
435 version: i64,
436 timestamp: i64,
437 payload_hash: Option<String>,
438 payload_json: Option<String>,
439 payload_blob: Option<Vec<u8>>,
440 audit_json: Option<String>,
441 }
442
443 let prepared: Vec<PreparedRow> = chunk.iter()
444 .map(|item| {
445 let (payload_json, payload_blob) = match item.content_type {
446 ContentType::Json => {
447 let json_str = String::from_utf8_lossy(&item.content).to_string();
448 (Some(json_str), None)
449 }
450 ContentType::Binary => {
451 (None, Some(item.content.clone()))
452 }
453 };
454
455 PreparedRow {
456 id: item.object_id.clone(),
457 version: item.version as i64,
458 timestamp: item.updated_at,
459 payload_hash: if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) },
460 payload_json,
461 payload_blob,
462 audit_json: Self::build_audit_json(item),
463 }
464 })
465 .collect();
466
467 retry("sql_put_batch", &RetryConfig::query(), || {
468 let sql = sql.clone();
469 let prepared = prepared.clone();
470 async move {
471 let mut query = sqlx::query(&sql);
472
473 for row in &prepared {
474 query = query
475 .bind(&row.id)
476 .bind(row.version)
477 .bind(row.timestamp)
478 .bind(&row.payload_hash)
479 .bind(&row.payload_json)
480 .bind(&row.payload_blob)
481 .bind(&row.audit_json);
482 }
483
484 query.execute(&self.pool)
485 .await
486 .map_err(|e| StorageError::Backend(e.to_string()))?;
487
488 Ok(())
489 }
490 })
491 .await?;
492
493 Ok(chunk.len())
494 }
495
496 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
498 let batch_id = batch_id.to_string();
499
500 let sql = if self.is_sqlite {
502 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
503 } else {
504 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
505 };
506
507 let bind_value = if self.is_sqlite {
508 format!("%\"batch\":\"{}%", batch_id)
509 } else {
510 batch_id.clone()
511 };
512
513 let result = sqlx::query(sql)
514 .bind(&bind_value)
515 .fetch_one(&self.pool)
516 .await
517 .map_err(|e| StorageError::Backend(e.to_string()))?;
518
519 let count: i64 = result.try_get("cnt")
520 .map_err(|e| StorageError::Backend(e.to_string()))?;
521
522 Ok(count as usize)
523 }
524
525 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
527 let rows = sqlx::query(
528 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items ORDER BY timestamp ASC LIMIT ?"
529 )
530 .bind(limit as i64)
531 .fetch_all(&self.pool)
532 .await
533 .map_err(|e| StorageError::Backend(e.to_string()))?;
534
535 let mut items = Vec::with_capacity(rows.len());
536 for row in rows {
537 let id: String = row.try_get("id")
538 .map_err(|e| StorageError::Backend(e.to_string()))?;
539 let version: i64 = row.try_get("version").unwrap_or(1);
540 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
541 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
542
543 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
545 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
546 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
547 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
548 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
549
550 let (content, content_type) = if let Some(ref json_str) = payload_json {
551 (json_str.as_bytes().to_vec(), ContentType::Json)
552 } else if let Some(blob) = payload_blob {
553 (blob, ContentType::Binary)
554 } else {
555 continue; };
557
558 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
559
560 let item = SyncItem::reconstruct(
561 id,
562 version as u64,
563 timestamp,
564 content_type,
565 content,
566 batch_id,
567 trace_parent,
568 payload_hash.unwrap_or_default(),
569 home_instance_id,
570 );
571 items.push(item);
572 }
573
574 Ok(items)
575 }
576
577 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
579 if ids.is_empty() {
580 return Ok(0);
581 }
582
583 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
584 let sql = format!(
585 "DELETE FROM sync_items WHERE id IN ({})",
586 placeholders.join(", ")
587 );
588
589 retry("sql_delete_batch", &RetryConfig::query(), || {
590 let sql = sql.clone();
591 let ids = ids.to_vec();
592 async move {
593 let mut query = sqlx::query(&sql);
594 for id in &ids {
595 query = query.bind(id);
596 }
597
598 let result = query.execute(&self.pool)
599 .await
600 .map_err(|e| StorageError::Backend(e.to_string()))?;
601
602 Ok(result.rows_affected() as usize)
603 }
604 })
605 .await
606 }
607}