1mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41mod search_api;
42
43pub use types::{EngineState, ItemStatus, BatchResult};
44pub use merkle_api::MerkleDiff;
45pub use search_api::{SearchTier, SearchResult, SearchSource};
46#[allow(unused_imports)]
47use types::WriteTarget;
48
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
51use std::time::Instant;
52use dashmap::DashMap;
53use parking_lot::RwLock;
54use tokio::sync::{watch, Mutex};
55use tracing::{info, warn, debug, error};
56
57use crate::config::SyncEngineConfig;
58use crate::sync_item::SyncItem;
59use crate::submit_options::SubmitOptions;
60use crate::backpressure::BackpressureLevel;
61use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
62use crate::storage::sql::SqlStore;
63use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
64use crate::cuckoo::FilterPersistence;
65use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
66use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
67use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
68use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
69
70use search_api::SearchState;
71
72pub struct SyncEngine {
84 pub(super) config: RwLock<SyncEngineConfig>,
87
88 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
90
91 pub(super) state: watch::Sender<EngineState>,
93
94 pub(super) state_rx: watch::Receiver<EngineState>,
96
97 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
99
100 pub(super) l1_size_bytes: Arc<AtomicUsize>,
102
103 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
105
106 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
108
109 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
111
112 pub(super) sql_store: Option<Arc<SqlStore>>,
114
115 pub(super) l3_filter: Arc<FilterManager>,
117
118 pub(super) filter_persistence: Option<FilterPersistence>,
120
121 pub(super) cf_inserts_since_snapshot: AtomicU64,
123 pub(super) cf_last_snapshot: Mutex<Instant>,
124
125 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
127
128 pub(super) redis_merkle: Option<RedisMerkleStore>,
130
131 pub(super) sql_merkle: Option<SqlMerkleStore>,
133
134 pub(super) l3_wal: Option<WriteAheadLog>,
136
137 pub(super) mysql_health: MysqlHealthChecker,
139
140 pub(super) eviction_policy: TanCurvePolicy,
142
143 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
145}
146
147impl SyncEngine {
148 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
153 let (state_tx, state_rx) = watch::channel(EngineState::Created);
154
155 let batch_config = BatchConfig {
156 flush_ms: config.batch_flush_ms,
157 flush_count: config.batch_flush_count,
158 flush_bytes: config.batch_flush_bytes,
159 };
160
161 Self {
162 config: RwLock::new(config.clone()),
163 config_rx: Mutex::new(config_rx),
164 state: state_tx,
165 state_rx,
166 l1_cache: Arc::new(DashMap::new()),
167 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
168 l2_store: None,
169 redis_store: None,
170 l3_store: None,
171 sql_store: None,
172 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
173 filter_persistence: None,
174 cf_inserts_since_snapshot: AtomicU64::new(0),
175 cf_last_snapshot: Mutex::new(Instant::now()),
176 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
177 redis_merkle: None,
178 sql_merkle: None,
179 l3_wal: None,
180 mysql_health: MysqlHealthChecker::new(),
181 eviction_policy: TanCurvePolicy::default(),
182 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
183 }
184 }
185
186 #[must_use]
188 pub fn state(&self) -> EngineState {
189 *self.state_rx.borrow()
190 }
191
192 #[must_use]
194 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
195 self.state_rx.clone()
196 }
197
198 #[must_use]
200 pub fn is_ready(&self) -> bool {
201 matches!(self.state(), EngineState::Ready | EngineState::Running)
202 }
203
204 #[must_use]
206 pub fn memory_pressure(&self) -> f64 {
207 let used = self.l1_size_bytes.load(Ordering::Acquire);
208 let max = self.config.read().l1_max_bytes;
209 if max == 0 {
210 0.0
211 } else {
212 used as f64 / max as f64
213 }
214 }
215
216 #[must_use]
218 pub fn pressure(&self) -> BackpressureLevel {
219 BackpressureLevel::from_pressure(self.memory_pressure())
220 }
221
222 #[must_use]
224 pub fn should_accept_writes(&self) -> bool {
225 let pressure = self.pressure();
226 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
227 }
228
229 #[tracing::instrument(skip(self), fields(tier))]
236 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
237 let start = std::time::Instant::now();
238
239 if let Some(mut item) = self.l1_cache.get_mut(id) {
241 item.access_count = item.access_count.saturating_add(1);
242 item.last_accessed = std::time::SystemTime::now()
243 .duration_since(std::time::UNIX_EPOCH)
244 .unwrap_or_default()
245 .as_millis() as u64;
246 tracing::Span::current().record("tier", "L1");
247 debug!("L1 hit");
248 crate::metrics::record_operation("L1", "get", "hit");
249 crate::metrics::record_latency("L1", "get", start.elapsed());
250 return Ok(Some(item.clone()));
251 }
252
253 if let Some(ref l2) = self.l2_store {
255 match l2.get(id).await {
256 Ok(Some(item)) => {
257 self.insert_l1(item.clone());
259 tracing::Span::current().record("tier", "L2");
260 debug!("L2 hit, promoted to L1");
261 crate::metrics::record_operation("L2", "get", "hit");
262 crate::metrics::record_latency("L2", "get", start.elapsed());
263 return Ok(Some(item));
264 }
265 Ok(None) => {
266 debug!("L2 miss");
268 crate::metrics::record_operation("L2", "get", "miss");
269 }
270 Err(e) => {
271 warn!(error = %e, "L2 lookup failed");
272 crate::metrics::record_operation("L2", "get", "error");
273 }
274 }
275 }
276
277 if self.l3_filter.should_check_l3(id) {
279 crate::metrics::record_cuckoo_check("L3", "positive");
280 if let Some(ref l3) = self.l3_store {
281 match l3.get(id).await {
282 Ok(Some(item)) => {
283 if self.memory_pressure() < 1.0 {
285 self.insert_l1(item.clone());
286 }
287 tracing::Span::current().record("tier", "L3");
288 debug!("L3 hit, promoted to L1");
289 crate::metrics::record_operation("L3", "get", "hit");
290 crate::metrics::record_latency("L3", "get", start.elapsed());
291 crate::metrics::record_bytes_read("L3", item.content.len());
292 return Ok(Some(item));
293 }
294 Ok(None) => {
295 debug!("L3 filter false positive");
297 crate::metrics::record_operation("L3", "get", "false_positive");
298 crate::metrics::record_cuckoo_false_positive("L3");
299 }
300 Err(e) => {
301 warn!(error = %e, "L3 lookup failed");
302 crate::metrics::record_operation("L3", "get", "error");
303 crate::metrics::record_error("L3", "get", "backend");
304 }
305 }
306 }
307 } else {
308 crate::metrics::record_cuckoo_check("L3", "negative");
310 }
311
312 tracing::Span::current().record("tier", "miss");
313 debug!("Cache miss");
314 crate::metrics::record_operation("all", "get", "miss");
315 crate::metrics::record_latency("all", "get", start.elapsed());
316 Ok(None)
317 }
318
319 #[tracing::instrument(skip(self), fields(verified))]
324 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
325 let item = match self.get(id).await? {
326 Some(item) => item,
327 None => return Ok(None),
328 };
329
330 if !item.content_hash.is_empty() {
332 use sha2::{Sha256, Digest};
333
334 let computed = Sha256::digest(&item.content);
335 let computed_hex = hex::encode(computed);
336
337 if computed_hex != item.content_hash {
338 tracing::Span::current().record("verified", false);
339 warn!(
340 id = %id,
341 expected = %item.content_hash,
342 actual = %computed_hex,
343 "Data corruption detected!"
344 );
345
346 crate::metrics::record_corruption(id);
348
349 return Err(StorageError::Corruption {
350 id: id.to_string(),
351 expected: item.content_hash.clone(),
352 actual: computed_hex,
353 });
354 }
355
356 tracing::Span::current().record("verified", true);
357 debug!(id = %id, "Hash verification passed");
358 }
359
360 Ok(Some(item))
361 }
362
363 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
371 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
372 self.submit_with(item, SubmitOptions::default()).await
373 }
374
375 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
396 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
397 let start = std::time::Instant::now();
398
399 if !self.should_accept_writes() {
400 crate::metrics::record_operation("engine", "submit", "rejected");
401 crate::metrics::record_error("engine", "submit", "backpressure");
402 return Err(StorageError::Backend(format!(
403 "Rejecting write: engine state={}, pressure={}",
404 self.state(),
405 self.pressure()
406 )));
407 }
408
409 let id = item.object_id.clone();
410 let item_bytes = item.content.len();
411
412 if let Some(ref state) = options.state {
414 item.state = state.clone();
415 }
416
417 item.submit_options = Some(options);
419
420 self.insert_l1(item.clone());
422 crate::metrics::record_operation("L1", "submit", "success");
423 crate::metrics::record_bytes_written("L1", item_bytes);
424
425 self.l2_batcher.lock().await.add(item);
431
432 debug!(id = %id, "Item submitted to L1 and batch queue");
433 crate::metrics::record_latency("L1", "submit", start.elapsed());
434 Ok(())
435 }
436
437 #[tracing::instrument(skip(self), fields(object_id = %id))]
446 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
447 let start = std::time::Instant::now();
448
449 if !self.should_accept_writes() {
450 crate::metrics::record_operation("engine", "delete", "rejected");
451 crate::metrics::record_error("engine", "delete", "backpressure");
452 return Err(StorageError::Backend(format!(
453 "Rejecting delete: engine state={}, pressure={}",
454 self.state(),
455 self.pressure()
456 )));
457 }
458
459 let mut found = false;
460
461 if let Some((_, item)) = self.l1_cache.remove(id) {
463 let size = Self::item_size(&item);
464 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
465 found = true;
466 debug!("Deleted from L1");
467 crate::metrics::record_operation("L1", "delete", "success");
468 }
469
470 self.l3_filter.remove(id);
472
473 if let Some(ref l2) = self.l2_store {
475 let l2_start = std::time::Instant::now();
476 match l2.delete(id).await {
477 Ok(()) => {
478 found = true;
479 debug!("Deleted from L2 (Redis)");
480 crate::metrics::record_operation("L2", "delete", "success");
481 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
482 }
483 Err(e) => {
484 warn!(error = %e, "Failed to delete from L2 (Redis)");
485 crate::metrics::record_operation("L2", "delete", "error");
486 crate::metrics::record_error("L2", "delete", "backend");
487 }
488 }
489 }
490
491 if let Some(ref l3) = self.l3_store {
493 let l3_start = std::time::Instant::now();
494 match l3.delete(id).await {
495 Ok(()) => {
496 found = true;
497 debug!("Deleted from L3 (MySQL)");
498 crate::metrics::record_operation("L3", "delete", "success");
499 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
500 }
501 Err(e) => {
502 error!(error = %e, "Failed to delete from L3 (MySQL)");
503 crate::metrics::record_operation("L3", "delete", "error");
504 crate::metrics::record_error("L3", "delete", "backend");
505 }
507 }
508 }
509
510 let mut merkle_batch = MerkleBatch::new();
512 merkle_batch.delete(id.to_string());
513
514 if let Some(ref sql_merkle) = self.sql_merkle {
515 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
516 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
517 crate::metrics::record_error("L3", "merkle", "batch_apply");
518 }
519 }
520
521 if let Some(ref redis_merkle) = self.redis_merkle {
522 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
523 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
524 crate::metrics::record_error("L2", "merkle", "batch_apply");
525 }
526 }
527
528 if self.config.read().enable_cdc_stream && found {
530 self.emit_cdc_delete(id).await;
531 }
532
533 info!(found, "Delete operation completed");
534 crate::metrics::record_latency("all", "delete", start.elapsed());
535 Ok(found)
536 }
537
538 fn insert_l1(&self, item: SyncItem) {
542 let new_size = Self::item_size(&item);
543 let key = item.object_id.clone();
544
545 if let Some(old_item) = self.l1_cache.insert(key, item) {
547 let old_size = Self::item_size(&old_item);
549 let current = self.l1_size_bytes.load(Ordering::Acquire);
551 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
552 self.l1_size_bytes.store(new_total, Ordering::Release);
553 } else {
554 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
556 }
557 }
558
559 #[inline]
561 fn item_size(item: &SyncItem) -> usize {
562 item.size_bytes()
564 }
565
566 fn maybe_evict(&self) {
567 let pressure = self.memory_pressure();
568 if pressure < self.config.read().backpressure_warn {
569 return;
570 }
571
572 let level = BackpressureLevel::from_pressure(pressure);
573 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
574
575 let now = std::time::Instant::now();
577 let entries: Vec<CacheEntry> = self.l1_cache.iter()
578 .map(|ref_multi| {
579 let item = ref_multi.value();
580 let id = ref_multi.key().clone();
581
582 let now_millis = std::time::SystemTime::now()
584 .duration_since(std::time::UNIX_EPOCH)
585 .unwrap_or_default()
586 .as_millis() as u64;
587 let age_secs = if item.last_accessed > 0 {
588 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
589 } else {
590 3600.0 };
592
593 CacheEntry {
594 id,
595 size_bytes: item.size_bytes(),
596 created_at: now - std::time::Duration::from_secs_f64(age_secs),
597 last_access: now - std::time::Duration::from_secs_f64(age_secs),
598 access_count: item.access_count,
599 is_dirty: false, }
601 })
602 .collect();
603
604 if entries.is_empty() {
605 return;
606 }
607
608 let evict_count = match level {
610 BackpressureLevel::Normal => 0,
611 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
617
618 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
620
621 let mut evicted_bytes = 0usize;
623 for victim_id in &victims {
624 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
625 evicted_bytes += item.size_bytes();
626 }
627 }
628
629 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
631
632 info!(
633 evicted = victims.len(),
634 evicted_bytes = evicted_bytes,
635 pressure = %pressure,
636 level = %level,
637 "Evicted entries from L1 cache"
638 );
639 }
640
641 pub fn l1_stats(&self) -> (usize, usize) {
643 (
644 self.l1_cache.len(),
645 self.l1_size_bytes.load(Ordering::Acquire),
646 )
647 }
648
649 #[must_use]
651 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
652 self.l3_filter.stats()
653 }
654
655 pub fn l3_filter(&self) -> &Arc<FilterManager> {
657 &self.l3_filter
658 }
659
660 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
665 let redis_root = if let Some(ref rm) = self.redis_merkle {
666 rm.root_hash().await.ok().flatten().map(hex::encode)
667 } else {
668 None
669 };
670
671 let sql_root = if let Some(ref sm) = self.sql_merkle {
672 sm.root_hash().await.ok().flatten().map(hex::encode)
673 } else {
674 None
675 };
676
677 (redis_root, sql_root)
678 }
679
680 pub async fn verify_filter(&self) -> bool {
687 let sql_root = if let Some(ref sm) = self.sql_merkle {
689 match sm.root_hash().await {
690 Ok(Some(root)) => root,
691 _ => return false,
692 }
693 } else {
694 self.l3_filter.mark_trusted();
696 return true;
697 };
698
699 info!(
703 sql_root = %hex::encode(sql_root),
704 "Verifying L3 filter against SQL merkle root"
705 );
706
707 self.l3_filter.mark_trusted();
709 true
710 }
711
712 pub fn update_gauge_metrics(&self) {
717 let (l1_count, l1_bytes) = self.l1_stats();
718 crate::metrics::set_l1_cache_items(l1_count);
719 crate::metrics::set_l1_cache_bytes(l1_bytes);
720 crate::metrics::set_memory_pressure(self.memory_pressure());
721
722 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
723 let filter_load = if filter_capacity > 0 {
724 filter_entries as f64 / filter_capacity as f64
725 } else {
726 0.0
727 };
728 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
729 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
730
731 crate::metrics::set_backpressure_level(self.pressure() as u8);
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use crate::config::SyncEngineConfig;
739 use tokio::sync::watch;
740 use serde_json::json;
741
742 fn create_test_engine() -> SyncEngine {
743 let config = SyncEngineConfig::default();
744 let (_tx, rx) = watch::channel(config.clone());
745 SyncEngine::new(config, rx)
746 }
747
748 fn create_test_item(id: &str) -> SyncItem {
749 SyncItem::from_json(
750 id.to_string(),
751 json!({"data": "test"}),
752 )
753 }
754
755 #[test]
756 fn test_engine_created_state() {
757 let engine = create_test_engine();
758 assert_eq!(engine.state(), EngineState::Created);
759 assert!(!engine.is_ready());
760 }
761
762 #[test]
763 fn test_memory_pressure_calculation() {
764 let config = SyncEngineConfig {
765 l1_max_bytes: 1000,
766 ..Default::default()
767 };
768 let (_tx, rx) = watch::channel(config.clone());
769 let engine = SyncEngine::new(config, rx);
770
771 assert_eq!(engine.memory_pressure(), 0.0);
772
773 let item = create_test_item("test1");
775 engine.insert_l1(item);
776
777 assert!(engine.memory_pressure() > 0.0);
779 }
780
781 #[test]
782 fn test_l1_insert_and_size_tracking() {
783 let engine = create_test_engine();
784
785 let item = create_test_item("test1");
786 let expected_size = item.size_bytes();
787
788 engine.insert_l1(item);
789
790 let (count, size) = engine.l1_stats();
791 assert_eq!(count, 1);
792 assert_eq!(size, expected_size);
793 }
794
795 #[test]
796 fn test_l1_update_size_tracking() {
797 let engine = create_test_engine();
798
799 let item1 = create_test_item("test1");
800 engine.insert_l1(item1);
801 let (_, _size1) = engine.l1_stats();
802
803 let item2 = SyncItem::from_json(
805 "test1".to_string(),
806 json!({"data": "much larger content here for testing size changes"}),
807 );
808 let size2_expected = item2.size_bytes();
809 engine.insert_l1(item2);
810
811 let (count, size2) = engine.l1_stats();
812 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
815
816 #[tokio::test]
817 async fn test_get_nonexistent() {
818 let engine = create_test_engine();
819 let result = engine.get("nonexistent").await.unwrap();
820 assert!(result.is_none());
821 }
822
823 #[tokio::test]
824 async fn test_get_from_l1() {
825 let engine = create_test_engine();
826 let item = create_test_item("test1");
827 engine.insert_l1(item.clone());
828
829 let result = engine.get("test1").await.unwrap();
830 assert!(result.is_some());
831 assert_eq!(result.unwrap().object_id, "test1");
832 }
833
834 #[tokio::test]
835 async fn test_delete_from_l1() {
836 let engine = create_test_engine();
837 let item = create_test_item("test1");
838 engine.insert_l1(item);
839
840 let (count_before, _) = engine.l1_stats();
841 assert_eq!(count_before, 1);
842
843 let deleted = engine.delete("test1").await.unwrap();
844 assert!(deleted);
845
846 let (count_after, size_after) = engine.l1_stats();
847 assert_eq!(count_after, 0);
848 assert_eq!(size_after, 0);
849 }
850
851 #[test]
852 fn test_filter_stats() {
853 let engine = create_test_engine();
854
855 let (entries, capacity, _trust) = engine.l3_filter_stats();
856 assert_eq!(entries, 0);
857 assert!(capacity > 0);
858 }
859
860 #[test]
861 fn test_should_accept_writes() {
862 let engine = create_test_engine();
863 assert!(engine.should_accept_writes());
864 }
865
866 #[test]
867 fn test_pressure_level() {
868 let engine = create_test_engine();
869 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
870 }
871}