1mod types;
37mod api;
38mod lifecycle;
39mod flush;
40
41pub use types::{EngineState, ItemStatus, BatchResult};
42#[allow(unused_imports)]
43use types::WriteTarget;
44
45use std::sync::Arc;
46use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
47use std::time::Instant;
48use dashmap::DashMap;
49use tokio::sync::{watch, Mutex};
50use tracing::{info, warn, debug, error};
51
52use crate::config::SyncEngineConfig;
53use crate::sync_item::SyncItem;
54use crate::submit_options::SubmitOptions;
55use crate::backpressure::BackpressureLevel;
56use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
57use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
58use crate::cuckoo::FilterPersistence;
59use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
60use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
61use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
62use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
63
64pub struct SyncEngine {
76 pub(super) config: SyncEngineConfig,
78
79 #[allow(dead_code)]
81 pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
82
83 pub(super) state: watch::Sender<EngineState>,
85
86 pub(super) state_rx: watch::Receiver<EngineState>,
88
89 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
91
92 pub(super) l1_size_bytes: Arc<AtomicUsize>,
94
95 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
97
98 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
100
101 pub(super) l3_filter: Arc<FilterManager>,
103
104 pub(super) filter_persistence: Option<FilterPersistence>,
106
107 pub(super) cf_inserts_since_snapshot: AtomicU64,
109 pub(super) cf_last_snapshot: Mutex<Instant>,
110
111 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
113
114 pub(super) redis_merkle: Option<RedisMerkleStore>,
116
117 pub(super) sql_merkle: Option<SqlMerkleStore>,
119
120 pub(super) l3_wal: Option<WriteAheadLog>,
122
123 pub(super) mysql_health: MysqlHealthChecker,
125
126 pub(super) eviction_policy: TanCurvePolicy,
128}
129
130impl SyncEngine {
131 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
136 let (state_tx, state_rx) = watch::channel(EngineState::Created);
137
138 let batch_config = BatchConfig {
139 flush_ms: config.batch_flush_ms,
140 flush_count: config.batch_flush_count,
141 flush_bytes: config.batch_flush_bytes,
142 };
143
144 Self {
145 config: config.clone(),
146 config_rx,
147 state: state_tx,
148 state_rx,
149 l1_cache: Arc::new(DashMap::new()),
150 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
151 l2_store: None,
152 l3_store: None,
153 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
154 filter_persistence: None,
155 cf_inserts_since_snapshot: AtomicU64::new(0),
156 cf_last_snapshot: Mutex::new(Instant::now()),
157 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
158 redis_merkle: None,
159 sql_merkle: None,
160 l3_wal: None,
161 mysql_health: MysqlHealthChecker::new(),
162 eviction_policy: TanCurvePolicy::default(),
163 }
164 }
165
166 #[must_use]
168 pub fn state(&self) -> EngineState {
169 *self.state_rx.borrow()
170 }
171
172 #[must_use]
174 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
175 self.state_rx.clone()
176 }
177
178 #[must_use]
180 pub fn is_ready(&self) -> bool {
181 matches!(self.state(), EngineState::Ready | EngineState::Running)
182 }
183
184 #[must_use]
186 pub fn memory_pressure(&self) -> f64 {
187 let used = self.l1_size_bytes.load(Ordering::Acquire);
188 let max = self.config.l1_max_bytes;
189 if max == 0 {
190 0.0
191 } else {
192 used as f64 / max as f64
193 }
194 }
195
196 #[must_use]
198 pub fn pressure(&self) -> BackpressureLevel {
199 BackpressureLevel::from_pressure(self.memory_pressure())
200 }
201
202 #[must_use]
204 pub fn should_accept_writes(&self) -> bool {
205 let pressure = self.pressure();
206 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
207 }
208
209 #[tracing::instrument(skip(self), fields(tier))]
216 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
217 let start = std::time::Instant::now();
218
219 if let Some(mut item) = self.l1_cache.get_mut(id) {
221 item.access_count = item.access_count.saturating_add(1);
222 item.last_accessed = std::time::SystemTime::now()
223 .duration_since(std::time::UNIX_EPOCH)
224 .unwrap_or_default()
225 .as_millis() as u64;
226 tracing::Span::current().record("tier", "L1");
227 debug!("L1 hit");
228 crate::metrics::record_operation("L1", "get", "hit");
229 crate::metrics::record_latency("L1", "get", start.elapsed());
230 return Ok(Some(item.clone()));
231 }
232
233 if let Some(ref l2) = self.l2_store {
235 match l2.get(id).await {
236 Ok(Some(item)) => {
237 self.insert_l1(item.clone());
239 tracing::Span::current().record("tier", "L2");
240 debug!("L2 hit, promoted to L1");
241 crate::metrics::record_operation("L2", "get", "hit");
242 crate::metrics::record_latency("L2", "get", start.elapsed());
243 return Ok(Some(item));
244 }
245 Ok(None) => {
246 debug!("L2 miss");
248 crate::metrics::record_operation("L2", "get", "miss");
249 }
250 Err(e) => {
251 warn!(error = %e, "L2 lookup failed");
252 crate::metrics::record_operation("L2", "get", "error");
253 }
254 }
255 }
256
257 if self.l3_filter.should_check_l3(id) {
259 crate::metrics::record_cuckoo_check("L3", "positive");
260 if let Some(ref l3) = self.l3_store {
261 match l3.get(id).await {
262 Ok(Some(item)) => {
263 if self.memory_pressure() < 1.0 {
265 self.insert_l1(item.clone());
266 }
267 tracing::Span::current().record("tier", "L3");
268 debug!("L3 hit, promoted to L1");
269 crate::metrics::record_operation("L3", "get", "hit");
270 crate::metrics::record_latency("L3", "get", start.elapsed());
271 crate::metrics::record_bytes_read("L3", item.content.len());
272 return Ok(Some(item));
273 }
274 Ok(None) => {
275 debug!("L3 filter false positive");
277 crate::metrics::record_operation("L3", "get", "false_positive");
278 crate::metrics::record_cuckoo_false_positive("L3");
279 }
280 Err(e) => {
281 warn!(error = %e, "L3 lookup failed");
282 crate::metrics::record_operation("L3", "get", "error");
283 crate::metrics::record_error("L3", "get", "backend");
284 }
285 }
286 }
287 } else {
288 crate::metrics::record_cuckoo_check("L3", "negative");
290 }
291
292 tracing::Span::current().record("tier", "miss");
293 debug!("Cache miss");
294 crate::metrics::record_operation("all", "get", "miss");
295 crate::metrics::record_latency("all", "get", start.elapsed());
296 Ok(None)
297 }
298
299 #[tracing::instrument(skip(self), fields(verified))]
304 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
305 let item = match self.get(id).await? {
306 Some(item) => item,
307 None => return Ok(None),
308 };
309
310 if !item.merkle_root.is_empty() {
312 use sha2::{Sha256, Digest};
313
314 let computed = Sha256::digest(&item.content);
315 let computed_hex = hex::encode(computed);
316
317 if computed_hex != item.merkle_root {
318 tracing::Span::current().record("verified", false);
319 warn!(
320 id = %id,
321 expected = %item.merkle_root,
322 actual = %computed_hex,
323 "Data corruption detected!"
324 );
325
326 crate::metrics::record_corruption(id);
328
329 return Err(StorageError::Corruption {
330 id: id.to_string(),
331 expected: item.merkle_root.clone(),
332 actual: computed_hex,
333 });
334 }
335
336 tracing::Span::current().record("verified", true);
337 debug!(id = %id, "Hash verification passed");
338 }
339
340 Ok(Some(item))
341 }
342
343 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
351 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
352 self.submit_with(item, SubmitOptions::default()).await
353 }
354
355 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
376 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
377 let start = std::time::Instant::now();
378
379 if !self.should_accept_writes() {
380 crate::metrics::record_operation("engine", "submit", "rejected");
381 crate::metrics::record_error("engine", "submit", "backpressure");
382 return Err(StorageError::Backend(format!(
383 "Rejecting write: engine state={}, pressure={}",
384 self.state(),
385 self.pressure()
386 )));
387 }
388
389 let id = item.object_id.clone();
390 let item_bytes = item.content.len();
391
392 item.submit_options = Some(options);
394
395 self.insert_l1(item.clone());
397 crate::metrics::record_operation("L1", "submit", "success");
398 crate::metrics::record_bytes_written("L1", item_bytes);
399
400 self.l2_batcher.lock().await.add(item);
406
407 debug!(id = %id, "Item submitted to L1 and batch queue");
408 crate::metrics::record_latency("L1", "submit", start.elapsed());
409 Ok(())
410 }
411
412 #[tracing::instrument(skip(self), fields(object_id = %id))]
421 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
422 let start = std::time::Instant::now();
423
424 if !self.should_accept_writes() {
425 crate::metrics::record_operation("engine", "delete", "rejected");
426 crate::metrics::record_error("engine", "delete", "backpressure");
427 return Err(StorageError::Backend(format!(
428 "Rejecting delete: engine state={}, pressure={}",
429 self.state(),
430 self.pressure()
431 )));
432 }
433
434 let mut found = false;
435
436 if let Some((_, item)) = self.l1_cache.remove(id) {
438 let size = Self::item_size(&item);
439 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
440 found = true;
441 debug!("Deleted from L1");
442 crate::metrics::record_operation("L1", "delete", "success");
443 }
444
445 self.l3_filter.remove(id);
447
448 if let Some(ref l2) = self.l2_store {
450 let l2_start = std::time::Instant::now();
451 match l2.delete(id).await {
452 Ok(()) => {
453 found = true;
454 debug!("Deleted from L2 (Redis)");
455 crate::metrics::record_operation("L2", "delete", "success");
456 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
457 }
458 Err(e) => {
459 warn!(error = %e, "Failed to delete from L2 (Redis)");
460 crate::metrics::record_operation("L2", "delete", "error");
461 crate::metrics::record_error("L2", "delete", "backend");
462 }
463 }
464 }
465
466 if let Some(ref l3) = self.l3_store {
468 let l3_start = std::time::Instant::now();
469 match l3.delete(id).await {
470 Ok(()) => {
471 found = true;
472 debug!("Deleted from L3 (MySQL)");
473 crate::metrics::record_operation("L3", "delete", "success");
474 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
475 }
476 Err(e) => {
477 error!(error = %e, "Failed to delete from L3 (MySQL)");
478 crate::metrics::record_operation("L3", "delete", "error");
479 crate::metrics::record_error("L3", "delete", "backend");
480 }
482 }
483 }
484
485 let mut merkle_batch = MerkleBatch::new();
487 merkle_batch.delete(id.to_string());
488
489 if let Some(ref sql_merkle) = self.sql_merkle {
490 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
491 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
492 crate::metrics::record_error("L3", "merkle", "batch_apply");
493 }
494 }
495
496 if let Some(ref redis_merkle) = self.redis_merkle {
497 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
498 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
499 crate::metrics::record_error("L2", "merkle", "batch_apply");
500 }
501 }
502
503 info!(found, "Delete operation completed");
504 crate::metrics::record_latency("all", "delete", start.elapsed());
505 Ok(found)
506 }
507
508 fn insert_l1(&self, item: SyncItem) {
512 let new_size = Self::item_size(&item);
513 let key = item.object_id.clone();
514
515 if let Some(old_item) = self.l1_cache.insert(key, item) {
517 let old_size = Self::item_size(&old_item);
519 let current = self.l1_size_bytes.load(Ordering::Acquire);
521 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
522 self.l1_size_bytes.store(new_total, Ordering::Release);
523 } else {
524 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
526 }
527 }
528
529 #[inline]
531 fn item_size(item: &SyncItem) -> usize {
532 item.size_bytes()
534 }
535
536 fn maybe_evict(&self) {
537 let pressure = self.memory_pressure();
538 if pressure < self.config.backpressure_warn {
539 return;
540 }
541
542 let level = BackpressureLevel::from_pressure(pressure);
543 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
544
545 let now = std::time::Instant::now();
547 let entries: Vec<CacheEntry> = self.l1_cache.iter()
548 .map(|ref_multi| {
549 let item = ref_multi.value();
550 let id = ref_multi.key().clone();
551
552 let now_millis = std::time::SystemTime::now()
554 .duration_since(std::time::UNIX_EPOCH)
555 .unwrap_or_default()
556 .as_millis() as u64;
557 let age_secs = if item.last_accessed > 0 {
558 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
559 } else {
560 3600.0 };
562
563 CacheEntry {
564 id,
565 size_bytes: item.size_bytes(),
566 created_at: now - std::time::Duration::from_secs_f64(age_secs),
567 last_access: now - std::time::Duration::from_secs_f64(age_secs),
568 access_count: item.access_count,
569 is_dirty: false, }
571 })
572 .collect();
573
574 if entries.is_empty() {
575 return;
576 }
577
578 let evict_count = match level {
580 BackpressureLevel::Normal => 0,
581 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
587
588 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
590
591 let mut evicted_bytes = 0usize;
593 for victim_id in &victims {
594 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
595 evicted_bytes += item.size_bytes();
596 }
597 }
598
599 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
601
602 info!(
603 evicted = victims.len(),
604 evicted_bytes = evicted_bytes,
605 pressure = %pressure,
606 level = %level,
607 "Evicted entries from L1 cache"
608 );
609 }
610
611 pub fn l1_stats(&self) -> (usize, usize) {
613 (
614 self.l1_cache.len(),
615 self.l1_size_bytes.load(Ordering::Acquire),
616 )
617 }
618
619 #[must_use]
621 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
622 self.l3_filter.stats()
623 }
624
625 pub fn l3_filter(&self) -> &Arc<FilterManager> {
627 &self.l3_filter
628 }
629
630 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
635 let redis_root = if let Some(ref rm) = self.redis_merkle {
636 rm.root_hash().await.ok().flatten().map(hex::encode)
637 } else {
638 None
639 };
640
641 let sql_root = if let Some(ref sm) = self.sql_merkle {
642 sm.root_hash().await.ok().flatten().map(hex::encode)
643 } else {
644 None
645 };
646
647 (redis_root, sql_root)
648 }
649
650 pub async fn verify_filter(&self) -> bool {
657 let sql_root = if let Some(ref sm) = self.sql_merkle {
659 match sm.root_hash().await {
660 Ok(Some(root)) => root,
661 _ => return false,
662 }
663 } else {
664 self.l3_filter.mark_trusted();
666 return true;
667 };
668
669 info!(
673 sql_root = %hex::encode(sql_root),
674 "Verifying L3 filter against SQL merkle root"
675 );
676
677 self.l3_filter.mark_trusted();
679 true
680 }
681
682 pub fn update_gauge_metrics(&self) {
687 let (l1_count, l1_bytes) = self.l1_stats();
688 crate::metrics::set_l1_cache_items(l1_count);
689 crate::metrics::set_l1_cache_bytes(l1_bytes);
690 crate::metrics::set_memory_pressure(self.memory_pressure());
691
692 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
693 let filter_load = if filter_capacity > 0 {
694 filter_entries as f64 / filter_capacity as f64
695 } else {
696 0.0
697 };
698 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
699 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
700
701 crate::metrics::set_backpressure_level(self.pressure() as u8);
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use crate::config::SyncEngineConfig;
709 use tokio::sync::watch;
710 use serde_json::json;
711
712 fn create_test_engine() -> SyncEngine {
713 let config = SyncEngineConfig::default();
714 let (_tx, rx) = watch::channel(config.clone());
715 SyncEngine::new(config, rx)
716 }
717
718 fn create_test_item(id: &str) -> SyncItem {
719 SyncItem::from_json(
720 id.to_string(),
721 json!({"data": "test"}),
722 )
723 }
724
725 #[test]
726 fn test_engine_created_state() {
727 let engine = create_test_engine();
728 assert_eq!(engine.state(), EngineState::Created);
729 assert!(!engine.is_ready());
730 }
731
732 #[test]
733 fn test_memory_pressure_calculation() {
734 let config = SyncEngineConfig {
735 l1_max_bytes: 1000,
736 ..Default::default()
737 };
738 let (_tx, rx) = watch::channel(config.clone());
739 let engine = SyncEngine::new(config, rx);
740
741 assert_eq!(engine.memory_pressure(), 0.0);
742
743 let item = create_test_item("test1");
745 engine.insert_l1(item);
746
747 assert!(engine.memory_pressure() > 0.0);
749 }
750
751 #[test]
752 fn test_l1_insert_and_size_tracking() {
753 let engine = create_test_engine();
754
755 let item = create_test_item("test1");
756 let expected_size = item.size_bytes();
757
758 engine.insert_l1(item);
759
760 let (count, size) = engine.l1_stats();
761 assert_eq!(count, 1);
762 assert_eq!(size, expected_size);
763 }
764
765 #[test]
766 fn test_l1_update_size_tracking() {
767 let engine = create_test_engine();
768
769 let item1 = create_test_item("test1");
770 engine.insert_l1(item1);
771 let (_, _size1) = engine.l1_stats();
772
773 let item2 = SyncItem::from_json(
775 "test1".to_string(),
776 json!({"data": "much larger content here for testing size changes"}),
777 );
778 let size2_expected = item2.size_bytes();
779 engine.insert_l1(item2);
780
781 let (count, size2) = engine.l1_stats();
782 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
785
786 #[tokio::test]
787 async fn test_get_nonexistent() {
788 let engine = create_test_engine();
789 let result = engine.get("nonexistent").await.unwrap();
790 assert!(result.is_none());
791 }
792
793 #[tokio::test]
794 async fn test_get_from_l1() {
795 let engine = create_test_engine();
796 let item = create_test_item("test1");
797 engine.insert_l1(item.clone());
798
799 let result = engine.get("test1").await.unwrap();
800 assert!(result.is_some());
801 assert_eq!(result.unwrap().object_id, "test1");
802 }
803
804 #[tokio::test]
805 async fn test_delete_from_l1() {
806 let engine = create_test_engine();
807 let item = create_test_item("test1");
808 engine.insert_l1(item);
809
810 let (count_before, _) = engine.l1_stats();
811 assert_eq!(count_before, 1);
812
813 let deleted = engine.delete("test1").await.unwrap();
814 assert!(deleted);
815
816 let (count_after, size_after) = engine.l1_stats();
817 assert_eq!(count_after, 0);
818 assert_eq!(size_after, 0);
819 }
820
821 #[test]
822 fn test_filter_stats() {
823 let engine = create_test_engine();
824
825 let (entries, capacity, _trust) = engine.l3_filter_stats();
826 assert_eq!(entries, 0);
827 assert!(capacity > 0);
828 }
829
830 #[test]
831 fn test_should_accept_writes() {
832 let engine = create_test_engine();
833 assert!(engine.should_accept_writes());
834 }
835
836 #[test]
837 fn test_pressure_level() {
838 let engine = create_test_engine();
839 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
840 }
841}