1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45mod schema_api;
46
47pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
48pub use merkle_api::MerkleDiff;
49pub use search_api::{SearchTier, SearchResult, SearchSource};
50#[allow(unused_imports)]
51use types::WriteTarget;
52
53use std::sync::Arc;
54use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
55use std::time::Instant;
56use dashmap::DashMap;
57use parking_lot::RwLock;
58use tokio::sync::{watch, Mutex, Semaphore};
59use tracing::{info, warn, debug, error};
60
61use crate::config::SyncEngineConfig;
62use crate::sync_item::SyncItem;
63use crate::submit_options::SubmitOptions;
64use crate::backpressure::BackpressureLevel;
65use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
66use crate::storage::sql::SqlStore;
67use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
68use crate::cuckoo::FilterPersistence;
69use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
70use crate::merkle::{MerkleCacheStore, SqlMerkleStore, MerkleBatch};
71use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
72use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
73use crate::schema::SchemaRegistry;
74
75use search_api::SearchState;
76
77pub struct SyncEngine {
89 pub(super) config: RwLock<SyncEngineConfig>,
92
93 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
95
96 pub(super) state: watch::Sender<EngineState>,
98
99 pub(super) state_rx: watch::Receiver<EngineState>,
101
102 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
104
105 pub(super) l1_size_bytes: Arc<AtomicUsize>,
107
108 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
110
111 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
113
114 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
116
117 pub(super) sql_store: Option<Arc<SqlStore>>,
119
120 pub(super) l3_filter: Arc<FilterManager>,
122
123 pub(super) filter_persistence: Option<FilterPersistence>,
125
126 pub(super) cf_inserts_since_snapshot: AtomicU64,
128 pub(super) cf_last_snapshot: Mutex<Instant>,
129
130 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
132
133 pub(super) merkle_cache: Option<MerkleCacheStore>,
135
136 pub(super) sql_merkle: Option<SqlMerkleStore>,
138
139 pub(super) l3_wal: Option<WriteAheadLog>,
141
142 pub(super) mysql_health: MysqlHealthChecker,
144
145 pub(super) eviction_policy: TanCurvePolicy,
147
148 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
150
151 pub(super) sql_write_semaphore: Arc<Semaphore>,
156
157 pub(super) schema_registry: Arc<SchemaRegistry>,
162}
163
164impl SyncEngine {
165 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
170 let (state_tx, state_rx) = watch::channel(EngineState::Created);
171
172 let batch_config = BatchConfig {
173 flush_ms: config.batch_flush_ms,
174 flush_count: config.batch_flush_count,
175 flush_bytes: config.batch_flush_bytes,
176 };
177
178 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
180
181 let schema_registry = Arc::new(SchemaRegistry::new());
183
184 Self {
185 config: RwLock::new(config.clone()),
186 config_rx: Mutex::new(config_rx),
187 state: state_tx,
188 state_rx,
189 l1_cache: Arc::new(DashMap::new()),
190 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
191 l2_store: None,
192 redis_store: None,
193 l3_store: None,
194 sql_store: None,
195 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
196 filter_persistence: None,
197 cf_inserts_since_snapshot: AtomicU64::new(0),
198 cf_last_snapshot: Mutex::new(Instant::now()),
199 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
200 merkle_cache: None,
201 sql_merkle: None,
202 l3_wal: None,
203 mysql_health: MysqlHealthChecker::new(),
204 eviction_policy: TanCurvePolicy::default(),
205 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
206 sql_write_semaphore,
207 schema_registry,
208 }
209 }
210
211 #[must_use]
213 pub fn state(&self) -> EngineState {
214 *self.state_rx.borrow()
215 }
216
217 #[must_use]
219 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
220 self.state_rx.clone()
221 }
222
223 #[must_use]
225 pub fn is_ready(&self) -> bool {
226 matches!(self.state(), EngineState::Ready | EngineState::Running)
227 }
228
229 #[must_use]
231 pub fn memory_pressure(&self) -> f64 {
232 let used = self.l1_size_bytes.load(Ordering::Acquire);
233 let max = self.config.read().l1_max_bytes;
234 if max == 0 {
235 0.0
236 } else {
237 used as f64 / max as f64
238 }
239 }
240
241 #[must_use]
243 pub fn pressure(&self) -> BackpressureLevel {
244 BackpressureLevel::from_pressure(self.memory_pressure())
245 }
246
247 #[must_use]
249 pub fn should_accept_writes(&self) -> bool {
250 let pressure = self.pressure();
251 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
252 }
253
254 pub async fn health_check(&self) -> types::HealthCheck {
281 let state = self.state();
283 let ready = matches!(state, EngineState::Ready | EngineState::Running);
284 let memory_pressure = self.memory_pressure();
285 let backpressure_level = self.pressure();
286 let accepting_writes = self.should_accept_writes();
287 let (l1_items, l1_bytes) = self.l1_stats();
288 let (filter_items, _, filter_trust) = self.l3_filter_stats();
289
290 let mysql_healthy = self.mysql_health.is_healthy();
293 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
294 let stats = wal.stats(mysql_healthy);
295 Some(stats.pending_items)
296 } else {
297 None
298 };
299
300 let (redis_result, sql_result) = tokio::join!(
302 self.probe_redis(),
303 self.probe_sql()
304 );
305
306 let (redis_connected, redis_latency_ms) = redis_result;
307 let (sql_connected, sql_latency_ms) = sql_result;
308
309 let healthy = matches!(state, EngineState::Running)
312 && redis_connected != Some(false)
313 && sql_connected != Some(false)
314 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
315
316 types::HealthCheck {
317 state,
318 ready,
319 memory_pressure,
320 backpressure_level,
321 accepting_writes,
322 l1_items,
323 l1_bytes,
324 filter_items,
325 filter_trust,
326 redis_connected,
327 redis_latency_ms,
328 sql_connected,
329 sql_latency_ms,
330 wal_pending_items,
331 healthy,
332 }
333 }
334
335 pub async fn merkle_root(&self) -> Option<String> {
340 if let Some(ref sql_merkle) = self.sql_merkle {
341 match sql_merkle.root_hash().await {
342 Ok(Some(hash)) => Some(hex::encode(hash)),
343 Ok(None) => None,
344 Err(e) => {
345 warn!(error = %e, "Failed to get merkle root");
346 None
347 }
348 }
349 } else {
350 None
351 }
352 }
353
354 pub async fn merkle_root_cache(&self) -> Option<String> {
356 if let Some(ref merkle_cache) = self.merkle_cache {
357 match merkle_cache.root_hash().await {
358 Ok(Some(hash)) => Some(hex::encode(hash)),
359 Ok(None) => None,
360 Err(e) => {
361 debug!(error = %e, "Failed to get merkle cache root");
362 None
363 }
364 }
365 } else {
366 None
367 }
368 }
369
370 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
372 let Some(ref redis_store) = self.redis_store else {
373 return (None, None); };
375
376 let start = std::time::Instant::now();
377 let mut conn = redis_store.connection();
378
379 let result: Result<String, _> = redis::cmd("PING")
380 .query_async(&mut conn)
381 .await;
382
383 match result {
384 Ok(_) => {
385 let latency_ms = start.elapsed().as_millis() as u64;
386 (Some(true), Some(latency_ms))
387 }
388 Err(_) => (Some(false), None),
389 }
390 }
391
392 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
394 let Some(ref sql_store) = self.sql_store else {
395 return (None, None); };
397
398 let start = std::time::Instant::now();
399 let pool = sql_store.pool();
400
401 let result = sqlx::query("SELECT 1")
402 .fetch_one(&pool)
403 .await;
404
405 match result {
406 Ok(_) => {
407 let latency_ms = start.elapsed().as_millis() as u64;
408 (Some(true), Some(latency_ms))
409 }
410 Err(_) => (Some(false), None),
411 }
412 }
413
414 #[tracing::instrument(skip(self), fields(tier))]
421 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
422 let start = std::time::Instant::now();
423
424 if let Some(mut item) = self.l1_cache.get_mut(id) {
426 item.access_count = item.access_count.saturating_add(1);
427 item.last_accessed = std::time::SystemTime::now()
428 .duration_since(std::time::UNIX_EPOCH)
429 .unwrap_or_default()
430 .as_millis() as u64;
431 tracing::Span::current().record("tier", "L1");
432 debug!("L1 hit");
433 crate::metrics::record_operation("L1", "get", "hit");
434 crate::metrics::record_latency("L1", "get", start.elapsed());
435 return Ok(Some(item.clone()));
436 }
437
438 if let Some(ref l2) = self.l2_store {
440 match l2.get(id).await {
441 Ok(Some(item)) => {
442 self.insert_l1(item.clone());
444 tracing::Span::current().record("tier", "L2");
445 debug!("L2 hit, promoted to L1");
446 crate::metrics::record_operation("L2", "get", "hit");
447 crate::metrics::record_latency("L2", "get", start.elapsed());
448 return Ok(Some(item));
449 }
450 Ok(None) => {
451 debug!("L2 miss");
453 crate::metrics::record_operation("L2", "get", "miss");
454 }
455 Err(e) => {
456 warn!(error = %e, "L2 lookup failed");
457 crate::metrics::record_operation("L2", "get", "error");
458 }
459 }
460 }
461
462 if self.l3_filter.should_check_l3(id) {
464 crate::metrics::record_cuckoo_check("L3", "positive");
465 if let Some(ref l3) = self.l3_store {
466 match l3.get(id).await {
467 Ok(Some(item)) => {
468 if self.memory_pressure() < 1.0 {
470 self.insert_l1(item.clone());
471 }
472
473 if let Some(ref l2) = self.l2_store {
475 if let Err(e) = l2.put(&item).await {
476 warn!(id = %id, error = %e, "Failed to populate L2 on read");
477 } else {
478 debug!("L3 hit, promoted to L1 and L2");
479 }
480 } else {
481 debug!("L3 hit, promoted to L1");
482 }
483
484 tracing::Span::current().record("tier", "L3");
485 crate::metrics::record_operation("L3", "get", "hit");
486 crate::metrics::record_latency("L3", "get", start.elapsed());
487 crate::metrics::record_bytes_read("L3", item.content.len());
488 return Ok(Some(item));
489 }
490 Ok(None) => {
491 debug!("L3 filter false positive");
493 crate::metrics::record_operation("L3", "get", "false_positive");
494 crate::metrics::record_cuckoo_false_positive("L3");
495 }
496 Err(e) => {
497 warn!(error = %e, "L3 lookup failed");
498 crate::metrics::record_operation("L3", "get", "error");
499 crate::metrics::record_error("L3", "get", "backend");
500 }
501 }
502 }
503 } else {
504 crate::metrics::record_cuckoo_check("L3", "negative");
506 }
507
508 tracing::Span::current().record("tier", "miss");
509 debug!("Cache miss");
510 crate::metrics::record_operation("all", "get", "miss");
511 crate::metrics::record_latency("all", "get", start.elapsed());
512 Ok(None)
513 }
514
515 #[tracing::instrument(skip(self), fields(verified))]
520 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
521 let item = match self.get(id).await? {
522 Some(item) => item,
523 None => return Ok(None),
524 };
525
526 if !item.content_hash.is_empty() {
528 use sha2::{Sha256, Digest};
529
530 let computed = Sha256::digest(&item.content);
531 let computed_hex = hex::encode(computed);
532
533 if computed_hex != item.content_hash {
534 tracing::Span::current().record("verified", false);
535 warn!(
536 id = %id,
537 expected = %item.content_hash,
538 actual = %computed_hex,
539 "Data corruption detected!"
540 );
541
542 crate::metrics::record_corruption(id);
544
545 return Err(StorageError::Corruption {
546 id: id.to_string(),
547 expected: item.content_hash.clone(),
548 actual: computed_hex,
549 });
550 }
551
552 tracing::Span::current().record("verified", true);
553 debug!(id = %id, "Hash verification passed");
554 }
555
556 Ok(Some(item))
557 }
558
559 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
567 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
568 self.submit_with(item, SubmitOptions::default()).await
569 }
570
571 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
592 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
593 let start = std::time::Instant::now();
594
595 if !self.should_accept_writes() {
596 crate::metrics::record_operation("engine", "submit", "rejected");
597 crate::metrics::record_error("engine", "submit", "backpressure");
598 return Err(StorageError::Backend(format!(
599 "Rejecting write: engine state={}, pressure={}",
600 self.state(),
601 self.pressure()
602 )));
603 }
604
605 let id = item.object_id.clone();
606 let item_bytes = item.content.len();
607
608 if let Some(ref state) = options.state {
610 item.state = state.clone();
611 }
612
613 item.submit_options = Some(options);
615
616 self.insert_l1(item.clone());
618 crate::metrics::record_operation("L1", "submit", "success");
619 crate::metrics::record_bytes_written("L1", item_bytes);
620
621 let batch_to_flush = {
627 let mut batcher = self.l2_batcher.lock().await;
628 if let Some(reason) = batcher.add(item) {
629 batcher.force_flush_with_reason(reason)
630 } else {
631 None
632 }
633 };
634
635 if let Some(batch) = batch_to_flush {
636 self.flush_batch_internal(batch).await;
638 }
639
640 debug!(id = %id, "Item submitted to L1 and batch queue");
641 crate::metrics::record_latency("L1", "submit", start.elapsed());
642 Ok(())
643 }
644
645 #[tracing::instrument(skip(self), fields(object_id = %id))]
654 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
655 let start = std::time::Instant::now();
656
657 if !self.should_accept_writes() {
658 crate::metrics::record_operation("engine", "delete", "rejected");
659 crate::metrics::record_error("engine", "delete", "backpressure");
660 return Err(StorageError::Backend(format!(
661 "Rejecting delete: engine state={}, pressure={}",
662 self.state(),
663 self.pressure()
664 )));
665 }
666
667 let mut found = false;
668
669 if let Some((_, item)) = self.l1_cache.remove(id) {
671 let size = Self::item_size(&item);
672 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
673 found = true;
674 debug!("Deleted from L1");
675 crate::metrics::record_operation("L1", "delete", "success");
676 }
677
678 self.l3_filter.remove(id);
680
681 if let Some(ref l2) = self.l2_store {
683 let l2_start = std::time::Instant::now();
684 match l2.delete(id).await {
685 Ok(()) => {
686 found = true;
687 debug!("Deleted from L2 (Redis)");
688 crate::metrics::record_operation("L2", "delete", "success");
689 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
690 }
691 Err(e) => {
692 warn!(error = %e, "Failed to delete from L2 (Redis)");
693 crate::metrics::record_operation("L2", "delete", "error");
694 crate::metrics::record_error("L2", "delete", "backend");
695 }
696 }
697 }
698
699 if let Some(ref l3) = self.l3_store {
701 let l3_start = std::time::Instant::now();
702 match l3.delete(id).await {
703 Ok(()) => {
704 found = true;
705 debug!("Deleted from L3 (MySQL)");
706 crate::metrics::record_operation("L3", "delete", "success");
707 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
708 }
709 Err(e) => {
710 error!(error = %e, "Failed to delete from L3 (MySQL)");
711 crate::metrics::record_operation("L3", "delete", "error");
712 crate::metrics::record_error("L3", "delete", "backend");
713 }
715 }
716 }
717
718 let mut merkle_batch = MerkleBatch::new();
720 merkle_batch.delete(id.to_string());
721
722 if let Some(ref sql_merkle) = self.sql_merkle {
723 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
724 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
725 crate::metrics::record_error("L3", "merkle", "batch_apply");
726 } else {
727 if let Some(ref merkle_cache) = self.merkle_cache {
729 if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &[id.to_string()]).await {
730 warn!(error = %e, "Failed to sync merkle cache after deletion");
731 }
732 }
733 }
734 }
735
736 if self.config.read().enable_cdc_stream && found {
738 self.emit_cdc_delete(id).await;
739 }
740
741 info!(found, "Delete operation completed");
742 crate::metrics::record_latency("all", "delete", start.elapsed());
743 Ok(found)
744 }
745
746 #[tracing::instrument(skip(self), fields(object_id = %id))]
753 pub async fn delete_replicated(&self, id: &str) -> Result<bool, StorageError> {
754 let start = std::time::Instant::now();
755
756 if !self.should_accept_writes() {
757 crate::metrics::record_operation("engine", "delete_replicated", "rejected");
758 return Err(StorageError::Backend(format!(
759 "Rejecting delete: engine state={}, pressure={}",
760 self.state(),
761 self.pressure()
762 )));
763 }
764
765 let mut found = false;
766
767 if let Some((_, item)) = self.l1_cache.remove(id) {
769 let size = Self::item_size(&item);
770 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
771 found = true;
772 debug!("Deleted from L1 (replicated)");
773 }
774
775 self.l3_filter.remove(id);
777
778 if let Some(ref l2) = self.l2_store {
780 if l2.delete(id).await.is_ok() {
781 found = true;
782 debug!("Deleted from L2 (replicated)");
783 }
784 }
785
786 if let Some(ref l3) = self.l3_store {
788 if l3.delete(id).await.is_ok() {
789 found = true;
790 debug!("Deleted from L3 (replicated)");
791 }
792 }
793
794 let mut merkle_batch = MerkleBatch::new();
796 merkle_batch.delete(id.to_string());
797
798 if let Some(ref sql_merkle) = self.sql_merkle {
799 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
800 error!(error = %e, "Failed to update SQL Merkle tree for replicated deletion");
801 } else {
802 if let Some(ref merkle_cache) = self.merkle_cache {
804 if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &[id.to_string()]).await {
805 warn!(error = %e, "Failed to sync merkle cache after replicated deletion");
806 }
807 }
808 }
809 }
810
811 debug!(found, "Replicated delete completed");
814 crate::metrics::record_latency("all", "delete_replicated", start.elapsed());
815 Ok(found)
816 }
817
818 fn insert_l1(&self, item: SyncItem) {
822 let new_size = Self::item_size(&item);
823 let key = item.object_id.clone();
824
825 if let Some(old_item) = self.l1_cache.insert(key, item) {
827 let old_size = Self::item_size(&old_item);
829 let current = self.l1_size_bytes.load(Ordering::Acquire);
831 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
832 self.l1_size_bytes.store(new_total, Ordering::Release);
833 } else {
834 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
836 }
837 }
838
839 #[inline]
841 fn item_size(item: &SyncItem) -> usize {
842 item.size_bytes()
844 }
845
846 fn maybe_evict(&self) {
847 let pressure = self.memory_pressure();
848 if pressure < self.config.read().backpressure_warn {
849 return;
850 }
851
852 let level = BackpressureLevel::from_pressure(pressure);
853 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
854
855 let now = std::time::Instant::now();
857 let entries: Vec<CacheEntry> = self.l1_cache.iter()
858 .map(|ref_multi| {
859 let item = ref_multi.value();
860 let id = ref_multi.key().clone();
861
862 let now_millis = std::time::SystemTime::now()
864 .duration_since(std::time::UNIX_EPOCH)
865 .unwrap_or_default()
866 .as_millis() as u64;
867 let age_secs = if item.last_accessed > 0 {
868 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
869 } else {
870 3600.0 };
872
873 CacheEntry {
874 id,
875 size_bytes: item.size_bytes(),
876 created_at: now - std::time::Duration::from_secs_f64(age_secs),
877 last_access: now - std::time::Duration::from_secs_f64(age_secs),
878 access_count: item.access_count,
879 is_dirty: false, }
881 })
882 .collect();
883
884 if entries.is_empty() {
885 return;
886 }
887
888 let evict_count = match level {
890 BackpressureLevel::Normal => 0,
891 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
897
898 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
900
901 let mut evicted_bytes = 0usize;
903 for victim_id in &victims {
904 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
905 evicted_bytes += item.size_bytes();
906 }
907 }
908
909 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
911
912 info!(
913 evicted = victims.len(),
914 evicted_bytes = evicted_bytes,
915 pressure = %pressure,
916 level = %level,
917 "Evicted entries from L1 cache"
918 );
919 }
920
921 pub fn l1_stats(&self) -> (usize, usize) {
923 (
924 self.l1_cache.len(),
925 self.l1_size_bytes.load(Ordering::Acquire),
926 )
927 }
928
929 #[must_use]
931 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
932 self.l3_filter.stats()
933 }
934
935 pub fn l3_filter(&self) -> &Arc<FilterManager> {
937 &self.l3_filter
938 }
939
940 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
945 let redis_root = if let Some(ref rm) = self.merkle_cache {
946 rm.root_hash().await.ok().flatten().map(hex::encode)
947 } else {
948 None
949 };
950
951 let sql_root = if let Some(ref sm) = self.sql_merkle {
952 sm.root_hash().await.ok().flatten().map(hex::encode)
953 } else {
954 None
955 };
956
957 (redis_root, sql_root)
958 }
959
960 pub async fn verify_filter(&self) -> bool {
967 let sql_root = if let Some(ref sm) = self.sql_merkle {
969 match sm.root_hash().await {
970 Ok(Some(root)) => root,
971 _ => return false,
972 }
973 } else {
974 self.l3_filter.mark_trusted();
976 return true;
977 };
978
979 info!(
983 sql_root = %hex::encode(sql_root),
984 "Verifying L3 filter against SQL merkle root"
985 );
986
987 self.l3_filter.mark_trusted();
989 true
990 }
991
992 pub fn update_gauge_metrics(&self) {
997 let (l1_count, l1_bytes) = self.l1_stats();
998 crate::metrics::set_l1_cache_items(l1_count);
999 crate::metrics::set_l1_cache_bytes(l1_bytes);
1000 crate::metrics::set_memory_pressure(self.memory_pressure());
1001
1002 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
1003 let filter_load = if filter_capacity > 0 {
1004 filter_entries as f64 / filter_capacity as f64
1005 } else {
1006 0.0
1007 };
1008 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
1009 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
1010
1011 crate::metrics::set_backpressure_level(self.pressure() as u8);
1012 }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use super::*;
1018 use crate::config::SyncEngineConfig;
1019 use tokio::sync::watch;
1020 use serde_json::json;
1021
1022 fn create_test_engine() -> SyncEngine {
1023 let config = SyncEngineConfig::default();
1024 let (_tx, rx) = watch::channel(config.clone());
1025 SyncEngine::new(config, rx)
1026 }
1027
1028 fn create_test_item(id: &str) -> SyncItem {
1029 SyncItem::from_json(
1030 id.to_string(),
1031 json!({"data": "test"}),
1032 )
1033 }
1034
1035 #[test]
1036 fn test_engine_created_state() {
1037 let engine = create_test_engine();
1038 assert_eq!(engine.state(), EngineState::Created);
1039 assert!(!engine.is_ready());
1040 }
1041
1042 #[test]
1043 fn test_memory_pressure_calculation() {
1044 let config = SyncEngineConfig {
1045 l1_max_bytes: 1000,
1046 ..Default::default()
1047 };
1048 let (_tx, rx) = watch::channel(config.clone());
1049 let engine = SyncEngine::new(config, rx);
1050
1051 assert_eq!(engine.memory_pressure(), 0.0);
1052
1053 let item = create_test_item("test1");
1055 engine.insert_l1(item);
1056
1057 assert!(engine.memory_pressure() > 0.0);
1059 }
1060
1061 #[test]
1062 fn test_l1_insert_and_size_tracking() {
1063 let engine = create_test_engine();
1064
1065 let item = create_test_item("test1");
1066 let expected_size = item.size_bytes();
1067
1068 engine.insert_l1(item);
1069
1070 let (count, size) = engine.l1_stats();
1071 assert_eq!(count, 1);
1072 assert_eq!(size, expected_size);
1073 }
1074
1075 #[test]
1076 fn test_l1_update_size_tracking() {
1077 let engine = create_test_engine();
1078
1079 let item1 = create_test_item("test1");
1080 engine.insert_l1(item1);
1081 let (_, _size1) = engine.l1_stats();
1082
1083 let item2 = SyncItem::from_json(
1085 "test1".to_string(),
1086 json!({"data": "much larger content here for testing size changes"}),
1087 );
1088 let size2_expected = item2.size_bytes();
1089 engine.insert_l1(item2);
1090
1091 let (count, size2) = engine.l1_stats();
1092 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
1095
1096 #[tokio::test]
1097 async fn test_get_nonexistent() {
1098 let engine = create_test_engine();
1099 let result = engine.get("nonexistent").await.unwrap();
1100 assert!(result.is_none());
1101 }
1102
1103 #[tokio::test]
1104 async fn test_get_from_l1() {
1105 let engine = create_test_engine();
1106 let item = create_test_item("test1");
1107 engine.insert_l1(item.clone());
1108
1109 let result = engine.get("test1").await.unwrap();
1110 assert!(result.is_some());
1111 assert_eq!(result.unwrap().object_id, "test1");
1112 }
1113
1114 #[tokio::test]
1115 async fn test_delete_from_l1() {
1116 let engine = create_test_engine();
1117 let item = create_test_item("test1");
1118 engine.insert_l1(item);
1119
1120 let (count_before, _) = engine.l1_stats();
1121 assert_eq!(count_before, 1);
1122
1123 let deleted = engine.delete("test1").await.unwrap();
1124 assert!(deleted);
1125
1126 let (count_after, size_after) = engine.l1_stats();
1127 assert_eq!(count_after, 0);
1128 assert_eq!(size_after, 0);
1129 }
1130
1131 #[test]
1132 fn test_filter_stats() {
1133 let engine = create_test_engine();
1134
1135 let (entries, capacity, _trust) = engine.l3_filter_stats();
1136 assert_eq!(entries, 0);
1137 assert!(capacity > 0);
1138 }
1139
1140 #[test]
1141 fn test_should_accept_writes() {
1142 let engine = create_test_engine();
1143 assert!(engine.should_accept_writes());
1144 }
1145
1146 #[test]
1147 fn test_pressure_level() {
1148 let engine = create_test_engine();
1149 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_health_check_basic() {
1154 let engine = create_test_engine();
1155
1156 let health = engine.health_check().await;
1157
1158 assert_eq!(health.state, EngineState::Created);
1160 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1165 assert_eq!(health.l1_items, 0);
1166 assert_eq!(health.l1_bytes, 0);
1167
1168 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1170 assert!(health.accepting_writes);
1171
1172 assert!(health.redis_connected.is_none());
1174 assert!(health.sql_connected.is_none());
1175 assert!(health.wal_pending_items.is_none());
1176 }
1177}