1mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41
42pub use types::{EngineState, ItemStatus, BatchResult};
43pub use merkle_api::MerkleDiff;
44#[allow(unused_imports)]
45use types::WriteTarget;
46
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
49use std::time::Instant;
50use dashmap::DashMap;
51use tokio::sync::{watch, Mutex};
52use tracing::{info, warn, debug, error};
53
54use crate::config::SyncEngineConfig;
55use crate::sync_item::SyncItem;
56use crate::submit_options::SubmitOptions;
57use crate::backpressure::BackpressureLevel;
58use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
59use crate::storage::sql::SqlStore;
60use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
61use crate::cuckoo::FilterPersistence;
62use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
63use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
64use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
65use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
66
67pub struct SyncEngine {
79 pub(super) config: SyncEngineConfig,
81
82 #[allow(dead_code)]
84 pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
85
86 pub(super) state: watch::Sender<EngineState>,
88
89 pub(super) state_rx: watch::Receiver<EngineState>,
91
92 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
94
95 pub(super) l1_size_bytes: Arc<AtomicUsize>,
97
98 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
100
101 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
103
104 pub(super) sql_store: Option<Arc<SqlStore>>,
106
107 pub(super) l3_filter: Arc<FilterManager>,
109
110 pub(super) filter_persistence: Option<FilterPersistence>,
112
113 pub(super) cf_inserts_since_snapshot: AtomicU64,
115 pub(super) cf_last_snapshot: Mutex<Instant>,
116
117 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
119
120 pub(super) redis_merkle: Option<RedisMerkleStore>,
122
123 pub(super) sql_merkle: Option<SqlMerkleStore>,
125
126 pub(super) l3_wal: Option<WriteAheadLog>,
128
129 pub(super) mysql_health: MysqlHealthChecker,
131
132 pub(super) eviction_policy: TanCurvePolicy,
134}
135
136impl SyncEngine {
137 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
142 let (state_tx, state_rx) = watch::channel(EngineState::Created);
143
144 let batch_config = BatchConfig {
145 flush_ms: config.batch_flush_ms,
146 flush_count: config.batch_flush_count,
147 flush_bytes: config.batch_flush_bytes,
148 };
149
150 Self {
151 config: config.clone(),
152 config_rx,
153 state: state_tx,
154 state_rx,
155 l1_cache: Arc::new(DashMap::new()),
156 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
157 l2_store: None,
158 l3_store: None,
159 sql_store: None,
160 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
161 filter_persistence: None,
162 cf_inserts_since_snapshot: AtomicU64::new(0),
163 cf_last_snapshot: Mutex::new(Instant::now()),
164 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
165 redis_merkle: None,
166 sql_merkle: None,
167 l3_wal: None,
168 mysql_health: MysqlHealthChecker::new(),
169 eviction_policy: TanCurvePolicy::default(),
170 }
171 }
172
173 #[must_use]
175 pub fn state(&self) -> EngineState {
176 *self.state_rx.borrow()
177 }
178
179 #[must_use]
181 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
182 self.state_rx.clone()
183 }
184
185 #[must_use]
187 pub fn is_ready(&self) -> bool {
188 matches!(self.state(), EngineState::Ready | EngineState::Running)
189 }
190
191 #[must_use]
193 pub fn memory_pressure(&self) -> f64 {
194 let used = self.l1_size_bytes.load(Ordering::Acquire);
195 let max = self.config.l1_max_bytes;
196 if max == 0 {
197 0.0
198 } else {
199 used as f64 / max as f64
200 }
201 }
202
203 #[must_use]
205 pub fn pressure(&self) -> BackpressureLevel {
206 BackpressureLevel::from_pressure(self.memory_pressure())
207 }
208
209 #[must_use]
211 pub fn should_accept_writes(&self) -> bool {
212 let pressure = self.pressure();
213 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
214 }
215
216 #[tracing::instrument(skip(self), fields(tier))]
223 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
224 let start = std::time::Instant::now();
225
226 if let Some(mut item) = self.l1_cache.get_mut(id) {
228 item.access_count = item.access_count.saturating_add(1);
229 item.last_accessed = std::time::SystemTime::now()
230 .duration_since(std::time::UNIX_EPOCH)
231 .unwrap_or_default()
232 .as_millis() as u64;
233 tracing::Span::current().record("tier", "L1");
234 debug!("L1 hit");
235 crate::metrics::record_operation("L1", "get", "hit");
236 crate::metrics::record_latency("L1", "get", start.elapsed());
237 return Ok(Some(item.clone()));
238 }
239
240 if let Some(ref l2) = self.l2_store {
242 match l2.get(id).await {
243 Ok(Some(item)) => {
244 self.insert_l1(item.clone());
246 tracing::Span::current().record("tier", "L2");
247 debug!("L2 hit, promoted to L1");
248 crate::metrics::record_operation("L2", "get", "hit");
249 crate::metrics::record_latency("L2", "get", start.elapsed());
250 return Ok(Some(item));
251 }
252 Ok(None) => {
253 debug!("L2 miss");
255 crate::metrics::record_operation("L2", "get", "miss");
256 }
257 Err(e) => {
258 warn!(error = %e, "L2 lookup failed");
259 crate::metrics::record_operation("L2", "get", "error");
260 }
261 }
262 }
263
264 if self.l3_filter.should_check_l3(id) {
266 crate::metrics::record_cuckoo_check("L3", "positive");
267 if let Some(ref l3) = self.l3_store {
268 match l3.get(id).await {
269 Ok(Some(item)) => {
270 if self.memory_pressure() < 1.0 {
272 self.insert_l1(item.clone());
273 }
274 tracing::Span::current().record("tier", "L3");
275 debug!("L3 hit, promoted to L1");
276 crate::metrics::record_operation("L3", "get", "hit");
277 crate::metrics::record_latency("L3", "get", start.elapsed());
278 crate::metrics::record_bytes_read("L3", item.content.len());
279 return Ok(Some(item));
280 }
281 Ok(None) => {
282 debug!("L3 filter false positive");
284 crate::metrics::record_operation("L3", "get", "false_positive");
285 crate::metrics::record_cuckoo_false_positive("L3");
286 }
287 Err(e) => {
288 warn!(error = %e, "L3 lookup failed");
289 crate::metrics::record_operation("L3", "get", "error");
290 crate::metrics::record_error("L3", "get", "backend");
291 }
292 }
293 }
294 } else {
295 crate::metrics::record_cuckoo_check("L3", "negative");
297 }
298
299 tracing::Span::current().record("tier", "miss");
300 debug!("Cache miss");
301 crate::metrics::record_operation("all", "get", "miss");
302 crate::metrics::record_latency("all", "get", start.elapsed());
303 Ok(None)
304 }
305
306 #[tracing::instrument(skip(self), fields(verified))]
311 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
312 let item = match self.get(id).await? {
313 Some(item) => item,
314 None => return Ok(None),
315 };
316
317 if !item.merkle_root.is_empty() {
319 use sha2::{Sha256, Digest};
320
321 let computed = Sha256::digest(&item.content);
322 let computed_hex = hex::encode(computed);
323
324 if computed_hex != item.merkle_root {
325 tracing::Span::current().record("verified", false);
326 warn!(
327 id = %id,
328 expected = %item.merkle_root,
329 actual = %computed_hex,
330 "Data corruption detected!"
331 );
332
333 crate::metrics::record_corruption(id);
335
336 return Err(StorageError::Corruption {
337 id: id.to_string(),
338 expected: item.merkle_root.clone(),
339 actual: computed_hex,
340 });
341 }
342
343 tracing::Span::current().record("verified", true);
344 debug!(id = %id, "Hash verification passed");
345 }
346
347 Ok(Some(item))
348 }
349
350 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
358 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
359 self.submit_with(item, SubmitOptions::default()).await
360 }
361
362 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
383 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
384 let start = std::time::Instant::now();
385
386 if !self.should_accept_writes() {
387 crate::metrics::record_operation("engine", "submit", "rejected");
388 crate::metrics::record_error("engine", "submit", "backpressure");
389 return Err(StorageError::Backend(format!(
390 "Rejecting write: engine state={}, pressure={}",
391 self.state(),
392 self.pressure()
393 )));
394 }
395
396 let id = item.object_id.clone();
397 let item_bytes = item.content.len();
398
399 if let Some(ref state) = options.state {
401 item.state = state.clone();
402 }
403
404 item.submit_options = Some(options);
406
407 self.insert_l1(item.clone());
409 crate::metrics::record_operation("L1", "submit", "success");
410 crate::metrics::record_bytes_written("L1", item_bytes);
411
412 self.l2_batcher.lock().await.add(item);
418
419 debug!(id = %id, "Item submitted to L1 and batch queue");
420 crate::metrics::record_latency("L1", "submit", start.elapsed());
421 Ok(())
422 }
423
424 #[tracing::instrument(skip(self), fields(object_id = %id))]
433 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
434 let start = std::time::Instant::now();
435
436 if !self.should_accept_writes() {
437 crate::metrics::record_operation("engine", "delete", "rejected");
438 crate::metrics::record_error("engine", "delete", "backpressure");
439 return Err(StorageError::Backend(format!(
440 "Rejecting delete: engine state={}, pressure={}",
441 self.state(),
442 self.pressure()
443 )));
444 }
445
446 let mut found = false;
447
448 if let Some((_, item)) = self.l1_cache.remove(id) {
450 let size = Self::item_size(&item);
451 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
452 found = true;
453 debug!("Deleted from L1");
454 crate::metrics::record_operation("L1", "delete", "success");
455 }
456
457 self.l3_filter.remove(id);
459
460 if let Some(ref l2) = self.l2_store {
462 let l2_start = std::time::Instant::now();
463 match l2.delete(id).await {
464 Ok(()) => {
465 found = true;
466 debug!("Deleted from L2 (Redis)");
467 crate::metrics::record_operation("L2", "delete", "success");
468 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
469 }
470 Err(e) => {
471 warn!(error = %e, "Failed to delete from L2 (Redis)");
472 crate::metrics::record_operation("L2", "delete", "error");
473 crate::metrics::record_error("L2", "delete", "backend");
474 }
475 }
476 }
477
478 if let Some(ref l3) = self.l3_store {
480 let l3_start = std::time::Instant::now();
481 match l3.delete(id).await {
482 Ok(()) => {
483 found = true;
484 debug!("Deleted from L3 (MySQL)");
485 crate::metrics::record_operation("L3", "delete", "success");
486 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
487 }
488 Err(e) => {
489 error!(error = %e, "Failed to delete from L3 (MySQL)");
490 crate::metrics::record_operation("L3", "delete", "error");
491 crate::metrics::record_error("L3", "delete", "backend");
492 }
494 }
495 }
496
497 let mut merkle_batch = MerkleBatch::new();
499 merkle_batch.delete(id.to_string());
500
501 if let Some(ref sql_merkle) = self.sql_merkle {
502 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
503 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
504 crate::metrics::record_error("L3", "merkle", "batch_apply");
505 }
506 }
507
508 if let Some(ref redis_merkle) = self.redis_merkle {
509 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
510 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
511 crate::metrics::record_error("L2", "merkle", "batch_apply");
512 }
513 }
514
515 info!(found, "Delete operation completed");
516 crate::metrics::record_latency("all", "delete", start.elapsed());
517 Ok(found)
518 }
519
520 fn insert_l1(&self, item: SyncItem) {
524 let new_size = Self::item_size(&item);
525 let key = item.object_id.clone();
526
527 if let Some(old_item) = self.l1_cache.insert(key, item) {
529 let old_size = Self::item_size(&old_item);
531 let current = self.l1_size_bytes.load(Ordering::Acquire);
533 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
534 self.l1_size_bytes.store(new_total, Ordering::Release);
535 } else {
536 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
538 }
539 }
540
541 #[inline]
543 fn item_size(item: &SyncItem) -> usize {
544 item.size_bytes()
546 }
547
548 fn maybe_evict(&self) {
549 let pressure = self.memory_pressure();
550 if pressure < self.config.backpressure_warn {
551 return;
552 }
553
554 let level = BackpressureLevel::from_pressure(pressure);
555 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
556
557 let now = std::time::Instant::now();
559 let entries: Vec<CacheEntry> = self.l1_cache.iter()
560 .map(|ref_multi| {
561 let item = ref_multi.value();
562 let id = ref_multi.key().clone();
563
564 let now_millis = std::time::SystemTime::now()
566 .duration_since(std::time::UNIX_EPOCH)
567 .unwrap_or_default()
568 .as_millis() as u64;
569 let age_secs = if item.last_accessed > 0 {
570 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
571 } else {
572 3600.0 };
574
575 CacheEntry {
576 id,
577 size_bytes: item.size_bytes(),
578 created_at: now - std::time::Duration::from_secs_f64(age_secs),
579 last_access: now - std::time::Duration::from_secs_f64(age_secs),
580 access_count: item.access_count,
581 is_dirty: false, }
583 })
584 .collect();
585
586 if entries.is_empty() {
587 return;
588 }
589
590 let evict_count = match level {
592 BackpressureLevel::Normal => 0,
593 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
599
600 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
602
603 let mut evicted_bytes = 0usize;
605 for victim_id in &victims {
606 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
607 evicted_bytes += item.size_bytes();
608 }
609 }
610
611 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
613
614 info!(
615 evicted = victims.len(),
616 evicted_bytes = evicted_bytes,
617 pressure = %pressure,
618 level = %level,
619 "Evicted entries from L1 cache"
620 );
621 }
622
623 pub fn l1_stats(&self) -> (usize, usize) {
625 (
626 self.l1_cache.len(),
627 self.l1_size_bytes.load(Ordering::Acquire),
628 )
629 }
630
631 #[must_use]
633 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
634 self.l3_filter.stats()
635 }
636
637 pub fn l3_filter(&self) -> &Arc<FilterManager> {
639 &self.l3_filter
640 }
641
642 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
647 let redis_root = if let Some(ref rm) = self.redis_merkle {
648 rm.root_hash().await.ok().flatten().map(hex::encode)
649 } else {
650 None
651 };
652
653 let sql_root = if let Some(ref sm) = self.sql_merkle {
654 sm.root_hash().await.ok().flatten().map(hex::encode)
655 } else {
656 None
657 };
658
659 (redis_root, sql_root)
660 }
661
662 pub async fn verify_filter(&self) -> bool {
669 let sql_root = if let Some(ref sm) = self.sql_merkle {
671 match sm.root_hash().await {
672 Ok(Some(root)) => root,
673 _ => return false,
674 }
675 } else {
676 self.l3_filter.mark_trusted();
678 return true;
679 };
680
681 info!(
685 sql_root = %hex::encode(sql_root),
686 "Verifying L3 filter against SQL merkle root"
687 );
688
689 self.l3_filter.mark_trusted();
691 true
692 }
693
694 pub fn update_gauge_metrics(&self) {
699 let (l1_count, l1_bytes) = self.l1_stats();
700 crate::metrics::set_l1_cache_items(l1_count);
701 crate::metrics::set_l1_cache_bytes(l1_bytes);
702 crate::metrics::set_memory_pressure(self.memory_pressure());
703
704 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
705 let filter_load = if filter_capacity > 0 {
706 filter_entries as f64 / filter_capacity as f64
707 } else {
708 0.0
709 };
710 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
711 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
712
713 crate::metrics::set_backpressure_level(self.pressure() as u8);
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720 use crate::config::SyncEngineConfig;
721 use tokio::sync::watch;
722 use serde_json::json;
723
724 fn create_test_engine() -> SyncEngine {
725 let config = SyncEngineConfig::default();
726 let (_tx, rx) = watch::channel(config.clone());
727 SyncEngine::new(config, rx)
728 }
729
730 fn create_test_item(id: &str) -> SyncItem {
731 SyncItem::from_json(
732 id.to_string(),
733 json!({"data": "test"}),
734 )
735 }
736
737 #[test]
738 fn test_engine_created_state() {
739 let engine = create_test_engine();
740 assert_eq!(engine.state(), EngineState::Created);
741 assert!(!engine.is_ready());
742 }
743
744 #[test]
745 fn test_memory_pressure_calculation() {
746 let config = SyncEngineConfig {
747 l1_max_bytes: 1000,
748 ..Default::default()
749 };
750 let (_tx, rx) = watch::channel(config.clone());
751 let engine = SyncEngine::new(config, rx);
752
753 assert_eq!(engine.memory_pressure(), 0.0);
754
755 let item = create_test_item("test1");
757 engine.insert_l1(item);
758
759 assert!(engine.memory_pressure() > 0.0);
761 }
762
763 #[test]
764 fn test_l1_insert_and_size_tracking() {
765 let engine = create_test_engine();
766
767 let item = create_test_item("test1");
768 let expected_size = item.size_bytes();
769
770 engine.insert_l1(item);
771
772 let (count, size) = engine.l1_stats();
773 assert_eq!(count, 1);
774 assert_eq!(size, expected_size);
775 }
776
777 #[test]
778 fn test_l1_update_size_tracking() {
779 let engine = create_test_engine();
780
781 let item1 = create_test_item("test1");
782 engine.insert_l1(item1);
783 let (_, _size1) = engine.l1_stats();
784
785 let item2 = SyncItem::from_json(
787 "test1".to_string(),
788 json!({"data": "much larger content here for testing size changes"}),
789 );
790 let size2_expected = item2.size_bytes();
791 engine.insert_l1(item2);
792
793 let (count, size2) = engine.l1_stats();
794 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
797
798 #[tokio::test]
799 async fn test_get_nonexistent() {
800 let engine = create_test_engine();
801 let result = engine.get("nonexistent").await.unwrap();
802 assert!(result.is_none());
803 }
804
805 #[tokio::test]
806 async fn test_get_from_l1() {
807 let engine = create_test_engine();
808 let item = create_test_item("test1");
809 engine.insert_l1(item.clone());
810
811 let result = engine.get("test1").await.unwrap();
812 assert!(result.is_some());
813 assert_eq!(result.unwrap().object_id, "test1");
814 }
815
816 #[tokio::test]
817 async fn test_delete_from_l1() {
818 let engine = create_test_engine();
819 let item = create_test_item("test1");
820 engine.insert_l1(item);
821
822 let (count_before, _) = engine.l1_stats();
823 assert_eq!(count_before, 1);
824
825 let deleted = engine.delete("test1").await.unwrap();
826 assert!(deleted);
827
828 let (count_after, size_after) = engine.l1_stats();
829 assert_eq!(count_after, 0);
830 assert_eq!(size_after, 0);
831 }
832
833 #[test]
834 fn test_filter_stats() {
835 let engine = create_test_engine();
836
837 let (entries, capacity, _trust) = engine.l3_filter_stats();
838 assert_eq!(entries, 0);
839 assert!(capacity > 0);
840 }
841
842 #[test]
843 fn test_should_accept_writes() {
844 let engine = create_test_engine();
845 assert!(engine.should_accept_writes());
846 }
847
848 #[test]
849 fn test_pressure_level() {
850 let engine = create_test_engine();
851 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
852 }
853}