1mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41mod search_api;
42
43pub use types::{EngineState, ItemStatus, BatchResult};
44pub use merkle_api::MerkleDiff;
45pub use search_api::{SearchTier, SearchResult, SearchSource};
46#[allow(unused_imports)]
47use types::WriteTarget;
48
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
51use std::time::Instant;
52use dashmap::DashMap;
53use parking_lot::RwLock;
54use tokio::sync::{watch, Mutex};
55use tracing::{info, warn, debug, error};
56
57use crate::config::SyncEngineConfig;
58use crate::sync_item::SyncItem;
59use crate::submit_options::SubmitOptions;
60use crate::backpressure::BackpressureLevel;
61use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
62use crate::storage::sql::SqlStore;
63use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
64use crate::cuckoo::FilterPersistence;
65use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
66use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
67use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
68use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
69
70use search_api::SearchState;
71
72pub struct SyncEngine {
84 pub(super) config: SyncEngineConfig,
86
87 #[allow(dead_code)]
89 pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
90
91 pub(super) state: watch::Sender<EngineState>,
93
94 pub(super) state_rx: watch::Receiver<EngineState>,
96
97 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
99
100 pub(super) l1_size_bytes: Arc<AtomicUsize>,
102
103 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
105
106 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
108
109 pub(super) sql_store: Option<Arc<SqlStore>>,
111
112 pub(super) l3_filter: Arc<FilterManager>,
114
115 pub(super) filter_persistence: Option<FilterPersistence>,
117
118 pub(super) cf_inserts_since_snapshot: AtomicU64,
120 pub(super) cf_last_snapshot: Mutex<Instant>,
121
122 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
124
125 pub(super) redis_merkle: Option<RedisMerkleStore>,
127
128 pub(super) sql_merkle: Option<SqlMerkleStore>,
130
131 pub(super) l3_wal: Option<WriteAheadLog>,
133
134 pub(super) mysql_health: MysqlHealthChecker,
136
137 pub(super) eviction_policy: TanCurvePolicy,
139
140 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
142}
143
144impl SyncEngine {
145 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
150 let (state_tx, state_rx) = watch::channel(EngineState::Created);
151
152 let batch_config = BatchConfig {
153 flush_ms: config.batch_flush_ms,
154 flush_count: config.batch_flush_count,
155 flush_bytes: config.batch_flush_bytes,
156 };
157
158 Self {
159 config: config.clone(),
160 config_rx,
161 state: state_tx,
162 state_rx,
163 l1_cache: Arc::new(DashMap::new()),
164 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
165 l2_store: None,
166 l3_store: None,
167 sql_store: None,
168 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
169 filter_persistence: None,
170 cf_inserts_since_snapshot: AtomicU64::new(0),
171 cf_last_snapshot: Mutex::new(Instant::now()),
172 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
173 redis_merkle: None,
174 sql_merkle: None,
175 l3_wal: None,
176 mysql_health: MysqlHealthChecker::new(),
177 eviction_policy: TanCurvePolicy::default(),
178 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
179 }
180 }
181
182 #[must_use]
184 pub fn state(&self) -> EngineState {
185 *self.state_rx.borrow()
186 }
187
188 #[must_use]
190 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
191 self.state_rx.clone()
192 }
193
194 #[must_use]
196 pub fn is_ready(&self) -> bool {
197 matches!(self.state(), EngineState::Ready | EngineState::Running)
198 }
199
200 #[must_use]
202 pub fn memory_pressure(&self) -> f64 {
203 let used = self.l1_size_bytes.load(Ordering::Acquire);
204 let max = self.config.l1_max_bytes;
205 if max == 0 {
206 0.0
207 } else {
208 used as f64 / max as f64
209 }
210 }
211
212 #[must_use]
214 pub fn pressure(&self) -> BackpressureLevel {
215 BackpressureLevel::from_pressure(self.memory_pressure())
216 }
217
218 #[must_use]
220 pub fn should_accept_writes(&self) -> bool {
221 let pressure = self.pressure();
222 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
223 }
224
225 #[tracing::instrument(skip(self), fields(tier))]
232 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
233 let start = std::time::Instant::now();
234
235 if let Some(mut item) = self.l1_cache.get_mut(id) {
237 item.access_count = item.access_count.saturating_add(1);
238 item.last_accessed = std::time::SystemTime::now()
239 .duration_since(std::time::UNIX_EPOCH)
240 .unwrap_or_default()
241 .as_millis() as u64;
242 tracing::Span::current().record("tier", "L1");
243 debug!("L1 hit");
244 crate::metrics::record_operation("L1", "get", "hit");
245 crate::metrics::record_latency("L1", "get", start.elapsed());
246 return Ok(Some(item.clone()));
247 }
248
249 if let Some(ref l2) = self.l2_store {
251 match l2.get(id).await {
252 Ok(Some(item)) => {
253 self.insert_l1(item.clone());
255 tracing::Span::current().record("tier", "L2");
256 debug!("L2 hit, promoted to L1");
257 crate::metrics::record_operation("L2", "get", "hit");
258 crate::metrics::record_latency("L2", "get", start.elapsed());
259 return Ok(Some(item));
260 }
261 Ok(None) => {
262 debug!("L2 miss");
264 crate::metrics::record_operation("L2", "get", "miss");
265 }
266 Err(e) => {
267 warn!(error = %e, "L2 lookup failed");
268 crate::metrics::record_operation("L2", "get", "error");
269 }
270 }
271 }
272
273 if self.l3_filter.should_check_l3(id) {
275 crate::metrics::record_cuckoo_check("L3", "positive");
276 if let Some(ref l3) = self.l3_store {
277 match l3.get(id).await {
278 Ok(Some(item)) => {
279 if self.memory_pressure() < 1.0 {
281 self.insert_l1(item.clone());
282 }
283 tracing::Span::current().record("tier", "L3");
284 debug!("L3 hit, promoted to L1");
285 crate::metrics::record_operation("L3", "get", "hit");
286 crate::metrics::record_latency("L3", "get", start.elapsed());
287 crate::metrics::record_bytes_read("L3", item.content.len());
288 return Ok(Some(item));
289 }
290 Ok(None) => {
291 debug!("L3 filter false positive");
293 crate::metrics::record_operation("L3", "get", "false_positive");
294 crate::metrics::record_cuckoo_false_positive("L3");
295 }
296 Err(e) => {
297 warn!(error = %e, "L3 lookup failed");
298 crate::metrics::record_operation("L3", "get", "error");
299 crate::metrics::record_error("L3", "get", "backend");
300 }
301 }
302 }
303 } else {
304 crate::metrics::record_cuckoo_check("L3", "negative");
306 }
307
308 tracing::Span::current().record("tier", "miss");
309 debug!("Cache miss");
310 crate::metrics::record_operation("all", "get", "miss");
311 crate::metrics::record_latency("all", "get", start.elapsed());
312 Ok(None)
313 }
314
315 #[tracing::instrument(skip(self), fields(verified))]
320 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
321 let item = match self.get(id).await? {
322 Some(item) => item,
323 None => return Ok(None),
324 };
325
326 if !item.merkle_root.is_empty() {
328 use sha2::{Sha256, Digest};
329
330 let computed = Sha256::digest(&item.content);
331 let computed_hex = hex::encode(computed);
332
333 if computed_hex != item.merkle_root {
334 tracing::Span::current().record("verified", false);
335 warn!(
336 id = %id,
337 expected = %item.merkle_root,
338 actual = %computed_hex,
339 "Data corruption detected!"
340 );
341
342 crate::metrics::record_corruption(id);
344
345 return Err(StorageError::Corruption {
346 id: id.to_string(),
347 expected: item.merkle_root.clone(),
348 actual: computed_hex,
349 });
350 }
351
352 tracing::Span::current().record("verified", true);
353 debug!(id = %id, "Hash verification passed");
354 }
355
356 Ok(Some(item))
357 }
358
359 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
367 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
368 self.submit_with(item, SubmitOptions::default()).await
369 }
370
371 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
392 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
393 let start = std::time::Instant::now();
394
395 if !self.should_accept_writes() {
396 crate::metrics::record_operation("engine", "submit", "rejected");
397 crate::metrics::record_error("engine", "submit", "backpressure");
398 return Err(StorageError::Backend(format!(
399 "Rejecting write: engine state={}, pressure={}",
400 self.state(),
401 self.pressure()
402 )));
403 }
404
405 let id = item.object_id.clone();
406 let item_bytes = item.content.len();
407
408 if let Some(ref state) = options.state {
410 item.state = state.clone();
411 }
412
413 item.submit_options = Some(options);
415
416 self.insert_l1(item.clone());
418 crate::metrics::record_operation("L1", "submit", "success");
419 crate::metrics::record_bytes_written("L1", item_bytes);
420
421 self.l2_batcher.lock().await.add(item);
427
428 debug!(id = %id, "Item submitted to L1 and batch queue");
429 crate::metrics::record_latency("L1", "submit", start.elapsed());
430 Ok(())
431 }
432
433 #[tracing::instrument(skip(self), fields(object_id = %id))]
442 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
443 let start = std::time::Instant::now();
444
445 if !self.should_accept_writes() {
446 crate::metrics::record_operation("engine", "delete", "rejected");
447 crate::metrics::record_error("engine", "delete", "backpressure");
448 return Err(StorageError::Backend(format!(
449 "Rejecting delete: engine state={}, pressure={}",
450 self.state(),
451 self.pressure()
452 )));
453 }
454
455 let mut found = false;
456
457 if let Some((_, item)) = self.l1_cache.remove(id) {
459 let size = Self::item_size(&item);
460 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
461 found = true;
462 debug!("Deleted from L1");
463 crate::metrics::record_operation("L1", "delete", "success");
464 }
465
466 self.l3_filter.remove(id);
468
469 if let Some(ref l2) = self.l2_store {
471 let l2_start = std::time::Instant::now();
472 match l2.delete(id).await {
473 Ok(()) => {
474 found = true;
475 debug!("Deleted from L2 (Redis)");
476 crate::metrics::record_operation("L2", "delete", "success");
477 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
478 }
479 Err(e) => {
480 warn!(error = %e, "Failed to delete from L2 (Redis)");
481 crate::metrics::record_operation("L2", "delete", "error");
482 crate::metrics::record_error("L2", "delete", "backend");
483 }
484 }
485 }
486
487 if let Some(ref l3) = self.l3_store {
489 let l3_start = std::time::Instant::now();
490 match l3.delete(id).await {
491 Ok(()) => {
492 found = true;
493 debug!("Deleted from L3 (MySQL)");
494 crate::metrics::record_operation("L3", "delete", "success");
495 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
496 }
497 Err(e) => {
498 error!(error = %e, "Failed to delete from L3 (MySQL)");
499 crate::metrics::record_operation("L3", "delete", "error");
500 crate::metrics::record_error("L3", "delete", "backend");
501 }
503 }
504 }
505
506 let mut merkle_batch = MerkleBatch::new();
508 merkle_batch.delete(id.to_string());
509
510 if let Some(ref sql_merkle) = self.sql_merkle {
511 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
512 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
513 crate::metrics::record_error("L3", "merkle", "batch_apply");
514 }
515 }
516
517 if let Some(ref redis_merkle) = self.redis_merkle {
518 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
519 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
520 crate::metrics::record_error("L2", "merkle", "batch_apply");
521 }
522 }
523
524 info!(found, "Delete operation completed");
525 crate::metrics::record_latency("all", "delete", start.elapsed());
526 Ok(found)
527 }
528
529 fn insert_l1(&self, item: SyncItem) {
533 let new_size = Self::item_size(&item);
534 let key = item.object_id.clone();
535
536 if let Some(old_item) = self.l1_cache.insert(key, item) {
538 let old_size = Self::item_size(&old_item);
540 let current = self.l1_size_bytes.load(Ordering::Acquire);
542 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
543 self.l1_size_bytes.store(new_total, Ordering::Release);
544 } else {
545 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
547 }
548 }
549
550 #[inline]
552 fn item_size(item: &SyncItem) -> usize {
553 item.size_bytes()
555 }
556
557 fn maybe_evict(&self) {
558 let pressure = self.memory_pressure();
559 if pressure < self.config.backpressure_warn {
560 return;
561 }
562
563 let level = BackpressureLevel::from_pressure(pressure);
564 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
565
566 let now = std::time::Instant::now();
568 let entries: Vec<CacheEntry> = self.l1_cache.iter()
569 .map(|ref_multi| {
570 let item = ref_multi.value();
571 let id = ref_multi.key().clone();
572
573 let now_millis = std::time::SystemTime::now()
575 .duration_since(std::time::UNIX_EPOCH)
576 .unwrap_or_default()
577 .as_millis() as u64;
578 let age_secs = if item.last_accessed > 0 {
579 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
580 } else {
581 3600.0 };
583
584 CacheEntry {
585 id,
586 size_bytes: item.size_bytes(),
587 created_at: now - std::time::Duration::from_secs_f64(age_secs),
588 last_access: now - std::time::Duration::from_secs_f64(age_secs),
589 access_count: item.access_count,
590 is_dirty: false, }
592 })
593 .collect();
594
595 if entries.is_empty() {
596 return;
597 }
598
599 let evict_count = match level {
601 BackpressureLevel::Normal => 0,
602 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
608
609 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
611
612 let mut evicted_bytes = 0usize;
614 for victim_id in &victims {
615 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
616 evicted_bytes += item.size_bytes();
617 }
618 }
619
620 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
622
623 info!(
624 evicted = victims.len(),
625 evicted_bytes = evicted_bytes,
626 pressure = %pressure,
627 level = %level,
628 "Evicted entries from L1 cache"
629 );
630 }
631
632 pub fn l1_stats(&self) -> (usize, usize) {
634 (
635 self.l1_cache.len(),
636 self.l1_size_bytes.load(Ordering::Acquire),
637 )
638 }
639
640 #[must_use]
642 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
643 self.l3_filter.stats()
644 }
645
646 pub fn l3_filter(&self) -> &Arc<FilterManager> {
648 &self.l3_filter
649 }
650
651 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
656 let redis_root = if let Some(ref rm) = self.redis_merkle {
657 rm.root_hash().await.ok().flatten().map(hex::encode)
658 } else {
659 None
660 };
661
662 let sql_root = if let Some(ref sm) = self.sql_merkle {
663 sm.root_hash().await.ok().flatten().map(hex::encode)
664 } else {
665 None
666 };
667
668 (redis_root, sql_root)
669 }
670
671 pub async fn verify_filter(&self) -> bool {
678 let sql_root = if let Some(ref sm) = self.sql_merkle {
680 match sm.root_hash().await {
681 Ok(Some(root)) => root,
682 _ => return false,
683 }
684 } else {
685 self.l3_filter.mark_trusted();
687 return true;
688 };
689
690 info!(
694 sql_root = %hex::encode(sql_root),
695 "Verifying L3 filter against SQL merkle root"
696 );
697
698 self.l3_filter.mark_trusted();
700 true
701 }
702
703 pub fn update_gauge_metrics(&self) {
708 let (l1_count, l1_bytes) = self.l1_stats();
709 crate::metrics::set_l1_cache_items(l1_count);
710 crate::metrics::set_l1_cache_bytes(l1_bytes);
711 crate::metrics::set_memory_pressure(self.memory_pressure());
712
713 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
714 let filter_load = if filter_capacity > 0 {
715 filter_entries as f64 / filter_capacity as f64
716 } else {
717 0.0
718 };
719 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
720 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
721
722 crate::metrics::set_backpressure_level(self.pressure() as u8);
723 }
724}
725
726#[cfg(test)]
727mod tests {
728 use super::*;
729 use crate::config::SyncEngineConfig;
730 use tokio::sync::watch;
731 use serde_json::json;
732
733 fn create_test_engine() -> SyncEngine {
734 let config = SyncEngineConfig::default();
735 let (_tx, rx) = watch::channel(config.clone());
736 SyncEngine::new(config, rx)
737 }
738
739 fn create_test_item(id: &str) -> SyncItem {
740 SyncItem::from_json(
741 id.to_string(),
742 json!({"data": "test"}),
743 )
744 }
745
746 #[test]
747 fn test_engine_created_state() {
748 let engine = create_test_engine();
749 assert_eq!(engine.state(), EngineState::Created);
750 assert!(!engine.is_ready());
751 }
752
753 #[test]
754 fn test_memory_pressure_calculation() {
755 let config = SyncEngineConfig {
756 l1_max_bytes: 1000,
757 ..Default::default()
758 };
759 let (_tx, rx) = watch::channel(config.clone());
760 let engine = SyncEngine::new(config, rx);
761
762 assert_eq!(engine.memory_pressure(), 0.0);
763
764 let item = create_test_item("test1");
766 engine.insert_l1(item);
767
768 assert!(engine.memory_pressure() > 0.0);
770 }
771
772 #[test]
773 fn test_l1_insert_and_size_tracking() {
774 let engine = create_test_engine();
775
776 let item = create_test_item("test1");
777 let expected_size = item.size_bytes();
778
779 engine.insert_l1(item);
780
781 let (count, size) = engine.l1_stats();
782 assert_eq!(count, 1);
783 assert_eq!(size, expected_size);
784 }
785
786 #[test]
787 fn test_l1_update_size_tracking() {
788 let engine = create_test_engine();
789
790 let item1 = create_test_item("test1");
791 engine.insert_l1(item1);
792 let (_, _size1) = engine.l1_stats();
793
794 let item2 = SyncItem::from_json(
796 "test1".to_string(),
797 json!({"data": "much larger content here for testing size changes"}),
798 );
799 let size2_expected = item2.size_bytes();
800 engine.insert_l1(item2);
801
802 let (count, size2) = engine.l1_stats();
803 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
806
807 #[tokio::test]
808 async fn test_get_nonexistent() {
809 let engine = create_test_engine();
810 let result = engine.get("nonexistent").await.unwrap();
811 assert!(result.is_none());
812 }
813
814 #[tokio::test]
815 async fn test_get_from_l1() {
816 let engine = create_test_engine();
817 let item = create_test_item("test1");
818 engine.insert_l1(item.clone());
819
820 let result = engine.get("test1").await.unwrap();
821 assert!(result.is_some());
822 assert_eq!(result.unwrap().object_id, "test1");
823 }
824
825 #[tokio::test]
826 async fn test_delete_from_l1() {
827 let engine = create_test_engine();
828 let item = create_test_item("test1");
829 engine.insert_l1(item);
830
831 let (count_before, _) = engine.l1_stats();
832 assert_eq!(count_before, 1);
833
834 let deleted = engine.delete("test1").await.unwrap();
835 assert!(deleted);
836
837 let (count_after, size_after) = engine.l1_stats();
838 assert_eq!(count_after, 0);
839 assert_eq!(size_after, 0);
840 }
841
842 #[test]
843 fn test_filter_stats() {
844 let engine = create_test_engine();
845
846 let (entries, capacity, _trust) = engine.l3_filter_stats();
847 assert_eq!(entries, 0);
848 assert!(capacity > 0);
849 }
850
851 #[test]
852 fn test_should_accept_writes() {
853 let engine = create_test_engine();
854 assert!(engine.should_accept_writes());
855 }
856
857 #[test]
858 fn test_pressure_level() {
859 let engine = create_test_engine();
860 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
861 }
862}