1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45mod schema_api;
46
47pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
48pub use merkle_api::MerkleDiff;
49pub use search_api::{SearchTier, SearchResult, SearchSource};
50#[allow(unused_imports)]
51use types::WriteTarget;
52
53use std::sync::Arc;
54use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
55use std::time::Instant;
56use dashmap::DashMap;
57use parking_lot::RwLock;
58use tokio::sync::{watch, Mutex, Semaphore};
59use tracing::{info, warn, debug, error};
60
61use crate::config::SyncEngineConfig;
62use crate::sync_item::SyncItem;
63use crate::submit_options::SubmitOptions;
64use crate::backpressure::BackpressureLevel;
65use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
66use crate::storage::sql::SqlStore;
67use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
68use crate::cuckoo::FilterPersistence;
69use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
70use crate::merkle::{MerkleCacheStore, SqlMerkleStore, MerkleBatch};
71use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
72use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
73use crate::schema::SchemaRegistry;
74
75use search_api::SearchState;
76
77pub struct SyncEngine {
89 pub(super) config: RwLock<SyncEngineConfig>,
92
93 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
95
96 pub(super) state: watch::Sender<EngineState>,
98
99 pub(super) state_rx: watch::Receiver<EngineState>,
101
102 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
104
105 pub(super) l1_size_bytes: Arc<AtomicUsize>,
107
108 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
110
111 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
113
114 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
116
117 pub(super) sql_store: Option<Arc<SqlStore>>,
119
120 pub(super) l3_filter: Arc<FilterManager>,
122
123 pub(super) filter_persistence: Option<FilterPersistence>,
125
126 pub(super) cf_inserts_since_snapshot: AtomicU64,
128 pub(super) cf_last_snapshot: Mutex<Instant>,
129
130 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
132
133 pub(super) merkle_cache: Option<MerkleCacheStore>,
135
136 pub(super) sql_merkle: Option<SqlMerkleStore>,
138
139 pub(super) l3_wal: Option<WriteAheadLog>,
141
142 pub(super) mysql_health: MysqlHealthChecker,
144
145 pub(super) eviction_policy: TanCurvePolicy,
147
148 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
150
151 pub(super) sql_write_semaphore: Arc<Semaphore>,
156
157 pub(super) schema_registry: Arc<SchemaRegistry>,
162}
163
164impl SyncEngine {
165 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
170 let (state_tx, state_rx) = watch::channel(EngineState::Created);
171
172 let batch_config = BatchConfig {
173 flush_ms: config.batch_flush_ms,
174 flush_count: config.batch_flush_count,
175 flush_bytes: config.batch_flush_bytes,
176 };
177
178 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
180
181 let schema_registry = Arc::new(SchemaRegistry::new());
183
184 Self {
185 config: RwLock::new(config.clone()),
186 config_rx: Mutex::new(config_rx),
187 state: state_tx,
188 state_rx,
189 l1_cache: Arc::new(DashMap::new()),
190 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
191 l2_store: None,
192 redis_store: None,
193 l3_store: None,
194 sql_store: None,
195 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
196 filter_persistence: None,
197 cf_inserts_since_snapshot: AtomicU64::new(0),
198 cf_last_snapshot: Mutex::new(Instant::now()),
199 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
200 merkle_cache: None,
201 sql_merkle: None,
202 l3_wal: None,
203 mysql_health: MysqlHealthChecker::new(),
204 eviction_policy: TanCurvePolicy::default(),
205 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
206 sql_write_semaphore,
207 schema_registry,
208 }
209 }
210
211 #[must_use]
213 pub fn state(&self) -> EngineState {
214 *self.state_rx.borrow()
215 }
216
217 #[must_use]
219 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
220 self.state_rx.clone()
221 }
222
223 #[must_use]
225 pub fn is_ready(&self) -> bool {
226 matches!(self.state(), EngineState::Ready | EngineState::Running)
227 }
228
229 #[must_use]
231 pub fn memory_pressure(&self) -> f64 {
232 let used = self.l1_size_bytes.load(Ordering::Acquire);
233 let max = self.config.read().l1_max_bytes;
234 if max == 0 {
235 0.0
236 } else {
237 used as f64 / max as f64
238 }
239 }
240
241 #[must_use]
243 pub fn pressure(&self) -> BackpressureLevel {
244 BackpressureLevel::from_pressure(self.memory_pressure())
245 }
246
247 #[must_use]
249 pub fn should_accept_writes(&self) -> bool {
250 let pressure = self.pressure();
251 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
252 }
253
254 pub async fn health_check(&self) -> types::HealthCheck {
281 let state = self.state();
283 let ready = matches!(state, EngineState::Ready | EngineState::Running);
284 let memory_pressure = self.memory_pressure();
285 let backpressure_level = self.pressure();
286 let accepting_writes = self.should_accept_writes();
287 let (l1_items, l1_bytes) = self.l1_stats();
288 let (filter_items, _, filter_trust) = self.l3_filter_stats();
289
290 let mysql_healthy = self.mysql_health.is_healthy();
293 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
294 let stats = wal.stats(mysql_healthy);
295 Some(stats.pending_items)
296 } else {
297 None
298 };
299
300 let (redis_result, sql_result) = tokio::join!(
302 self.probe_redis(),
303 self.probe_sql()
304 );
305
306 let (redis_connected, redis_latency_ms) = redis_result;
307 let (sql_connected, sql_latency_ms) = sql_result;
308
309 let healthy = matches!(state, EngineState::Running)
312 && redis_connected != Some(false)
313 && sql_connected != Some(false)
314 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
315
316 types::HealthCheck {
317 state,
318 ready,
319 memory_pressure,
320 backpressure_level,
321 accepting_writes,
322 l1_items,
323 l1_bytes,
324 filter_items,
325 filter_trust,
326 redis_connected,
327 redis_latency_ms,
328 sql_connected,
329 sql_latency_ms,
330 wal_pending_items,
331 healthy,
332 }
333 }
334
335 pub async fn merkle_root(&self) -> Option<String> {
340 if let Some(ref sql_merkle) = self.sql_merkle {
341 match sql_merkle.root_hash().await {
342 Ok(Some(hash)) => Some(hex::encode(hash)),
343 Ok(None) => None,
344 Err(e) => {
345 warn!(error = %e, "Failed to get merkle root");
346 None
347 }
348 }
349 } else {
350 None
351 }
352 }
353
354 pub async fn merkle_root_cache(&self) -> Option<String> {
356 if let Some(ref merkle_cache) = self.merkle_cache {
357 match merkle_cache.root_hash().await {
358 Ok(Some(hash)) => Some(hex::encode(hash)),
359 Ok(None) => None,
360 Err(e) => {
361 debug!(error = %e, "Failed to get merkle cache root");
362 None
363 }
364 }
365 } else {
366 None
367 }
368 }
369
370 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
372 let Some(ref redis_store) = self.redis_store else {
373 return (None, None); };
375
376 let start = std::time::Instant::now();
377 let mut conn = redis_store.connection();
378
379 let result: Result<String, _> = redis::cmd("PING")
380 .query_async(&mut conn)
381 .await;
382
383 match result {
384 Ok(_) => {
385 let latency_ms = start.elapsed().as_millis() as u64;
386 (Some(true), Some(latency_ms))
387 }
388 Err(_) => (Some(false), None),
389 }
390 }
391
392 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
394 let Some(ref sql_store) = self.sql_store else {
395 return (None, None); };
397
398 let start = std::time::Instant::now();
399 let pool = sql_store.pool();
400
401 let result = sqlx::query("SELECT 1")
402 .fetch_one(&pool)
403 .await;
404
405 match result {
406 Ok(_) => {
407 let latency_ms = start.elapsed().as_millis() as u64;
408 (Some(true), Some(latency_ms))
409 }
410 Err(_) => (Some(false), None),
411 }
412 }
413
414 #[tracing::instrument(skip(self), fields(tier))]
421 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
422 let start = std::time::Instant::now();
423
424 if let Some(mut item) = self.l1_cache.get_mut(id) {
426 item.access_count = item.access_count.saturating_add(1);
427 item.last_accessed = std::time::SystemTime::now()
428 .duration_since(std::time::UNIX_EPOCH)
429 .unwrap_or_default()
430 .as_millis() as u64;
431 tracing::Span::current().record("tier", "L1");
432 debug!("L1 hit");
433 crate::metrics::record_operation("L1", "get", "hit");
434 crate::metrics::record_latency("L1", "get", start.elapsed());
435 return Ok(Some(item.clone()));
436 }
437
438 if let Some(ref l2) = self.l2_store {
440 match l2.get(id).await {
441 Ok(Some(item)) => {
442 self.insert_l1(item.clone());
444 tracing::Span::current().record("tier", "L2");
445 debug!("L2 hit, promoted to L1");
446 crate::metrics::record_operation("L2", "get", "hit");
447 crate::metrics::record_latency("L2", "get", start.elapsed());
448 return Ok(Some(item));
449 }
450 Ok(None) => {
451 debug!("L2 miss");
453 crate::metrics::record_operation("L2", "get", "miss");
454 }
455 Err(e) => {
456 warn!(error = %e, "L2 lookup failed");
457 crate::metrics::record_operation("L2", "get", "error");
458 }
459 }
460 }
461
462 if self.l3_filter.should_check_l3(id) {
464 crate::metrics::record_cuckoo_check("L3", "positive");
465 if let Some(ref l3) = self.l3_store {
466 match l3.get(id).await {
467 Ok(Some(item)) => {
468 if self.memory_pressure() < 1.0 {
470 self.insert_l1(item.clone());
471 }
472
473 if let Some(ref l2) = self.l2_store {
475 if let Err(e) = l2.put(&item).await {
476 warn!(id = %id, error = %e, "Failed to populate L2 on read");
477 } else {
478 debug!("L3 hit, promoted to L1 and L2");
479 }
480 } else {
481 debug!("L3 hit, promoted to L1");
482 }
483
484 tracing::Span::current().record("tier", "L3");
485 crate::metrics::record_operation("L3", "get", "hit");
486 crate::metrics::record_latency("L3", "get", start.elapsed());
487 crate::metrics::record_bytes_read("L3", item.content.len());
488 return Ok(Some(item));
489 }
490 Ok(None) => {
491 debug!("L3 filter false positive");
493 crate::metrics::record_operation("L3", "get", "false_positive");
494 crate::metrics::record_cuckoo_false_positive("L3");
495 }
496 Err(e) => {
497 warn!(error = %e, "L3 lookup failed");
498 crate::metrics::record_operation("L3", "get", "error");
499 crate::metrics::record_error("L3", "get", "backend");
500 }
501 }
502 }
503 } else {
504 crate::metrics::record_cuckoo_check("L3", "negative");
506 }
507
508 tracing::Span::current().record("tier", "miss");
509 debug!("Cache miss");
510 crate::metrics::record_operation("all", "get", "miss");
511 crate::metrics::record_latency("all", "get", start.elapsed());
512 Ok(None)
513 }
514
515 #[tracing::instrument(skip(self), fields(verified))]
520 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
521 let item = match self.get(id).await? {
522 Some(item) => item,
523 None => return Ok(None),
524 };
525
526 if !item.content_hash.is_empty() {
528 use sha2::{Sha256, Digest};
529
530 let computed = Sha256::digest(&item.content);
531 let computed_hex = hex::encode(computed);
532
533 if computed_hex != item.content_hash {
534 tracing::Span::current().record("verified", false);
535 warn!(
536 id = %id,
537 expected = %item.content_hash,
538 actual = %computed_hex,
539 "Data corruption detected!"
540 );
541
542 crate::metrics::record_corruption(id);
544
545 return Err(StorageError::Corruption {
546 id: id.to_string(),
547 expected: item.content_hash.clone(),
548 actual: computed_hex,
549 });
550 }
551
552 tracing::Span::current().record("verified", true);
553 debug!(id = %id, "Hash verification passed");
554 }
555
556 Ok(Some(item))
557 }
558
559 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
567 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
568 self.submit_with(item, SubmitOptions::default()).await
569 }
570
571 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
592 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
593 let start = std::time::Instant::now();
594
595 if !self.should_accept_writes() {
596 crate::metrics::record_operation("engine", "submit", "rejected");
597 crate::metrics::record_error("engine", "submit", "backpressure");
598 return Err(StorageError::Backend(format!(
599 "Rejecting write: engine state={}, pressure={}",
600 self.state(),
601 self.pressure()
602 )));
603 }
604
605 let id = item.object_id.clone();
606 let item_bytes = item.content.len();
607
608 if let Some(ref state) = options.state {
610 item.state = state.clone();
611 }
612
613 item.submit_options = Some(options);
615
616 self.insert_l1(item.clone());
618 crate::metrics::record_operation("L1", "submit", "success");
619 crate::metrics::record_bytes_written("L1", item_bytes);
620
621 let batch_to_flush = {
627 let mut batcher = self.l2_batcher.lock().await;
628 if let Some(reason) = batcher.add(item) {
629 batcher.force_flush_with_reason(reason)
630 } else {
631 None
632 }
633 };
634
635 if let Some(batch) = batch_to_flush {
636 self.flush_batch_internal(batch).await;
638 }
639
640 debug!(id = %id, "Item submitted to L1 and batch queue");
641 crate::metrics::record_latency("L1", "submit", start.elapsed());
642 Ok(())
643 }
644
645 #[tracing::instrument(skip(self), fields(object_id = %id))]
654 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
655 let start = std::time::Instant::now();
656
657 if !self.should_accept_writes() {
658 crate::metrics::record_operation("engine", "delete", "rejected");
659 crate::metrics::record_error("engine", "delete", "backpressure");
660 return Err(StorageError::Backend(format!(
661 "Rejecting delete: engine state={}, pressure={}",
662 self.state(),
663 self.pressure()
664 )));
665 }
666
667 let mut found = false;
668
669 if let Some((_, item)) = self.l1_cache.remove(id) {
671 let size = Self::item_size(&item);
672 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
673 found = true;
674 debug!("Deleted from L1");
675 crate::metrics::record_operation("L1", "delete", "success");
676 }
677
678 self.l3_filter.remove(id);
680
681 if let Some(ref l2) = self.l2_store {
683 let l2_start = std::time::Instant::now();
684 match l2.delete(id).await {
685 Ok(()) => {
686 found = true;
687 debug!("Deleted from L2 (Redis)");
688 crate::metrics::record_operation("L2", "delete", "success");
689 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
690 }
691 Err(e) => {
692 warn!(error = %e, "Failed to delete from L2 (Redis)");
693 crate::metrics::record_operation("L2", "delete", "error");
694 crate::metrics::record_error("L2", "delete", "backend");
695 }
696 }
697 }
698
699 if let Some(ref l3) = self.l3_store {
701 let l3_start = std::time::Instant::now();
702 match l3.delete(id).await {
703 Ok(()) => {
704 found = true;
705 debug!("Deleted from L3 (MySQL)");
706 crate::metrics::record_operation("L3", "delete", "success");
707 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
708 }
709 Err(e) => {
710 error!(error = %e, "Failed to delete from L3 (MySQL)");
711 crate::metrics::record_operation("L3", "delete", "error");
712 crate::metrics::record_error("L3", "delete", "backend");
713 }
715 }
716 }
717
718 let mut merkle_batch = MerkleBatch::new();
720 merkle_batch.delete(id.to_string());
721
722 if let Some(ref sql_merkle) = self.sql_merkle {
723 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
724 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
725 crate::metrics::record_error("L3", "merkle", "batch_apply");
726 } else {
727 if let Some(ref merkle_cache) = self.merkle_cache {
729 if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &[id.to_string()]).await {
730 warn!(error = %e, "Failed to sync merkle cache after deletion");
731 }
732 }
733 }
734 }
735
736 if self.config.read().enable_cdc_stream && found {
738 self.emit_cdc_delete(id).await;
739 }
740
741 info!(found, "Delete operation completed");
742 crate::metrics::record_latency("all", "delete", start.elapsed());
743 Ok(found)
744 }
745
746 fn insert_l1(&self, item: SyncItem) {
750 let new_size = Self::item_size(&item);
751 let key = item.object_id.clone();
752
753 if let Some(old_item) = self.l1_cache.insert(key, item) {
755 let old_size = Self::item_size(&old_item);
757 let current = self.l1_size_bytes.load(Ordering::Acquire);
759 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
760 self.l1_size_bytes.store(new_total, Ordering::Release);
761 } else {
762 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
764 }
765 }
766
767 #[inline]
769 fn item_size(item: &SyncItem) -> usize {
770 item.size_bytes()
772 }
773
774 fn maybe_evict(&self) {
775 let pressure = self.memory_pressure();
776 if pressure < self.config.read().backpressure_warn {
777 return;
778 }
779
780 let level = BackpressureLevel::from_pressure(pressure);
781 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
782
783 let now = std::time::Instant::now();
785 let entries: Vec<CacheEntry> = self.l1_cache.iter()
786 .map(|ref_multi| {
787 let item = ref_multi.value();
788 let id = ref_multi.key().clone();
789
790 let now_millis = std::time::SystemTime::now()
792 .duration_since(std::time::UNIX_EPOCH)
793 .unwrap_or_default()
794 .as_millis() as u64;
795 let age_secs = if item.last_accessed > 0 {
796 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
797 } else {
798 3600.0 };
800
801 CacheEntry {
802 id,
803 size_bytes: item.size_bytes(),
804 created_at: now - std::time::Duration::from_secs_f64(age_secs),
805 last_access: now - std::time::Duration::from_secs_f64(age_secs),
806 access_count: item.access_count,
807 is_dirty: false, }
809 })
810 .collect();
811
812 if entries.is_empty() {
813 return;
814 }
815
816 let evict_count = match level {
818 BackpressureLevel::Normal => 0,
819 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
825
826 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
828
829 let mut evicted_bytes = 0usize;
831 for victim_id in &victims {
832 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
833 evicted_bytes += item.size_bytes();
834 }
835 }
836
837 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
839
840 info!(
841 evicted = victims.len(),
842 evicted_bytes = evicted_bytes,
843 pressure = %pressure,
844 level = %level,
845 "Evicted entries from L1 cache"
846 );
847 }
848
849 pub fn l1_stats(&self) -> (usize, usize) {
851 (
852 self.l1_cache.len(),
853 self.l1_size_bytes.load(Ordering::Acquire),
854 )
855 }
856
857 #[must_use]
859 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
860 self.l3_filter.stats()
861 }
862
863 pub fn l3_filter(&self) -> &Arc<FilterManager> {
865 &self.l3_filter
866 }
867
868 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
873 let redis_root = if let Some(ref rm) = self.merkle_cache {
874 rm.root_hash().await.ok().flatten().map(hex::encode)
875 } else {
876 None
877 };
878
879 let sql_root = if let Some(ref sm) = self.sql_merkle {
880 sm.root_hash().await.ok().flatten().map(hex::encode)
881 } else {
882 None
883 };
884
885 (redis_root, sql_root)
886 }
887
888 pub async fn verify_filter(&self) -> bool {
895 let sql_root = if let Some(ref sm) = self.sql_merkle {
897 match sm.root_hash().await {
898 Ok(Some(root)) => root,
899 _ => return false,
900 }
901 } else {
902 self.l3_filter.mark_trusted();
904 return true;
905 };
906
907 info!(
911 sql_root = %hex::encode(sql_root),
912 "Verifying L3 filter against SQL merkle root"
913 );
914
915 self.l3_filter.mark_trusted();
917 true
918 }
919
920 pub fn update_gauge_metrics(&self) {
925 let (l1_count, l1_bytes) = self.l1_stats();
926 crate::metrics::set_l1_cache_items(l1_count);
927 crate::metrics::set_l1_cache_bytes(l1_bytes);
928 crate::metrics::set_memory_pressure(self.memory_pressure());
929
930 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
931 let filter_load = if filter_capacity > 0 {
932 filter_entries as f64 / filter_capacity as f64
933 } else {
934 0.0
935 };
936 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
937 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
938
939 crate::metrics::set_backpressure_level(self.pressure() as u8);
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use super::*;
946 use crate::config::SyncEngineConfig;
947 use tokio::sync::watch;
948 use serde_json::json;
949
950 fn create_test_engine() -> SyncEngine {
951 let config = SyncEngineConfig::default();
952 let (_tx, rx) = watch::channel(config.clone());
953 SyncEngine::new(config, rx)
954 }
955
956 fn create_test_item(id: &str) -> SyncItem {
957 SyncItem::from_json(
958 id.to_string(),
959 json!({"data": "test"}),
960 )
961 }
962
963 #[test]
964 fn test_engine_created_state() {
965 let engine = create_test_engine();
966 assert_eq!(engine.state(), EngineState::Created);
967 assert!(!engine.is_ready());
968 }
969
970 #[test]
971 fn test_memory_pressure_calculation() {
972 let config = SyncEngineConfig {
973 l1_max_bytes: 1000,
974 ..Default::default()
975 };
976 let (_tx, rx) = watch::channel(config.clone());
977 let engine = SyncEngine::new(config, rx);
978
979 assert_eq!(engine.memory_pressure(), 0.0);
980
981 let item = create_test_item("test1");
983 engine.insert_l1(item);
984
985 assert!(engine.memory_pressure() > 0.0);
987 }
988
989 #[test]
990 fn test_l1_insert_and_size_tracking() {
991 let engine = create_test_engine();
992
993 let item = create_test_item("test1");
994 let expected_size = item.size_bytes();
995
996 engine.insert_l1(item);
997
998 let (count, size) = engine.l1_stats();
999 assert_eq!(count, 1);
1000 assert_eq!(size, expected_size);
1001 }
1002
1003 #[test]
1004 fn test_l1_update_size_tracking() {
1005 let engine = create_test_engine();
1006
1007 let item1 = create_test_item("test1");
1008 engine.insert_l1(item1);
1009 let (_, _size1) = engine.l1_stats();
1010
1011 let item2 = SyncItem::from_json(
1013 "test1".to_string(),
1014 json!({"data": "much larger content here for testing size changes"}),
1015 );
1016 let size2_expected = item2.size_bytes();
1017 engine.insert_l1(item2);
1018
1019 let (count, size2) = engine.l1_stats();
1020 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
1023
1024 #[tokio::test]
1025 async fn test_get_nonexistent() {
1026 let engine = create_test_engine();
1027 let result = engine.get("nonexistent").await.unwrap();
1028 assert!(result.is_none());
1029 }
1030
1031 #[tokio::test]
1032 async fn test_get_from_l1() {
1033 let engine = create_test_engine();
1034 let item = create_test_item("test1");
1035 engine.insert_l1(item.clone());
1036
1037 let result = engine.get("test1").await.unwrap();
1038 assert!(result.is_some());
1039 assert_eq!(result.unwrap().object_id, "test1");
1040 }
1041
1042 #[tokio::test]
1043 async fn test_delete_from_l1() {
1044 let engine = create_test_engine();
1045 let item = create_test_item("test1");
1046 engine.insert_l1(item);
1047
1048 let (count_before, _) = engine.l1_stats();
1049 assert_eq!(count_before, 1);
1050
1051 let deleted = engine.delete("test1").await.unwrap();
1052 assert!(deleted);
1053
1054 let (count_after, size_after) = engine.l1_stats();
1055 assert_eq!(count_after, 0);
1056 assert_eq!(size_after, 0);
1057 }
1058
1059 #[test]
1060 fn test_filter_stats() {
1061 let engine = create_test_engine();
1062
1063 let (entries, capacity, _trust) = engine.l3_filter_stats();
1064 assert_eq!(entries, 0);
1065 assert!(capacity > 0);
1066 }
1067
1068 #[test]
1069 fn test_should_accept_writes() {
1070 let engine = create_test_engine();
1071 assert!(engine.should_accept_writes());
1072 }
1073
1074 #[test]
1075 fn test_pressure_level() {
1076 let engine = create_test_engine();
1077 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_health_check_basic() {
1082 let engine = create_test_engine();
1083
1084 let health = engine.health_check().await;
1085
1086 assert_eq!(health.state, EngineState::Created);
1088 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1093 assert_eq!(health.l1_items, 0);
1094 assert_eq!(health.l1_bytes, 0);
1095
1096 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1098 assert!(health.accepting_writes);
1099
1100 assert!(health.redis_connected.is_none());
1102 assert!(health.sql_connected.is_none());
1103 assert!(health.wal_pending_items.is_none());
1104 }
1105}