1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75pub struct SyncEngine {
87 pub(super) config: RwLock<SyncEngineConfig>,
90
91 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94 pub(super) state: watch::Sender<EngineState>,
96
97 pub(super) state_rx: watch::Receiver<EngineState>,
99
100 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103 pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108
109 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114
115 pub(super) sql_store: Option<Arc<SqlStore>>,
117
118 pub(super) l3_filter: Arc<FilterManager>,
120
121 pub(super) filter_persistence: Option<FilterPersistence>,
123
124 pub(super) cf_inserts_since_snapshot: AtomicU64,
126 pub(super) cf_last_snapshot: Mutex<Instant>,
127
128 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131 pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134 pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137 pub(super) l3_wal: Option<WriteAheadLog>,
139
140 pub(super) mysql_health: MysqlHealthChecker,
142
143 pub(super) eviction_policy: TanCurvePolicy,
145
146 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149 pub(super) sql_write_semaphore: Arc<Semaphore>,
154}
155
156impl SyncEngine {
157 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
162 let (state_tx, state_rx) = watch::channel(EngineState::Created);
163
164 let batch_config = BatchConfig {
165 flush_ms: config.batch_flush_ms,
166 flush_count: config.batch_flush_count,
167 flush_bytes: config.batch_flush_bytes,
168 };
169
170 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
172
173 Self {
174 config: RwLock::new(config.clone()),
175 config_rx: Mutex::new(config_rx),
176 state: state_tx,
177 state_rx,
178 l1_cache: Arc::new(DashMap::new()),
179 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
180 l2_store: None,
181 redis_store: None,
182 l3_store: None,
183 sql_store: None,
184 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
185 filter_persistence: None,
186 cf_inserts_since_snapshot: AtomicU64::new(0),
187 cf_last_snapshot: Mutex::new(Instant::now()),
188 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
189 redis_merkle: None,
190 sql_merkle: None,
191 l3_wal: None,
192 mysql_health: MysqlHealthChecker::new(),
193 eviction_policy: TanCurvePolicy::default(),
194 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
195 sql_write_semaphore,
196 }
197 }
198
199 #[must_use]
201 pub fn state(&self) -> EngineState {
202 *self.state_rx.borrow()
203 }
204
205 #[must_use]
207 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
208 self.state_rx.clone()
209 }
210
211 #[must_use]
213 pub fn is_ready(&self) -> bool {
214 matches!(self.state(), EngineState::Ready | EngineState::Running)
215 }
216
217 #[must_use]
219 pub fn memory_pressure(&self) -> f64 {
220 let used = self.l1_size_bytes.load(Ordering::Acquire);
221 let max = self.config.read().l1_max_bytes;
222 if max == 0 {
223 0.0
224 } else {
225 used as f64 / max as f64
226 }
227 }
228
229 #[must_use]
231 pub fn pressure(&self) -> BackpressureLevel {
232 BackpressureLevel::from_pressure(self.memory_pressure())
233 }
234
235 #[must_use]
237 pub fn should_accept_writes(&self) -> bool {
238 let pressure = self.pressure();
239 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
240 }
241
242 pub async fn health_check(&self) -> types::HealthCheck {
269 let state = self.state();
271 let ready = matches!(state, EngineState::Ready | EngineState::Running);
272 let memory_pressure = self.memory_pressure();
273 let backpressure_level = self.pressure();
274 let accepting_writes = self.should_accept_writes();
275 let (l1_items, l1_bytes) = self.l1_stats();
276 let (filter_items, _, filter_trust) = self.l3_filter_stats();
277
278 let mysql_healthy = self.mysql_health.is_healthy();
281 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
282 let stats = wal.stats(mysql_healthy);
283 Some(stats.pending_items)
284 } else {
285 None
286 };
287
288 let (redis_result, sql_result) = tokio::join!(
290 self.probe_redis(),
291 self.probe_sql()
292 );
293
294 let (redis_connected, redis_latency_ms) = redis_result;
295 let (sql_connected, sql_latency_ms) = sql_result;
296
297 let healthy = matches!(state, EngineState::Running)
300 && redis_connected != Some(false)
301 && sql_connected != Some(false)
302 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
303
304 types::HealthCheck {
305 state,
306 ready,
307 memory_pressure,
308 backpressure_level,
309 accepting_writes,
310 l1_items,
311 l1_bytes,
312 filter_items,
313 filter_trust,
314 redis_connected,
315 redis_latency_ms,
316 sql_connected,
317 sql_latency_ms,
318 wal_pending_items,
319 healthy,
320 }
321 }
322
323 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
325 let Some(ref redis_store) = self.redis_store else {
326 return (None, None); };
328
329 let start = std::time::Instant::now();
330 let mut conn = redis_store.connection();
331
332 let result: Result<String, _> = redis::cmd("PING")
333 .query_async(&mut conn)
334 .await;
335
336 match result {
337 Ok(_) => {
338 let latency_ms = start.elapsed().as_millis() as u64;
339 (Some(true), Some(latency_ms))
340 }
341 Err(_) => (Some(false), None),
342 }
343 }
344
345 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
347 let Some(ref sql_store) = self.sql_store else {
348 return (None, None); };
350
351 let start = std::time::Instant::now();
352 let pool = sql_store.pool();
353
354 let result = sqlx::query("SELECT 1")
355 .fetch_one(&pool)
356 .await;
357
358 match result {
359 Ok(_) => {
360 let latency_ms = start.elapsed().as_millis() as u64;
361 (Some(true), Some(latency_ms))
362 }
363 Err(_) => (Some(false), None),
364 }
365 }
366
367 #[tracing::instrument(skip(self), fields(tier))]
374 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
375 let start = std::time::Instant::now();
376
377 if let Some(mut item) = self.l1_cache.get_mut(id) {
379 item.access_count = item.access_count.saturating_add(1);
380 item.last_accessed = std::time::SystemTime::now()
381 .duration_since(std::time::UNIX_EPOCH)
382 .unwrap_or_default()
383 .as_millis() as u64;
384 tracing::Span::current().record("tier", "L1");
385 debug!("L1 hit");
386 crate::metrics::record_operation("L1", "get", "hit");
387 crate::metrics::record_latency("L1", "get", start.elapsed());
388 return Ok(Some(item.clone()));
389 }
390
391 if let Some(ref l2) = self.l2_store {
393 match l2.get(id).await {
394 Ok(Some(item)) => {
395 self.insert_l1(item.clone());
397 tracing::Span::current().record("tier", "L2");
398 debug!("L2 hit, promoted to L1");
399 crate::metrics::record_operation("L2", "get", "hit");
400 crate::metrics::record_latency("L2", "get", start.elapsed());
401 return Ok(Some(item));
402 }
403 Ok(None) => {
404 debug!("L2 miss");
406 crate::metrics::record_operation("L2", "get", "miss");
407 }
408 Err(e) => {
409 warn!(error = %e, "L2 lookup failed");
410 crate::metrics::record_operation("L2", "get", "error");
411 }
412 }
413 }
414
415 if self.l3_filter.should_check_l3(id) {
417 crate::metrics::record_cuckoo_check("L3", "positive");
418 if let Some(ref l3) = self.l3_store {
419 match l3.get(id).await {
420 Ok(Some(item)) => {
421 if self.memory_pressure() < 1.0 {
423 self.insert_l1(item.clone());
424 }
425 tracing::Span::current().record("tier", "L3");
426 debug!("L3 hit, promoted to L1");
427 crate::metrics::record_operation("L3", "get", "hit");
428 crate::metrics::record_latency("L3", "get", start.elapsed());
429 crate::metrics::record_bytes_read("L3", item.content.len());
430 return Ok(Some(item));
431 }
432 Ok(None) => {
433 debug!("L3 filter false positive");
435 crate::metrics::record_operation("L3", "get", "false_positive");
436 crate::metrics::record_cuckoo_false_positive("L3");
437 }
438 Err(e) => {
439 warn!(error = %e, "L3 lookup failed");
440 crate::metrics::record_operation("L3", "get", "error");
441 crate::metrics::record_error("L3", "get", "backend");
442 }
443 }
444 }
445 } else {
446 crate::metrics::record_cuckoo_check("L3", "negative");
448 }
449
450 tracing::Span::current().record("tier", "miss");
451 debug!("Cache miss");
452 crate::metrics::record_operation("all", "get", "miss");
453 crate::metrics::record_latency("all", "get", start.elapsed());
454 Ok(None)
455 }
456
457 #[tracing::instrument(skip(self), fields(verified))]
462 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
463 let item = match self.get(id).await? {
464 Some(item) => item,
465 None => return Ok(None),
466 };
467
468 if !item.content_hash.is_empty() {
470 use sha2::{Sha256, Digest};
471
472 let computed = Sha256::digest(&item.content);
473 let computed_hex = hex::encode(computed);
474
475 if computed_hex != item.content_hash {
476 tracing::Span::current().record("verified", false);
477 warn!(
478 id = %id,
479 expected = %item.content_hash,
480 actual = %computed_hex,
481 "Data corruption detected!"
482 );
483
484 crate::metrics::record_corruption(id);
486
487 return Err(StorageError::Corruption {
488 id: id.to_string(),
489 expected: item.content_hash.clone(),
490 actual: computed_hex,
491 });
492 }
493
494 tracing::Span::current().record("verified", true);
495 debug!(id = %id, "Hash verification passed");
496 }
497
498 Ok(Some(item))
499 }
500
501 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
509 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
510 self.submit_with(item, SubmitOptions::default()).await
511 }
512
513 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
534 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
535 let start = std::time::Instant::now();
536
537 if !self.should_accept_writes() {
538 crate::metrics::record_operation("engine", "submit", "rejected");
539 crate::metrics::record_error("engine", "submit", "backpressure");
540 return Err(StorageError::Backend(format!(
541 "Rejecting write: engine state={}, pressure={}",
542 self.state(),
543 self.pressure()
544 )));
545 }
546
547 let id = item.object_id.clone();
548 let item_bytes = item.content.len();
549
550 if let Some(ref state) = options.state {
552 item.state = state.clone();
553 }
554
555 item.submit_options = Some(options);
557
558 self.insert_l1(item.clone());
560 crate::metrics::record_operation("L1", "submit", "success");
561 crate::metrics::record_bytes_written("L1", item_bytes);
562
563 let batch_to_flush = {
569 let mut batcher = self.l2_batcher.lock().await;
570 if let Some(reason) = batcher.add(item) {
571 batcher.force_flush_with_reason(reason)
572 } else {
573 None
574 }
575 };
576
577 if let Some(batch) = batch_to_flush {
578 self.flush_batch_internal(batch).await;
580 }
581
582 debug!(id = %id, "Item submitted to L1 and batch queue");
583 crate::metrics::record_latency("L1", "submit", start.elapsed());
584 Ok(())
585 }
586
587 #[tracing::instrument(skip(self), fields(object_id = %id))]
596 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
597 let start = std::time::Instant::now();
598
599 if !self.should_accept_writes() {
600 crate::metrics::record_operation("engine", "delete", "rejected");
601 crate::metrics::record_error("engine", "delete", "backpressure");
602 return Err(StorageError::Backend(format!(
603 "Rejecting delete: engine state={}, pressure={}",
604 self.state(),
605 self.pressure()
606 )));
607 }
608
609 let mut found = false;
610
611 if let Some((_, item)) = self.l1_cache.remove(id) {
613 let size = Self::item_size(&item);
614 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
615 found = true;
616 debug!("Deleted from L1");
617 crate::metrics::record_operation("L1", "delete", "success");
618 }
619
620 self.l3_filter.remove(id);
622
623 if let Some(ref l2) = self.l2_store {
625 let l2_start = std::time::Instant::now();
626 match l2.delete(id).await {
627 Ok(()) => {
628 found = true;
629 debug!("Deleted from L2 (Redis)");
630 crate::metrics::record_operation("L2", "delete", "success");
631 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
632 }
633 Err(e) => {
634 warn!(error = %e, "Failed to delete from L2 (Redis)");
635 crate::metrics::record_operation("L2", "delete", "error");
636 crate::metrics::record_error("L2", "delete", "backend");
637 }
638 }
639 }
640
641 if let Some(ref l3) = self.l3_store {
643 let l3_start = std::time::Instant::now();
644 match l3.delete(id).await {
645 Ok(()) => {
646 found = true;
647 debug!("Deleted from L3 (MySQL)");
648 crate::metrics::record_operation("L3", "delete", "success");
649 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
650 }
651 Err(e) => {
652 error!(error = %e, "Failed to delete from L3 (MySQL)");
653 crate::metrics::record_operation("L3", "delete", "error");
654 crate::metrics::record_error("L3", "delete", "backend");
655 }
657 }
658 }
659
660 let mut merkle_batch = MerkleBatch::new();
662 merkle_batch.delete(id.to_string());
663
664 if let Some(ref sql_merkle) = self.sql_merkle {
665 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
666 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
667 crate::metrics::record_error("L3", "merkle", "batch_apply");
668 }
669 }
670
671 if let Some(ref redis_merkle) = self.redis_merkle {
672 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
673 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
674 crate::metrics::record_error("L2", "merkle", "batch_apply");
675 }
676 }
677
678 if self.config.read().enable_cdc_stream && found {
680 self.emit_cdc_delete(id).await;
681 }
682
683 info!(found, "Delete operation completed");
684 crate::metrics::record_latency("all", "delete", start.elapsed());
685 Ok(found)
686 }
687
688 fn insert_l1(&self, item: SyncItem) {
692 let new_size = Self::item_size(&item);
693 let key = item.object_id.clone();
694
695 if let Some(old_item) = self.l1_cache.insert(key, item) {
697 let old_size = Self::item_size(&old_item);
699 let current = self.l1_size_bytes.load(Ordering::Acquire);
701 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
702 self.l1_size_bytes.store(new_total, Ordering::Release);
703 } else {
704 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
706 }
707 }
708
709 #[inline]
711 fn item_size(item: &SyncItem) -> usize {
712 item.size_bytes()
714 }
715
716 fn maybe_evict(&self) {
717 let pressure = self.memory_pressure();
718 if pressure < self.config.read().backpressure_warn {
719 return;
720 }
721
722 let level = BackpressureLevel::from_pressure(pressure);
723 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
724
725 let now = std::time::Instant::now();
727 let entries: Vec<CacheEntry> = self.l1_cache.iter()
728 .map(|ref_multi| {
729 let item = ref_multi.value();
730 let id = ref_multi.key().clone();
731
732 let now_millis = std::time::SystemTime::now()
734 .duration_since(std::time::UNIX_EPOCH)
735 .unwrap_or_default()
736 .as_millis() as u64;
737 let age_secs = if item.last_accessed > 0 {
738 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
739 } else {
740 3600.0 };
742
743 CacheEntry {
744 id,
745 size_bytes: item.size_bytes(),
746 created_at: now - std::time::Duration::from_secs_f64(age_secs),
747 last_access: now - std::time::Duration::from_secs_f64(age_secs),
748 access_count: item.access_count,
749 is_dirty: false, }
751 })
752 .collect();
753
754 if entries.is_empty() {
755 return;
756 }
757
758 let evict_count = match level {
760 BackpressureLevel::Normal => 0,
761 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
767
768 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
770
771 let mut evicted_bytes = 0usize;
773 for victim_id in &victims {
774 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
775 evicted_bytes += item.size_bytes();
776 }
777 }
778
779 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
781
782 info!(
783 evicted = victims.len(),
784 evicted_bytes = evicted_bytes,
785 pressure = %pressure,
786 level = %level,
787 "Evicted entries from L1 cache"
788 );
789 }
790
791 pub fn l1_stats(&self) -> (usize, usize) {
793 (
794 self.l1_cache.len(),
795 self.l1_size_bytes.load(Ordering::Acquire),
796 )
797 }
798
799 #[must_use]
801 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
802 self.l3_filter.stats()
803 }
804
805 pub fn l3_filter(&self) -> &Arc<FilterManager> {
807 &self.l3_filter
808 }
809
810 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
815 let redis_root = if let Some(ref rm) = self.redis_merkle {
816 rm.root_hash().await.ok().flatten().map(hex::encode)
817 } else {
818 None
819 };
820
821 let sql_root = if let Some(ref sm) = self.sql_merkle {
822 sm.root_hash().await.ok().flatten().map(hex::encode)
823 } else {
824 None
825 };
826
827 (redis_root, sql_root)
828 }
829
830 pub async fn verify_filter(&self) -> bool {
837 let sql_root = if let Some(ref sm) = self.sql_merkle {
839 match sm.root_hash().await {
840 Ok(Some(root)) => root,
841 _ => return false,
842 }
843 } else {
844 self.l3_filter.mark_trusted();
846 return true;
847 };
848
849 info!(
853 sql_root = %hex::encode(sql_root),
854 "Verifying L3 filter against SQL merkle root"
855 );
856
857 self.l3_filter.mark_trusted();
859 true
860 }
861
862 pub fn update_gauge_metrics(&self) {
867 let (l1_count, l1_bytes) = self.l1_stats();
868 crate::metrics::set_l1_cache_items(l1_count);
869 crate::metrics::set_l1_cache_bytes(l1_bytes);
870 crate::metrics::set_memory_pressure(self.memory_pressure());
871
872 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
873 let filter_load = if filter_capacity > 0 {
874 filter_entries as f64 / filter_capacity as f64
875 } else {
876 0.0
877 };
878 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
879 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
880
881 crate::metrics::set_backpressure_level(self.pressure() as u8);
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::config::SyncEngineConfig;
889 use tokio::sync::watch;
890 use serde_json::json;
891
892 fn create_test_engine() -> SyncEngine {
893 let config = SyncEngineConfig::default();
894 let (_tx, rx) = watch::channel(config.clone());
895 SyncEngine::new(config, rx)
896 }
897
898 fn create_test_item(id: &str) -> SyncItem {
899 SyncItem::from_json(
900 id.to_string(),
901 json!({"data": "test"}),
902 )
903 }
904
905 #[test]
906 fn test_engine_created_state() {
907 let engine = create_test_engine();
908 assert_eq!(engine.state(), EngineState::Created);
909 assert!(!engine.is_ready());
910 }
911
912 #[test]
913 fn test_memory_pressure_calculation() {
914 let config = SyncEngineConfig {
915 l1_max_bytes: 1000,
916 ..Default::default()
917 };
918 let (_tx, rx) = watch::channel(config.clone());
919 let engine = SyncEngine::new(config, rx);
920
921 assert_eq!(engine.memory_pressure(), 0.0);
922
923 let item = create_test_item("test1");
925 engine.insert_l1(item);
926
927 assert!(engine.memory_pressure() > 0.0);
929 }
930
931 #[test]
932 fn test_l1_insert_and_size_tracking() {
933 let engine = create_test_engine();
934
935 let item = create_test_item("test1");
936 let expected_size = item.size_bytes();
937
938 engine.insert_l1(item);
939
940 let (count, size) = engine.l1_stats();
941 assert_eq!(count, 1);
942 assert_eq!(size, expected_size);
943 }
944
945 #[test]
946 fn test_l1_update_size_tracking() {
947 let engine = create_test_engine();
948
949 let item1 = create_test_item("test1");
950 engine.insert_l1(item1);
951 let (_, _size1) = engine.l1_stats();
952
953 let item2 = SyncItem::from_json(
955 "test1".to_string(),
956 json!({"data": "much larger content here for testing size changes"}),
957 );
958 let size2_expected = item2.size_bytes();
959 engine.insert_l1(item2);
960
961 let (count, size2) = engine.l1_stats();
962 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
965
966 #[tokio::test]
967 async fn test_get_nonexistent() {
968 let engine = create_test_engine();
969 let result = engine.get("nonexistent").await.unwrap();
970 assert!(result.is_none());
971 }
972
973 #[tokio::test]
974 async fn test_get_from_l1() {
975 let engine = create_test_engine();
976 let item = create_test_item("test1");
977 engine.insert_l1(item.clone());
978
979 let result = engine.get("test1").await.unwrap();
980 assert!(result.is_some());
981 assert_eq!(result.unwrap().object_id, "test1");
982 }
983
984 #[tokio::test]
985 async fn test_delete_from_l1() {
986 let engine = create_test_engine();
987 let item = create_test_item("test1");
988 engine.insert_l1(item);
989
990 let (count_before, _) = engine.l1_stats();
991 assert_eq!(count_before, 1);
992
993 let deleted = engine.delete("test1").await.unwrap();
994 assert!(deleted);
995
996 let (count_after, size_after) = engine.l1_stats();
997 assert_eq!(count_after, 0);
998 assert_eq!(size_after, 0);
999 }
1000
1001 #[test]
1002 fn test_filter_stats() {
1003 let engine = create_test_engine();
1004
1005 let (entries, capacity, _trust) = engine.l3_filter_stats();
1006 assert_eq!(entries, 0);
1007 assert!(capacity > 0);
1008 }
1009
1010 #[test]
1011 fn test_should_accept_writes() {
1012 let engine = create_test_engine();
1013 assert!(engine.should_accept_writes());
1014 }
1015
1016 #[test]
1017 fn test_pressure_level() {
1018 let engine = create_test_engine();
1019 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1020 }
1021
1022 #[tokio::test]
1023 async fn test_health_check_basic() {
1024 let engine = create_test_engine();
1025
1026 let health = engine.health_check().await;
1027
1028 assert_eq!(health.state, EngineState::Created);
1030 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1035 assert_eq!(health.l1_items, 0);
1036 assert_eq!(health.l1_bytes, 0);
1037
1038 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1040 assert!(health.accepting_writes);
1041
1042 assert!(health.redis_connected.is_none());
1044 assert!(health.sql_connected.is_none());
1045 assert!(health.wal_pending_items.is_none());
1046 }
1047}