1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45mod schema_api;
46
47pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
48pub use merkle_api::MerkleDiff;
49pub use search_api::{SearchTier, SearchResult, SearchSource};
50#[allow(unused_imports)]
51use types::WriteTarget;
52
53use std::sync::Arc;
54use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
55use std::time::Instant;
56use dashmap::DashMap;
57use parking_lot::RwLock;
58use tokio::sync::{watch, Mutex, Semaphore};
59use tracing::{info, warn, debug, error};
60
61use crate::config::SyncEngineConfig;
62use crate::sync_item::SyncItem;
63use crate::submit_options::SubmitOptions;
64use crate::backpressure::BackpressureLevel;
65use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
66use crate::storage::sql::SqlStore;
67use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
68use crate::cuckoo::FilterPersistence;
69use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
70use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
71use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
72use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
73use crate::schema::SchemaRegistry;
74
75use search_api::SearchState;
76
77pub struct SyncEngine {
89 pub(super) config: RwLock<SyncEngineConfig>,
92
93 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
95
96 pub(super) state: watch::Sender<EngineState>,
98
99 pub(super) state_rx: watch::Receiver<EngineState>,
101
102 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
104
105 pub(super) l1_size_bytes: Arc<AtomicUsize>,
107
108 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
110
111 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
113
114 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
116
117 pub(super) sql_store: Option<Arc<SqlStore>>,
119
120 pub(super) l3_filter: Arc<FilterManager>,
122
123 pub(super) filter_persistence: Option<FilterPersistence>,
125
126 pub(super) cf_inserts_since_snapshot: AtomicU64,
128 pub(super) cf_last_snapshot: Mutex<Instant>,
129
130 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
132
133 pub(super) redis_merkle: Option<RedisMerkleStore>,
135
136 pub(super) sql_merkle: Option<SqlMerkleStore>,
138
139 pub(super) l3_wal: Option<WriteAheadLog>,
141
142 pub(super) mysql_health: MysqlHealthChecker,
144
145 pub(super) eviction_policy: TanCurvePolicy,
147
148 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
150
151 pub(super) sql_write_semaphore: Arc<Semaphore>,
156
157 pub(super) schema_registry: Arc<SchemaRegistry>,
162}
163
164impl SyncEngine {
165 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
170 let (state_tx, state_rx) = watch::channel(EngineState::Created);
171
172 let batch_config = BatchConfig {
173 flush_ms: config.batch_flush_ms,
174 flush_count: config.batch_flush_count,
175 flush_bytes: config.batch_flush_bytes,
176 };
177
178 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
180
181 let schema_registry = Arc::new(SchemaRegistry::new());
183
184 Self {
185 config: RwLock::new(config.clone()),
186 config_rx: Mutex::new(config_rx),
187 state: state_tx,
188 state_rx,
189 l1_cache: Arc::new(DashMap::new()),
190 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
191 l2_store: None,
192 redis_store: None,
193 l3_store: None,
194 sql_store: None,
195 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
196 filter_persistence: None,
197 cf_inserts_since_snapshot: AtomicU64::new(0),
198 cf_last_snapshot: Mutex::new(Instant::now()),
199 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
200 redis_merkle: None,
201 sql_merkle: None,
202 l3_wal: None,
203 mysql_health: MysqlHealthChecker::new(),
204 eviction_policy: TanCurvePolicy::default(),
205 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
206 sql_write_semaphore,
207 schema_registry,
208 }
209 }
210
211 #[must_use]
213 pub fn state(&self) -> EngineState {
214 *self.state_rx.borrow()
215 }
216
217 #[must_use]
219 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
220 self.state_rx.clone()
221 }
222
223 #[must_use]
225 pub fn is_ready(&self) -> bool {
226 matches!(self.state(), EngineState::Ready | EngineState::Running)
227 }
228
229 #[must_use]
231 pub fn memory_pressure(&self) -> f64 {
232 let used = self.l1_size_bytes.load(Ordering::Acquire);
233 let max = self.config.read().l1_max_bytes;
234 if max == 0 {
235 0.0
236 } else {
237 used as f64 / max as f64
238 }
239 }
240
241 #[must_use]
243 pub fn pressure(&self) -> BackpressureLevel {
244 BackpressureLevel::from_pressure(self.memory_pressure())
245 }
246
247 #[must_use]
249 pub fn should_accept_writes(&self) -> bool {
250 let pressure = self.pressure();
251 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
252 }
253
254 pub async fn health_check(&self) -> types::HealthCheck {
281 let state = self.state();
283 let ready = matches!(state, EngineState::Ready | EngineState::Running);
284 let memory_pressure = self.memory_pressure();
285 let backpressure_level = self.pressure();
286 let accepting_writes = self.should_accept_writes();
287 let (l1_items, l1_bytes) = self.l1_stats();
288 let (filter_items, _, filter_trust) = self.l3_filter_stats();
289
290 let mysql_healthy = self.mysql_health.is_healthy();
293 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
294 let stats = wal.stats(mysql_healthy);
295 Some(stats.pending_items)
296 } else {
297 None
298 };
299
300 let (redis_result, sql_result) = tokio::join!(
302 self.probe_redis(),
303 self.probe_sql()
304 );
305
306 let (redis_connected, redis_latency_ms) = redis_result;
307 let (sql_connected, sql_latency_ms) = sql_result;
308
309 let healthy = matches!(state, EngineState::Running)
312 && redis_connected != Some(false)
313 && sql_connected != Some(false)
314 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
315
316 types::HealthCheck {
317 state,
318 ready,
319 memory_pressure,
320 backpressure_level,
321 accepting_writes,
322 l1_items,
323 l1_bytes,
324 filter_items,
325 filter_trust,
326 redis_connected,
327 redis_latency_ms,
328 sql_connected,
329 sql_latency_ms,
330 wal_pending_items,
331 healthy,
332 }
333 }
334
335 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
337 let Some(ref redis_store) = self.redis_store else {
338 return (None, None); };
340
341 let start = std::time::Instant::now();
342 let mut conn = redis_store.connection();
343
344 let result: Result<String, _> = redis::cmd("PING")
345 .query_async(&mut conn)
346 .await;
347
348 match result {
349 Ok(_) => {
350 let latency_ms = start.elapsed().as_millis() as u64;
351 (Some(true), Some(latency_ms))
352 }
353 Err(_) => (Some(false), None),
354 }
355 }
356
357 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
359 let Some(ref sql_store) = self.sql_store else {
360 return (None, None); };
362
363 let start = std::time::Instant::now();
364 let pool = sql_store.pool();
365
366 let result = sqlx::query("SELECT 1")
367 .fetch_one(&pool)
368 .await;
369
370 match result {
371 Ok(_) => {
372 let latency_ms = start.elapsed().as_millis() as u64;
373 (Some(true), Some(latency_ms))
374 }
375 Err(_) => (Some(false), None),
376 }
377 }
378
379 #[tracing::instrument(skip(self), fields(tier))]
386 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
387 let start = std::time::Instant::now();
388
389 if let Some(mut item) = self.l1_cache.get_mut(id) {
391 item.access_count = item.access_count.saturating_add(1);
392 item.last_accessed = std::time::SystemTime::now()
393 .duration_since(std::time::UNIX_EPOCH)
394 .unwrap_or_default()
395 .as_millis() as u64;
396 tracing::Span::current().record("tier", "L1");
397 debug!("L1 hit");
398 crate::metrics::record_operation("L1", "get", "hit");
399 crate::metrics::record_latency("L1", "get", start.elapsed());
400 return Ok(Some(item.clone()));
401 }
402
403 if let Some(ref l2) = self.l2_store {
405 match l2.get(id).await {
406 Ok(Some(item)) => {
407 self.insert_l1(item.clone());
409 tracing::Span::current().record("tier", "L2");
410 debug!("L2 hit, promoted to L1");
411 crate::metrics::record_operation("L2", "get", "hit");
412 crate::metrics::record_latency("L2", "get", start.elapsed());
413 return Ok(Some(item));
414 }
415 Ok(None) => {
416 debug!("L2 miss");
418 crate::metrics::record_operation("L2", "get", "miss");
419 }
420 Err(e) => {
421 warn!(error = %e, "L2 lookup failed");
422 crate::metrics::record_operation("L2", "get", "error");
423 }
424 }
425 }
426
427 if self.l3_filter.should_check_l3(id) {
429 crate::metrics::record_cuckoo_check("L3", "positive");
430 if let Some(ref l3) = self.l3_store {
431 match l3.get(id).await {
432 Ok(Some(item)) => {
433 if self.memory_pressure() < 1.0 {
435 self.insert_l1(item.clone());
436 }
437
438 if let Some(ref l2) = self.l2_store {
440 if let Err(e) = l2.put(&item).await {
441 warn!(id = %id, error = %e, "Failed to populate L2 on read");
442 } else {
443 debug!("L3 hit, promoted to L1 and L2");
444 }
445 } else {
446 debug!("L3 hit, promoted to L1");
447 }
448
449 tracing::Span::current().record("tier", "L3");
450 crate::metrics::record_operation("L3", "get", "hit");
451 crate::metrics::record_latency("L3", "get", start.elapsed());
452 crate::metrics::record_bytes_read("L3", item.content.len());
453 return Ok(Some(item));
454 }
455 Ok(None) => {
456 debug!("L3 filter false positive");
458 crate::metrics::record_operation("L3", "get", "false_positive");
459 crate::metrics::record_cuckoo_false_positive("L3");
460 }
461 Err(e) => {
462 warn!(error = %e, "L3 lookup failed");
463 crate::metrics::record_operation("L3", "get", "error");
464 crate::metrics::record_error("L3", "get", "backend");
465 }
466 }
467 }
468 } else {
469 crate::metrics::record_cuckoo_check("L3", "negative");
471 }
472
473 tracing::Span::current().record("tier", "miss");
474 debug!("Cache miss");
475 crate::metrics::record_operation("all", "get", "miss");
476 crate::metrics::record_latency("all", "get", start.elapsed());
477 Ok(None)
478 }
479
480 #[tracing::instrument(skip(self), fields(verified))]
485 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
486 let item = match self.get(id).await? {
487 Some(item) => item,
488 None => return Ok(None),
489 };
490
491 if !item.content_hash.is_empty() {
493 use sha2::{Sha256, Digest};
494
495 let computed = Sha256::digest(&item.content);
496 let computed_hex = hex::encode(computed);
497
498 if computed_hex != item.content_hash {
499 tracing::Span::current().record("verified", false);
500 warn!(
501 id = %id,
502 expected = %item.content_hash,
503 actual = %computed_hex,
504 "Data corruption detected!"
505 );
506
507 crate::metrics::record_corruption(id);
509
510 return Err(StorageError::Corruption {
511 id: id.to_string(),
512 expected: item.content_hash.clone(),
513 actual: computed_hex,
514 });
515 }
516
517 tracing::Span::current().record("verified", true);
518 debug!(id = %id, "Hash verification passed");
519 }
520
521 Ok(Some(item))
522 }
523
524 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
532 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
533 self.submit_with(item, SubmitOptions::default()).await
534 }
535
536 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
557 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
558 let start = std::time::Instant::now();
559
560 if !self.should_accept_writes() {
561 crate::metrics::record_operation("engine", "submit", "rejected");
562 crate::metrics::record_error("engine", "submit", "backpressure");
563 return Err(StorageError::Backend(format!(
564 "Rejecting write: engine state={}, pressure={}",
565 self.state(),
566 self.pressure()
567 )));
568 }
569
570 let id = item.object_id.clone();
571 let item_bytes = item.content.len();
572
573 if let Some(ref state) = options.state {
575 item.state = state.clone();
576 }
577
578 item.submit_options = Some(options);
580
581 self.insert_l1(item.clone());
583 crate::metrics::record_operation("L1", "submit", "success");
584 crate::metrics::record_bytes_written("L1", item_bytes);
585
586 let batch_to_flush = {
592 let mut batcher = self.l2_batcher.lock().await;
593 if let Some(reason) = batcher.add(item) {
594 batcher.force_flush_with_reason(reason)
595 } else {
596 None
597 }
598 };
599
600 if let Some(batch) = batch_to_flush {
601 self.flush_batch_internal(batch).await;
603 }
604
605 debug!(id = %id, "Item submitted to L1 and batch queue");
606 crate::metrics::record_latency("L1", "submit", start.elapsed());
607 Ok(())
608 }
609
610 #[tracing::instrument(skip(self), fields(object_id = %id))]
619 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
620 let start = std::time::Instant::now();
621
622 if !self.should_accept_writes() {
623 crate::metrics::record_operation("engine", "delete", "rejected");
624 crate::metrics::record_error("engine", "delete", "backpressure");
625 return Err(StorageError::Backend(format!(
626 "Rejecting delete: engine state={}, pressure={}",
627 self.state(),
628 self.pressure()
629 )));
630 }
631
632 let mut found = false;
633
634 if let Some((_, item)) = self.l1_cache.remove(id) {
636 let size = Self::item_size(&item);
637 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
638 found = true;
639 debug!("Deleted from L1");
640 crate::metrics::record_operation("L1", "delete", "success");
641 }
642
643 self.l3_filter.remove(id);
645
646 if let Some(ref l2) = self.l2_store {
648 let l2_start = std::time::Instant::now();
649 match l2.delete(id).await {
650 Ok(()) => {
651 found = true;
652 debug!("Deleted from L2 (Redis)");
653 crate::metrics::record_operation("L2", "delete", "success");
654 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
655 }
656 Err(e) => {
657 warn!(error = %e, "Failed to delete from L2 (Redis)");
658 crate::metrics::record_operation("L2", "delete", "error");
659 crate::metrics::record_error("L2", "delete", "backend");
660 }
661 }
662 }
663
664 if let Some(ref l3) = self.l3_store {
666 let l3_start = std::time::Instant::now();
667 match l3.delete(id).await {
668 Ok(()) => {
669 found = true;
670 debug!("Deleted from L3 (MySQL)");
671 crate::metrics::record_operation("L3", "delete", "success");
672 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
673 }
674 Err(e) => {
675 error!(error = %e, "Failed to delete from L3 (MySQL)");
676 crate::metrics::record_operation("L3", "delete", "error");
677 crate::metrics::record_error("L3", "delete", "backend");
678 }
680 }
681 }
682
683 let mut merkle_batch = MerkleBatch::new();
685 merkle_batch.delete(id.to_string());
686
687 if let Some(ref sql_merkle) = self.sql_merkle {
688 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
689 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
690 crate::metrics::record_error("L3", "merkle", "batch_apply");
691 }
692 }
693
694 if let Some(ref redis_merkle) = self.redis_merkle {
695 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
696 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
697 crate::metrics::record_error("L2", "merkle", "batch_apply");
698 }
699 }
700
701 if self.config.read().enable_cdc_stream && found {
703 self.emit_cdc_delete(id).await;
704 }
705
706 info!(found, "Delete operation completed");
707 crate::metrics::record_latency("all", "delete", start.elapsed());
708 Ok(found)
709 }
710
711 fn insert_l1(&self, item: SyncItem) {
715 let new_size = Self::item_size(&item);
716 let key = item.object_id.clone();
717
718 if let Some(old_item) = self.l1_cache.insert(key, item) {
720 let old_size = Self::item_size(&old_item);
722 let current = self.l1_size_bytes.load(Ordering::Acquire);
724 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
725 self.l1_size_bytes.store(new_total, Ordering::Release);
726 } else {
727 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
729 }
730 }
731
732 #[inline]
734 fn item_size(item: &SyncItem) -> usize {
735 item.size_bytes()
737 }
738
739 fn maybe_evict(&self) {
740 let pressure = self.memory_pressure();
741 if pressure < self.config.read().backpressure_warn {
742 return;
743 }
744
745 let level = BackpressureLevel::from_pressure(pressure);
746 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
747
748 let now = std::time::Instant::now();
750 let entries: Vec<CacheEntry> = self.l1_cache.iter()
751 .map(|ref_multi| {
752 let item = ref_multi.value();
753 let id = ref_multi.key().clone();
754
755 let now_millis = std::time::SystemTime::now()
757 .duration_since(std::time::UNIX_EPOCH)
758 .unwrap_or_default()
759 .as_millis() as u64;
760 let age_secs = if item.last_accessed > 0 {
761 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
762 } else {
763 3600.0 };
765
766 CacheEntry {
767 id,
768 size_bytes: item.size_bytes(),
769 created_at: now - std::time::Duration::from_secs_f64(age_secs),
770 last_access: now - std::time::Duration::from_secs_f64(age_secs),
771 access_count: item.access_count,
772 is_dirty: false, }
774 })
775 .collect();
776
777 if entries.is_empty() {
778 return;
779 }
780
781 let evict_count = match level {
783 BackpressureLevel::Normal => 0,
784 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
790
791 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
793
794 let mut evicted_bytes = 0usize;
796 for victim_id in &victims {
797 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
798 evicted_bytes += item.size_bytes();
799 }
800 }
801
802 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
804
805 info!(
806 evicted = victims.len(),
807 evicted_bytes = evicted_bytes,
808 pressure = %pressure,
809 level = %level,
810 "Evicted entries from L1 cache"
811 );
812 }
813
814 pub fn l1_stats(&self) -> (usize, usize) {
816 (
817 self.l1_cache.len(),
818 self.l1_size_bytes.load(Ordering::Acquire),
819 )
820 }
821
822 #[must_use]
824 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
825 self.l3_filter.stats()
826 }
827
828 pub fn l3_filter(&self) -> &Arc<FilterManager> {
830 &self.l3_filter
831 }
832
833 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
838 let redis_root = if let Some(ref rm) = self.redis_merkle {
839 rm.root_hash().await.ok().flatten().map(hex::encode)
840 } else {
841 None
842 };
843
844 let sql_root = if let Some(ref sm) = self.sql_merkle {
845 sm.root_hash().await.ok().flatten().map(hex::encode)
846 } else {
847 None
848 };
849
850 (redis_root, sql_root)
851 }
852
853 pub async fn verify_filter(&self) -> bool {
860 let sql_root = if let Some(ref sm) = self.sql_merkle {
862 match sm.root_hash().await {
863 Ok(Some(root)) => root,
864 _ => return false,
865 }
866 } else {
867 self.l3_filter.mark_trusted();
869 return true;
870 };
871
872 info!(
876 sql_root = %hex::encode(sql_root),
877 "Verifying L3 filter against SQL merkle root"
878 );
879
880 self.l3_filter.mark_trusted();
882 true
883 }
884
885 pub fn update_gauge_metrics(&self) {
890 let (l1_count, l1_bytes) = self.l1_stats();
891 crate::metrics::set_l1_cache_items(l1_count);
892 crate::metrics::set_l1_cache_bytes(l1_bytes);
893 crate::metrics::set_memory_pressure(self.memory_pressure());
894
895 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
896 let filter_load = if filter_capacity > 0 {
897 filter_entries as f64 / filter_capacity as f64
898 } else {
899 0.0
900 };
901 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
902 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
903
904 crate::metrics::set_backpressure_level(self.pressure() as u8);
905 }
906}
907
908#[cfg(test)]
909mod tests {
910 use super::*;
911 use crate::config::SyncEngineConfig;
912 use tokio::sync::watch;
913 use serde_json::json;
914
915 fn create_test_engine() -> SyncEngine {
916 let config = SyncEngineConfig::default();
917 let (_tx, rx) = watch::channel(config.clone());
918 SyncEngine::new(config, rx)
919 }
920
921 fn create_test_item(id: &str) -> SyncItem {
922 SyncItem::from_json(
923 id.to_string(),
924 json!({"data": "test"}),
925 )
926 }
927
928 #[test]
929 fn test_engine_created_state() {
930 let engine = create_test_engine();
931 assert_eq!(engine.state(), EngineState::Created);
932 assert!(!engine.is_ready());
933 }
934
935 #[test]
936 fn test_memory_pressure_calculation() {
937 let config = SyncEngineConfig {
938 l1_max_bytes: 1000,
939 ..Default::default()
940 };
941 let (_tx, rx) = watch::channel(config.clone());
942 let engine = SyncEngine::new(config, rx);
943
944 assert_eq!(engine.memory_pressure(), 0.0);
945
946 let item = create_test_item("test1");
948 engine.insert_l1(item);
949
950 assert!(engine.memory_pressure() > 0.0);
952 }
953
954 #[test]
955 fn test_l1_insert_and_size_tracking() {
956 let engine = create_test_engine();
957
958 let item = create_test_item("test1");
959 let expected_size = item.size_bytes();
960
961 engine.insert_l1(item);
962
963 let (count, size) = engine.l1_stats();
964 assert_eq!(count, 1);
965 assert_eq!(size, expected_size);
966 }
967
968 #[test]
969 fn test_l1_update_size_tracking() {
970 let engine = create_test_engine();
971
972 let item1 = create_test_item("test1");
973 engine.insert_l1(item1);
974 let (_, _size1) = engine.l1_stats();
975
976 let item2 = SyncItem::from_json(
978 "test1".to_string(),
979 json!({"data": "much larger content here for testing size changes"}),
980 );
981 let size2_expected = item2.size_bytes();
982 engine.insert_l1(item2);
983
984 let (count, size2) = engine.l1_stats();
985 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
988
989 #[tokio::test]
990 async fn test_get_nonexistent() {
991 let engine = create_test_engine();
992 let result = engine.get("nonexistent").await.unwrap();
993 assert!(result.is_none());
994 }
995
996 #[tokio::test]
997 async fn test_get_from_l1() {
998 let engine = create_test_engine();
999 let item = create_test_item("test1");
1000 engine.insert_l1(item.clone());
1001
1002 let result = engine.get("test1").await.unwrap();
1003 assert!(result.is_some());
1004 assert_eq!(result.unwrap().object_id, "test1");
1005 }
1006
1007 #[tokio::test]
1008 async fn test_delete_from_l1() {
1009 let engine = create_test_engine();
1010 let item = create_test_item("test1");
1011 engine.insert_l1(item);
1012
1013 let (count_before, _) = engine.l1_stats();
1014 assert_eq!(count_before, 1);
1015
1016 let deleted = engine.delete("test1").await.unwrap();
1017 assert!(deleted);
1018
1019 let (count_after, size_after) = engine.l1_stats();
1020 assert_eq!(count_after, 0);
1021 assert_eq!(size_after, 0);
1022 }
1023
1024 #[test]
1025 fn test_filter_stats() {
1026 let engine = create_test_engine();
1027
1028 let (entries, capacity, _trust) = engine.l3_filter_stats();
1029 assert_eq!(entries, 0);
1030 assert!(capacity > 0);
1031 }
1032
1033 #[test]
1034 fn test_should_accept_writes() {
1035 let engine = create_test_engine();
1036 assert!(engine.should_accept_writes());
1037 }
1038
1039 #[test]
1040 fn test_pressure_level() {
1041 let engine = create_test_engine();
1042 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_health_check_basic() {
1047 let engine = create_test_engine();
1048
1049 let health = engine.health_check().await;
1050
1051 assert_eq!(health.state, EngineState::Created);
1053 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1058 assert_eq!(health.l1_items, 0);
1059 assert_eq!(health.l1_bytes, 0);
1060
1061 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1063 assert!(health.accepting_writes);
1064
1065 assert!(health.redis_connected.is_none());
1067 assert!(health.sql_connected.is_none());
1068 assert!(health.wal_pending_items.is_none());
1069 }
1070}