1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75pub struct SyncEngine {
87 pub(super) config: RwLock<SyncEngineConfig>,
90
91 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94 pub(super) state: watch::Sender<EngineState>,
96
97 pub(super) state_rx: watch::Receiver<EngineState>,
99
100 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103 pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108
109 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114
115 pub(super) sql_store: Option<Arc<SqlStore>>,
117
118 pub(super) l3_filter: Arc<FilterManager>,
120
121 pub(super) filter_persistence: Option<FilterPersistence>,
123
124 pub(super) cf_inserts_since_snapshot: AtomicU64,
126 pub(super) cf_last_snapshot: Mutex<Instant>,
127
128 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131 pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134 pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137 pub(super) l3_wal: Option<WriteAheadLog>,
139
140 pub(super) mysql_health: MysqlHealthChecker,
142
143 pub(super) eviction_policy: TanCurvePolicy,
145
146 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148}
149
150impl SyncEngine {
151 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
156 let (state_tx, state_rx) = watch::channel(EngineState::Created);
157
158 let batch_config = BatchConfig {
159 flush_ms: config.batch_flush_ms,
160 flush_count: config.batch_flush_count,
161 flush_bytes: config.batch_flush_bytes,
162 };
163
164 Self {
165 config: RwLock::new(config.clone()),
166 config_rx: Mutex::new(config_rx),
167 state: state_tx,
168 state_rx,
169 l1_cache: Arc::new(DashMap::new()),
170 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
171 l2_store: None,
172 redis_store: None,
173 l3_store: None,
174 sql_store: None,
175 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
176 filter_persistence: None,
177 cf_inserts_since_snapshot: AtomicU64::new(0),
178 cf_last_snapshot: Mutex::new(Instant::now()),
179 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
180 redis_merkle: None,
181 sql_merkle: None,
182 l3_wal: None,
183 mysql_health: MysqlHealthChecker::new(),
184 eviction_policy: TanCurvePolicy::default(),
185 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
186 }
187 }
188
189 #[must_use]
191 pub fn state(&self) -> EngineState {
192 *self.state_rx.borrow()
193 }
194
195 #[must_use]
197 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
198 self.state_rx.clone()
199 }
200
201 #[must_use]
203 pub fn is_ready(&self) -> bool {
204 matches!(self.state(), EngineState::Ready | EngineState::Running)
205 }
206
207 #[must_use]
209 pub fn memory_pressure(&self) -> f64 {
210 let used = self.l1_size_bytes.load(Ordering::Acquire);
211 let max = self.config.read().l1_max_bytes;
212 if max == 0 {
213 0.0
214 } else {
215 used as f64 / max as f64
216 }
217 }
218
219 #[must_use]
221 pub fn pressure(&self) -> BackpressureLevel {
222 BackpressureLevel::from_pressure(self.memory_pressure())
223 }
224
225 #[must_use]
227 pub fn should_accept_writes(&self) -> bool {
228 let pressure = self.pressure();
229 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
230 }
231
232 pub async fn health_check(&self) -> types::HealthCheck {
259 let state = self.state();
261 let ready = matches!(state, EngineState::Ready | EngineState::Running);
262 let memory_pressure = self.memory_pressure();
263 let backpressure_level = self.pressure();
264 let accepting_writes = self.should_accept_writes();
265 let (l1_items, l1_bytes) = self.l1_stats();
266 let (filter_items, _, filter_trust) = self.l3_filter_stats();
267
268 let mysql_healthy = self.mysql_health.is_healthy();
271 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
272 let stats = wal.stats(mysql_healthy);
273 Some(stats.pending_items)
274 } else {
275 None
276 };
277
278 let (redis_result, sql_result) = tokio::join!(
280 self.probe_redis(),
281 self.probe_sql()
282 );
283
284 let (redis_connected, redis_latency_ms) = redis_result;
285 let (sql_connected, sql_latency_ms) = sql_result;
286
287 let healthy = matches!(state, EngineState::Running)
290 && redis_connected != Some(false)
291 && sql_connected != Some(false)
292 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
293
294 types::HealthCheck {
295 state,
296 ready,
297 memory_pressure,
298 backpressure_level,
299 accepting_writes,
300 l1_items,
301 l1_bytes,
302 filter_items,
303 filter_trust,
304 redis_connected,
305 redis_latency_ms,
306 sql_connected,
307 sql_latency_ms,
308 wal_pending_items,
309 healthy,
310 }
311 }
312
313 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
315 let Some(ref redis_store) = self.redis_store else {
316 return (None, None); };
318
319 let start = std::time::Instant::now();
320 let mut conn = redis_store.connection();
321
322 let result: Result<String, _> = redis::cmd("PING")
323 .query_async(&mut conn)
324 .await;
325
326 match result {
327 Ok(_) => {
328 let latency_ms = start.elapsed().as_millis() as u64;
329 (Some(true), Some(latency_ms))
330 }
331 Err(_) => (Some(false), None),
332 }
333 }
334
335 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
337 let Some(ref sql_store) = self.sql_store else {
338 return (None, None); };
340
341 let start = std::time::Instant::now();
342 let pool = sql_store.pool();
343
344 let result = sqlx::query("SELECT 1")
345 .fetch_one(&pool)
346 .await;
347
348 match result {
349 Ok(_) => {
350 let latency_ms = start.elapsed().as_millis() as u64;
351 (Some(true), Some(latency_ms))
352 }
353 Err(_) => (Some(false), None),
354 }
355 }
356
357 #[tracing::instrument(skip(self), fields(tier))]
364 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
365 let start = std::time::Instant::now();
366
367 if let Some(mut item) = self.l1_cache.get_mut(id) {
369 item.access_count = item.access_count.saturating_add(1);
370 item.last_accessed = std::time::SystemTime::now()
371 .duration_since(std::time::UNIX_EPOCH)
372 .unwrap_or_default()
373 .as_millis() as u64;
374 tracing::Span::current().record("tier", "L1");
375 debug!("L1 hit");
376 crate::metrics::record_operation("L1", "get", "hit");
377 crate::metrics::record_latency("L1", "get", start.elapsed());
378 return Ok(Some(item.clone()));
379 }
380
381 if let Some(ref l2) = self.l2_store {
383 match l2.get(id).await {
384 Ok(Some(item)) => {
385 self.insert_l1(item.clone());
387 tracing::Span::current().record("tier", "L2");
388 debug!("L2 hit, promoted to L1");
389 crate::metrics::record_operation("L2", "get", "hit");
390 crate::metrics::record_latency("L2", "get", start.elapsed());
391 return Ok(Some(item));
392 }
393 Ok(None) => {
394 debug!("L2 miss");
396 crate::metrics::record_operation("L2", "get", "miss");
397 }
398 Err(e) => {
399 warn!(error = %e, "L2 lookup failed");
400 crate::metrics::record_operation("L2", "get", "error");
401 }
402 }
403 }
404
405 if self.l3_filter.should_check_l3(id) {
407 crate::metrics::record_cuckoo_check("L3", "positive");
408 if let Some(ref l3) = self.l3_store {
409 match l3.get(id).await {
410 Ok(Some(item)) => {
411 if self.memory_pressure() < 1.0 {
413 self.insert_l1(item.clone());
414 }
415 tracing::Span::current().record("tier", "L3");
416 debug!("L3 hit, promoted to L1");
417 crate::metrics::record_operation("L3", "get", "hit");
418 crate::metrics::record_latency("L3", "get", start.elapsed());
419 crate::metrics::record_bytes_read("L3", item.content.len());
420 return Ok(Some(item));
421 }
422 Ok(None) => {
423 debug!("L3 filter false positive");
425 crate::metrics::record_operation("L3", "get", "false_positive");
426 crate::metrics::record_cuckoo_false_positive("L3");
427 }
428 Err(e) => {
429 warn!(error = %e, "L3 lookup failed");
430 crate::metrics::record_operation("L3", "get", "error");
431 crate::metrics::record_error("L3", "get", "backend");
432 }
433 }
434 }
435 } else {
436 crate::metrics::record_cuckoo_check("L3", "negative");
438 }
439
440 tracing::Span::current().record("tier", "miss");
441 debug!("Cache miss");
442 crate::metrics::record_operation("all", "get", "miss");
443 crate::metrics::record_latency("all", "get", start.elapsed());
444 Ok(None)
445 }
446
447 #[tracing::instrument(skip(self), fields(verified))]
452 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
453 let item = match self.get(id).await? {
454 Some(item) => item,
455 None => return Ok(None),
456 };
457
458 if !item.content_hash.is_empty() {
460 use sha2::{Sha256, Digest};
461
462 let computed = Sha256::digest(&item.content);
463 let computed_hex = hex::encode(computed);
464
465 if computed_hex != item.content_hash {
466 tracing::Span::current().record("verified", false);
467 warn!(
468 id = %id,
469 expected = %item.content_hash,
470 actual = %computed_hex,
471 "Data corruption detected!"
472 );
473
474 crate::metrics::record_corruption(id);
476
477 return Err(StorageError::Corruption {
478 id: id.to_string(),
479 expected: item.content_hash.clone(),
480 actual: computed_hex,
481 });
482 }
483
484 tracing::Span::current().record("verified", true);
485 debug!(id = %id, "Hash verification passed");
486 }
487
488 Ok(Some(item))
489 }
490
491 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
499 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
500 self.submit_with(item, SubmitOptions::default()).await
501 }
502
503 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
524 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
525 let start = std::time::Instant::now();
526
527 if !self.should_accept_writes() {
528 crate::metrics::record_operation("engine", "submit", "rejected");
529 crate::metrics::record_error("engine", "submit", "backpressure");
530 return Err(StorageError::Backend(format!(
531 "Rejecting write: engine state={}, pressure={}",
532 self.state(),
533 self.pressure()
534 )));
535 }
536
537 let id = item.object_id.clone();
538 let item_bytes = item.content.len();
539
540 if let Some(ref state) = options.state {
542 item.state = state.clone();
543 }
544
545 item.submit_options = Some(options);
547
548 self.insert_l1(item.clone());
550 crate::metrics::record_operation("L1", "submit", "success");
551 crate::metrics::record_bytes_written("L1", item_bytes);
552
553 self.l2_batcher.lock().await.add(item);
559
560 debug!(id = %id, "Item submitted to L1 and batch queue");
561 crate::metrics::record_latency("L1", "submit", start.elapsed());
562 Ok(())
563 }
564
565 #[tracing::instrument(skip(self), fields(object_id = %id))]
574 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
575 let start = std::time::Instant::now();
576
577 if !self.should_accept_writes() {
578 crate::metrics::record_operation("engine", "delete", "rejected");
579 crate::metrics::record_error("engine", "delete", "backpressure");
580 return Err(StorageError::Backend(format!(
581 "Rejecting delete: engine state={}, pressure={}",
582 self.state(),
583 self.pressure()
584 )));
585 }
586
587 let mut found = false;
588
589 if let Some((_, item)) = self.l1_cache.remove(id) {
591 let size = Self::item_size(&item);
592 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
593 found = true;
594 debug!("Deleted from L1");
595 crate::metrics::record_operation("L1", "delete", "success");
596 }
597
598 self.l3_filter.remove(id);
600
601 if let Some(ref l2) = self.l2_store {
603 let l2_start = std::time::Instant::now();
604 match l2.delete(id).await {
605 Ok(()) => {
606 found = true;
607 debug!("Deleted from L2 (Redis)");
608 crate::metrics::record_operation("L2", "delete", "success");
609 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
610 }
611 Err(e) => {
612 warn!(error = %e, "Failed to delete from L2 (Redis)");
613 crate::metrics::record_operation("L2", "delete", "error");
614 crate::metrics::record_error("L2", "delete", "backend");
615 }
616 }
617 }
618
619 if let Some(ref l3) = self.l3_store {
621 let l3_start = std::time::Instant::now();
622 match l3.delete(id).await {
623 Ok(()) => {
624 found = true;
625 debug!("Deleted from L3 (MySQL)");
626 crate::metrics::record_operation("L3", "delete", "success");
627 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
628 }
629 Err(e) => {
630 error!(error = %e, "Failed to delete from L3 (MySQL)");
631 crate::metrics::record_operation("L3", "delete", "error");
632 crate::metrics::record_error("L3", "delete", "backend");
633 }
635 }
636 }
637
638 let mut merkle_batch = MerkleBatch::new();
640 merkle_batch.delete(id.to_string());
641
642 if let Some(ref sql_merkle) = self.sql_merkle {
643 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
644 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
645 crate::metrics::record_error("L3", "merkle", "batch_apply");
646 }
647 }
648
649 if let Some(ref redis_merkle) = self.redis_merkle {
650 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
651 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
652 crate::metrics::record_error("L2", "merkle", "batch_apply");
653 }
654 }
655
656 if self.config.read().enable_cdc_stream && found {
658 self.emit_cdc_delete(id).await;
659 }
660
661 info!(found, "Delete operation completed");
662 crate::metrics::record_latency("all", "delete", start.elapsed());
663 Ok(found)
664 }
665
666 fn insert_l1(&self, item: SyncItem) {
670 let new_size = Self::item_size(&item);
671 let key = item.object_id.clone();
672
673 if let Some(old_item) = self.l1_cache.insert(key, item) {
675 let old_size = Self::item_size(&old_item);
677 let current = self.l1_size_bytes.load(Ordering::Acquire);
679 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
680 self.l1_size_bytes.store(new_total, Ordering::Release);
681 } else {
682 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
684 }
685 }
686
687 #[inline]
689 fn item_size(item: &SyncItem) -> usize {
690 item.size_bytes()
692 }
693
694 fn maybe_evict(&self) {
695 let pressure = self.memory_pressure();
696 if pressure < self.config.read().backpressure_warn {
697 return;
698 }
699
700 let level = BackpressureLevel::from_pressure(pressure);
701 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
702
703 let now = std::time::Instant::now();
705 let entries: Vec<CacheEntry> = self.l1_cache.iter()
706 .map(|ref_multi| {
707 let item = ref_multi.value();
708 let id = ref_multi.key().clone();
709
710 let now_millis = std::time::SystemTime::now()
712 .duration_since(std::time::UNIX_EPOCH)
713 .unwrap_or_default()
714 .as_millis() as u64;
715 let age_secs = if item.last_accessed > 0 {
716 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
717 } else {
718 3600.0 };
720
721 CacheEntry {
722 id,
723 size_bytes: item.size_bytes(),
724 created_at: now - std::time::Duration::from_secs_f64(age_secs),
725 last_access: now - std::time::Duration::from_secs_f64(age_secs),
726 access_count: item.access_count,
727 is_dirty: false, }
729 })
730 .collect();
731
732 if entries.is_empty() {
733 return;
734 }
735
736 let evict_count = match level {
738 BackpressureLevel::Normal => 0,
739 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
745
746 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
748
749 let mut evicted_bytes = 0usize;
751 for victim_id in &victims {
752 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
753 evicted_bytes += item.size_bytes();
754 }
755 }
756
757 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
759
760 info!(
761 evicted = victims.len(),
762 evicted_bytes = evicted_bytes,
763 pressure = %pressure,
764 level = %level,
765 "Evicted entries from L1 cache"
766 );
767 }
768
769 pub fn l1_stats(&self) -> (usize, usize) {
771 (
772 self.l1_cache.len(),
773 self.l1_size_bytes.load(Ordering::Acquire),
774 )
775 }
776
777 #[must_use]
779 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
780 self.l3_filter.stats()
781 }
782
783 pub fn l3_filter(&self) -> &Arc<FilterManager> {
785 &self.l3_filter
786 }
787
788 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
793 let redis_root = if let Some(ref rm) = self.redis_merkle {
794 rm.root_hash().await.ok().flatten().map(hex::encode)
795 } else {
796 None
797 };
798
799 let sql_root = if let Some(ref sm) = self.sql_merkle {
800 sm.root_hash().await.ok().flatten().map(hex::encode)
801 } else {
802 None
803 };
804
805 (redis_root, sql_root)
806 }
807
808 pub async fn verify_filter(&self) -> bool {
815 let sql_root = if let Some(ref sm) = self.sql_merkle {
817 match sm.root_hash().await {
818 Ok(Some(root)) => root,
819 _ => return false,
820 }
821 } else {
822 self.l3_filter.mark_trusted();
824 return true;
825 };
826
827 info!(
831 sql_root = %hex::encode(sql_root),
832 "Verifying L3 filter against SQL merkle root"
833 );
834
835 self.l3_filter.mark_trusted();
837 true
838 }
839
840 pub fn update_gauge_metrics(&self) {
845 let (l1_count, l1_bytes) = self.l1_stats();
846 crate::metrics::set_l1_cache_items(l1_count);
847 crate::metrics::set_l1_cache_bytes(l1_bytes);
848 crate::metrics::set_memory_pressure(self.memory_pressure());
849
850 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
851 let filter_load = if filter_capacity > 0 {
852 filter_entries as f64 / filter_capacity as f64
853 } else {
854 0.0
855 };
856 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
857 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
858
859 crate::metrics::set_backpressure_level(self.pressure() as u8);
860 }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866 use crate::config::SyncEngineConfig;
867 use tokio::sync::watch;
868 use serde_json::json;
869
870 fn create_test_engine() -> SyncEngine {
871 let config = SyncEngineConfig::default();
872 let (_tx, rx) = watch::channel(config.clone());
873 SyncEngine::new(config, rx)
874 }
875
876 fn create_test_item(id: &str) -> SyncItem {
877 SyncItem::from_json(
878 id.to_string(),
879 json!({"data": "test"}),
880 )
881 }
882
883 #[test]
884 fn test_engine_created_state() {
885 let engine = create_test_engine();
886 assert_eq!(engine.state(), EngineState::Created);
887 assert!(!engine.is_ready());
888 }
889
890 #[test]
891 fn test_memory_pressure_calculation() {
892 let config = SyncEngineConfig {
893 l1_max_bytes: 1000,
894 ..Default::default()
895 };
896 let (_tx, rx) = watch::channel(config.clone());
897 let engine = SyncEngine::new(config, rx);
898
899 assert_eq!(engine.memory_pressure(), 0.0);
900
901 let item = create_test_item("test1");
903 engine.insert_l1(item);
904
905 assert!(engine.memory_pressure() > 0.0);
907 }
908
909 #[test]
910 fn test_l1_insert_and_size_tracking() {
911 let engine = create_test_engine();
912
913 let item = create_test_item("test1");
914 let expected_size = item.size_bytes();
915
916 engine.insert_l1(item);
917
918 let (count, size) = engine.l1_stats();
919 assert_eq!(count, 1);
920 assert_eq!(size, expected_size);
921 }
922
923 #[test]
924 fn test_l1_update_size_tracking() {
925 let engine = create_test_engine();
926
927 let item1 = create_test_item("test1");
928 engine.insert_l1(item1);
929 let (_, _size1) = engine.l1_stats();
930
931 let item2 = SyncItem::from_json(
933 "test1".to_string(),
934 json!({"data": "much larger content here for testing size changes"}),
935 );
936 let size2_expected = item2.size_bytes();
937 engine.insert_l1(item2);
938
939 let (count, size2) = engine.l1_stats();
940 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
943
944 #[tokio::test]
945 async fn test_get_nonexistent() {
946 let engine = create_test_engine();
947 let result = engine.get("nonexistent").await.unwrap();
948 assert!(result.is_none());
949 }
950
951 #[tokio::test]
952 async fn test_get_from_l1() {
953 let engine = create_test_engine();
954 let item = create_test_item("test1");
955 engine.insert_l1(item.clone());
956
957 let result = engine.get("test1").await.unwrap();
958 assert!(result.is_some());
959 assert_eq!(result.unwrap().object_id, "test1");
960 }
961
962 #[tokio::test]
963 async fn test_delete_from_l1() {
964 let engine = create_test_engine();
965 let item = create_test_item("test1");
966 engine.insert_l1(item);
967
968 let (count_before, _) = engine.l1_stats();
969 assert_eq!(count_before, 1);
970
971 let deleted = engine.delete("test1").await.unwrap();
972 assert!(deleted);
973
974 let (count_after, size_after) = engine.l1_stats();
975 assert_eq!(count_after, 0);
976 assert_eq!(size_after, 0);
977 }
978
979 #[test]
980 fn test_filter_stats() {
981 let engine = create_test_engine();
982
983 let (entries, capacity, _trust) = engine.l3_filter_stats();
984 assert_eq!(entries, 0);
985 assert!(capacity > 0);
986 }
987
988 #[test]
989 fn test_should_accept_writes() {
990 let engine = create_test_engine();
991 assert!(engine.should_accept_writes());
992 }
993
994 #[test]
995 fn test_pressure_level() {
996 let engine = create_test_engine();
997 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
998 }
999
1000 #[tokio::test]
1001 async fn test_health_check_basic() {
1002 let engine = create_test_engine();
1003
1004 let health = engine.health_check().await;
1005
1006 assert_eq!(health.state, EngineState::Created);
1008 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1013 assert_eq!(health.l1_items, 0);
1014 assert_eq!(health.l1_bytes, 0);
1015
1016 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1018 assert!(health.accepting_writes);
1019
1020 assert!(health.redis_connected.is_none());
1022 assert!(health.sql_connected.is_none());
1023 assert!(health.wal_pending_items.is_none());
1024 }
1025}