1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75pub struct SyncEngine {
87 pub(super) config: RwLock<SyncEngineConfig>,
90
91 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94 pub(super) state: watch::Sender<EngineState>,
96
97 pub(super) state_rx: watch::Receiver<EngineState>,
99
100 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103 pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108
109 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114
115 pub(super) sql_store: Option<Arc<SqlStore>>,
117
118 pub(super) l3_filter: Arc<FilterManager>,
120
121 pub(super) filter_persistence: Option<FilterPersistence>,
123
124 pub(super) cf_inserts_since_snapshot: AtomicU64,
126 pub(super) cf_last_snapshot: Mutex<Instant>,
127
128 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131 pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134 pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137 pub(super) l3_wal: Option<WriteAheadLog>,
139
140 pub(super) mysql_health: MysqlHealthChecker,
142
143 pub(super) eviction_policy: TanCurvePolicy,
145
146 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148}
149
150impl SyncEngine {
151 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
156 let (state_tx, state_rx) = watch::channel(EngineState::Created);
157
158 let batch_config = BatchConfig {
159 flush_ms: config.batch_flush_ms,
160 flush_count: config.batch_flush_count,
161 flush_bytes: config.batch_flush_bytes,
162 };
163
164 Self {
165 config: RwLock::new(config.clone()),
166 config_rx: Mutex::new(config_rx),
167 state: state_tx,
168 state_rx,
169 l1_cache: Arc::new(DashMap::new()),
170 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
171 l2_store: None,
172 redis_store: None,
173 l3_store: None,
174 sql_store: None,
175 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
176 filter_persistence: None,
177 cf_inserts_since_snapshot: AtomicU64::new(0),
178 cf_last_snapshot: Mutex::new(Instant::now()),
179 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
180 redis_merkle: None,
181 sql_merkle: None,
182 l3_wal: None,
183 mysql_health: MysqlHealthChecker::new(),
184 eviction_policy: TanCurvePolicy::default(),
185 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
186 }
187 }
188
189 #[must_use]
191 pub fn state(&self) -> EngineState {
192 *self.state_rx.borrow()
193 }
194
195 #[must_use]
197 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
198 self.state_rx.clone()
199 }
200
201 #[must_use]
203 pub fn is_ready(&self) -> bool {
204 matches!(self.state(), EngineState::Ready | EngineState::Running)
205 }
206
207 #[must_use]
209 pub fn memory_pressure(&self) -> f64 {
210 let used = self.l1_size_bytes.load(Ordering::Acquire);
211 let max = self.config.read().l1_max_bytes;
212 if max == 0 {
213 0.0
214 } else {
215 used as f64 / max as f64
216 }
217 }
218
219 #[must_use]
221 pub fn pressure(&self) -> BackpressureLevel {
222 BackpressureLevel::from_pressure(self.memory_pressure())
223 }
224
225 #[must_use]
227 pub fn should_accept_writes(&self) -> bool {
228 let pressure = self.pressure();
229 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
230 }
231
232 #[tracing::instrument(skip(self), fields(tier))]
239 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
240 let start = std::time::Instant::now();
241
242 if let Some(mut item) = self.l1_cache.get_mut(id) {
244 item.access_count = item.access_count.saturating_add(1);
245 item.last_accessed = std::time::SystemTime::now()
246 .duration_since(std::time::UNIX_EPOCH)
247 .unwrap_or_default()
248 .as_millis() as u64;
249 tracing::Span::current().record("tier", "L1");
250 debug!("L1 hit");
251 crate::metrics::record_operation("L1", "get", "hit");
252 crate::metrics::record_latency("L1", "get", start.elapsed());
253 return Ok(Some(item.clone()));
254 }
255
256 if let Some(ref l2) = self.l2_store {
258 match l2.get(id).await {
259 Ok(Some(item)) => {
260 self.insert_l1(item.clone());
262 tracing::Span::current().record("tier", "L2");
263 debug!("L2 hit, promoted to L1");
264 crate::metrics::record_operation("L2", "get", "hit");
265 crate::metrics::record_latency("L2", "get", start.elapsed());
266 return Ok(Some(item));
267 }
268 Ok(None) => {
269 debug!("L2 miss");
271 crate::metrics::record_operation("L2", "get", "miss");
272 }
273 Err(e) => {
274 warn!(error = %e, "L2 lookup failed");
275 crate::metrics::record_operation("L2", "get", "error");
276 }
277 }
278 }
279
280 if self.l3_filter.should_check_l3(id) {
282 crate::metrics::record_cuckoo_check("L3", "positive");
283 if let Some(ref l3) = self.l3_store {
284 match l3.get(id).await {
285 Ok(Some(item)) => {
286 if self.memory_pressure() < 1.0 {
288 self.insert_l1(item.clone());
289 }
290 tracing::Span::current().record("tier", "L3");
291 debug!("L3 hit, promoted to L1");
292 crate::metrics::record_operation("L3", "get", "hit");
293 crate::metrics::record_latency("L3", "get", start.elapsed());
294 crate::metrics::record_bytes_read("L3", item.content.len());
295 return Ok(Some(item));
296 }
297 Ok(None) => {
298 debug!("L3 filter false positive");
300 crate::metrics::record_operation("L3", "get", "false_positive");
301 crate::metrics::record_cuckoo_false_positive("L3");
302 }
303 Err(e) => {
304 warn!(error = %e, "L3 lookup failed");
305 crate::metrics::record_operation("L3", "get", "error");
306 crate::metrics::record_error("L3", "get", "backend");
307 }
308 }
309 }
310 } else {
311 crate::metrics::record_cuckoo_check("L3", "negative");
313 }
314
315 tracing::Span::current().record("tier", "miss");
316 debug!("Cache miss");
317 crate::metrics::record_operation("all", "get", "miss");
318 crate::metrics::record_latency("all", "get", start.elapsed());
319 Ok(None)
320 }
321
322 #[tracing::instrument(skip(self), fields(verified))]
327 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
328 let item = match self.get(id).await? {
329 Some(item) => item,
330 None => return Ok(None),
331 };
332
333 if !item.content_hash.is_empty() {
335 use sha2::{Sha256, Digest};
336
337 let computed = Sha256::digest(&item.content);
338 let computed_hex = hex::encode(computed);
339
340 if computed_hex != item.content_hash {
341 tracing::Span::current().record("verified", false);
342 warn!(
343 id = %id,
344 expected = %item.content_hash,
345 actual = %computed_hex,
346 "Data corruption detected!"
347 );
348
349 crate::metrics::record_corruption(id);
351
352 return Err(StorageError::Corruption {
353 id: id.to_string(),
354 expected: item.content_hash.clone(),
355 actual: computed_hex,
356 });
357 }
358
359 tracing::Span::current().record("verified", true);
360 debug!(id = %id, "Hash verification passed");
361 }
362
363 Ok(Some(item))
364 }
365
366 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
374 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
375 self.submit_with(item, SubmitOptions::default()).await
376 }
377
378 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
399 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
400 let start = std::time::Instant::now();
401
402 if !self.should_accept_writes() {
403 crate::metrics::record_operation("engine", "submit", "rejected");
404 crate::metrics::record_error("engine", "submit", "backpressure");
405 return Err(StorageError::Backend(format!(
406 "Rejecting write: engine state={}, pressure={}",
407 self.state(),
408 self.pressure()
409 )));
410 }
411
412 let id = item.object_id.clone();
413 let item_bytes = item.content.len();
414
415 if let Some(ref state) = options.state {
417 item.state = state.clone();
418 }
419
420 item.submit_options = Some(options);
422
423 self.insert_l1(item.clone());
425 crate::metrics::record_operation("L1", "submit", "success");
426 crate::metrics::record_bytes_written("L1", item_bytes);
427
428 self.l2_batcher.lock().await.add(item);
434
435 debug!(id = %id, "Item submitted to L1 and batch queue");
436 crate::metrics::record_latency("L1", "submit", start.elapsed());
437 Ok(())
438 }
439
440 #[tracing::instrument(skip(self), fields(object_id = %id))]
449 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
450 let start = std::time::Instant::now();
451
452 if !self.should_accept_writes() {
453 crate::metrics::record_operation("engine", "delete", "rejected");
454 crate::metrics::record_error("engine", "delete", "backpressure");
455 return Err(StorageError::Backend(format!(
456 "Rejecting delete: engine state={}, pressure={}",
457 self.state(),
458 self.pressure()
459 )));
460 }
461
462 let mut found = false;
463
464 if let Some((_, item)) = self.l1_cache.remove(id) {
466 let size = Self::item_size(&item);
467 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
468 found = true;
469 debug!("Deleted from L1");
470 crate::metrics::record_operation("L1", "delete", "success");
471 }
472
473 self.l3_filter.remove(id);
475
476 if let Some(ref l2) = self.l2_store {
478 let l2_start = std::time::Instant::now();
479 match l2.delete(id).await {
480 Ok(()) => {
481 found = true;
482 debug!("Deleted from L2 (Redis)");
483 crate::metrics::record_operation("L2", "delete", "success");
484 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
485 }
486 Err(e) => {
487 warn!(error = %e, "Failed to delete from L2 (Redis)");
488 crate::metrics::record_operation("L2", "delete", "error");
489 crate::metrics::record_error("L2", "delete", "backend");
490 }
491 }
492 }
493
494 if let Some(ref l3) = self.l3_store {
496 let l3_start = std::time::Instant::now();
497 match l3.delete(id).await {
498 Ok(()) => {
499 found = true;
500 debug!("Deleted from L3 (MySQL)");
501 crate::metrics::record_operation("L3", "delete", "success");
502 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
503 }
504 Err(e) => {
505 error!(error = %e, "Failed to delete from L3 (MySQL)");
506 crate::metrics::record_operation("L3", "delete", "error");
507 crate::metrics::record_error("L3", "delete", "backend");
508 }
510 }
511 }
512
513 let mut merkle_batch = MerkleBatch::new();
515 merkle_batch.delete(id.to_string());
516
517 if let Some(ref sql_merkle) = self.sql_merkle {
518 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
519 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
520 crate::metrics::record_error("L3", "merkle", "batch_apply");
521 }
522 }
523
524 if let Some(ref redis_merkle) = self.redis_merkle {
525 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
526 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
527 crate::metrics::record_error("L2", "merkle", "batch_apply");
528 }
529 }
530
531 if self.config.read().enable_cdc_stream && found {
533 self.emit_cdc_delete(id).await;
534 }
535
536 info!(found, "Delete operation completed");
537 crate::metrics::record_latency("all", "delete", start.elapsed());
538 Ok(found)
539 }
540
541 fn insert_l1(&self, item: SyncItem) {
545 let new_size = Self::item_size(&item);
546 let key = item.object_id.clone();
547
548 if let Some(old_item) = self.l1_cache.insert(key, item) {
550 let old_size = Self::item_size(&old_item);
552 let current = self.l1_size_bytes.load(Ordering::Acquire);
554 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
555 self.l1_size_bytes.store(new_total, Ordering::Release);
556 } else {
557 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
559 }
560 }
561
562 #[inline]
564 fn item_size(item: &SyncItem) -> usize {
565 item.size_bytes()
567 }
568
569 fn maybe_evict(&self) {
570 let pressure = self.memory_pressure();
571 if pressure < self.config.read().backpressure_warn {
572 return;
573 }
574
575 let level = BackpressureLevel::from_pressure(pressure);
576 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
577
578 let now = std::time::Instant::now();
580 let entries: Vec<CacheEntry> = self.l1_cache.iter()
581 .map(|ref_multi| {
582 let item = ref_multi.value();
583 let id = ref_multi.key().clone();
584
585 let now_millis = std::time::SystemTime::now()
587 .duration_since(std::time::UNIX_EPOCH)
588 .unwrap_or_default()
589 .as_millis() as u64;
590 let age_secs = if item.last_accessed > 0 {
591 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
592 } else {
593 3600.0 };
595
596 CacheEntry {
597 id,
598 size_bytes: item.size_bytes(),
599 created_at: now - std::time::Duration::from_secs_f64(age_secs),
600 last_access: now - std::time::Duration::from_secs_f64(age_secs),
601 access_count: item.access_count,
602 is_dirty: false, }
604 })
605 .collect();
606
607 if entries.is_empty() {
608 return;
609 }
610
611 let evict_count = match level {
613 BackpressureLevel::Normal => 0,
614 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
620
621 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
623
624 let mut evicted_bytes = 0usize;
626 for victim_id in &victims {
627 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
628 evicted_bytes += item.size_bytes();
629 }
630 }
631
632 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
634
635 info!(
636 evicted = victims.len(),
637 evicted_bytes = evicted_bytes,
638 pressure = %pressure,
639 level = %level,
640 "Evicted entries from L1 cache"
641 );
642 }
643
644 pub fn l1_stats(&self) -> (usize, usize) {
646 (
647 self.l1_cache.len(),
648 self.l1_size_bytes.load(Ordering::Acquire),
649 )
650 }
651
652 #[must_use]
654 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
655 self.l3_filter.stats()
656 }
657
658 pub fn l3_filter(&self) -> &Arc<FilterManager> {
660 &self.l3_filter
661 }
662
663 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
668 let redis_root = if let Some(ref rm) = self.redis_merkle {
669 rm.root_hash().await.ok().flatten().map(hex::encode)
670 } else {
671 None
672 };
673
674 let sql_root = if let Some(ref sm) = self.sql_merkle {
675 sm.root_hash().await.ok().flatten().map(hex::encode)
676 } else {
677 None
678 };
679
680 (redis_root, sql_root)
681 }
682
683 pub async fn verify_filter(&self) -> bool {
690 let sql_root = if let Some(ref sm) = self.sql_merkle {
692 match sm.root_hash().await {
693 Ok(Some(root)) => root,
694 _ => return false,
695 }
696 } else {
697 self.l3_filter.mark_trusted();
699 return true;
700 };
701
702 info!(
706 sql_root = %hex::encode(sql_root),
707 "Verifying L3 filter against SQL merkle root"
708 );
709
710 self.l3_filter.mark_trusted();
712 true
713 }
714
715 pub fn update_gauge_metrics(&self) {
720 let (l1_count, l1_bytes) = self.l1_stats();
721 crate::metrics::set_l1_cache_items(l1_count);
722 crate::metrics::set_l1_cache_bytes(l1_bytes);
723 crate::metrics::set_memory_pressure(self.memory_pressure());
724
725 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
726 let filter_load = if filter_capacity > 0 {
727 filter_entries as f64 / filter_capacity as f64
728 } else {
729 0.0
730 };
731 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
732 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
733
734 crate::metrics::set_backpressure_level(self.pressure() as u8);
735 }
736}
737
738#[cfg(test)]
739mod tests {
740 use super::*;
741 use crate::config::SyncEngineConfig;
742 use tokio::sync::watch;
743 use serde_json::json;
744
745 fn create_test_engine() -> SyncEngine {
746 let config = SyncEngineConfig::default();
747 let (_tx, rx) = watch::channel(config.clone());
748 SyncEngine::new(config, rx)
749 }
750
751 fn create_test_item(id: &str) -> SyncItem {
752 SyncItem::from_json(
753 id.to_string(),
754 json!({"data": "test"}),
755 )
756 }
757
758 #[test]
759 fn test_engine_created_state() {
760 let engine = create_test_engine();
761 assert_eq!(engine.state(), EngineState::Created);
762 assert!(!engine.is_ready());
763 }
764
765 #[test]
766 fn test_memory_pressure_calculation() {
767 let config = SyncEngineConfig {
768 l1_max_bytes: 1000,
769 ..Default::default()
770 };
771 let (_tx, rx) = watch::channel(config.clone());
772 let engine = SyncEngine::new(config, rx);
773
774 assert_eq!(engine.memory_pressure(), 0.0);
775
776 let item = create_test_item("test1");
778 engine.insert_l1(item);
779
780 assert!(engine.memory_pressure() > 0.0);
782 }
783
784 #[test]
785 fn test_l1_insert_and_size_tracking() {
786 let engine = create_test_engine();
787
788 let item = create_test_item("test1");
789 let expected_size = item.size_bytes();
790
791 engine.insert_l1(item);
792
793 let (count, size) = engine.l1_stats();
794 assert_eq!(count, 1);
795 assert_eq!(size, expected_size);
796 }
797
798 #[test]
799 fn test_l1_update_size_tracking() {
800 let engine = create_test_engine();
801
802 let item1 = create_test_item("test1");
803 engine.insert_l1(item1);
804 let (_, _size1) = engine.l1_stats();
805
806 let item2 = SyncItem::from_json(
808 "test1".to_string(),
809 json!({"data": "much larger content here for testing size changes"}),
810 );
811 let size2_expected = item2.size_bytes();
812 engine.insert_l1(item2);
813
814 let (count, size2) = engine.l1_stats();
815 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
818
819 #[tokio::test]
820 async fn test_get_nonexistent() {
821 let engine = create_test_engine();
822 let result = engine.get("nonexistent").await.unwrap();
823 assert!(result.is_none());
824 }
825
826 #[tokio::test]
827 async fn test_get_from_l1() {
828 let engine = create_test_engine();
829 let item = create_test_item("test1");
830 engine.insert_l1(item.clone());
831
832 let result = engine.get("test1").await.unwrap();
833 assert!(result.is_some());
834 assert_eq!(result.unwrap().object_id, "test1");
835 }
836
837 #[tokio::test]
838 async fn test_delete_from_l1() {
839 let engine = create_test_engine();
840 let item = create_test_item("test1");
841 engine.insert_l1(item);
842
843 let (count_before, _) = engine.l1_stats();
844 assert_eq!(count_before, 1);
845
846 let deleted = engine.delete("test1").await.unwrap();
847 assert!(deleted);
848
849 let (count_after, size_after) = engine.l1_stats();
850 assert_eq!(count_after, 0);
851 assert_eq!(size_after, 0);
852 }
853
854 #[test]
855 fn test_filter_stats() {
856 let engine = create_test_engine();
857
858 let (entries, capacity, _trust) = engine.l3_filter_stats();
859 assert_eq!(entries, 0);
860 assert!(capacity > 0);
861 }
862
863 #[test]
864 fn test_should_accept_writes() {
865 let engine = create_test_engine();
866 assert!(engine.should_accept_writes());
867 }
868
869 #[test]
870 fn test_pressure_level() {
871 let engine = create_test_engine();
872 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
873 }
874}