1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75pub struct SyncEngine {
87 pub(super) config: RwLock<SyncEngineConfig>,
90
91 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94 pub(super) state: watch::Sender<EngineState>,
96
97 pub(super) state_rx: watch::Receiver<EngineState>,
99
100 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103 pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108
109 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114
115 pub(super) sql_store: Option<Arc<SqlStore>>,
117
118 pub(super) l3_filter: Arc<FilterManager>,
120
121 pub(super) filter_persistence: Option<FilterPersistence>,
123
124 pub(super) cf_inserts_since_snapshot: AtomicU64,
126 pub(super) cf_last_snapshot: Mutex<Instant>,
127
128 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131 pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134 pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137 pub(super) l3_wal: Option<WriteAheadLog>,
139
140 pub(super) mysql_health: MysqlHealthChecker,
142
143 pub(super) eviction_policy: TanCurvePolicy,
145
146 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149 pub(super) sql_write_semaphore: Arc<Semaphore>,
154}
155
156impl SyncEngine {
157 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
162 let (state_tx, state_rx) = watch::channel(EngineState::Created);
163
164 let batch_config = BatchConfig {
165 flush_ms: config.batch_flush_ms,
166 flush_count: config.batch_flush_count,
167 flush_bytes: config.batch_flush_bytes,
168 };
169
170 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
172
173 Self {
174 config: RwLock::new(config.clone()),
175 config_rx: Mutex::new(config_rx),
176 state: state_tx,
177 state_rx,
178 l1_cache: Arc::new(DashMap::new()),
179 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
180 l2_store: None,
181 redis_store: None,
182 l3_store: None,
183 sql_store: None,
184 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
185 filter_persistence: None,
186 cf_inserts_since_snapshot: AtomicU64::new(0),
187 cf_last_snapshot: Mutex::new(Instant::now()),
188 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
189 redis_merkle: None,
190 sql_merkle: None,
191 l3_wal: None,
192 mysql_health: MysqlHealthChecker::new(),
193 eviction_policy: TanCurvePolicy::default(),
194 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
195 sql_write_semaphore,
196 }
197 }
198
199 #[must_use]
201 pub fn state(&self) -> EngineState {
202 *self.state_rx.borrow()
203 }
204
205 #[must_use]
207 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
208 self.state_rx.clone()
209 }
210
211 #[must_use]
213 pub fn is_ready(&self) -> bool {
214 matches!(self.state(), EngineState::Ready | EngineState::Running)
215 }
216
217 #[must_use]
219 pub fn memory_pressure(&self) -> f64 {
220 let used = self.l1_size_bytes.load(Ordering::Acquire);
221 let max = self.config.read().l1_max_bytes;
222 if max == 0 {
223 0.0
224 } else {
225 used as f64 / max as f64
226 }
227 }
228
229 #[must_use]
231 pub fn pressure(&self) -> BackpressureLevel {
232 BackpressureLevel::from_pressure(self.memory_pressure())
233 }
234
235 #[must_use]
237 pub fn should_accept_writes(&self) -> bool {
238 let pressure = self.pressure();
239 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
240 }
241
242 pub async fn health_check(&self) -> types::HealthCheck {
269 let state = self.state();
271 let ready = matches!(state, EngineState::Ready | EngineState::Running);
272 let memory_pressure = self.memory_pressure();
273 let backpressure_level = self.pressure();
274 let accepting_writes = self.should_accept_writes();
275 let (l1_items, l1_bytes) = self.l1_stats();
276 let (filter_items, _, filter_trust) = self.l3_filter_stats();
277
278 let mysql_healthy = self.mysql_health.is_healthy();
281 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
282 let stats = wal.stats(mysql_healthy);
283 Some(stats.pending_items)
284 } else {
285 None
286 };
287
288 let (redis_result, sql_result) = tokio::join!(
290 self.probe_redis(),
291 self.probe_sql()
292 );
293
294 let (redis_connected, redis_latency_ms) = redis_result;
295 let (sql_connected, sql_latency_ms) = sql_result;
296
297 let healthy = matches!(state, EngineState::Running)
300 && redis_connected != Some(false)
301 && sql_connected != Some(false)
302 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
303
304 types::HealthCheck {
305 state,
306 ready,
307 memory_pressure,
308 backpressure_level,
309 accepting_writes,
310 l1_items,
311 l1_bytes,
312 filter_items,
313 filter_trust,
314 redis_connected,
315 redis_latency_ms,
316 sql_connected,
317 sql_latency_ms,
318 wal_pending_items,
319 healthy,
320 }
321 }
322
323 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
325 let Some(ref redis_store) = self.redis_store else {
326 return (None, None); };
328
329 let start = std::time::Instant::now();
330 let mut conn = redis_store.connection();
331
332 let result: Result<String, _> = redis::cmd("PING")
333 .query_async(&mut conn)
334 .await;
335
336 match result {
337 Ok(_) => {
338 let latency_ms = start.elapsed().as_millis() as u64;
339 (Some(true), Some(latency_ms))
340 }
341 Err(_) => (Some(false), None),
342 }
343 }
344
345 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
347 let Some(ref sql_store) = self.sql_store else {
348 return (None, None); };
350
351 let start = std::time::Instant::now();
352 let pool = sql_store.pool();
353
354 let result = sqlx::query("SELECT 1")
355 .fetch_one(&pool)
356 .await;
357
358 match result {
359 Ok(_) => {
360 let latency_ms = start.elapsed().as_millis() as u64;
361 (Some(true), Some(latency_ms))
362 }
363 Err(_) => (Some(false), None),
364 }
365 }
366
367 #[tracing::instrument(skip(self), fields(tier))]
374 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
375 let start = std::time::Instant::now();
376
377 if let Some(mut item) = self.l1_cache.get_mut(id) {
379 item.access_count = item.access_count.saturating_add(1);
380 item.last_accessed = std::time::SystemTime::now()
381 .duration_since(std::time::UNIX_EPOCH)
382 .unwrap_or_default()
383 .as_millis() as u64;
384 tracing::Span::current().record("tier", "L1");
385 debug!("L1 hit");
386 crate::metrics::record_operation("L1", "get", "hit");
387 crate::metrics::record_latency("L1", "get", start.elapsed());
388 return Ok(Some(item.clone()));
389 }
390
391 if let Some(ref l2) = self.l2_store {
393 match l2.get(id).await {
394 Ok(Some(item)) => {
395 self.insert_l1(item.clone());
397 tracing::Span::current().record("tier", "L2");
398 debug!("L2 hit, promoted to L1");
399 crate::metrics::record_operation("L2", "get", "hit");
400 crate::metrics::record_latency("L2", "get", start.elapsed());
401 return Ok(Some(item));
402 }
403 Ok(None) => {
404 debug!("L2 miss");
406 crate::metrics::record_operation("L2", "get", "miss");
407 }
408 Err(e) => {
409 warn!(error = %e, "L2 lookup failed");
410 crate::metrics::record_operation("L2", "get", "error");
411 }
412 }
413 }
414
415 if self.l3_filter.should_check_l3(id) {
417 crate::metrics::record_cuckoo_check("L3", "positive");
418 if let Some(ref l3) = self.l3_store {
419 match l3.get(id).await {
420 Ok(Some(item)) => {
421 if self.memory_pressure() < 1.0 {
423 self.insert_l1(item.clone());
424 }
425
426 if let Some(ref l2) = self.l2_store {
428 if let Err(e) = l2.put(&item).await {
429 warn!(id = %id, error = %e, "Failed to populate L2 on read");
430 } else {
431 debug!("L3 hit, promoted to L1 and L2");
432 }
433 } else {
434 debug!("L3 hit, promoted to L1");
435 }
436
437 tracing::Span::current().record("tier", "L3");
438 crate::metrics::record_operation("L3", "get", "hit");
439 crate::metrics::record_latency("L3", "get", start.elapsed());
440 crate::metrics::record_bytes_read("L3", item.content.len());
441 return Ok(Some(item));
442 }
443 Ok(None) => {
444 debug!("L3 filter false positive");
446 crate::metrics::record_operation("L3", "get", "false_positive");
447 crate::metrics::record_cuckoo_false_positive("L3");
448 }
449 Err(e) => {
450 warn!(error = %e, "L3 lookup failed");
451 crate::metrics::record_operation("L3", "get", "error");
452 crate::metrics::record_error("L3", "get", "backend");
453 }
454 }
455 }
456 } else {
457 crate::metrics::record_cuckoo_check("L3", "negative");
459 }
460
461 tracing::Span::current().record("tier", "miss");
462 debug!("Cache miss");
463 crate::metrics::record_operation("all", "get", "miss");
464 crate::metrics::record_latency("all", "get", start.elapsed());
465 Ok(None)
466 }
467
468 #[tracing::instrument(skip(self), fields(verified))]
473 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
474 let item = match self.get(id).await? {
475 Some(item) => item,
476 None => return Ok(None),
477 };
478
479 if !item.content_hash.is_empty() {
481 use sha2::{Sha256, Digest};
482
483 let computed = Sha256::digest(&item.content);
484 let computed_hex = hex::encode(computed);
485
486 if computed_hex != item.content_hash {
487 tracing::Span::current().record("verified", false);
488 warn!(
489 id = %id,
490 expected = %item.content_hash,
491 actual = %computed_hex,
492 "Data corruption detected!"
493 );
494
495 crate::metrics::record_corruption(id);
497
498 return Err(StorageError::Corruption {
499 id: id.to_string(),
500 expected: item.content_hash.clone(),
501 actual: computed_hex,
502 });
503 }
504
505 tracing::Span::current().record("verified", true);
506 debug!(id = %id, "Hash verification passed");
507 }
508
509 Ok(Some(item))
510 }
511
512 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
520 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
521 self.submit_with(item, SubmitOptions::default()).await
522 }
523
524 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
545 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
546 let start = std::time::Instant::now();
547
548 if !self.should_accept_writes() {
549 crate::metrics::record_operation("engine", "submit", "rejected");
550 crate::metrics::record_error("engine", "submit", "backpressure");
551 return Err(StorageError::Backend(format!(
552 "Rejecting write: engine state={}, pressure={}",
553 self.state(),
554 self.pressure()
555 )));
556 }
557
558 let id = item.object_id.clone();
559 let item_bytes = item.content.len();
560
561 if let Some(ref state) = options.state {
563 item.state = state.clone();
564 }
565
566 item.submit_options = Some(options);
568
569 self.insert_l1(item.clone());
571 crate::metrics::record_operation("L1", "submit", "success");
572 crate::metrics::record_bytes_written("L1", item_bytes);
573
574 let batch_to_flush = {
580 let mut batcher = self.l2_batcher.lock().await;
581 if let Some(reason) = batcher.add(item) {
582 batcher.force_flush_with_reason(reason)
583 } else {
584 None
585 }
586 };
587
588 if let Some(batch) = batch_to_flush {
589 self.flush_batch_internal(batch).await;
591 }
592
593 debug!(id = %id, "Item submitted to L1 and batch queue");
594 crate::metrics::record_latency("L1", "submit", start.elapsed());
595 Ok(())
596 }
597
598 #[tracing::instrument(skip(self), fields(object_id = %id))]
607 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
608 let start = std::time::Instant::now();
609
610 if !self.should_accept_writes() {
611 crate::metrics::record_operation("engine", "delete", "rejected");
612 crate::metrics::record_error("engine", "delete", "backpressure");
613 return Err(StorageError::Backend(format!(
614 "Rejecting delete: engine state={}, pressure={}",
615 self.state(),
616 self.pressure()
617 )));
618 }
619
620 let mut found = false;
621
622 if let Some((_, item)) = self.l1_cache.remove(id) {
624 let size = Self::item_size(&item);
625 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
626 found = true;
627 debug!("Deleted from L1");
628 crate::metrics::record_operation("L1", "delete", "success");
629 }
630
631 self.l3_filter.remove(id);
633
634 if let Some(ref l2) = self.l2_store {
636 let l2_start = std::time::Instant::now();
637 match l2.delete(id).await {
638 Ok(()) => {
639 found = true;
640 debug!("Deleted from L2 (Redis)");
641 crate::metrics::record_operation("L2", "delete", "success");
642 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
643 }
644 Err(e) => {
645 warn!(error = %e, "Failed to delete from L2 (Redis)");
646 crate::metrics::record_operation("L2", "delete", "error");
647 crate::metrics::record_error("L2", "delete", "backend");
648 }
649 }
650 }
651
652 if let Some(ref l3) = self.l3_store {
654 let l3_start = std::time::Instant::now();
655 match l3.delete(id).await {
656 Ok(()) => {
657 found = true;
658 debug!("Deleted from L3 (MySQL)");
659 crate::metrics::record_operation("L3", "delete", "success");
660 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
661 }
662 Err(e) => {
663 error!(error = %e, "Failed to delete from L3 (MySQL)");
664 crate::metrics::record_operation("L3", "delete", "error");
665 crate::metrics::record_error("L3", "delete", "backend");
666 }
668 }
669 }
670
671 let mut merkle_batch = MerkleBatch::new();
673 merkle_batch.delete(id.to_string());
674
675 if let Some(ref sql_merkle) = self.sql_merkle {
676 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
677 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
678 crate::metrics::record_error("L3", "merkle", "batch_apply");
679 }
680 }
681
682 if let Some(ref redis_merkle) = self.redis_merkle {
683 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
684 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
685 crate::metrics::record_error("L2", "merkle", "batch_apply");
686 }
687 }
688
689 if self.config.read().enable_cdc_stream && found {
691 self.emit_cdc_delete(id).await;
692 }
693
694 info!(found, "Delete operation completed");
695 crate::metrics::record_latency("all", "delete", start.elapsed());
696 Ok(found)
697 }
698
699 fn insert_l1(&self, item: SyncItem) {
703 let new_size = Self::item_size(&item);
704 let key = item.object_id.clone();
705
706 if let Some(old_item) = self.l1_cache.insert(key, item) {
708 let old_size = Self::item_size(&old_item);
710 let current = self.l1_size_bytes.load(Ordering::Acquire);
712 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
713 self.l1_size_bytes.store(new_total, Ordering::Release);
714 } else {
715 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
717 }
718 }
719
720 #[inline]
722 fn item_size(item: &SyncItem) -> usize {
723 item.size_bytes()
725 }
726
727 fn maybe_evict(&self) {
728 let pressure = self.memory_pressure();
729 if pressure < self.config.read().backpressure_warn {
730 return;
731 }
732
733 let level = BackpressureLevel::from_pressure(pressure);
734 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
735
736 let now = std::time::Instant::now();
738 let entries: Vec<CacheEntry> = self.l1_cache.iter()
739 .map(|ref_multi| {
740 let item = ref_multi.value();
741 let id = ref_multi.key().clone();
742
743 let now_millis = std::time::SystemTime::now()
745 .duration_since(std::time::UNIX_EPOCH)
746 .unwrap_or_default()
747 .as_millis() as u64;
748 let age_secs = if item.last_accessed > 0 {
749 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
750 } else {
751 3600.0 };
753
754 CacheEntry {
755 id,
756 size_bytes: item.size_bytes(),
757 created_at: now - std::time::Duration::from_secs_f64(age_secs),
758 last_access: now - std::time::Duration::from_secs_f64(age_secs),
759 access_count: item.access_count,
760 is_dirty: false, }
762 })
763 .collect();
764
765 if entries.is_empty() {
766 return;
767 }
768
769 let evict_count = match level {
771 BackpressureLevel::Normal => 0,
772 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
778
779 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
781
782 let mut evicted_bytes = 0usize;
784 for victim_id in &victims {
785 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
786 evicted_bytes += item.size_bytes();
787 }
788 }
789
790 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
792
793 info!(
794 evicted = victims.len(),
795 evicted_bytes = evicted_bytes,
796 pressure = %pressure,
797 level = %level,
798 "Evicted entries from L1 cache"
799 );
800 }
801
802 pub fn l1_stats(&self) -> (usize, usize) {
804 (
805 self.l1_cache.len(),
806 self.l1_size_bytes.load(Ordering::Acquire),
807 )
808 }
809
810 #[must_use]
812 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
813 self.l3_filter.stats()
814 }
815
816 pub fn l3_filter(&self) -> &Arc<FilterManager> {
818 &self.l3_filter
819 }
820
821 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
826 let redis_root = if let Some(ref rm) = self.redis_merkle {
827 rm.root_hash().await.ok().flatten().map(hex::encode)
828 } else {
829 None
830 };
831
832 let sql_root = if let Some(ref sm) = self.sql_merkle {
833 sm.root_hash().await.ok().flatten().map(hex::encode)
834 } else {
835 None
836 };
837
838 (redis_root, sql_root)
839 }
840
841 pub async fn verify_filter(&self) -> bool {
848 let sql_root = if let Some(ref sm) = self.sql_merkle {
850 match sm.root_hash().await {
851 Ok(Some(root)) => root,
852 _ => return false,
853 }
854 } else {
855 self.l3_filter.mark_trusted();
857 return true;
858 };
859
860 info!(
864 sql_root = %hex::encode(sql_root),
865 "Verifying L3 filter against SQL merkle root"
866 );
867
868 self.l3_filter.mark_trusted();
870 true
871 }
872
873 pub fn update_gauge_metrics(&self) {
878 let (l1_count, l1_bytes) = self.l1_stats();
879 crate::metrics::set_l1_cache_items(l1_count);
880 crate::metrics::set_l1_cache_bytes(l1_bytes);
881 crate::metrics::set_memory_pressure(self.memory_pressure());
882
883 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
884 let filter_load = if filter_capacity > 0 {
885 filter_entries as f64 / filter_capacity as f64
886 } else {
887 0.0
888 };
889 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
890 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
891
892 crate::metrics::set_backpressure_level(self.pressure() as u8);
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use crate::config::SyncEngineConfig;
900 use tokio::sync::watch;
901 use serde_json::json;
902
903 fn create_test_engine() -> SyncEngine {
904 let config = SyncEngineConfig::default();
905 let (_tx, rx) = watch::channel(config.clone());
906 SyncEngine::new(config, rx)
907 }
908
909 fn create_test_item(id: &str) -> SyncItem {
910 SyncItem::from_json(
911 id.to_string(),
912 json!({"data": "test"}),
913 )
914 }
915
916 #[test]
917 fn test_engine_created_state() {
918 let engine = create_test_engine();
919 assert_eq!(engine.state(), EngineState::Created);
920 assert!(!engine.is_ready());
921 }
922
923 #[test]
924 fn test_memory_pressure_calculation() {
925 let config = SyncEngineConfig {
926 l1_max_bytes: 1000,
927 ..Default::default()
928 };
929 let (_tx, rx) = watch::channel(config.clone());
930 let engine = SyncEngine::new(config, rx);
931
932 assert_eq!(engine.memory_pressure(), 0.0);
933
934 let item = create_test_item("test1");
936 engine.insert_l1(item);
937
938 assert!(engine.memory_pressure() > 0.0);
940 }
941
942 #[test]
943 fn test_l1_insert_and_size_tracking() {
944 let engine = create_test_engine();
945
946 let item = create_test_item("test1");
947 let expected_size = item.size_bytes();
948
949 engine.insert_l1(item);
950
951 let (count, size) = engine.l1_stats();
952 assert_eq!(count, 1);
953 assert_eq!(size, expected_size);
954 }
955
956 #[test]
957 fn test_l1_update_size_tracking() {
958 let engine = create_test_engine();
959
960 let item1 = create_test_item("test1");
961 engine.insert_l1(item1);
962 let (_, _size1) = engine.l1_stats();
963
964 let item2 = SyncItem::from_json(
966 "test1".to_string(),
967 json!({"data": "much larger content here for testing size changes"}),
968 );
969 let size2_expected = item2.size_bytes();
970 engine.insert_l1(item2);
971
972 let (count, size2) = engine.l1_stats();
973 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
976
977 #[tokio::test]
978 async fn test_get_nonexistent() {
979 let engine = create_test_engine();
980 let result = engine.get("nonexistent").await.unwrap();
981 assert!(result.is_none());
982 }
983
984 #[tokio::test]
985 async fn test_get_from_l1() {
986 let engine = create_test_engine();
987 let item = create_test_item("test1");
988 engine.insert_l1(item.clone());
989
990 let result = engine.get("test1").await.unwrap();
991 assert!(result.is_some());
992 assert_eq!(result.unwrap().object_id, "test1");
993 }
994
995 #[tokio::test]
996 async fn test_delete_from_l1() {
997 let engine = create_test_engine();
998 let item = create_test_item("test1");
999 engine.insert_l1(item);
1000
1001 let (count_before, _) = engine.l1_stats();
1002 assert_eq!(count_before, 1);
1003
1004 let deleted = engine.delete("test1").await.unwrap();
1005 assert!(deleted);
1006
1007 let (count_after, size_after) = engine.l1_stats();
1008 assert_eq!(count_after, 0);
1009 assert_eq!(size_after, 0);
1010 }
1011
1012 #[test]
1013 fn test_filter_stats() {
1014 let engine = create_test_engine();
1015
1016 let (entries, capacity, _trust) = engine.l3_filter_stats();
1017 assert_eq!(entries, 0);
1018 assert!(capacity > 0);
1019 }
1020
1021 #[test]
1022 fn test_should_accept_writes() {
1023 let engine = create_test_engine();
1024 assert!(engine.should_accept_writes());
1025 }
1026
1027 #[test]
1028 fn test_pressure_level() {
1029 let engine = create_test_engine();
1030 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_health_check_basic() {
1035 let engine = create_test_engine();
1036
1037 let health = engine.health_check().await;
1038
1039 assert_eq!(health.state, EngineState::Created);
1041 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1046 assert_eq!(health.l1_items, 0);
1047 assert_eq!(health.l1_bytes, 0);
1048
1049 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1051 assert!(health.accepting_writes);
1052
1053 assert!(health.redis_connected.is_none());
1055 assert!(health.sql_connected.is_none());
1056 assert!(health.wal_pending_items.is_none());
1057 }
1058}