1mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore, mpsc};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75pub struct SyncEngine {
87 pub(super) config: RwLock<SyncEngineConfig>,
90
91 pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94 pub(super) state: watch::Sender<EngineState>,
96
97 pub(super) state_rx: watch::Receiver<EngineState>,
99
100 pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103 pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106 pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108
109 pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112 pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114
115 pub(super) sql_store: Option<Arc<SqlStore>>,
117
118 pub(super) l3_filter: Arc<FilterManager>,
120
121 pub(super) filter_persistence: Option<FilterPersistence>,
123
124 pub(super) cf_inserts_since_snapshot: AtomicU64,
126 pub(super) cf_last_snapshot: Mutex<Instant>,
127
128 pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131 pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134 pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137 pub(super) l3_wal: Option<WriteAheadLog>,
139
140 pub(super) mysql_health: MysqlHealthChecker,
142
143 pub(super) eviction_policy: TanCurvePolicy,
145
146 pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149 pub(super) sql_write_semaphore: Arc<Semaphore>,
154
155 pub(super) view_queue_tx: mpsc::Sender<Vec<SyncItem>>,
161
162 pub(super) view_queue_rx: Mutex<mpsc::Receiver<Vec<SyncItem>>>,
166}
167
168impl SyncEngine {
169 pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
174 let (state_tx, state_rx) = watch::channel(EngineState::Created);
175
176 let batch_config = BatchConfig {
177 flush_ms: config.batch_flush_ms,
178 flush_count: config.batch_flush_count,
179 flush_bytes: config.batch_flush_bytes,
180 };
181
182 let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
184
185 const VIEW_QUEUE_BUFFER_SIZE: usize = 100;
188 let (view_queue_tx, view_queue_rx) = mpsc::channel(VIEW_QUEUE_BUFFER_SIZE);
189
190 Self {
191 config: RwLock::new(config.clone()),
192 config_rx: Mutex::new(config_rx),
193 state: state_tx,
194 state_rx,
195 l1_cache: Arc::new(DashMap::new()),
196 l1_size_bytes: Arc::new(AtomicUsize::new(0)),
197 l2_store: None,
198 redis_store: None,
199 l3_store: None,
200 sql_store: None,
201 l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
202 filter_persistence: None,
203 cf_inserts_since_snapshot: AtomicU64::new(0),
204 cf_last_snapshot: Mutex::new(Instant::now()),
205 l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
206 redis_merkle: None,
207 sql_merkle: None,
208 l3_wal: None,
209 mysql_health: MysqlHealthChecker::new(),
210 eviction_policy: TanCurvePolicy::default(),
211 search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
212 sql_write_semaphore,
213 view_queue_tx,
214 view_queue_rx: Mutex::new(view_queue_rx),
215 }
216 }
217
218 #[must_use]
220 pub fn state(&self) -> EngineState {
221 *self.state_rx.borrow()
222 }
223
224 #[must_use]
226 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
227 self.state_rx.clone()
228 }
229
230 #[must_use]
232 pub fn is_ready(&self) -> bool {
233 matches!(self.state(), EngineState::Ready | EngineState::Running)
234 }
235
236 #[must_use]
238 pub fn memory_pressure(&self) -> f64 {
239 let used = self.l1_size_bytes.load(Ordering::Acquire);
240 let max = self.config.read().l1_max_bytes;
241 if max == 0 {
242 0.0
243 } else {
244 used as f64 / max as f64
245 }
246 }
247
248 #[must_use]
250 pub fn pressure(&self) -> BackpressureLevel {
251 BackpressureLevel::from_pressure(self.memory_pressure())
252 }
253
254 #[must_use]
256 pub fn should_accept_writes(&self) -> bool {
257 let pressure = self.pressure();
258 !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
259 }
260
261 pub async fn health_check(&self) -> types::HealthCheck {
288 let state = self.state();
290 let ready = matches!(state, EngineState::Ready | EngineState::Running);
291 let memory_pressure = self.memory_pressure();
292 let backpressure_level = self.pressure();
293 let accepting_writes = self.should_accept_writes();
294 let (l1_items, l1_bytes) = self.l1_stats();
295 let (filter_items, _, filter_trust) = self.l3_filter_stats();
296
297 let mysql_healthy = self.mysql_health.is_healthy();
300 let wal_pending_items = if let Some(ref wal) = self.l3_wal {
301 let stats = wal.stats(mysql_healthy);
302 Some(stats.pending_items)
303 } else {
304 None
305 };
306
307 let (redis_result, sql_result) = tokio::join!(
309 self.probe_redis(),
310 self.probe_sql()
311 );
312
313 let (redis_connected, redis_latency_ms) = redis_result;
314 let (sql_connected, sql_latency_ms) = sql_result;
315
316 let healthy = matches!(state, EngineState::Running)
319 && redis_connected != Some(false)
320 && sql_connected != Some(false)
321 && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
322
323 types::HealthCheck {
324 state,
325 ready,
326 memory_pressure,
327 backpressure_level,
328 accepting_writes,
329 l1_items,
330 l1_bytes,
331 filter_items,
332 filter_trust,
333 redis_connected,
334 redis_latency_ms,
335 sql_connected,
336 sql_latency_ms,
337 wal_pending_items,
338 healthy,
339 }
340 }
341
342 async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
344 let Some(ref redis_store) = self.redis_store else {
345 return (None, None); };
347
348 let start = std::time::Instant::now();
349 let mut conn = redis_store.connection();
350
351 let result: Result<String, _> = redis::cmd("PING")
352 .query_async(&mut conn)
353 .await;
354
355 match result {
356 Ok(_) => {
357 let latency_ms = start.elapsed().as_millis() as u64;
358 (Some(true), Some(latency_ms))
359 }
360 Err(_) => (Some(false), None),
361 }
362 }
363
364 async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
366 let Some(ref sql_store) = self.sql_store else {
367 return (None, None); };
369
370 let start = std::time::Instant::now();
371 let pool = sql_store.pool();
372
373 let result = sqlx::query("SELECT 1")
374 .fetch_one(&pool)
375 .await;
376
377 match result {
378 Ok(_) => {
379 let latency_ms = start.elapsed().as_millis() as u64;
380 (Some(true), Some(latency_ms))
381 }
382 Err(_) => (Some(false), None),
383 }
384 }
385
386 #[tracing::instrument(skip(self), fields(tier))]
393 pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
394 let start = std::time::Instant::now();
395
396 if let Some(mut item) = self.l1_cache.get_mut(id) {
398 item.access_count = item.access_count.saturating_add(1);
399 item.last_accessed = std::time::SystemTime::now()
400 .duration_since(std::time::UNIX_EPOCH)
401 .unwrap_or_default()
402 .as_millis() as u64;
403 tracing::Span::current().record("tier", "L1");
404 debug!("L1 hit");
405 crate::metrics::record_operation("L1", "get", "hit");
406 crate::metrics::record_latency("L1", "get", start.elapsed());
407 return Ok(Some(item.clone()));
408 }
409
410 if let Some(ref l2) = self.l2_store {
412 match l2.get(id).await {
413 Ok(Some(item)) => {
414 self.insert_l1(item.clone());
416 tracing::Span::current().record("tier", "L2");
417 debug!("L2 hit, promoted to L1");
418 crate::metrics::record_operation("L2", "get", "hit");
419 crate::metrics::record_latency("L2", "get", start.elapsed());
420 return Ok(Some(item));
421 }
422 Ok(None) => {
423 debug!("L2 miss");
425 crate::metrics::record_operation("L2", "get", "miss");
426 }
427 Err(e) => {
428 warn!(error = %e, "L2 lookup failed");
429 crate::metrics::record_operation("L2", "get", "error");
430 }
431 }
432 }
433
434 if self.l3_filter.should_check_l3(id) {
436 crate::metrics::record_cuckoo_check("L3", "positive");
437 if let Some(ref l3) = self.l3_store {
438 match l3.get(id).await {
439 Ok(Some(item)) => {
440 if self.memory_pressure() < 1.0 {
442 self.insert_l1(item.clone());
443 }
444 tracing::Span::current().record("tier", "L3");
445 debug!("L3 hit, promoted to L1");
446 crate::metrics::record_operation("L3", "get", "hit");
447 crate::metrics::record_latency("L3", "get", start.elapsed());
448 crate::metrics::record_bytes_read("L3", item.content.len());
449 return Ok(Some(item));
450 }
451 Ok(None) => {
452 debug!("L3 filter false positive");
454 crate::metrics::record_operation("L3", "get", "false_positive");
455 crate::metrics::record_cuckoo_false_positive("L3");
456 }
457 Err(e) => {
458 warn!(error = %e, "L3 lookup failed");
459 crate::metrics::record_operation("L3", "get", "error");
460 crate::metrics::record_error("L3", "get", "backend");
461 }
462 }
463 }
464 } else {
465 crate::metrics::record_cuckoo_check("L3", "negative");
467 }
468
469 tracing::Span::current().record("tier", "miss");
470 debug!("Cache miss");
471 crate::metrics::record_operation("all", "get", "miss");
472 crate::metrics::record_latency("all", "get", start.elapsed());
473 Ok(None)
474 }
475
476 #[tracing::instrument(skip(self), fields(verified))]
481 pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
482 let item = match self.get(id).await? {
483 Some(item) => item,
484 None => return Ok(None),
485 };
486
487 if !item.content_hash.is_empty() {
489 use sha2::{Sha256, Digest};
490
491 let computed = Sha256::digest(&item.content);
492 let computed_hex = hex::encode(computed);
493
494 if computed_hex != item.content_hash {
495 tracing::Span::current().record("verified", false);
496 warn!(
497 id = %id,
498 expected = %item.content_hash,
499 actual = %computed_hex,
500 "Data corruption detected!"
501 );
502
503 crate::metrics::record_corruption(id);
505
506 return Err(StorageError::Corruption {
507 id: id.to_string(),
508 expected: item.content_hash.clone(),
509 actual: computed_hex,
510 });
511 }
512
513 tracing::Span::current().record("verified", true);
514 debug!(id = %id, "Hash verification passed");
515 }
516
517 Ok(Some(item))
518 }
519
520 #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
528 pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
529 self.submit_with(item, SubmitOptions::default()).await
530 }
531
532 #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
553 pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
554 let start = std::time::Instant::now();
555
556 if !self.should_accept_writes() {
557 crate::metrics::record_operation("engine", "submit", "rejected");
558 crate::metrics::record_error("engine", "submit", "backpressure");
559 return Err(StorageError::Backend(format!(
560 "Rejecting write: engine state={}, pressure={}",
561 self.state(),
562 self.pressure()
563 )));
564 }
565
566 let id = item.object_id.clone();
567 let item_bytes = item.content.len();
568
569 if let Some(ref state) = options.state {
571 item.state = state.clone();
572 }
573
574 item.submit_options = Some(options);
576
577 self.insert_l1(item.clone());
579 crate::metrics::record_operation("L1", "submit", "success");
580 crate::metrics::record_bytes_written("L1", item_bytes);
581
582 let batch_to_flush = {
588 let mut batcher = self.l2_batcher.lock().await;
589 if let Some(reason) = batcher.add(item) {
590 batcher.force_flush_with_reason(reason)
591 } else {
592 None
593 }
594 };
595
596 if let Some(batch) = batch_to_flush {
597 self.flush_batch_internal(batch).await;
599 }
600
601 debug!(id = %id, "Item submitted to L1 and batch queue");
602 crate::metrics::record_latency("L1", "submit", start.elapsed());
603 Ok(())
604 }
605
606 #[tracing::instrument(skip(self), fields(object_id = %id))]
615 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
616 let start = std::time::Instant::now();
617
618 if !self.should_accept_writes() {
619 crate::metrics::record_operation("engine", "delete", "rejected");
620 crate::metrics::record_error("engine", "delete", "backpressure");
621 return Err(StorageError::Backend(format!(
622 "Rejecting delete: engine state={}, pressure={}",
623 self.state(),
624 self.pressure()
625 )));
626 }
627
628 let mut found = false;
629
630 if let Some((_, item)) = self.l1_cache.remove(id) {
632 let size = Self::item_size(&item);
633 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
634 found = true;
635 debug!("Deleted from L1");
636 crate::metrics::record_operation("L1", "delete", "success");
637 }
638
639 self.l3_filter.remove(id);
641
642 if let Some(ref l2) = self.l2_store {
644 let l2_start = std::time::Instant::now();
645 match l2.delete(id).await {
646 Ok(()) => {
647 found = true;
648 debug!("Deleted from L2 (Redis)");
649 crate::metrics::record_operation("L2", "delete", "success");
650 crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
651 }
652 Err(e) => {
653 warn!(error = %e, "Failed to delete from L2 (Redis)");
654 crate::metrics::record_operation("L2", "delete", "error");
655 crate::metrics::record_error("L2", "delete", "backend");
656 }
657 }
658 }
659
660 if let Some(ref l3) = self.l3_store {
662 let l3_start = std::time::Instant::now();
663 match l3.delete(id).await {
664 Ok(()) => {
665 found = true;
666 debug!("Deleted from L3 (MySQL)");
667 crate::metrics::record_operation("L3", "delete", "success");
668 crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
669 }
670 Err(e) => {
671 error!(error = %e, "Failed to delete from L3 (MySQL)");
672 crate::metrics::record_operation("L3", "delete", "error");
673 crate::metrics::record_error("L3", "delete", "backend");
674 }
676 }
677 }
678
679 let mut merkle_batch = MerkleBatch::new();
681 merkle_batch.delete(id.to_string());
682
683 if let Some(ref sql_merkle) = self.sql_merkle {
684 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
685 error!(error = %e, "Failed to update SQL Merkle tree for deletion");
686 crate::metrics::record_error("L3", "merkle", "batch_apply");
687 }
688 }
689
690 if let Some(ref redis_merkle) = self.redis_merkle {
691 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
692 warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
693 crate::metrics::record_error("L2", "merkle", "batch_apply");
694 }
695 }
696
697 if self.config.read().enable_cdc_stream && found {
699 self.emit_cdc_delete(id).await;
700 }
701
702 info!(found, "Delete operation completed");
703 crate::metrics::record_latency("all", "delete", start.elapsed());
704 Ok(found)
705 }
706
707 fn insert_l1(&self, item: SyncItem) {
711 let new_size = Self::item_size(&item);
712 let key = item.object_id.clone();
713
714 if let Some(old_item) = self.l1_cache.insert(key, item) {
716 let old_size = Self::item_size(&old_item);
718 let current = self.l1_size_bytes.load(Ordering::Acquire);
720 let new_total = current.saturating_sub(old_size).saturating_add(new_size);
721 self.l1_size_bytes.store(new_total, Ordering::Release);
722 } else {
723 self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
725 }
726 }
727
728 #[inline]
730 fn item_size(item: &SyncItem) -> usize {
731 item.size_bytes()
733 }
734
735 fn maybe_evict(&self) {
736 let pressure = self.memory_pressure();
737 if pressure < self.config.read().backpressure_warn {
738 return;
739 }
740
741 let level = BackpressureLevel::from_pressure(pressure);
742 debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
743
744 let now = std::time::Instant::now();
746 let entries: Vec<CacheEntry> = self.l1_cache.iter()
747 .map(|ref_multi| {
748 let item = ref_multi.value();
749 let id = ref_multi.key().clone();
750
751 let now_millis = std::time::SystemTime::now()
753 .duration_since(std::time::UNIX_EPOCH)
754 .unwrap_or_default()
755 .as_millis() as u64;
756 let age_secs = if item.last_accessed > 0 {
757 (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
758 } else {
759 3600.0 };
761
762 CacheEntry {
763 id,
764 size_bytes: item.size_bytes(),
765 created_at: now - std::time::Duration::from_secs_f64(age_secs),
766 last_access: now - std::time::Duration::from_secs_f64(age_secs),
767 access_count: item.access_count,
768 is_dirty: false, }
770 })
771 .collect();
772
773 if entries.is_empty() {
774 return;
775 }
776
777 let evict_count = match level {
779 BackpressureLevel::Normal => 0,
780 BackpressureLevel::Warn => entries.len() / 20, BackpressureLevel::Throttle => entries.len() / 10, BackpressureLevel::Critical => entries.len() / 5, BackpressureLevel::Emergency => entries.len() / 3, BackpressureLevel::Shutdown => entries.len() / 2, }.max(1);
786
787 let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
789
790 let mut evicted_bytes = 0usize;
792 for victim_id in &victims {
793 if let Some((_, item)) = self.l1_cache.remove(victim_id) {
794 evicted_bytes += item.size_bytes();
795 }
796 }
797
798 self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
800
801 info!(
802 evicted = victims.len(),
803 evicted_bytes = evicted_bytes,
804 pressure = %pressure,
805 level = %level,
806 "Evicted entries from L1 cache"
807 );
808 }
809
810 pub fn l1_stats(&self) -> (usize, usize) {
812 (
813 self.l1_cache.len(),
814 self.l1_size_bytes.load(Ordering::Acquire),
815 )
816 }
817
818 #[must_use]
820 pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
821 self.l3_filter.stats()
822 }
823
824 pub fn l3_filter(&self) -> &Arc<FilterManager> {
826 &self.l3_filter
827 }
828
829 pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
834 let redis_root = if let Some(ref rm) = self.redis_merkle {
835 rm.root_hash().await.ok().flatten().map(hex::encode)
836 } else {
837 None
838 };
839
840 let sql_root = if let Some(ref sm) = self.sql_merkle {
841 sm.root_hash().await.ok().flatten().map(hex::encode)
842 } else {
843 None
844 };
845
846 (redis_root, sql_root)
847 }
848
849 pub async fn verify_filter(&self) -> bool {
856 let sql_root = if let Some(ref sm) = self.sql_merkle {
858 match sm.root_hash().await {
859 Ok(Some(root)) => root,
860 _ => return false,
861 }
862 } else {
863 self.l3_filter.mark_trusted();
865 return true;
866 };
867
868 info!(
872 sql_root = %hex::encode(sql_root),
873 "Verifying L3 filter against SQL merkle root"
874 );
875
876 self.l3_filter.mark_trusted();
878 true
879 }
880
881 pub fn update_gauge_metrics(&self) {
886 let (l1_count, l1_bytes) = self.l1_stats();
887 crate::metrics::set_l1_cache_items(l1_count);
888 crate::metrics::set_l1_cache_bytes(l1_bytes);
889 crate::metrics::set_memory_pressure(self.memory_pressure());
890
891 let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
892 let filter_load = if filter_capacity > 0 {
893 filter_entries as f64 / filter_capacity as f64
894 } else {
895 0.0
896 };
897 crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
898 crate::metrics::set_cuckoo_filter_load("L3", filter_load);
899
900 crate::metrics::set_backpressure_level(self.pressure() as u8);
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use crate::config::SyncEngineConfig;
908 use tokio::sync::watch;
909 use serde_json::json;
910
911 fn create_test_engine() -> SyncEngine {
912 let config = SyncEngineConfig::default();
913 let (_tx, rx) = watch::channel(config.clone());
914 SyncEngine::new(config, rx)
915 }
916
917 fn create_test_item(id: &str) -> SyncItem {
918 SyncItem::from_json(
919 id.to_string(),
920 json!({"data": "test"}),
921 )
922 }
923
924 #[test]
925 fn test_engine_created_state() {
926 let engine = create_test_engine();
927 assert_eq!(engine.state(), EngineState::Created);
928 assert!(!engine.is_ready());
929 }
930
931 #[test]
932 fn test_memory_pressure_calculation() {
933 let config = SyncEngineConfig {
934 l1_max_bytes: 1000,
935 ..Default::default()
936 };
937 let (_tx, rx) = watch::channel(config.clone());
938 let engine = SyncEngine::new(config, rx);
939
940 assert_eq!(engine.memory_pressure(), 0.0);
941
942 let item = create_test_item("test1");
944 engine.insert_l1(item);
945
946 assert!(engine.memory_pressure() > 0.0);
948 }
949
950 #[test]
951 fn test_l1_insert_and_size_tracking() {
952 let engine = create_test_engine();
953
954 let item = create_test_item("test1");
955 let expected_size = item.size_bytes();
956
957 engine.insert_l1(item);
958
959 let (count, size) = engine.l1_stats();
960 assert_eq!(count, 1);
961 assert_eq!(size, expected_size);
962 }
963
964 #[test]
965 fn test_l1_update_size_tracking() {
966 let engine = create_test_engine();
967
968 let item1 = create_test_item("test1");
969 engine.insert_l1(item1);
970 let (_, _size1) = engine.l1_stats();
971
972 let item2 = SyncItem::from_json(
974 "test1".to_string(),
975 json!({"data": "much larger content here for testing size changes"}),
976 );
977 let size2_expected = item2.size_bytes();
978 engine.insert_l1(item2);
979
980 let (count, size2) = engine.l1_stats();
981 assert_eq!(count, 1); assert_eq!(size2, size2_expected); }
984
985 #[tokio::test]
986 async fn test_get_nonexistent() {
987 let engine = create_test_engine();
988 let result = engine.get("nonexistent").await.unwrap();
989 assert!(result.is_none());
990 }
991
992 #[tokio::test]
993 async fn test_get_from_l1() {
994 let engine = create_test_engine();
995 let item = create_test_item("test1");
996 engine.insert_l1(item.clone());
997
998 let result = engine.get("test1").await.unwrap();
999 assert!(result.is_some());
1000 assert_eq!(result.unwrap().object_id, "test1");
1001 }
1002
1003 #[tokio::test]
1004 async fn test_delete_from_l1() {
1005 let engine = create_test_engine();
1006 let item = create_test_item("test1");
1007 engine.insert_l1(item);
1008
1009 let (count_before, _) = engine.l1_stats();
1010 assert_eq!(count_before, 1);
1011
1012 let deleted = engine.delete("test1").await.unwrap();
1013 assert!(deleted);
1014
1015 let (count_after, size_after) = engine.l1_stats();
1016 assert_eq!(count_after, 0);
1017 assert_eq!(size_after, 0);
1018 }
1019
1020 #[test]
1021 fn test_filter_stats() {
1022 let engine = create_test_engine();
1023
1024 let (entries, capacity, _trust) = engine.l3_filter_stats();
1025 assert_eq!(entries, 0);
1026 assert!(capacity > 0);
1027 }
1028
1029 #[test]
1030 fn test_should_accept_writes() {
1031 let engine = create_test_engine();
1032 assert!(engine.should_accept_writes());
1033 }
1034
1035 #[test]
1036 fn test_pressure_level() {
1037 let engine = create_test_engine();
1038 assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_health_check_basic() {
1043 let engine = create_test_engine();
1044
1045 let health = engine.health_check().await;
1046
1047 assert_eq!(health.state, EngineState::Created);
1049 assert!(!health.ready); assert!(!health.healthy); assert_eq!(health.memory_pressure, 0.0);
1054 assert_eq!(health.l1_items, 0);
1055 assert_eq!(health.l1_bytes, 0);
1056
1057 assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1059 assert!(health.accepting_writes);
1060
1061 assert!(health.redis_connected.is_none());
1063 assert!(health.sql_connected.is_none());
1064 assert!(health.wal_pending_items.is_none());
1065 }
1066}