sync_engine/coordinator/
mod.rs

1//! Sync engine coordinator.
2//!
3//! The [`SyncEngine`] is the main orchestrator that ties together all components:
4//! - L1 in-memory cache with eviction
5//! - L2 Redis cache with batch writes
6//! - L3 MySQL/SQLite archive with WAL durability
7//! - Cuckoo filters for existence checks
8//! - Merkle trees for sync verification
9//!
10//! # Lifecycle
11//!
12//! ```text
13//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
14//! ```
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
20//! use serde_json::json;
21//! use tokio::sync::watch;
22//!
23//! # #[tokio::main]
24//! # async fn main() {
25//! let config = SyncEngineConfig::default();
26//! let (_tx, rx) = watch::channel(config.clone());
27//! let mut engine = SyncEngine::new(config, rx);
28//!
29//! assert_eq!(engine.state(), EngineState::Created);
30//!
31//! // engine.start().await.expect("Start failed");
32//! // assert!(engine.is_ready());
33//! # }
34//! ```
35
36mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41mod search_api;
42
43pub use types::{EngineState, ItemStatus, BatchResult};
44pub use merkle_api::MerkleDiff;
45pub use search_api::{SearchTier, SearchResult, SearchSource};
46#[allow(unused_imports)]
47use types::WriteTarget;
48
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
51use std::time::Instant;
52use dashmap::DashMap;
53use parking_lot::RwLock;
54use tokio::sync::{watch, Mutex};
55use tracing::{info, warn, debug, error};
56
57use crate::config::SyncEngineConfig;
58use crate::sync_item::SyncItem;
59use crate::submit_options::SubmitOptions;
60use crate::backpressure::BackpressureLevel;
61use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
62use crate::storage::sql::SqlStore;
63use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
64use crate::cuckoo::FilterPersistence;
65use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
66use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
67use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
68use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
69
70use search_api::SearchState;
71
72/// Main sync engine coordinator.
73///
74/// Manages the three-tier storage architecture:
75/// - **L1**: In-memory DashMap with pressure-based eviction
76/// - **L2**: Redis cache with batch writes
77/// - **L3**: MySQL/SQLite archive (ground truth)
78///
79/// # Thread Safety
80///
81/// The engine is `Send + Sync` and designed for concurrent access.
82/// Internal state uses atomic operations and concurrent data structures.
83pub struct SyncEngine {
84    /// Configuration (can be updated at runtime via watch channel)
85    /// Uses RwLock for interior mutability so run() can take &self
86    pub(super) config: RwLock<SyncEngineConfig>,
87
88    /// Runtime config updates (Mutex for interior mutability in run loop)
89    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
90
91    /// Engine state (broadcast to watchers)
92    pub(super) state: watch::Sender<EngineState>,
93
94    /// Engine state receiver (for internal use)
95    pub(super) state_rx: watch::Receiver<EngineState>,
96
97    /// L1: In-memory cache
98    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
99
100    /// L1 size tracking (bytes)
101    pub(super) l1_size_bytes: Arc<AtomicUsize>,
102
103    /// L2: Redis cache (optional)
104    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
105    
106    /// L2: Direct RedisStore reference for CDC operations
107    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
108
109    /// L3: MySQL/SQLite archive (optional)
110    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
111    
112    /// L3: Direct SqlStore reference for dirty merkle operations
113    pub(super) sql_store: Option<Arc<SqlStore>>,
114
115    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
116    pub(super) l3_filter: Arc<FilterManager>,
117
118    /// Filter persistence (for fast startup)
119    pub(super) filter_persistence: Option<FilterPersistence>,
120
121    /// CF snapshot tracking
122    pub(super) cf_inserts_since_snapshot: AtomicU64,
123    pub(super) cf_last_snapshot: Mutex<Instant>,
124
125    /// Hybrid batcher for L2 writes
126    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
127
128    /// Redis merkle store
129    pub(super) redis_merkle: Option<RedisMerkleStore>,
130
131    /// SQL merkle store
132    pub(super) sql_merkle: Option<SqlMerkleStore>,
133
134    /// Write-ahead log for L3 durability
135    pub(super) l3_wal: Option<WriteAheadLog>,
136
137    /// MySQL health checker
138    pub(super) mysql_health: MysqlHealthChecker,
139
140    /// Eviction policy
141    pub(super) eviction_policy: TanCurvePolicy,
142
143    /// Search state (index manager + cache)
144    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
145}
146
147impl SyncEngine {
148    /// Create a new sync engine.
149    ///
150    /// The engine starts in `Created` state. Call [`start()`](Self::start)
151    /// to connect to backends and transition to `Ready`.
152    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
153        let (state_tx, state_rx) = watch::channel(EngineState::Created);
154
155        let batch_config = BatchConfig {
156            flush_ms: config.batch_flush_ms,
157            flush_count: config.batch_flush_count,
158            flush_bytes: config.batch_flush_bytes,
159        };
160
161        Self {
162            config: RwLock::new(config.clone()),
163            config_rx: Mutex::new(config_rx),
164            state: state_tx,
165            state_rx,
166            l1_cache: Arc::new(DashMap::new()),
167            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
168            l2_store: None,
169            redis_store: None,
170            l3_store: None,
171            sql_store: None,
172            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
173            filter_persistence: None,
174            cf_inserts_since_snapshot: AtomicU64::new(0),
175            cf_last_snapshot: Mutex::new(Instant::now()),
176            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
177            redis_merkle: None,
178            sql_merkle: None,
179            l3_wal: None,
180            mysql_health: MysqlHealthChecker::new(),
181            eviction_policy: TanCurvePolicy::default(),
182            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
183        }
184    }
185
186    /// Get current engine state.
187    #[must_use]
188    pub fn state(&self) -> EngineState {
189        *self.state_rx.borrow()
190    }
191
192    /// Get a receiver to watch state changes.
193    #[must_use]
194    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
195        self.state_rx.clone()
196    }
197
198    /// Check if engine is ready to accept requests.
199    #[must_use]
200    pub fn is_ready(&self) -> bool {
201        matches!(self.state(), EngineState::Ready | EngineState::Running)
202    }
203
204    /// Get current memory pressure (0.0 - 1.0+).
205    #[must_use]
206    pub fn memory_pressure(&self) -> f64 {
207        let used = self.l1_size_bytes.load(Ordering::Acquire);
208        let max = self.config.read().l1_max_bytes;
209        if max == 0 {
210            0.0
211        } else {
212            used as f64 / max as f64
213        }
214    }
215
216    /// Get current backpressure level.
217    #[must_use]
218    pub fn pressure(&self) -> BackpressureLevel {
219        BackpressureLevel::from_pressure(self.memory_pressure())
220    }
221
222    /// Check if the engine should accept writes (based on pressure).
223    #[must_use]
224    pub fn should_accept_writes(&self) -> bool {
225        let pressure = self.pressure();
226        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
227    }
228
229    // --- Core CRUD Operations ---
230
231    /// Get an item by ID.
232    ///
233    /// Checks storage tiers in order: L1 → L2 → L3.
234    /// Updates access count and promotes to L1 on hit.
235    #[tracing::instrument(skip(self), fields(tier))]
236    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
237        let start = std::time::Instant::now();
238        
239        // 1. Check L1 (in-memory)
240        if let Some(mut item) = self.l1_cache.get_mut(id) {
241            item.access_count = item.access_count.saturating_add(1);
242            item.last_accessed = std::time::SystemTime::now()
243                .duration_since(std::time::UNIX_EPOCH)
244                .unwrap_or_default()
245                .as_millis() as u64;
246            tracing::Span::current().record("tier", "L1");
247            debug!("L1 hit");
248            crate::metrics::record_operation("L1", "get", "hit");
249            crate::metrics::record_latency("L1", "get", start.elapsed());
250            return Ok(Some(item.clone()));
251        }
252
253        // 2. Try L2 (Redis) - no filter, just try it
254        if let Some(ref l2) = self.l2_store {
255            match l2.get(id).await {
256                Ok(Some(item)) => {
257                    // Promote to L1
258                    self.insert_l1(item.clone());
259                    tracing::Span::current().record("tier", "L2");
260                    debug!("L2 hit, promoted to L1");
261                    crate::metrics::record_operation("L2", "get", "hit");
262                    crate::metrics::record_latency("L2", "get", start.elapsed());
263                    return Ok(Some(item));
264                }
265                Ok(None) => {
266                    // Not in Redis
267                    debug!("L2 miss");
268                    crate::metrics::record_operation("L2", "get", "miss");
269                }
270                Err(e) => {
271                    warn!(error = %e, "L2 lookup failed");
272                    crate::metrics::record_operation("L2", "get", "error");
273                }
274            }
275        }
276
277        // 3. Check L3 filter before hitting MySQL
278        if self.l3_filter.should_check_l3(id) {
279            crate::metrics::record_cuckoo_check("L3", "positive");
280            if let Some(ref l3) = self.l3_store {
281                match l3.get(id).await {
282                    Ok(Some(item)) => {
283                        // Promote to L1
284                        if self.memory_pressure() < 1.0 {
285                            self.insert_l1(item.clone());
286                        }
287                        tracing::Span::current().record("tier", "L3");
288                        debug!("L3 hit, promoted to L1");
289                        crate::metrics::record_operation("L3", "get", "hit");
290                        crate::metrics::record_latency("L3", "get", start.elapsed());
291                        crate::metrics::record_bytes_read("L3", item.content.len());
292                        return Ok(Some(item));
293                    }
294                    Ok(None) => {
295                        // False positive in filter
296                        debug!("L3 filter false positive");
297                        crate::metrics::record_operation("L3", "get", "false_positive");
298                        crate::metrics::record_cuckoo_false_positive("L3");
299                    }
300                    Err(e) => {
301                        warn!(error = %e, "L3 lookup failed");
302                        crate::metrics::record_operation("L3", "get", "error");
303                        crate::metrics::record_error("L3", "get", "backend");
304                    }
305                }
306            }
307        } else {
308            // Cuckoo filter says definitely not in L3 - saved a database query
309            crate::metrics::record_cuckoo_check("L3", "negative");
310        }
311
312        tracing::Span::current().record("tier", "miss");
313        debug!("Cache miss");
314        crate::metrics::record_operation("all", "get", "miss");
315        crate::metrics::record_latency("all", "get", start.elapsed());
316        Ok(None)
317    }
318
319    /// Get an item with hash verification.
320    ///
321    /// If the item has a non-empty `content_hash`, the content hash is verified.
322    /// Returns `StorageError::Corruption` if the hash doesn't match.
323    #[tracing::instrument(skip(self), fields(verified))]
324    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
325        let item = match self.get(id).await? {
326            Some(item) => item,
327            None => return Ok(None),
328        };
329
330        // Verify hash if item has content_hash set
331        if !item.content_hash.is_empty() {
332            use sha2::{Sha256, Digest};
333            
334            let computed = Sha256::digest(&item.content);
335            let computed_hex = hex::encode(computed);
336            
337            if computed_hex != item.content_hash {
338                tracing::Span::current().record("verified", false);
339                warn!(
340                    id = %id,
341                    expected = %item.content_hash,
342                    actual = %computed_hex,
343                    "Data corruption detected!"
344                );
345                
346                // Record corruption metric
347                crate::metrics::record_corruption(id);
348                
349                return Err(StorageError::Corruption {
350                    id: id.to_string(),
351                    expected: item.content_hash.clone(),
352                    actual: computed_hex,
353                });
354            }
355            
356            tracing::Span::current().record("verified", true);
357            debug!(id = %id, "Hash verification passed");
358        }
359
360        Ok(Some(item))
361    }
362
363    /// Submit an item for sync.
364    ///
365    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
366    /// Uses default options: Redis + SQL (both enabled).
367    /// Filters are updated only on successful writes in flush_batch_internal().
368    ///
369    /// For custom routing, use [`submit_with`](Self::submit_with).
370    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
371    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
372        self.submit_with(item, SubmitOptions::default()).await
373    }
374
375    /// Submit an item with custom routing options.
376    ///
377    /// The item is immediately stored in L1 and queued for batch write.
378    /// Items are batched by compatible options for efficient pipelined writes.
379    ///
380    /// # Example
381    ///
382    /// ```rust,no_run
383    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
384    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
385    /// // Cache-only with 1 minute TTL (no SQL write)
386    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
387    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
388    ///
389    /// // SQL-only durable storage (no Redis)
390    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
391    /// engine.submit_with(item, SubmitOptions::durable()).await?;
392    /// # Ok(())
393    /// # }
394    /// ```
395    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
396    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
397        let start = std::time::Instant::now();
398        
399        if !self.should_accept_writes() {
400            crate::metrics::record_operation("engine", "submit", "rejected");
401            crate::metrics::record_error("engine", "submit", "backpressure");
402            return Err(StorageError::Backend(format!(
403                "Rejecting write: engine state={}, pressure={}",
404                self.state(),
405                self.pressure()
406            )));
407        }
408
409        let id = item.object_id.clone();
410        let item_bytes = item.content.len();
411
412        // Apply state override from options (if provided)
413        if let Some(ref state) = options.state {
414            item.state = state.clone();
415        }
416        
417        // Attach options to item (travels through batch pipeline)
418        item.submit_options = Some(options);
419
420        // Insert into L1 (immediate, in-memory)
421        self.insert_l1(item.clone());
422        crate::metrics::record_operation("L1", "submit", "success");
423        crate::metrics::record_bytes_written("L1", item_bytes);
424        
425        // NOTE: We do NOT insert into L2/L3 filters here!
426        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
427        // This prevents filter/storage divergence if writes fail.
428        
429        // Queue for batched L2/L3 persistence
430        self.l2_batcher.lock().await.add(item);
431
432        debug!(id = %id, "Item submitted to L1 and batch queue");
433        crate::metrics::record_latency("L1", "submit", start.elapsed());
434        Ok(())
435    }
436
437    /// Delete an item from all storage tiers.
438    /// 
439    /// Deletes are more complex than writes because the item may exist in:
440    /// - L1 (DashMap) - immediate removal
441    /// - L2 (Redis) - async removal
442    /// - L3 (MySQL) - async removal  
443    /// - Cuckoo filters (L2/L3) - remove from both
444    /// - Merkle trees - update with deletion marker
445    #[tracing::instrument(skip(self), fields(object_id = %id))]
446    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
447        let start = std::time::Instant::now();
448        
449        if !self.should_accept_writes() {
450            crate::metrics::record_operation("engine", "delete", "rejected");
451            crate::metrics::record_error("engine", "delete", "backpressure");
452            return Err(StorageError::Backend(format!(
453                "Rejecting delete: engine state={}, pressure={}",
454                self.state(),
455                self.pressure()
456            )));
457        }
458
459        let mut found = false;
460
461        // 1. Remove from L1 (immediate)
462        if let Some((_, item)) = self.l1_cache.remove(id) {
463            let size = Self::item_size(&item);
464            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
465            found = true;
466            debug!("Deleted from L1");
467            crate::metrics::record_operation("L1", "delete", "success");
468        }
469
470        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
471        self.l3_filter.remove(id);
472
473        // 3. Delete from L2 (Redis) - best effort
474        if let Some(ref l2) = self.l2_store {
475            let l2_start = std::time::Instant::now();
476            match l2.delete(id).await {
477                Ok(()) => {
478                    found = true;
479                    debug!("Deleted from L2 (Redis)");
480                    crate::metrics::record_operation("L2", "delete", "success");
481                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
482                }
483                Err(e) => {
484                    warn!(error = %e, "Failed to delete from L2 (Redis)");
485                    crate::metrics::record_operation("L2", "delete", "error");
486                    crate::metrics::record_error("L2", "delete", "backend");
487                }
488            }
489        }
490
491        // 4. Delete from L3 (MySQL) - ground truth
492        if let Some(ref l3) = self.l3_store {
493            let l3_start = std::time::Instant::now();
494            match l3.delete(id).await {
495                Ok(()) => {
496                    found = true;
497                    debug!("Deleted from L3 (MySQL)");
498                    crate::metrics::record_operation("L3", "delete", "success");
499                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
500                }
501                Err(e) => {
502                    error!(error = %e, "Failed to delete from L3 (MySQL)");
503                    crate::metrics::record_operation("L3", "delete", "error");
504                    crate::metrics::record_error("L3", "delete", "backend");
505                    // Don't return error - item may not exist in L3
506                }
507            }
508        }
509
510        // 5. Update merkle trees with deletion marker
511        let mut merkle_batch = MerkleBatch::new();
512        merkle_batch.delete(id.to_string());
513
514        if let Some(ref sql_merkle) = self.sql_merkle {
515            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
516                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
517                crate::metrics::record_error("L3", "merkle", "batch_apply");
518            }
519        }
520
521        if let Some(ref redis_merkle) = self.redis_merkle {
522            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
523                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
524                crate::metrics::record_error("L2", "merkle", "batch_apply");
525            }
526        }
527
528        // 6. Emit CDC delete entry (if enabled)
529        if self.config.read().enable_cdc_stream && found {
530            self.emit_cdc_delete(id).await;
531        }
532
533        info!(found, "Delete operation completed");
534        crate::metrics::record_latency("all", "delete", start.elapsed());
535        Ok(found)
536    }
537
538    // --- Internal helpers ---
539
540    /// Insert or update an item in L1, correctly tracking size.
541    fn insert_l1(&self, item: SyncItem) {
542        let new_size = Self::item_size(&item);
543        let key = item.object_id.clone();
544        
545        // Use entry API to handle insert vs update atomically
546        if let Some(old_item) = self.l1_cache.insert(key, item) {
547            // Update: subtract old size, add new size
548            let old_size = Self::item_size(&old_item);
549            // Use wrapping operations to avoid underflow if sizes are estimated
550            let current = self.l1_size_bytes.load(Ordering::Acquire);
551            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
552            self.l1_size_bytes.store(new_total, Ordering::Release);
553        } else {
554            // Insert: just add new size
555            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
556        }
557    }
558
559    /// Calculate approximate size of an item in bytes.
560    #[inline]
561    fn item_size(item: &SyncItem) -> usize {
562        // Use cached size if available, otherwise compute
563        item.size_bytes()
564    }
565
566    fn maybe_evict(&self) {
567        let pressure = self.memory_pressure();
568        if pressure < self.config.read().backpressure_warn {
569            return;
570        }
571
572        let level = BackpressureLevel::from_pressure(pressure);
573        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
574        
575        // Collect cache entries for scoring
576        let now = std::time::Instant::now();
577        let entries: Vec<CacheEntry> = self.l1_cache.iter()
578            .map(|ref_multi| {
579                let item = ref_multi.value();
580                let id = ref_multi.key().clone();
581                
582                // Convert epoch millis to Instant-relative age
583                let now_millis = std::time::SystemTime::now()
584                    .duration_since(std::time::UNIX_EPOCH)
585                    .unwrap_or_default()
586                    .as_millis() as u64;
587                let age_secs = if item.last_accessed > 0 {
588                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
589                } else {
590                    3600.0 // Default 1 hour if never accessed
591                };
592                
593                CacheEntry {
594                    id,
595                    size_bytes: item.size_bytes(),
596                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
597                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
598                    access_count: item.access_count,
599                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
600                }
601            })
602            .collect();
603        
604        if entries.is_empty() {
605            return;
606        }
607        
608        // Calculate how many to evict based on pressure level
609        let evict_count = match level {
610            BackpressureLevel::Normal => 0,
611            BackpressureLevel::Warn => entries.len() / 20,    // 5%
612            BackpressureLevel::Throttle => entries.len() / 10, // 10%
613            BackpressureLevel::Critical => entries.len() / 5,  // 20%
614            BackpressureLevel::Emergency => entries.len() / 3, // 33%
615            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
616        }.max(1);
617        
618        // Select victims using tan curve algorithm
619        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
620        
621        // Evict victims
622        let mut evicted_bytes = 0usize;
623        for victim_id in &victims {
624            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
625                evicted_bytes += item.size_bytes();
626            }
627        }
628        
629        // Update size tracking
630        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
631        
632        info!(
633            evicted = victims.len(),
634            evicted_bytes = evicted_bytes,
635            pressure = %pressure,
636            level = %level,
637            "Evicted entries from L1 cache"
638        );
639    }
640
641    /// Get L1 cache stats
642    pub fn l1_stats(&self) -> (usize, usize) {
643        (
644            self.l1_cache.len(),
645            self.l1_size_bytes.load(Ordering::Acquire),
646        )
647    }
648
649    /// Get L3 filter stats (entries, capacity, trust_state)
650    #[must_use]
651    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
652        self.l3_filter.stats()
653    }
654
655    /// Get access to the L3 filter (for warmup/verification)
656    pub fn l3_filter(&self) -> &Arc<FilterManager> {
657        &self.l3_filter
658    }
659
660    /// Get merkle root hashes from Redis (L2) and SQL (L3).
661    /// 
662    /// Returns `(redis_root, sql_root)` as hex strings.
663    /// Returns `None` for backends that aren't connected or have empty trees.
664    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
665        let redis_root = if let Some(ref rm) = self.redis_merkle {
666            rm.root_hash().await.ok().flatten().map(hex::encode)
667        } else {
668            None
669        };
670        
671        let sql_root = if let Some(ref sm) = self.sql_merkle {
672            sm.root_hash().await.ok().flatten().map(hex::encode)
673        } else {
674            None
675        };
676        
677        (redis_root, sql_root)
678    }
679
680    /// Verify and trust the L3 cuckoo filter.
681    /// 
682    /// Compares the filter's merkle root against L3's merkle root.
683    /// If they match, marks the filter as trusted.
684    /// 
685    /// Returns `true` if the filter is now trusted, `false` otherwise.
686    pub async fn verify_filter(&self) -> bool {
687        // Get SQL merkle root
688        let sql_root = if let Some(ref sm) = self.sql_merkle {
689            match sm.root_hash().await {
690                Ok(Some(root)) => root,
691                _ => return false,
692            }
693        } else {
694            // No SQL backend - can't verify, mark trusted anyway
695            self.l3_filter.mark_trusted();
696            return true;
697        };
698
699        // For now, we trust the filter if we have a SQL root
700        // A full implementation would compare CF merkle against SQL merkle
701        // But since CF doesn't maintain a merkle tree, we trust after warmup
702        info!(
703            sql_root = %hex::encode(sql_root),
704            "Verifying L3 filter against SQL merkle root"
705        );
706        
707        // Mark trusted if we got here (SQL is connected and has a root)
708        self.l3_filter.mark_trusted();
709        true
710    }
711
712    /// Update all gauge metrics with current engine state.
713    /// 
714    /// Call this before snapshotting metrics to ensure gauges reflect current state.
715    /// Useful for OTEL export or monitoring dashboards.
716    pub fn update_gauge_metrics(&self) {
717        let (l1_count, l1_bytes) = self.l1_stats();
718        crate::metrics::set_l1_cache_items(l1_count);
719        crate::metrics::set_l1_cache_bytes(l1_bytes);
720        crate::metrics::set_memory_pressure(self.memory_pressure());
721        
722        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
723        let filter_load = if filter_capacity > 0 { 
724            filter_entries as f64 / filter_capacity as f64 
725        } else { 
726            0.0 
727        };
728        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
729        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
730        
731        crate::metrics::set_backpressure_level(self.pressure() as u8);
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738    use crate::config::SyncEngineConfig;
739    use tokio::sync::watch;
740    use serde_json::json;
741
742    fn create_test_engine() -> SyncEngine {
743        let config = SyncEngineConfig::default();
744        let (_tx, rx) = watch::channel(config.clone());
745        SyncEngine::new(config, rx)
746    }
747
748    fn create_test_item(id: &str) -> SyncItem {
749        SyncItem::from_json(
750            id.to_string(),
751            json!({"data": "test"}),
752        )
753    }
754
755    #[test]
756    fn test_engine_created_state() {
757        let engine = create_test_engine();
758        assert_eq!(engine.state(), EngineState::Created);
759        assert!(!engine.is_ready());
760    }
761
762    #[test]
763    fn test_memory_pressure_calculation() {
764        let config = SyncEngineConfig {
765            l1_max_bytes: 1000,
766            ..Default::default()
767        };
768        let (_tx, rx) = watch::channel(config.clone());
769        let engine = SyncEngine::new(config, rx);
770
771        assert_eq!(engine.memory_pressure(), 0.0);
772
773        // Simulate adding items
774        let item = create_test_item("test1");
775        engine.insert_l1(item);
776
777        // Pressure should be > 0 now
778        assert!(engine.memory_pressure() > 0.0);
779    }
780
781    #[test]
782    fn test_l1_insert_and_size_tracking() {
783        let engine = create_test_engine();
784        
785        let item = create_test_item("test1");
786        let expected_size = item.size_bytes();
787        
788        engine.insert_l1(item);
789        
790        let (count, size) = engine.l1_stats();
791        assert_eq!(count, 1);
792        assert_eq!(size, expected_size);
793    }
794
795    #[test]
796    fn test_l1_update_size_tracking() {
797        let engine = create_test_engine();
798        
799        let item1 = create_test_item("test1");
800        engine.insert_l1(item1);
801        let (_, _size1) = engine.l1_stats();
802        
803        // Insert larger item with same ID
804        let item2 = SyncItem::from_json(
805            "test1".to_string(),
806            json!({"data": "much larger content here for testing size changes"}),
807        );
808        let size2_expected = item2.size_bytes();
809        engine.insert_l1(item2);
810        
811        let (count, size2) = engine.l1_stats();
812        assert_eq!(count, 1); // Still one item
813        assert_eq!(size2, size2_expected); // Size should be updated
814    }
815
816    #[tokio::test]
817    async fn test_get_nonexistent() {
818        let engine = create_test_engine();
819        let result = engine.get("nonexistent").await.unwrap();
820        assert!(result.is_none());
821    }
822
823    #[tokio::test]
824    async fn test_get_from_l1() {
825        let engine = create_test_engine();
826        let item = create_test_item("test1");
827        engine.insert_l1(item.clone());
828
829        let result = engine.get("test1").await.unwrap();
830        assert!(result.is_some());
831        assert_eq!(result.unwrap().object_id, "test1");
832    }
833
834    #[tokio::test]
835    async fn test_delete_from_l1() {
836        let engine = create_test_engine();
837        let item = create_test_item("test1");
838        engine.insert_l1(item);
839
840        let (count_before, _) = engine.l1_stats();
841        assert_eq!(count_before, 1);
842
843        let deleted = engine.delete("test1").await.unwrap();
844        assert!(deleted);
845
846        let (count_after, size_after) = engine.l1_stats();
847        assert_eq!(count_after, 0);
848        assert_eq!(size_after, 0);
849    }
850
851    #[test]
852    fn test_filter_stats() {
853        let engine = create_test_engine();
854        
855        let (entries, capacity, _trust) = engine.l3_filter_stats();
856        assert_eq!(entries, 0);
857        assert!(capacity > 0);
858    }
859
860    #[test]
861    fn test_should_accept_writes() {
862        let engine = create_test_engine();
863        assert!(engine.should_accept_writes());
864    }
865
866    #[test]
867    fn test_pressure_level() {
868        let engine = create_test_engine();
869        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
870    }
871}