1use async_trait::async_trait;
37use sqlx::{AnyPool, Row, any::AnyPoolOptions};
38use crate::sync_item::{SyncItem, ContentType};
39use super::traits::{ArchiveStore, BatchWriteResult, StorageError};
40use crate::resilience::retry::{retry, RetryConfig};
41use std::sync::Once;
42use std::time::Duration;
43
44static INSTALL_DRIVERS: Once = Once::new();
46
47fn install_drivers() {
48 INSTALL_DRIVERS.call_once(|| {
49 sqlx::any::install_default_drivers();
50 });
51}
52
53pub struct SqlStore {
54 pool: AnyPool,
55 is_sqlite: bool,
56}
57
58impl SqlStore {
59 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
61 install_drivers();
62
63 let is_sqlite = connection_string.starts_with("sqlite:");
64
65 let pool = retry("sql_connect", &RetryConfig::startup(), || async {
66 AnyPoolOptions::new()
67 .max_connections(20)
68 .acquire_timeout(Duration::from_secs(10))
69 .idle_timeout(Duration::from_secs(300))
70 .connect(connection_string)
71 .await
72 .map_err(|e| StorageError::Backend(e.to_string()))
73 })
74 .await?;
75
76 let store = Self { pool, is_sqlite };
77 store.init_schema().await?;
78 Ok(store)
79 }
80
81 pub fn pool(&self) -> AnyPool {
83 self.pool.clone()
84 }
85
86 async fn init_schema(&self) -> Result<(), StorageError> {
87 let sql = if self.is_sqlite {
94 r#"
95 CREATE TABLE IF NOT EXISTS sync_items (
96 id TEXT PRIMARY KEY,
97 version INTEGER NOT NULL DEFAULT 1,
98 timestamp INTEGER NOT NULL,
99 payload_hash TEXT,
100 payload TEXT,
101 payload_blob BLOB,
102 audit TEXT,
103 merkle_dirty INTEGER NOT NULL DEFAULT 1
104 )
105 "#
106 } else {
107 r#"
110 CREATE TABLE IF NOT EXISTS sync_items (
111 id VARCHAR(255) PRIMARY KEY,
112 version BIGINT NOT NULL DEFAULT 1,
113 timestamp BIGINT NOT NULL,
114 payload_hash VARCHAR(64),
115 payload LONGTEXT,
116 payload_blob MEDIUMBLOB,
117 audit TEXT,
118 merkle_dirty TINYINT NOT NULL DEFAULT 1,
119 INDEX idx_timestamp (timestamp),
120 INDEX idx_merkle_dirty (merkle_dirty)
121 )
122 "#
123 };
124
125 retry("sql_init_schema", &RetryConfig::startup(), || async {
126 sqlx::query(sql)
127 .execute(&self.pool)
128 .await
129 .map_err(|e| StorageError::Backend(e.to_string()))
130 })
131 .await?;
132
133 Ok(())
134 }
135
136 fn build_audit_json(item: &SyncItem) -> Option<String> {
138 let mut audit = serde_json::Map::new();
139
140 if let Some(ref batch_id) = item.batch_id {
141 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
142 }
143 if let Some(ref trace_parent) = item.trace_parent {
144 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
145 }
146 if let Some(ref home) = item.home_instance_id {
147 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
148 }
149
150 if audit.is_empty() {
151 None
152 } else {
153 serde_json::to_string(&serde_json::Value::Object(audit)).ok()
154 }
155 }
156
157 fn parse_audit_json(audit_str: Option<String>) -> (Option<String>, Option<String>, Option<String>) {
159 match audit_str {
160 Some(s) => {
161 if let Ok(audit) = serde_json::from_str::<serde_json::Value>(&s) {
162 let batch_id = audit.get("batch").and_then(|v| v.as_str()).map(String::from);
163 let trace_parent = audit.get("trace").and_then(|v| v.as_str()).map(String::from);
164 let home_instance_id = audit.get("home").and_then(|v| v.as_str()).map(String::from);
165 (batch_id, trace_parent, home_instance_id)
166 } else {
167 (None, None, None)
168 }
169 }
170 None => (None, None, None),
171 }
172 }
173}
174
175#[async_trait]
176impl ArchiveStore for SqlStore {
177 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
178 let id = id.to_string();
179
180 retry("sql_get", &RetryConfig::query(), || async {
181 let result = sqlx::query(
182 "SELECT version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items WHERE id = ?"
183 )
184 .bind(&id)
185 .fetch_optional(&self.pool)
186 .await
187 .map_err(|e| StorageError::Backend(e.to_string()))?;
188
189 match result {
190 Some(row) => {
191 let version: i64 = row.try_get("version").unwrap_or(1);
192 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
193 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
194
195 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
198 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
199 String::from_utf8(bytes).ok()
200 });
201
202 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
203 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
204 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
205 String::from_utf8(bytes).ok()
206 });
207
208 let (content, content_type) = if let Some(ref json_str) = payload_json {
210 let content = json_str.as_bytes().to_vec();
212 (content, ContentType::Json)
213 } else if let Some(blob) = payload_blob {
214 (blob, ContentType::Binary)
216 } else {
217 return Err(StorageError::Backend("No payload in row".to_string()));
218 };
219
220 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
222
223 let item = SyncItem::reconstruct(
224 id.clone(),
225 version as u64,
226 timestamp,
227 content_type,
228 content,
229 batch_id,
230 trace_parent,
231 payload_hash.unwrap_or_default(),
232 home_instance_id,
233 );
234 Ok(Some(item))
235 }
236 None => Ok(None),
237 }
238 })
239 .await
240 }
241
242 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
243 let id = item.object_id.clone();
244 let version = item.version as i64;
245 let timestamp = item.updated_at;
246 let payload_hash = if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) };
247 let audit_json = Self::build_audit_json(item);
248
249 let (payload_json, payload_blob): (Option<String>, Option<Vec<u8>>) = match item.content_type {
251 ContentType::Json => {
252 let json_str = String::from_utf8_lossy(&item.content).to_string();
253 (Some(json_str), None)
254 }
255 ContentType::Binary => {
256 (None, Some(item.content.clone()))
257 }
258 };
259
260 let sql = if self.is_sqlite {
261 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty)
262 VALUES (?, ?, ?, ?, ?, ?, ?, 1)
263 ON CONFLICT(id) DO UPDATE SET
264 version = excluded.version,
265 timestamp = excluded.timestamp,
266 payload_hash = excluded.payload_hash,
267 payload = excluded.payload,
268 payload_blob = excluded.payload_blob,
269 audit = excluded.audit,
270 merkle_dirty = 1"
271 } else {
272 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty)
273 VALUES (?, ?, ?, ?, ?, ?, ?, 1)
274 ON DUPLICATE KEY UPDATE
275 version = VALUES(version),
276 timestamp = VALUES(timestamp),
277 payload_hash = VALUES(payload_hash),
278 payload = VALUES(payload),
279 payload_blob = VALUES(payload_blob),
280 audit = VALUES(audit),
281 merkle_dirty = 1"
282 };
283
284 retry("sql_put", &RetryConfig::query(), || async {
285 sqlx::query(sql)
286 .bind(&id)
287 .bind(version)
288 .bind(timestamp)
289 .bind(&payload_hash)
290 .bind(&payload_json)
291 .bind(&payload_blob)
292 .bind(&audit_json)
293 .execute(&self.pool)
294 .await
295 .map_err(|e| StorageError::Backend(e.to_string()))?;
296 Ok(())
297 })
298 .await
299 }
300
301 async fn delete(&self, id: &str) -> Result<(), StorageError> {
302 let id = id.to_string();
303 retry("sql_delete", &RetryConfig::query(), || async {
304 sqlx::query("DELETE FROM sync_items WHERE id = ?")
305 .bind(&id)
306 .execute(&self.pool)
307 .await
308 .map_err(|e| StorageError::Backend(e.to_string()))?;
309 Ok(())
310 })
311 .await
312 }
313
314 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
315 let id = id.to_string();
316 retry("sql_exists", &RetryConfig::query(), || async {
317 let result = sqlx::query("SELECT 1 FROM sync_items WHERE id = ? LIMIT 1")
318 .bind(&id)
319 .fetch_optional(&self.pool)
320 .await
321 .map_err(|e| StorageError::Backend(e.to_string()))?;
322 Ok(result.is_some())
323 })
324 .await
325 }
326
327 async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
329 if items.is_empty() {
330 return Ok(BatchWriteResult {
331 batch_id: String::new(),
332 written: 0,
333 verified: true,
334 });
335 }
336
337 let batch_id = uuid::Uuid::new_v4().to_string();
339
340 for item in items.iter_mut() {
342 item.batch_id = Some(batch_id.clone());
343 }
344
345 const CHUNK_SIZE: usize = 500;
347 let mut total_written = 0usize;
348
349 for chunk in items.chunks(CHUNK_SIZE) {
350 let written = self.put_batch_chunk(chunk, &batch_id).await?;
351 total_written += written;
352 }
353
354 let verified_count = self.verify_batch(&batch_id).await?;
356 let verified = verified_count == items.len();
357
358 if !verified {
359 tracing::warn!(
360 batch_id = %batch_id,
361 expected = items.len(),
362 actual = verified_count,
363 "Batch verification mismatch"
364 );
365 }
366
367 Ok(BatchWriteResult {
368 batch_id,
369 written: total_written,
370 verified,
371 })
372 }
373
374 async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError> {
375 let rows = sqlx::query("SELECT id FROM sync_items ORDER BY id LIMIT ? OFFSET ?")
376 .bind(limit as i64)
377 .bind(offset as i64)
378 .fetch_all(&self.pool)
379 .await
380 .map_err(|e| StorageError::Backend(e.to_string()))?;
381
382 let mut keys = Vec::with_capacity(rows.len());
383 for row in rows {
384 let id: String = row.try_get("id")
385 .map_err(|e| StorageError::Backend(e.to_string()))?;
386 keys.push(id);
387 }
388
389 Ok(keys)
390 }
391
392 async fn count_all(&self) -> Result<u64, StorageError> {
393 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items")
394 .fetch_one(&self.pool)
395 .await
396 .map_err(|e| StorageError::Backend(e.to_string()))?;
397
398 let count: i64 = result.try_get("cnt")
399 .map_err(|e| StorageError::Backend(e.to_string()))?;
400
401 Ok(count as u64)
402 }
403}
404
405impl SqlStore {
406 async fn put_batch_chunk(&self, chunk: &[SyncItem], _batch_id: &str) -> Result<usize, StorageError> {
409 let placeholders: Vec<String> = (0..chunk.len())
410 .map(|_| "(?, ?, ?, ?, ?, ?, ?, 1)".to_string())
411 .collect();
412
413 let sql = if self.is_sqlite {
414 format!(
415 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) VALUES {} \
416 ON CONFLICT(id) DO UPDATE SET \
417 version = excluded.version, \
418 timestamp = excluded.timestamp, \
419 payload_hash = excluded.payload_hash, \
420 payload = excluded.payload, \
421 payload_blob = excluded.payload_blob, \
422 audit = excluded.audit, \
423 merkle_dirty = 1",
424 placeholders.join(", ")
425 )
426 } else {
427 format!(
428 "INSERT INTO sync_items (id, version, timestamp, payload_hash, payload, payload_blob, audit, merkle_dirty) VALUES {} \
429 ON DUPLICATE KEY UPDATE \
430 version = VALUES(version), \
431 timestamp = VALUES(timestamp), \
432 payload_hash = VALUES(payload_hash), \
433 payload = VALUES(payload), \
434 payload_blob = VALUES(payload_blob), \
435 audit = VALUES(audit), \
436 merkle_dirty = 1",
437 placeholders.join(", ")
438 )
439 };
440
441 #[derive(Clone)]
443 struct PreparedRow {
444 id: String,
445 version: i64,
446 timestamp: i64,
447 payload_hash: Option<String>,
448 payload_json: Option<String>,
449 payload_blob: Option<Vec<u8>>,
450 audit_json: Option<String>,
451 }
452
453 let prepared: Vec<PreparedRow> = chunk.iter()
454 .map(|item| {
455 let (payload_json, payload_blob) = match item.content_type {
456 ContentType::Json => {
457 let json_str = String::from_utf8_lossy(&item.content).to_string();
458 (Some(json_str), None)
459 }
460 ContentType::Binary => {
461 (None, Some(item.content.clone()))
462 }
463 };
464
465 PreparedRow {
466 id: item.object_id.clone(),
467 version: item.version as i64,
468 timestamp: item.updated_at,
469 payload_hash: if item.merkle_root.is_empty() { None } else { Some(item.merkle_root.clone()) },
470 payload_json,
471 payload_blob,
472 audit_json: Self::build_audit_json(item),
473 }
474 })
475 .collect();
476
477 retry("sql_put_batch", &RetryConfig::query(), || {
478 let sql = sql.clone();
479 let prepared = prepared.clone();
480 async move {
481 let mut query = sqlx::query(&sql);
482
483 for row in &prepared {
484 query = query
485 .bind(&row.id)
486 .bind(row.version)
487 .bind(row.timestamp)
488 .bind(&row.payload_hash)
489 .bind(&row.payload_json)
490 .bind(&row.payload_blob)
491 .bind(&row.audit_json);
492 }
493
494 query.execute(&self.pool)
495 .await
496 .map_err(|e| StorageError::Backend(e.to_string()))?;
497
498 Ok(())
499 }
500 })
501 .await?;
502
503 Ok(chunk.len())
504 }
505
506 async fn verify_batch(&self, batch_id: &str) -> Result<usize, StorageError> {
508 let batch_id = batch_id.to_string();
509
510 let sql = if self.is_sqlite {
512 "SELECT COUNT(*) as cnt FROM sync_items WHERE audit LIKE ?"
513 } else {
514 "SELECT COUNT(*) as cnt FROM sync_items WHERE JSON_EXTRACT(audit, '$.batch') = ?"
515 };
516
517 let bind_value = if self.is_sqlite {
518 format!("%\"batch\":\"{}%", batch_id)
519 } else {
520 batch_id.clone()
521 };
522
523 let result = sqlx::query(sql)
524 .bind(&bind_value)
525 .fetch_one(&self.pool)
526 .await
527 .map_err(|e| StorageError::Backend(e.to_string()))?;
528
529 let count: i64 = result.try_get("cnt")
530 .map_err(|e| StorageError::Backend(e.to_string()))?;
531
532 Ok(count as usize)
533 }
534
535 pub async fn scan_batch(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
537 let rows = sqlx::query(
538 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit FROM sync_items ORDER BY timestamp ASC LIMIT ?"
539 )
540 .bind(limit as i64)
541 .fetch_all(&self.pool)
542 .await
543 .map_err(|e| StorageError::Backend(e.to_string()))?;
544
545 let mut items = Vec::with_capacity(rows.len());
546 for row in rows {
547 let id: String = row.try_get("id")
548 .map_err(|e| StorageError::Backend(e.to_string()))?;
549 let version: i64 = row.try_get("version").unwrap_or(1);
550 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
551 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
552
553 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
555 let payload_json: Option<String> = payload_bytes.and_then(|b| String::from_utf8(b).ok());
556 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
557 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
558 let audit_json: Option<String> = audit_bytes.and_then(|b| String::from_utf8(b).ok());
559
560 let (content, content_type) = if let Some(ref json_str) = payload_json {
561 (json_str.as_bytes().to_vec(), ContentType::Json)
562 } else if let Some(blob) = payload_blob {
563 (blob, ContentType::Binary)
564 } else {
565 continue; };
567
568 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
569
570 let item = SyncItem::reconstruct(
571 id,
572 version as u64,
573 timestamp,
574 content_type,
575 content,
576 batch_id,
577 trace_parent,
578 payload_hash.unwrap_or_default(),
579 home_instance_id,
580 );
581 items.push(item);
582 }
583
584 Ok(items)
585 }
586
587 pub async fn delete_batch(&self, ids: &[String]) -> Result<usize, StorageError> {
589 if ids.is_empty() {
590 return Ok(0);
591 }
592
593 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
594 let sql = format!(
595 "DELETE FROM sync_items WHERE id IN ({})",
596 placeholders.join(", ")
597 );
598
599 retry("sql_delete_batch", &RetryConfig::query(), || {
600 let sql = sql.clone();
601 let ids = ids.to_vec();
602 async move {
603 let mut query = sqlx::query(&sql);
604 for id in &ids {
605 query = query.bind(id);
606 }
607
608 let result = query.execute(&self.pool)
609 .await
610 .map_err(|e| StorageError::Backend(e.to_string()))?;
611
612 Ok(result.rows_affected() as usize)
613 }
614 })
615 .await
616 }
617
618 pub async fn get_dirty_merkle_ids(&self, limit: usize) -> Result<Vec<String>, StorageError> {
626 let rows = sqlx::query(
627 "SELECT id FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
628 )
629 .bind(limit as i64)
630 .fetch_all(&self.pool)
631 .await
632 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle ids: {}", e)))?;
633
634 let mut ids = Vec::with_capacity(rows.len());
635 for row in rows {
636 let id: String = row.try_get("id")
637 .map_err(|e| StorageError::Backend(e.to_string()))?;
638 ids.push(id);
639 }
640
641 Ok(ids)
642 }
643
644 pub async fn count_dirty_merkle(&self) -> Result<u64, StorageError> {
646 let result = sqlx::query("SELECT COUNT(*) as cnt FROM sync_items WHERE merkle_dirty = 1")
647 .fetch_one(&self.pool)
648 .await
649 .map_err(|e| StorageError::Backend(e.to_string()))?;
650
651 let count: i64 = result.try_get("cnt")
652 .map_err(|e| StorageError::Backend(e.to_string()))?;
653
654 Ok(count as u64)
655 }
656
657 pub async fn mark_merkle_clean(&self, ids: &[String]) -> Result<usize, StorageError> {
659 if ids.is_empty() {
660 return Ok(0);
661 }
662
663 let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
664 let sql = format!(
665 "UPDATE sync_items SET merkle_dirty = 0 WHERE id IN ({})",
666 placeholders.join(", ")
667 );
668
669 let mut query = sqlx::query(&sql);
670 for id in ids {
671 query = query.bind(id);
672 }
673
674 let result = query.execute(&self.pool)
675 .await
676 .map_err(|e| StorageError::Backend(e.to_string()))?;
677
678 Ok(result.rows_affected() as usize)
679 }
680
681 pub async fn has_dirty_merkle(&self) -> Result<bool, StorageError> {
683 let result = sqlx::query("SELECT 1 FROM sync_items WHERE merkle_dirty = 1 LIMIT 1")
684 .fetch_optional(&self.pool)
685 .await
686 .map_err(|e| StorageError::Backend(e.to_string()))?;
687
688 Ok(result.is_some())
689 }
690
691 pub async fn get_dirty_merkle_items(&self, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
696 let rows = sqlx::query(
697 "SELECT id, version, timestamp, payload_hash, payload, payload_blob, audit
698 FROM sync_items WHERE merkle_dirty = 1 LIMIT ?"
699 )
700 .bind(limit as i64)
701 .fetch_all(&self.pool)
702 .await
703 .map_err(|e| StorageError::Backend(format!("Failed to get dirty merkle items: {}", e)))?;
704
705 let mut items = Vec::with_capacity(rows.len());
706 for row in rows {
707 let id: String = row.try_get("id")
708 .map_err(|e| StorageError::Backend(e.to_string()))?;
709 let version: i64 = row.try_get("version").unwrap_or(1);
710 let timestamp: i64 = row.try_get("timestamp").unwrap_or(0);
711 let payload_hash: Option<String> = row.try_get("payload_hash").ok();
712
713 let payload_bytes: Option<Vec<u8>> = row.try_get("payload").ok();
715 let payload_json: Option<String> = payload_bytes.and_then(|bytes| {
716 String::from_utf8(bytes).ok()
717 });
718
719 let payload_blob: Option<Vec<u8>> = row.try_get("payload_blob").ok();
720 let audit_bytes: Option<Vec<u8>> = row.try_get("audit").ok();
721 let audit_json: Option<String> = audit_bytes.and_then(|bytes| {
722 String::from_utf8(bytes).ok()
723 });
724
725 let (content, content_type) = if let Some(ref json_str) = payload_json {
727 (json_str.as_bytes().to_vec(), ContentType::Json)
728 } else if let Some(blob) = payload_blob {
729 (blob, ContentType::Binary)
730 } else {
731 continue; };
733
734 let (batch_id, trace_parent, home_instance_id) = Self::parse_audit_json(audit_json);
736
737 let item = SyncItem::reconstruct(
738 id,
739 version as u64,
740 timestamp,
741 content_type,
742 content,
743 batch_id,
744 trace_parent,
745 payload_hash.unwrap_or_default(),
746 home_instance_id,
747 );
748 items.push(item);
749 }
750
751 Ok(items)
752 }
753}