1mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41
42pub use types::{EngineState, ItemStatus, BatchResult};
43pub use merkle_api::MerkleDiff;
44#[allow(unused_imports)]
45use types::WriteTarget;
46
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
49use std::time::Instant;
50use dashmap::DashMap;
51use tokio::sync::{watch, Mutex};
52use tracing::{info, warn, debug, error};
53
54use crate::config::SyncEngineConfig;
55use crate::sync_item::SyncItem;
56use crate::submit_options::SubmitOptions;
57use crate::backpressure::BackpressureLevel;
58use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
59use crate::storage::sql::SqlStore;
60use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
61use crate::cuckoo::FilterPersistence;
62use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
63use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
64use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
65use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
66
67pub struct SyncEngine {
79 pub(super) config: SyncEngineConfig,
81
82 #[allow(dead_code)]
84 pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
85
86 pub(super) state: watch::Sender<EngineState>,
88
89 pub(super) state_rx: watch::Receiver<EngineState>,
91
92 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
94
95 pub(super) l1_size_bytes: Arc<AtomicUsize>,
97
98 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
100
101 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
103
104 pub(super) sql_store: Option<Arc<SqlStore>>,
106
107 pub(super) l3_filter: Arc<FilterManager>,
109
110 pub(super) filter_persistence: Option<FilterPersistence>,
112
113 pub(super) cf_inserts_since_snapshot: AtomicU64,
115 pub(super) cf_last_snapshot: Mutex<Instant>,
116
117 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
119
120 pub(super) redis_merkle: Option<RedisMerkleStore>,
122
123 pub(super) sql_merkle: Option<SqlMerkleStore>,
125
126 pub(super) l3_wal: Option<WriteAheadLog>,
128
129 pub(super) mysql_health: MysqlHealthChecker,
131
132 pub(super) eviction_policy: TanCurvePolicy,
134}
135
136impl SyncEngine {
137 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
142 let (state_tx, state_rx) = watch::channel(EngineState::Created);
143
144 let batch_config = BatchConfig {
145 flush_ms: config.batch_flush_ms,
146 flush_count: config.batch_flush_count,
147 flush_bytes: config.batch_flush_bytes,
148 };
149
150 Self {
151 config: config.clone(),
152 config_rx,
153 state: state_tx,
154 state_rx,
155 l1_cache: Arc::new(DashMap::new()),
156 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
157 l2_store: None,
158 l3_store: None,
159 sql_store: None,
160 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
161 filter_persistence: None,
162 cf_inserts_since_snapshot: AtomicU64::new(0),
163 cf_last_snapshot: Mutex::new(Instant::now()),
164 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
165 redis_merkle: None,
166 sql_merkle: None,
167 l3_wal: None,
168 mysql_health: MysqlHealthChecker::new(),
169 eviction_policy: TanCurvePolicy::default(),
170 }
171 }
172
173 #[must_use]
175 pub fn state(&self) -> EngineState {
176 *self.state_rx.borrow()
177 }
178
179 #[must_use]
181 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
182 self.state_rx.clone()
183 }
184
185 #[must_use]
187 pub fn is_ready(&self) -> bool {
188 matches!(self.state(), EngineState::Ready | EngineState::Running)
189 }
190
191 #[must_use]
193 pub fn memory_pressure(&self) -> f64 {
194 let used = self.l1_size_bytes.load(Ordering::Acquire);
195 let max = self.config.l1_max_bytes;
196 if max == 0 {
197 0.0
198 } else {
199 used as f64 / max as f64
200 }
201 }
202
203 #[must_use]
205 pub fn pressure(&self) -> BackpressureLevel {
206 BackpressureLevel::from_pressure(self.memory_pressure())
207 }
208
209 #[must_use]
211 pub fn should_accept_writes(&self) -> bool {
212 let pressure = self.pressure();
213 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
214 }
215
216 #[tracing::instrument(skip(self), fields(tier))]
223 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
224 let start = std::time::Instant::now();
225
226 if let Some(mut item) = self.l1_cache.get_mut(id) {
228 item.access_count = item.access_count.saturating_add(1);
229 item.last_accessed = std::time::SystemTime::now()
230 .duration_since(std::time::UNIX_EPOCH)
231 .unwrap_or_default()
232 .as_millis() as u64;
233 tracing::Span::current().record("tier", "L1");
234 debug!("L1 hit");
235 crate::metrics::record_operation("L1", "get", "hit");
236 crate::metrics::record_latency("L1", "get", start.elapsed());
237 return Ok(Some(item.clone()));
238 }
239
240 if let Some(ref l2) = self.l2_store {
242 match l2.get(id).await {
243 Ok(Some(item)) => {
244 self.insert_l1(item.clone());
246 tracing::Span::current().record("tier", "L2");
247 debug!("L2 hit, promoted to L1");
248 crate::metrics::record_operation("L2", "get", "hit");
249 crate::metrics::record_latency("L2", "get", start.elapsed());
250 return Ok(Some(item));
251 }
252 Ok(None) => {
253 debug!("L2 miss");
255 crate::metrics::record_operation("L2", "get", "miss");
256 }
257 Err(e) => {
258 warn!(error = %e, "L2 lookup failed");
259 crate::metrics::record_operation("L2", "get", "error");
260 }
261 }
262 }
263
264 if self.l3_filter.should_check_l3(id) {
266 crate::metrics::record_cuckoo_check("L3", "positive");
267 if let Some(ref l3) = self.l3_store {
268 match l3.get(id).await {
269 Ok(Some(item)) => {
270 if self.memory_pressure() < 1.0 {
272 self.insert_l1(item.clone());
273 }
274 tracing::Span::current().record("tier", "L3");
275 debug!("L3 hit, promoted to L1");
276 crate::metrics::record_operation("L3", "get", "hit");
277 crate::metrics::record_latency("L3", "get", start.elapsed());
278 crate::metrics::record_bytes_read("L3", item.content.len());
279 return Ok(Some(item));
280 }
281 Ok(None) => {
282 debug!("L3 filter false positive");
284 crate::metrics::record_operation("L3", "get", "false_positive");
285 crate::metrics::record_cuckoo_false_positive("L3");
286 }
287 Err(e) => {
288 warn!(error = %e, "L3 lookup failed");
289 crate::metrics::record_operation("L3", "get", "error");
290 crate::metrics::record_error("L3", "get", "backend");
291 }
292 }
293 }
294 } else {
295 crate::metrics::record_cuckoo_check("L3", "negative");
297 }
298
299 tracing::Span::current().record("tier", "miss");
300 debug!("Cache miss");
301 crate::metrics::record_operation("all", "get", "miss");
302 crate::metrics::record_latency("all", "get", start.elapsed());
303 Ok(None)
304 }
305
306 #[tracing::instrument(skip(self), fields(verified))]
311 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
312 let item = match self.get(id).await? {
313 Some(item) => item,
314 None => return Ok(None),
315 };
316
317 if !item.merkle_root.is_empty() {
319 use sha2::{Sha256, Digest};
320
321 let computed = Sha256::digest(&item.content);
322 let computed_hex = hex::encode(computed);
323
324 if computed_hex != item.merkle_root {
325 tracing::Span::current().record("verified", false);
326 warn!(
327 id = %id,
328 expected = %item.merkle_root,
329 actual = %computed_hex,
330 "Data corruption detected!"
331 );
332
333 crate::metrics::record_corruption(id);
335
336 return Err(StorageError::Corruption {
337 id: id.to_string(),
338 expected: item.merkle_root.clone(),
339 actual: computed_hex,
340 });
341 }
342
343 tracing::Span::current().record("verified", true);
344 debug!(id = %id, "Hash verification passed");
345 }
346
347 Ok(Some(item))
348 }
349
350 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
358 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
359 self.submit_with(item, SubmitOptions::default()).await
360 }
361
362 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
383 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
384 let start = std::time::Instant::now();
385
386 if !self.should_accept_writes() {
387 crate::metrics::record_operation("engine", "submit", "rejected");
388 crate::metrics::record_error("engine", "submit", "backpressure");
389 return Err(StorageError::Backend(format!(
390 "Rejecting write: engine state={}, pressure={}",
391 self.state(),
392 self.pressure()
393 )));
394 }
395
396 let id = item.object_id.clone();
397 let item_bytes = item.content.len();
398
399 item.submit_options = Some(options);
401
402 self.insert_l1(item.clone());
404 crate::metrics::record_operation("L1", "submit", "success");
405 crate::metrics::record_bytes_written("L1", item_bytes);
406
407 self.l2_batcher.lock().await.add(item);
413
414 debug!(id = %id, "Item submitted to L1 and batch queue");
415 crate::metrics::record_latency("L1", "submit", start.elapsed());
416 Ok(())
417 }
418
419 #[tracing::instrument(skip(self), fields(object_id = %id))]
428 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
429 let start = std::time::Instant::now();
430
431 if !self.should_accept_writes() {
432 crate::metrics::record_operation("engine", "delete", "rejected");
433 crate::metrics::record_error("engine", "delete", "backpressure");
434 return Err(StorageError::Backend(format!(
435 "Rejecting delete: engine state={}, pressure={}",
436 self.state(),
437 self.pressure()
438 )));
439 }
440
441 let mut found = false;
442
443 if let Some((_, item)) = self.l1_cache.remove(id) {
445 let size = Self::item_size(&item);
446 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
447 found = true;
448 debug!("Deleted from L1");
449 crate::metrics::record_operation("L1", "delete", "success");
450 }
451
452 self.l3_filter.remove(id);
454
455 if let Some(ref l2) = self.l2_store {
457 let l2_start = std::time::Instant::now();
458 match l2.delete(id).await {
459 Ok(()) => {
460 found = true;
461 debug!("Deleted from L2 (Redis)");
462 crate::metrics::record_operation("L2", "delete", "success");
463 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
464 }
465 Err(e) => {
466 warn!(error = %e, "Failed to delete from L2 (Redis)");
467 crate::metrics::record_operation("L2", "delete", "error");
468 crate::metrics::record_error("L2", "delete", "backend");
469 }
470 }
471 }
472
473 if let Some(ref l3) = self.l3_store {
475 let l3_start = std::time::Instant::now();
476 match l3.delete(id).await {
477 Ok(()) => {
478 found = true;
479 debug!("Deleted from L3 (MySQL)");
480 crate::metrics::record_operation("L3", "delete", "success");
481 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
482 }
483 Err(e) => {
484 error!(error = %e, "Failed to delete from L3 (MySQL)");
485 crate::metrics::record_operation("L3", "delete", "error");
486 crate::metrics::record_error("L3", "delete", "backend");
487 }
489 }
490 }
491
492 let mut merkle_batch = MerkleBatch::new();
494 merkle_batch.delete(id.to_string());
495
496 if let Some(ref sql_merkle) = self.sql_merkle {
497 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
498 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
499 crate::metrics::record_error("L3", "merkle", "batch_apply");
500 }
501 }
502
503 if let Some(ref redis_merkle) = self.redis_merkle {
504 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
505 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
506 crate::metrics::record_error("L2", "merkle", "batch_apply");
507 }
508 }
509
510 info!(found, "Delete operation completed");
511 crate::metrics::record_latency("all", "delete", start.elapsed());
512 Ok(found)
513 }
514
515 fn insert_l1(&self, item: SyncItem) {
519 let new_size = Self::item_size(&item);
520 let key = item.object_id.clone();
521
522 if let Some(old_item) = self.l1_cache.insert(key, item) {
524 let old_size = Self::item_size(&old_item);
526 let current = self.l1_size_bytes.load(Ordering::Acquire);
528 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
529 self.l1_size_bytes.store(new_total, Ordering::Release);
530 } else {
531 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
533 }
534 }
535
536 #[inline]
538 fn item_size(item: &SyncItem) -> usize {
539 item.size_bytes()
541 }
542
543 fn maybe_evict(&self) {
544 let pressure = self.memory_pressure();
545 if pressure < self.config.backpressure_warn {
546 return;
547 }
548
549 let level = BackpressureLevel::from_pressure(pressure);
550 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
551
552 let now = std::time::Instant::now();
554 let entries: Vec<CacheEntry> = self.l1_cache.iter()
555 .map(|ref_multi| {
556 let item = ref_multi.value();
557 let id = ref_multi.key().clone();
558
559 let now_millis = std::time::SystemTime::now()
561 .duration_since(std::time::UNIX_EPOCH)
562 .unwrap_or_default()
563 .as_millis() as u64;
564 let age_secs = if item.last_accessed > 0 {
565 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
566 } else {
567 3600.0 };
569
570 CacheEntry {
571 id,
572 size_bytes: item.size_bytes(),
573 created_at: now - std::time::Duration::from_secs_f64(age_secs),
574 last_access: now - std::time::Duration::from_secs_f64(age_secs),
575 access_count: item.access_count,
576 is_dirty: false, }
578 })
579 .collect();
580
581 if entries.is_empty() {
582 return;
583 }
584
585 let evict_count = match level {
587 BackpressureLevel::Normal => 0,
588 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
594
595 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
597
598 let mut evicted_bytes = 0usize;
600 for victim_id in &victims {
601 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
602 evicted_bytes += item.size_bytes();
603 }
604 }
605
606 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
608
609 info!(
610 evicted = victims.len(),
611 evicted_bytes = evicted_bytes,
612 pressure = %pressure,
613 level = %level,
614 "Evicted entries from L1 cache"
615 );
616 }
617
618 pub fn l1_stats(&self) -> (usize, usize) {
620 (
621 self.l1_cache.len(),
622 self.l1_size_bytes.load(Ordering::Acquire),
623 )
624 }
625
626 #[must_use]
628 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
629 self.l3_filter.stats()
630 }
631
632 pub fn l3_filter(&self) -> &Arc<FilterManager> {
634 &self.l3_filter
635 }
636
637 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
642 let redis_root = if let Some(ref rm) = self.redis_merkle {
643 rm.root_hash().await.ok().flatten().map(hex::encode)
644 } else {
645 None
646 };
647
648 let sql_root = if let Some(ref sm) = self.sql_merkle {
649 sm.root_hash().await.ok().flatten().map(hex::encode)
650 } else {
651 None
652 };
653
654 (redis_root, sql_root)
655 }
656
657 pub async fn verify_filter(&self) -> bool {
664 let sql_root = if let Some(ref sm) = self.sql_merkle {
666 match sm.root_hash().await {
667 Ok(Some(root)) => root,
668 _ => return false,
669 }
670 } else {
671 self.l3_filter.mark_trusted();
673 return true;
674 };
675
676 info!(
680 sql_root = %hex::encode(sql_root),
681 "Verifying L3 filter against SQL merkle root"
682 );
683
684 self.l3_filter.mark_trusted();
686 true
687 }
688
689 pub fn update_gauge_metrics(&self) {
694 let (l1_count, l1_bytes) = self.l1_stats();
695 crate::metrics::set_l1_cache_items(l1_count);
696 crate::metrics::set_l1_cache_bytes(l1_bytes);
697 crate::metrics::set_memory_pressure(self.memory_pressure());
698
699 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
700 let filter_load = if filter_capacity > 0 {
701 filter_entries as f64 / filter_capacity as f64
702 } else {
703 0.0
704 };
705 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
706 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
707
708 crate::metrics::set_backpressure_level(self.pressure() as u8);
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715 use crate::config::SyncEngineConfig;
716 use tokio::sync::watch;
717 use serde_json::json;
718
719 fn create_test_engine() -> SyncEngine {
720 let config = SyncEngineConfig::default();
721 let (_tx, rx) = watch::channel(config.clone());
722 SyncEngine::new(config, rx)
723 }
724
725 fn create_test_item(id: &str) -> SyncItem {
726 SyncItem::from_json(
727 id.to_string(),
728 json!({"data": "test"}),
729 )
730 }
731
732 #[test]
733 fn test_engine_created_state() {
734 let engine = create_test_engine();
735 assert_eq!(engine.state(), EngineState::Created);
736 assert!(!engine.is_ready());
737 }
738
739 #[test]
740 fn test_memory_pressure_calculation() {
741 let config = SyncEngineConfig {
742 l1_max_bytes: 1000,
743 ..Default::default()
744 };
745 let (_tx, rx) = watch::channel(config.clone());
746 let engine = SyncEngine::new(config, rx);
747
748 assert_eq!(engine.memory_pressure(), 0.0);
749
750 let item = create_test_item("test1");
752 engine.insert_l1(item);
753
754 assert!(engine.memory_pressure() > 0.0);
756 }
757
758 #[test]
759 fn test_l1_insert_and_size_tracking() {
760 let engine = create_test_engine();
761
762 let item = create_test_item("test1");
763 let expected_size = item.size_bytes();
764
765 engine.insert_l1(item);
766
767 let (count, size) = engine.l1_stats();
768 assert_eq!(count, 1);
769 assert_eq!(size, expected_size);
770 }
771
772 #[test]
773 fn test_l1_update_size_tracking() {
774 let engine = create_test_engine();
775
776 let item1 = create_test_item("test1");
777 engine.insert_l1(item1);
778 let (_, _size1) = engine.l1_stats();
779
780 let item2 = SyncItem::from_json(
782 "test1".to_string(),
783 json!({"data": "much larger content here for testing size changes"}),
784 );
785 let size2_expected = item2.size_bytes();
786 engine.insert_l1(item2);
787
788 let (count, size2) = engine.l1_stats();
789 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
792
793 #[tokio::test]
794 async fn test_get_nonexistent() {
795 let engine = create_test_engine();
796 let result = engine.get("nonexistent").await.unwrap();
797 assert!(result.is_none());
798 }
799
800 #[tokio::test]
801 async fn test_get_from_l1() {
802 let engine = create_test_engine();
803 let item = create_test_item("test1");
804 engine.insert_l1(item.clone());
805
806 let result = engine.get("test1").await.unwrap();
807 assert!(result.is_some());
808 assert_eq!(result.unwrap().object_id, "test1");
809 }
810
811 #[tokio::test]
812 async fn test_delete_from_l1() {
813 let engine = create_test_engine();
814 let item = create_test_item("test1");
815 engine.insert_l1(item);
816
817 let (count_before, _) = engine.l1_stats();
818 assert_eq!(count_before, 1);
819
820 let deleted = engine.delete("test1").await.unwrap();
821 assert!(deleted);
822
823 let (count_after, size_after) = engine.l1_stats();
824 assert_eq!(count_after, 0);
825 assert_eq!(size_after, 0);
826 }
827
828 #[test]
829 fn test_filter_stats() {
830 let engine = create_test_engine();
831
832 let (entries, capacity, _trust) = engine.l3_filter_stats();
833 assert_eq!(entries, 0);
834 assert!(capacity > 0);
835 }
836
837 #[test]
838 fn test_should_accept_writes() {
839 let engine = create_test_engine();
840 assert!(engine.should_accept_writes());
841 }
842
843 #[test]
844 fn test_pressure_level() {
845 let engine = create_test_engine();
846 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
847 }
848}