sync_engine/coordinator/
mod.rs

1//! Sync engine coordinator.
2//!
3//! The [`SyncEngine`] is the main orchestrator that ties together all components:
4//! - L1 in-memory cache with eviction
5//! - L2 Redis cache with batch writes
6//! - L3 MySQL/SQLite archive with WAL durability
7//! - Cuckoo filters for existence checks
8//! - Merkle trees for sync verification
9//!
10//! # Lifecycle
11//!
12//! ```text
13//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
14//! ```
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
20//! use serde_json::json;
21//! use tokio::sync::watch;
22//!
23//! # #[tokio::main]
24//! # async fn main() {
25//! let config = SyncEngineConfig::default();
26//! let (_tx, rx) = watch::channel(config.clone());
27//! let mut engine = SyncEngine::new(config, rx);
28//!
29//! assert_eq!(engine.state(), EngineState::Created);
30//!
31//! // engine.start().await.expect("Start failed");
32//! // assert!(engine.is_ready());
33//! # }
34//! ```
35
36mod types;
37mod api;
38mod lifecycle;
39mod flush;
40
41pub use types::{EngineState, ItemStatus, BatchResult};
42#[allow(unused_imports)]
43use types::WriteTarget;
44
45use std::sync::Arc;
46use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
47use std::time::Instant;
48use dashmap::DashMap;
49use tokio::sync::{watch, Mutex};
50use tracing::{info, warn, debug, error};
51
52use crate::config::SyncEngineConfig;
53use crate::sync_item::SyncItem;
54use crate::submit_options::SubmitOptions;
55use crate::backpressure::BackpressureLevel;
56use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
57use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
58use crate::cuckoo::FilterPersistence;
59use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
60use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
61use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
62use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
63
64/// Main sync engine coordinator.
65///
66/// Manages the three-tier storage architecture:
67/// - **L1**: In-memory DashMap with pressure-based eviction
68/// - **L2**: Redis cache with batch writes
69/// - **L3**: MySQL/SQLite archive (ground truth)
70///
71/// # Thread Safety
72///
73/// The engine is `Send + Sync` and designed for concurrent access.
74/// Internal state uses atomic operations and concurrent data structures.
75pub struct SyncEngine {
76    /// Configuration (can be updated at runtime via watch channel)
77    pub(super) config: SyncEngineConfig,
78
79    /// Runtime config updates
80    #[allow(dead_code)]
81    pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
82
83    /// Engine state (broadcast to watchers)
84    pub(super) state: watch::Sender<EngineState>,
85
86    /// Engine state receiver (for internal use)
87    pub(super) state_rx: watch::Receiver<EngineState>,
88
89    /// L1: In-memory cache
90    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
91
92    /// L1 size tracking (bytes)
93    pub(super) l1_size_bytes: Arc<AtomicUsize>,
94
95    /// L2: Redis cache (optional)
96    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
97
98    /// L3: MySQL/SQLite archive (optional)
99    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
100
101    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
102    pub(super) l3_filter: Arc<FilterManager>,
103
104    /// Filter persistence (for fast startup)
105    pub(super) filter_persistence: Option<FilterPersistence>,
106
107    /// CF snapshot tracking
108    pub(super) cf_inserts_since_snapshot: AtomicU64,
109    pub(super) cf_last_snapshot: Mutex<Instant>,
110
111    /// Hybrid batcher for L2 writes
112    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
113
114    /// Redis merkle store
115    pub(super) redis_merkle: Option<RedisMerkleStore>,
116
117    /// SQL merkle store
118    pub(super) sql_merkle: Option<SqlMerkleStore>,
119
120    /// Write-ahead log for L3 durability
121    pub(super) l3_wal: Option<WriteAheadLog>,
122
123    /// MySQL health checker
124    pub(super) mysql_health: MysqlHealthChecker,
125
126    /// Eviction policy
127    pub(super) eviction_policy: TanCurvePolicy,
128}
129
130impl SyncEngine {
131    /// Create a new sync engine.
132    ///
133    /// The engine starts in `Created` state. Call [`start()`](Self::start)
134    /// to connect to backends and transition to `Ready`.
135    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
136        let (state_tx, state_rx) = watch::channel(EngineState::Created);
137
138        let batch_config = BatchConfig {
139            flush_ms: config.batch_flush_ms,
140            flush_count: config.batch_flush_count,
141            flush_bytes: config.batch_flush_bytes,
142        };
143
144        Self {
145            config: config.clone(),
146            config_rx,
147            state: state_tx,
148            state_rx,
149            l1_cache: Arc::new(DashMap::new()),
150            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
151            l2_store: None,
152            l3_store: None,
153            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
154            filter_persistence: None,
155            cf_inserts_since_snapshot: AtomicU64::new(0),
156            cf_last_snapshot: Mutex::new(Instant::now()),
157            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
158            redis_merkle: None,
159            sql_merkle: None,
160            l3_wal: None,
161            mysql_health: MysqlHealthChecker::new(),
162            eviction_policy: TanCurvePolicy::default(),
163        }
164    }
165
166    /// Get current engine state.
167    #[must_use]
168    pub fn state(&self) -> EngineState {
169        *self.state_rx.borrow()
170    }
171
172    /// Get a receiver to watch state changes.
173    #[must_use]
174    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
175        self.state_rx.clone()
176    }
177
178    /// Check if engine is ready to accept requests.
179    #[must_use]
180    pub fn is_ready(&self) -> bool {
181        matches!(self.state(), EngineState::Ready | EngineState::Running)
182    }
183
184    /// Get current memory pressure (0.0 - 1.0+).
185    #[must_use]
186    pub fn memory_pressure(&self) -> f64 {
187        let used = self.l1_size_bytes.load(Ordering::Acquire);
188        let max = self.config.l1_max_bytes;
189        if max == 0 {
190            0.0
191        } else {
192            used as f64 / max as f64
193        }
194    }
195
196    /// Get current backpressure level.
197    #[must_use]
198    pub fn pressure(&self) -> BackpressureLevel {
199        BackpressureLevel::from_pressure(self.memory_pressure())
200    }
201
202    /// Check if the engine should accept writes (based on pressure).
203    #[must_use]
204    pub fn should_accept_writes(&self) -> bool {
205        let pressure = self.pressure();
206        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
207    }
208
209    // --- Core CRUD Operations ---
210
211    /// Get an item by ID.
212    ///
213    /// Checks storage tiers in order: L1 → L2 → L3.
214    /// Updates access count and promotes to L1 on hit.
215    #[tracing::instrument(skip(self), fields(tier))]
216    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
217        let start = std::time::Instant::now();
218        
219        // 1. Check L1 (in-memory)
220        if let Some(mut item) = self.l1_cache.get_mut(id) {
221            item.access_count = item.access_count.saturating_add(1);
222            item.last_accessed = std::time::SystemTime::now()
223                .duration_since(std::time::UNIX_EPOCH)
224                .unwrap_or_default()
225                .as_millis() as u64;
226            tracing::Span::current().record("tier", "L1");
227            debug!("L1 hit");
228            crate::metrics::record_operation("L1", "get", "hit");
229            crate::metrics::record_latency("L1", "get", start.elapsed());
230            return Ok(Some(item.clone()));
231        }
232
233        // 2. Try L2 (Redis) - no filter, just try it
234        if let Some(ref l2) = self.l2_store {
235            match l2.get(id).await {
236                Ok(Some(item)) => {
237                    // Promote to L1
238                    self.insert_l1(item.clone());
239                    tracing::Span::current().record("tier", "L2");
240                    debug!("L2 hit, promoted to L1");
241                    crate::metrics::record_operation("L2", "get", "hit");
242                    crate::metrics::record_latency("L2", "get", start.elapsed());
243                    return Ok(Some(item));
244                }
245                Ok(None) => {
246                    // Not in Redis
247                    debug!("L2 miss");
248                    crate::metrics::record_operation("L2", "get", "miss");
249                }
250                Err(e) => {
251                    warn!(error = %e, "L2 lookup failed");
252                    crate::metrics::record_operation("L2", "get", "error");
253                }
254            }
255        }
256
257        // 3. Check L3 filter before hitting MySQL
258        if self.l3_filter.should_check_l3(id) {
259            crate::metrics::record_cuckoo_check("L3", "positive");
260            if let Some(ref l3) = self.l3_store {
261                match l3.get(id).await {
262                    Ok(Some(item)) => {
263                        // Promote to L1
264                        if self.memory_pressure() < 1.0 {
265                            self.insert_l1(item.clone());
266                        }
267                        tracing::Span::current().record("tier", "L3");
268                        debug!("L3 hit, promoted to L1");
269                        crate::metrics::record_operation("L3", "get", "hit");
270                        crate::metrics::record_latency("L3", "get", start.elapsed());
271                        crate::metrics::record_bytes_read("L3", item.content.len());
272                        return Ok(Some(item));
273                    }
274                    Ok(None) => {
275                        // False positive in filter
276                        debug!("L3 filter false positive");
277                        crate::metrics::record_operation("L3", "get", "false_positive");
278                        crate::metrics::record_cuckoo_false_positive("L3");
279                    }
280                    Err(e) => {
281                        warn!(error = %e, "L3 lookup failed");
282                        crate::metrics::record_operation("L3", "get", "error");
283                        crate::metrics::record_error("L3", "get", "backend");
284                    }
285                }
286            }
287        } else {
288            // Cuckoo filter says definitely not in L3 - saved a database query
289            crate::metrics::record_cuckoo_check("L3", "negative");
290        }
291
292        tracing::Span::current().record("tier", "miss");
293        debug!("Cache miss");
294        crate::metrics::record_operation("all", "get", "miss");
295        crate::metrics::record_latency("all", "get", start.elapsed());
296        Ok(None)
297    }
298
299    /// Get an item with hash verification.
300    ///
301    /// If the item has a non-empty `merkle_root`, the content hash is verified.
302    /// Returns `StorageError::Corruption` if the hash doesn't match.
303    #[tracing::instrument(skip(self), fields(verified))]
304    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
305        let item = match self.get(id).await? {
306            Some(item) => item,
307            None => return Ok(None),
308        };
309
310        // Verify hash if item has merkle_root set
311        if !item.merkle_root.is_empty() {
312            use sha2::{Sha256, Digest};
313            
314            let computed = Sha256::digest(&item.content);
315            let computed_hex = hex::encode(computed);
316            
317            if computed_hex != item.merkle_root {
318                tracing::Span::current().record("verified", false);
319                warn!(
320                    id = %id,
321                    expected = %item.merkle_root,
322                    actual = %computed_hex,
323                    "Data corruption detected!"
324                );
325                
326                // Record corruption metric
327                crate::metrics::record_corruption(id);
328                
329                return Err(StorageError::Corruption {
330                    id: id.to_string(),
331                    expected: item.merkle_root.clone(),
332                    actual: computed_hex,
333                });
334            }
335            
336            tracing::Span::current().record("verified", true);
337            debug!(id = %id, "Hash verification passed");
338        }
339
340        Ok(Some(item))
341    }
342
343    /// Submit an item for sync.
344    ///
345    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
346    /// Uses default options: Redis + SQL (both enabled).
347    /// Filters are updated only on successful writes in flush_batch_internal().
348    ///
349    /// For custom routing, use [`submit_with`](Self::submit_with).
350    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
351    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
352        self.submit_with(item, SubmitOptions::default()).await
353    }
354
355    /// Submit an item with custom routing options.
356    ///
357    /// The item is immediately stored in L1 and queued for batch write.
358    /// Items are batched by compatible options for efficient pipelined writes.
359    ///
360    /// # Example
361    ///
362    /// ```rust,no_run
363    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
364    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
365    /// // Cache-only with 1 minute TTL (no SQL write)
366    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
367    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
368    ///
369    /// // SQL-only durable storage (no Redis)
370    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
371    /// engine.submit_with(item, SubmitOptions::durable()).await?;
372    /// # Ok(())
373    /// # }
374    /// ```
375    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
376    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
377        let start = std::time::Instant::now();
378        
379        if !self.should_accept_writes() {
380            crate::metrics::record_operation("engine", "submit", "rejected");
381            crate::metrics::record_error("engine", "submit", "backpressure");
382            return Err(StorageError::Backend(format!(
383                "Rejecting write: engine state={}, pressure={}",
384                self.state(),
385                self.pressure()
386            )));
387        }
388
389        let id = item.object_id.clone();
390        let item_bytes = item.content.len();
391
392        // Attach options to item (travels through batch pipeline)
393        item.submit_options = Some(options);
394
395        // Insert into L1 (immediate, in-memory)
396        self.insert_l1(item.clone());
397        crate::metrics::record_operation("L1", "submit", "success");
398        crate::metrics::record_bytes_written("L1", item_bytes);
399        
400        // NOTE: We do NOT insert into L2/L3 filters here!
401        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
402        // This prevents filter/storage divergence if writes fail.
403        
404        // Queue for batched L2/L3 persistence
405        self.l2_batcher.lock().await.add(item);
406
407        debug!(id = %id, "Item submitted to L1 and batch queue");
408        crate::metrics::record_latency("L1", "submit", start.elapsed());
409        Ok(())
410    }
411
412    /// Delete an item from all storage tiers.
413    /// 
414    /// Deletes are more complex than writes because the item may exist in:
415    /// - L1 (DashMap) - immediate removal
416    /// - L2 (Redis) - async removal
417    /// - L3 (MySQL) - async removal  
418    /// - Cuckoo filters (L2/L3) - remove from both
419    /// - Merkle trees - update with deletion marker
420    #[tracing::instrument(skip(self), fields(object_id = %id))]
421    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
422        let start = std::time::Instant::now();
423        
424        if !self.should_accept_writes() {
425            crate::metrics::record_operation("engine", "delete", "rejected");
426            crate::metrics::record_error("engine", "delete", "backpressure");
427            return Err(StorageError::Backend(format!(
428                "Rejecting delete: engine state={}, pressure={}",
429                self.state(),
430                self.pressure()
431            )));
432        }
433
434        let mut found = false;
435
436        // 1. Remove from L1 (immediate)
437        if let Some((_, item)) = self.l1_cache.remove(id) {
438            let size = Self::item_size(&item);
439            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
440            found = true;
441            debug!("Deleted from L1");
442            crate::metrics::record_operation("L1", "delete", "success");
443        }
444
445        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
446        self.l3_filter.remove(id);
447
448        // 3. Delete from L2 (Redis) - best effort
449        if let Some(ref l2) = self.l2_store {
450            let l2_start = std::time::Instant::now();
451            match l2.delete(id).await {
452                Ok(()) => {
453                    found = true;
454                    debug!("Deleted from L2 (Redis)");
455                    crate::metrics::record_operation("L2", "delete", "success");
456                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
457                }
458                Err(e) => {
459                    warn!(error = %e, "Failed to delete from L2 (Redis)");
460                    crate::metrics::record_operation("L2", "delete", "error");
461                    crate::metrics::record_error("L2", "delete", "backend");
462                }
463            }
464        }
465
466        // 4. Delete from L3 (MySQL) - ground truth
467        if let Some(ref l3) = self.l3_store {
468            let l3_start = std::time::Instant::now();
469            match l3.delete(id).await {
470                Ok(()) => {
471                    found = true;
472                    debug!("Deleted from L3 (MySQL)");
473                    crate::metrics::record_operation("L3", "delete", "success");
474                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
475                }
476                Err(e) => {
477                    error!(error = %e, "Failed to delete from L3 (MySQL)");
478                    crate::metrics::record_operation("L3", "delete", "error");
479                    crate::metrics::record_error("L3", "delete", "backend");
480                    // Don't return error - item may not exist in L3
481                }
482            }
483        }
484
485        // 5. Update merkle trees with deletion marker
486        let mut merkle_batch = MerkleBatch::new();
487        merkle_batch.delete(id.to_string());
488
489        if let Some(ref sql_merkle) = self.sql_merkle {
490            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
491                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
492                crate::metrics::record_error("L3", "merkle", "batch_apply");
493            }
494        }
495
496        if let Some(ref redis_merkle) = self.redis_merkle {
497            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
498                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
499                crate::metrics::record_error("L2", "merkle", "batch_apply");
500            }
501        }
502
503        info!(found, "Delete operation completed");
504        crate::metrics::record_latency("all", "delete", start.elapsed());
505        Ok(found)
506    }
507
508    // --- Internal helpers ---
509
510    /// Insert or update an item in L1, correctly tracking size.
511    fn insert_l1(&self, item: SyncItem) {
512        let new_size = Self::item_size(&item);
513        let key = item.object_id.clone();
514        
515        // Use entry API to handle insert vs update atomically
516        if let Some(old_item) = self.l1_cache.insert(key, item) {
517            // Update: subtract old size, add new size
518            let old_size = Self::item_size(&old_item);
519            // Use wrapping operations to avoid underflow if sizes are estimated
520            let current = self.l1_size_bytes.load(Ordering::Acquire);
521            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
522            self.l1_size_bytes.store(new_total, Ordering::Release);
523        } else {
524            // Insert: just add new size
525            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
526        }
527    }
528
529    /// Calculate approximate size of an item in bytes.
530    #[inline]
531    fn item_size(item: &SyncItem) -> usize {
532        // Use cached size if available, otherwise compute
533        item.size_bytes()
534    }
535
536    fn maybe_evict(&self) {
537        let pressure = self.memory_pressure();
538        if pressure < self.config.backpressure_warn {
539            return;
540        }
541
542        let level = BackpressureLevel::from_pressure(pressure);
543        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
544        
545        // Collect cache entries for scoring
546        let now = std::time::Instant::now();
547        let entries: Vec<CacheEntry> = self.l1_cache.iter()
548            .map(|ref_multi| {
549                let item = ref_multi.value();
550                let id = ref_multi.key().clone();
551                
552                // Convert epoch millis to Instant-relative age
553                let now_millis = std::time::SystemTime::now()
554                    .duration_since(std::time::UNIX_EPOCH)
555                    .unwrap_or_default()
556                    .as_millis() as u64;
557                let age_secs = if item.last_accessed > 0 {
558                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
559                } else {
560                    3600.0 // Default 1 hour if never accessed
561                };
562                
563                CacheEntry {
564                    id,
565                    size_bytes: item.size_bytes(),
566                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
567                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
568                    access_count: item.access_count,
569                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
570                }
571            })
572            .collect();
573        
574        if entries.is_empty() {
575            return;
576        }
577        
578        // Calculate how many to evict based on pressure level
579        let evict_count = match level {
580            BackpressureLevel::Normal => 0,
581            BackpressureLevel::Warn => entries.len() / 20,    // 5%
582            BackpressureLevel::Throttle => entries.len() / 10, // 10%
583            BackpressureLevel::Critical => entries.len() / 5,  // 20%
584            BackpressureLevel::Emergency => entries.len() / 3, // 33%
585            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
586        }.max(1);
587        
588        // Select victims using tan curve algorithm
589        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
590        
591        // Evict victims
592        let mut evicted_bytes = 0usize;
593        for victim_id in &victims {
594            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
595                evicted_bytes += item.size_bytes();
596            }
597        }
598        
599        // Update size tracking
600        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
601        
602        info!(
603            evicted = victims.len(),
604            evicted_bytes = evicted_bytes,
605            pressure = %pressure,
606            level = %level,
607            "Evicted entries from L1 cache"
608        );
609    }
610
611    /// Get L1 cache stats
612    pub fn l1_stats(&self) -> (usize, usize) {
613        (
614            self.l1_cache.len(),
615            self.l1_size_bytes.load(Ordering::Acquire),
616        )
617    }
618
619    /// Get L3 filter stats (entries, capacity, trust_state)
620    #[must_use]
621    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
622        self.l3_filter.stats()
623    }
624
625    /// Get access to the L3 filter (for warmup/verification)
626    pub fn l3_filter(&self) -> &Arc<FilterManager> {
627        &self.l3_filter
628    }
629
630    /// Get merkle root hashes from Redis (L2) and SQL (L3).
631    /// 
632    /// Returns `(redis_root, sql_root)` as hex strings.
633    /// Returns `None` for backends that aren't connected or have empty trees.
634    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
635        let redis_root = if let Some(ref rm) = self.redis_merkle {
636            rm.root_hash().await.ok().flatten().map(hex::encode)
637        } else {
638            None
639        };
640        
641        let sql_root = if let Some(ref sm) = self.sql_merkle {
642            sm.root_hash().await.ok().flatten().map(hex::encode)
643        } else {
644            None
645        };
646        
647        (redis_root, sql_root)
648    }
649
650    /// Verify and trust the L3 cuckoo filter.
651    /// 
652    /// Compares the filter's merkle root against L3's merkle root.
653    /// If they match, marks the filter as trusted.
654    /// 
655    /// Returns `true` if the filter is now trusted, `false` otherwise.
656    pub async fn verify_filter(&self) -> bool {
657        // Get SQL merkle root
658        let sql_root = if let Some(ref sm) = self.sql_merkle {
659            match sm.root_hash().await {
660                Ok(Some(root)) => root,
661                _ => return false,
662            }
663        } else {
664            // No SQL backend - can't verify, mark trusted anyway
665            self.l3_filter.mark_trusted();
666            return true;
667        };
668
669        // For now, we trust the filter if we have a SQL root
670        // A full implementation would compare CF merkle against SQL merkle
671        // But since CF doesn't maintain a merkle tree, we trust after warmup
672        info!(
673            sql_root = %hex::encode(sql_root),
674            "Verifying L3 filter against SQL merkle root"
675        );
676        
677        // Mark trusted if we got here (SQL is connected and has a root)
678        self.l3_filter.mark_trusted();
679        true
680    }
681
682    /// Update all gauge metrics with current engine state.
683    /// 
684    /// Call this before snapshotting metrics to ensure gauges reflect current state.
685    /// Useful for OTEL export or monitoring dashboards.
686    pub fn update_gauge_metrics(&self) {
687        let (l1_count, l1_bytes) = self.l1_stats();
688        crate::metrics::set_l1_cache_items(l1_count);
689        crate::metrics::set_l1_cache_bytes(l1_bytes);
690        crate::metrics::set_memory_pressure(self.memory_pressure());
691        
692        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
693        let filter_load = if filter_capacity > 0 { 
694            filter_entries as f64 / filter_capacity as f64 
695        } else { 
696            0.0 
697        };
698        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
699        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
700        
701        crate::metrics::set_backpressure_level(self.pressure() as u8);
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708    use crate::config::SyncEngineConfig;
709    use tokio::sync::watch;
710    use serde_json::json;
711
712    fn create_test_engine() -> SyncEngine {
713        let config = SyncEngineConfig::default();
714        let (_tx, rx) = watch::channel(config.clone());
715        SyncEngine::new(config, rx)
716    }
717
718    fn create_test_item(id: &str) -> SyncItem {
719        SyncItem::from_json(
720            id.to_string(),
721            json!({"data": "test"}),
722        )
723    }
724
725    #[test]
726    fn test_engine_created_state() {
727        let engine = create_test_engine();
728        assert_eq!(engine.state(), EngineState::Created);
729        assert!(!engine.is_ready());
730    }
731
732    #[test]
733    fn test_memory_pressure_calculation() {
734        let config = SyncEngineConfig {
735            l1_max_bytes: 1000,
736            ..Default::default()
737        };
738        let (_tx, rx) = watch::channel(config.clone());
739        let engine = SyncEngine::new(config, rx);
740
741        assert_eq!(engine.memory_pressure(), 0.0);
742
743        // Simulate adding items
744        let item = create_test_item("test1");
745        engine.insert_l1(item);
746
747        // Pressure should be > 0 now
748        assert!(engine.memory_pressure() > 0.0);
749    }
750
751    #[test]
752    fn test_l1_insert_and_size_tracking() {
753        let engine = create_test_engine();
754        
755        let item = create_test_item("test1");
756        let expected_size = item.size_bytes();
757        
758        engine.insert_l1(item);
759        
760        let (count, size) = engine.l1_stats();
761        assert_eq!(count, 1);
762        assert_eq!(size, expected_size);
763    }
764
765    #[test]
766    fn test_l1_update_size_tracking() {
767        let engine = create_test_engine();
768        
769        let item1 = create_test_item("test1");
770        engine.insert_l1(item1);
771        let (_, _size1) = engine.l1_stats();
772        
773        // Insert larger item with same ID
774        let item2 = SyncItem::from_json(
775            "test1".to_string(),
776            json!({"data": "much larger content here for testing size changes"}),
777        );
778        let size2_expected = item2.size_bytes();
779        engine.insert_l1(item2);
780        
781        let (count, size2) = engine.l1_stats();
782        assert_eq!(count, 1); // Still one item
783        assert_eq!(size2, size2_expected); // Size should be updated
784    }
785
786    #[tokio::test]
787    async fn test_get_nonexistent() {
788        let engine = create_test_engine();
789        let result = engine.get("nonexistent").await.unwrap();
790        assert!(result.is_none());
791    }
792
793    #[tokio::test]
794    async fn test_get_from_l1() {
795        let engine = create_test_engine();
796        let item = create_test_item("test1");
797        engine.insert_l1(item.clone());
798
799        let result = engine.get("test1").await.unwrap();
800        assert!(result.is_some());
801        assert_eq!(result.unwrap().object_id, "test1");
802    }
803
804    #[tokio::test]
805    async fn test_delete_from_l1() {
806        let engine = create_test_engine();
807        let item = create_test_item("test1");
808        engine.insert_l1(item);
809
810        let (count_before, _) = engine.l1_stats();
811        assert_eq!(count_before, 1);
812
813        let deleted = engine.delete("test1").await.unwrap();
814        assert!(deleted);
815
816        let (count_after, size_after) = engine.l1_stats();
817        assert_eq!(count_after, 0);
818        assert_eq!(size_after, 0);
819    }
820
821    #[test]
822    fn test_filter_stats() {
823        let engine = create_test_engine();
824        
825        let (entries, capacity, _trust) = engine.l3_filter_stats();
826        assert_eq!(entries, 0);
827        assert!(capacity > 0);
828    }
829
830    #[test]
831    fn test_should_accept_writes() {
832        let engine = create_test_engine();
833        assert!(engine.should_accept_writes());
834    }
835
836    #[test]
837    fn test_pressure_level() {
838        let engine = create_test_engine();
839        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
840    }
841}