sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45mod schema_api;
46
47pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
48pub use merkle_api::MerkleDiff;
49pub use search_api::{SearchTier, SearchResult, SearchSource};
50#[allow(unused_imports)]
51use types::WriteTarget;
52
53use std::sync::Arc;
54use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
55use std::time::Instant;
56use dashmap::DashMap;
57use parking_lot::RwLock;
58use tokio::sync::{watch, Mutex, Semaphore};
59use tracing::{info, warn, debug, error};
60
61use crate::config::SyncEngineConfig;
62use crate::sync_item::SyncItem;
63use crate::submit_options::SubmitOptions;
64use crate::backpressure::BackpressureLevel;
65use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
66use crate::storage::sql::SqlStore;
67use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
68use crate::cuckoo::FilterPersistence;
69use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
70use crate::merkle::{MerkleCacheStore, SqlMerkleStore, MerkleBatch};
71use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
72use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
73use crate::schema::SchemaRegistry;
74
75use search_api::SearchState;
76
77/// Main sync engine coordinator.
78///
79/// Manages the three-tier storage architecture:
80/// - **L1**: In-memory DashMap with pressure-based eviction
81/// - **L2**: Redis cache with batch writes
82/// - **L3**: MySQL/SQLite archive (ground truth)
83///
84/// # Thread Safety
85///
86/// The engine is `Send + Sync` and designed for concurrent access.
87/// Internal state uses atomic operations and concurrent data structures.
88pub struct SyncEngine {
89    /// Configuration (can be updated at runtime via watch channel)
90    /// Uses RwLock for interior mutability so run() can take &self
91    pub(super) config: RwLock<SyncEngineConfig>,
92
93    /// Runtime config updates (Mutex for interior mutability in run loop)
94    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
95
96    /// Engine state (broadcast to watchers)
97    pub(super) state: watch::Sender<EngineState>,
98
99    /// Engine state receiver (for internal use)
100    pub(super) state_rx: watch::Receiver<EngineState>,
101
102    /// L1: In-memory cache
103    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
104
105    /// L1 size tracking (bytes)
106    pub(super) l1_size_bytes: Arc<AtomicUsize>,
107
108    /// L2: Redis cache (optional)
109    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
110    
111    /// L2: Direct RedisStore reference for CDC operations
112    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
113
114    /// L3: MySQL/SQLite archive (optional)
115    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
116    
117    /// L3: Direct SqlStore reference for dirty merkle operations
118    pub(super) sql_store: Option<Arc<SqlStore>>,
119
120    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
121    pub(super) l3_filter: Arc<FilterManager>,
122
123    /// Filter persistence (for fast startup)
124    pub(super) filter_persistence: Option<FilterPersistence>,
125
126    /// CF snapshot tracking
127    pub(super) cf_inserts_since_snapshot: AtomicU64,
128    pub(super) cf_last_snapshot: Mutex<Instant>,
129
130    /// Hybrid batcher for L2 writes
131    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
132
133    /// Merkle cache (SQL merkle shadow in Redis for fast reads)
134    pub(super) merkle_cache: Option<MerkleCacheStore>,
135
136    /// SQL merkle store
137    pub(super) sql_merkle: Option<SqlMerkleStore>,
138
139    /// Write-ahead log for L3 durability
140    pub(super) l3_wal: Option<WriteAheadLog>,
141
142    /// MySQL health checker
143    pub(super) mysql_health: MysqlHealthChecker,
144
145    /// Eviction policy
146    pub(super) eviction_policy: TanCurvePolicy,
147
148    /// Search state (index manager + cache)
149    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
150
151    /// SQL write concurrency limiter.
152    /// 
153    /// Limits concurrent sql_put_batch and merkle_nodes updates to reduce
154    /// row-level lock contention and deadlocks under high load.
155    pub(super) sql_write_semaphore: Arc<Semaphore>,
156
157    /// Schema registry for table routing.
158    ///
159    /// Maps key prefixes to separate SQL tables for horizontal scaling.
160    /// Shared with SqlStore for consistent routing.
161    pub(super) schema_registry: Arc<SchemaRegistry>,
162}
163
164impl SyncEngine {
165    /// Create a new sync engine.
166    ///
167    /// The engine starts in `Created` state. Call [`start()`](Self::start)
168    /// to connect to backends and transition to `Ready`.
169    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
170        let (state_tx, state_rx) = watch::channel(EngineState::Created);
171
172        let batch_config = BatchConfig {
173            flush_ms: config.batch_flush_ms,
174            flush_count: config.batch_flush_count,
175            flush_bytes: config.batch_flush_bytes,
176        };
177
178        // SQL write concurrency limiter - reduces row lock contention
179        let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
180
181        // Schema registry for table routing (starts empty, populated via register_schema)
182        let schema_registry = Arc::new(SchemaRegistry::new());
183
184        Self {
185            config: RwLock::new(config.clone()),
186            config_rx: Mutex::new(config_rx),
187            state: state_tx,
188            state_rx,
189            l1_cache: Arc::new(DashMap::new()),
190            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
191            l2_store: None,
192            redis_store: None,
193            l3_store: None,
194            sql_store: None,
195            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
196            filter_persistence: None,
197            cf_inserts_since_snapshot: AtomicU64::new(0),
198            cf_last_snapshot: Mutex::new(Instant::now()),
199            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
200            merkle_cache: None,
201            sql_merkle: None,
202            l3_wal: None,
203            mysql_health: MysqlHealthChecker::new(),
204            eviction_policy: TanCurvePolicy::default(),
205            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
206            sql_write_semaphore,
207            schema_registry,
208        }
209    }
210
211    /// Get current engine state.
212    #[must_use]
213    pub fn state(&self) -> EngineState {
214        *self.state_rx.borrow()
215    }
216
217    /// Get a receiver to watch state changes.
218    #[must_use]
219    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
220        self.state_rx.clone()
221    }
222
223    /// Check if engine is ready to accept requests.
224    #[must_use]
225    pub fn is_ready(&self) -> bool {
226        matches!(self.state(), EngineState::Ready | EngineState::Running)
227    }
228
229    /// Get current memory pressure (0.0 - 1.0+).
230    #[must_use]
231    pub fn memory_pressure(&self) -> f64 {
232        let used = self.l1_size_bytes.load(Ordering::Acquire);
233        let max = self.config.read().l1_max_bytes;
234        if max == 0 {
235            0.0
236        } else {
237            used as f64 / max as f64
238        }
239    }
240
241    /// Get current backpressure level.
242    #[must_use]
243    pub fn pressure(&self) -> BackpressureLevel {
244        BackpressureLevel::from_pressure(self.memory_pressure())
245    }
246
247    /// Check if the engine should accept writes (based on pressure).
248    #[must_use]
249    pub fn should_accept_writes(&self) -> bool {
250        let pressure = self.pressure();
251        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
252    }
253
254    /// Perform a comprehensive health check.
255    ///
256    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
257    /// collects internal state into a [`HealthCheck`] struct suitable for
258    /// `/ready` and `/health` endpoints.
259    ///
260    /// # Performance
261    ///
262    /// - **Cached fields**: Instant (no I/O)
263    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
264    ///
265    /// # Example
266    ///
267    /// ```rust,ignore
268    /// let health = engine.health_check().await;
269    ///
270    /// // For /ready endpoint (load balancer)
271    /// if health.healthy {
272    ///     HttpResponse::Ok().body("ready")
273    /// } else {
274    ///     HttpResponse::ServiceUnavailable().body("not ready")
275    /// }
276    ///
277    /// // For /health endpoint (diagnostics)
278    /// HttpResponse::Ok().json(health)
279    /// ```
280    pub async fn health_check(&self) -> types::HealthCheck {
281        // Cached state (no I/O)
282        let state = self.state();
283        let ready = matches!(state, EngineState::Ready | EngineState::Running);
284        let memory_pressure = self.memory_pressure();
285        let backpressure_level = self.pressure();
286        let accepting_writes = self.should_accept_writes();
287        let (l1_items, l1_bytes) = self.l1_stats();
288        let (filter_items, _, filter_trust) = self.l3_filter_stats();
289        
290        // WAL stats (cached, no I/O)
291        // Note: WalStats doesn't have file_size, so we only report pending items
292        let mysql_healthy = self.mysql_health.is_healthy();
293        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
294            let stats = wal.stats(mysql_healthy);
295            Some(stats.pending_items)
296        } else {
297            None
298        };
299        
300        // Live backend probes (parallel)
301        let (redis_result, sql_result) = tokio::join!(
302            self.probe_redis(),
303            self.probe_sql()
304        );
305        
306        let (redis_connected, redis_latency_ms) = redis_result;
307        let (sql_connected, sql_latency_ms) = sql_result;
308        
309        // Derive overall health
310        // Healthy if: running, backends connected (or not configured), not in critical backpressure
311        let healthy = matches!(state, EngineState::Running)
312            && redis_connected != Some(false)
313            && sql_connected != Some(false)
314            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
315        
316        types::HealthCheck {
317            state,
318            ready,
319            memory_pressure,
320            backpressure_level,
321            accepting_writes,
322            l1_items,
323            l1_bytes,
324            filter_items,
325            filter_trust,
326            redis_connected,
327            redis_latency_ms,
328            sql_connected,
329            sql_latency_ms,
330            wal_pending_items,
331            healthy,
332        }
333    }
334    
335    /// Get the current merkle root hash (ground truth from SQL).
336    /// 
337    /// Returns hex-encoded 32-byte hash, or None if no merkle tree exists.
338    /// Useful for debugging replication sync issues.
339    pub async fn merkle_root(&self) -> Option<String> {
340        if let Some(ref sql_merkle) = self.sql_merkle {
341            match sql_merkle.root_hash().await {
342                Ok(Some(hash)) => Some(hex::encode(hash)),
343                Ok(None) => None,
344                Err(e) => {
345                    warn!(error = %e, "Failed to get merkle root");
346                    None
347                }
348            }
349        } else {
350            None
351        }
352    }
353    
354    /// Get merkle root from cache (mirrors SQL, for fast reads).
355    pub async fn merkle_root_cache(&self) -> Option<String> {
356        if let Some(ref merkle_cache) = self.merkle_cache {
357            match merkle_cache.root_hash().await {
358                Ok(Some(hash)) => Some(hex::encode(hash)),
359                Ok(None) => None,
360                Err(e) => {
361                    debug!(error = %e, "Failed to get merkle cache root");
362                    None
363                }
364            }
365        } else {
366            None
367        }
368    }
369    
370    /// Probe Redis connectivity with PING.
371    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
372        let Some(ref redis_store) = self.redis_store else {
373            return (None, None); // Redis not configured
374        };
375        
376        let start = std::time::Instant::now();
377        let mut conn = redis_store.connection();
378        
379        let result: Result<String, _> = redis::cmd("PING")
380            .query_async(&mut conn)
381            .await;
382        
383        match result {
384            Ok(_) => {
385                let latency_ms = start.elapsed().as_millis() as u64;
386                (Some(true), Some(latency_ms))
387            }
388            Err(_) => (Some(false), None),
389        }
390    }
391    
392    /// Probe SQL connectivity with SELECT 1.
393    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
394        let Some(ref sql_store) = self.sql_store else {
395            return (None, None); // SQL not configured
396        };
397        
398        let start = std::time::Instant::now();
399        let pool = sql_store.pool();
400        
401        let result = sqlx::query("SELECT 1")
402            .fetch_one(&pool)
403            .await;
404        
405        match result {
406            Ok(_) => {
407                let latency_ms = start.elapsed().as_millis() as u64;
408                (Some(true), Some(latency_ms))
409            }
410            Err(_) => (Some(false), None),
411        }
412    }
413
414    // --- Core CRUD Operations ---
415
416    /// Get an item by ID.
417    ///
418    /// Checks storage tiers in order: L1 → L2 → L3.
419    /// Updates access count and promotes to L1 on hit.
420    #[tracing::instrument(skip(self), fields(tier))]
421    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
422        let start = std::time::Instant::now();
423        
424        // 1. Check L1 (in-memory)
425        if let Some(mut item) = self.l1_cache.get_mut(id) {
426            item.access_count = item.access_count.saturating_add(1);
427            item.last_accessed = std::time::SystemTime::now()
428                .duration_since(std::time::UNIX_EPOCH)
429                .unwrap_or_default()
430                .as_millis() as u64;
431            tracing::Span::current().record("tier", "L1");
432            debug!("L1 hit");
433            crate::metrics::record_operation("L1", "get", "hit");
434            crate::metrics::record_latency("L1", "get", start.elapsed());
435            return Ok(Some(item.clone()));
436        }
437
438        // 2. Try L2 (Redis) - no filter, just try it
439        if let Some(ref l2) = self.l2_store {
440            match l2.get(id).await {
441                Ok(Some(item)) => {
442                    // Promote to L1
443                    self.insert_l1(item.clone());
444                    tracing::Span::current().record("tier", "L2");
445                    debug!("L2 hit, promoted to L1");
446                    crate::metrics::record_operation("L2", "get", "hit");
447                    crate::metrics::record_latency("L2", "get", start.elapsed());
448                    return Ok(Some(item));
449                }
450                Ok(None) => {
451                    // Not in Redis
452                    debug!("L2 miss");
453                    crate::metrics::record_operation("L2", "get", "miss");
454                }
455                Err(e) => {
456                    warn!(error = %e, "L2 lookup failed");
457                    crate::metrics::record_operation("L2", "get", "error");
458                }
459            }
460        }
461
462        // 3. Check L3 filter before hitting MySQL
463        if self.l3_filter.should_check_l3(id) {
464            crate::metrics::record_cuckoo_check("L3", "positive");
465            if let Some(ref l3) = self.l3_store {
466                match l3.get(id).await {
467                    Ok(Some(item)) => {
468                        // Promote to L1
469                        if self.memory_pressure() < 1.0 {
470                            self.insert_l1(item.clone());
471                        }
472                        
473                        // Also populate L2 (read-through cache)
474                        if let Some(ref l2) = self.l2_store {
475                            if let Err(e) = l2.put(&item).await {
476                                warn!(id = %id, error = %e, "Failed to populate L2 on read");
477                            } else {
478                                debug!("L3 hit, promoted to L1 and L2");
479                            }
480                        } else {
481                            debug!("L3 hit, promoted to L1");
482                        }
483                        
484                        tracing::Span::current().record("tier", "L3");
485                        crate::metrics::record_operation("L3", "get", "hit");
486                        crate::metrics::record_latency("L3", "get", start.elapsed());
487                        crate::metrics::record_bytes_read("L3", item.content.len());
488                        return Ok(Some(item));
489                    }
490                    Ok(None) => {
491                        // False positive in filter
492                        debug!("L3 filter false positive");
493                        crate::metrics::record_operation("L3", "get", "false_positive");
494                        crate::metrics::record_cuckoo_false_positive("L3");
495                    }
496                    Err(e) => {
497                        warn!(error = %e, "L3 lookup failed");
498                        crate::metrics::record_operation("L3", "get", "error");
499                        crate::metrics::record_error("L3", "get", "backend");
500                    }
501                }
502            }
503        } else {
504            // Cuckoo filter says definitely not in L3 - saved a database query
505            crate::metrics::record_cuckoo_check("L3", "negative");
506        }
507
508        tracing::Span::current().record("tier", "miss");
509        debug!("Cache miss");
510        crate::metrics::record_operation("all", "get", "miss");
511        crate::metrics::record_latency("all", "get", start.elapsed());
512        Ok(None)
513    }
514
515    /// Get an item with hash verification.
516    ///
517    /// If the item has a non-empty `content_hash`, the content hash is verified.
518    /// Returns `StorageError::Corruption` if the hash doesn't match.
519    #[tracing::instrument(skip(self), fields(verified))]
520    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
521        let item = match self.get(id).await? {
522            Some(item) => item,
523            None => return Ok(None),
524        };
525
526        // Verify hash if item has content_hash set
527        if !item.content_hash.is_empty() {
528            use sha2::{Sha256, Digest};
529            
530            let computed = Sha256::digest(&item.content);
531            let computed_hex = hex::encode(computed);
532            
533            if computed_hex != item.content_hash {
534                tracing::Span::current().record("verified", false);
535                warn!(
536                    id = %id,
537                    expected = %item.content_hash,
538                    actual = %computed_hex,
539                    "Data corruption detected!"
540                );
541                
542                // Record corruption metric
543                crate::metrics::record_corruption(id);
544                
545                return Err(StorageError::Corruption {
546                    id: id.to_string(),
547                    expected: item.content_hash.clone(),
548                    actual: computed_hex,
549                });
550            }
551            
552            tracing::Span::current().record("verified", true);
553            debug!(id = %id, "Hash verification passed");
554        }
555
556        Ok(Some(item))
557    }
558
559    /// Submit an item for sync.
560    ///
561    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
562    /// Uses default options: Redis + SQL (both enabled).
563    /// Filters are updated only on successful writes in flush_batch_internal().
564    ///
565    /// For custom routing, use [`submit_with`](Self::submit_with).
566    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
567    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
568        self.submit_with(item, SubmitOptions::default()).await
569    }
570
571    /// Submit an item with custom routing options.
572    ///
573    /// The item is immediately stored in L1 and queued for batch write.
574    /// Items are batched by compatible options for efficient pipelined writes.
575    ///
576    /// # Example
577    ///
578    /// ```rust,no_run
579    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
580    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
581    /// // Cache-only with 1 minute TTL (no SQL write)
582    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
583    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
584    ///
585    /// // SQL-only durable storage (no Redis)
586    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
587    /// engine.submit_with(item, SubmitOptions::durable()).await?;
588    /// # Ok(())
589    /// # }
590    /// ```
591    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
592    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
593        let start = std::time::Instant::now();
594        
595        if !self.should_accept_writes() {
596            crate::metrics::record_operation("engine", "submit", "rejected");
597            crate::metrics::record_error("engine", "submit", "backpressure");
598            return Err(StorageError::Backend(format!(
599                "Rejecting write: engine state={}, pressure={}",
600                self.state(),
601                self.pressure()
602            )));
603        }
604
605        let id = item.object_id.clone();
606        let item_bytes = item.content.len();
607
608        // Apply state override from options (if provided)
609        if let Some(ref state) = options.state {
610            item.state = state.clone();
611        }
612        
613        // Attach options to item (travels through batch pipeline)
614        item.submit_options = Some(options);
615
616        // Insert into L1 (immediate, in-memory)
617        self.insert_l1(item.clone());
618        crate::metrics::record_operation("L1", "submit", "success");
619        crate::metrics::record_bytes_written("L1", item_bytes);
620        
621        // NOTE: We do NOT insert into L2/L3 filters here!
622        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
623        // This prevents filter/storage divergence if writes fail.
624        
625        // Queue for batched L2/L3 persistence
626        let batch_to_flush = {
627            let mut batcher = self.l2_batcher.lock().await;
628            if let Some(reason) = batcher.add(item) {
629                batcher.force_flush_with_reason(reason)
630            } else {
631                None
632            }
633        };
634
635        if let Some(batch) = batch_to_flush {
636            // Flush immediately (provides backpressure)
637            self.flush_batch_internal(batch).await;
638        }
639
640        debug!(id = %id, "Item submitted to L1 and batch queue");
641        crate::metrics::record_latency("L1", "submit", start.elapsed());
642        Ok(())
643    }
644
645    /// Delete an item from all storage tiers.
646    /// 
647    /// Deletes are more complex than writes because the item may exist in:
648    /// - L1 (DashMap) - immediate removal
649    /// - L2 (Redis) - async removal
650    /// - L3 (MySQL) - async removal  
651    /// - Cuckoo filters (L2/L3) - remove from both
652    /// - Merkle trees - update with deletion marker
653    #[tracing::instrument(skip(self), fields(object_id = %id))]
654    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
655        let start = std::time::Instant::now();
656        
657        if !self.should_accept_writes() {
658            crate::metrics::record_operation("engine", "delete", "rejected");
659            crate::metrics::record_error("engine", "delete", "backpressure");
660            return Err(StorageError::Backend(format!(
661                "Rejecting delete: engine state={}, pressure={}",
662                self.state(),
663                self.pressure()
664            )));
665        }
666
667        let mut found = false;
668
669        // 1. Remove from L1 (immediate)
670        if let Some((_, item)) = self.l1_cache.remove(id) {
671            let size = Self::item_size(&item);
672            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
673            found = true;
674            debug!("Deleted from L1");
675            crate::metrics::record_operation("L1", "delete", "success");
676        }
677
678        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
679        self.l3_filter.remove(id);
680
681        // 3. Delete from L2 (Redis) - best effort
682        if let Some(ref l2) = self.l2_store {
683            let l2_start = std::time::Instant::now();
684            match l2.delete(id).await {
685                Ok(()) => {
686                    found = true;
687                    debug!("Deleted from L2 (Redis)");
688                    crate::metrics::record_operation("L2", "delete", "success");
689                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
690                }
691                Err(e) => {
692                    warn!(error = %e, "Failed to delete from L2 (Redis)");
693                    crate::metrics::record_operation("L2", "delete", "error");
694                    crate::metrics::record_error("L2", "delete", "backend");
695                }
696            }
697        }
698
699        // 4. Delete from L3 (MySQL) - ground truth
700        if let Some(ref l3) = self.l3_store {
701            let l3_start = std::time::Instant::now();
702            match l3.delete(id).await {
703                Ok(()) => {
704                    found = true;
705                    debug!("Deleted from L3 (MySQL)");
706                    crate::metrics::record_operation("L3", "delete", "success");
707                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
708                }
709                Err(e) => {
710                    error!(error = %e, "Failed to delete from L3 (MySQL)");
711                    crate::metrics::record_operation("L3", "delete", "error");
712                    crate::metrics::record_error("L3", "delete", "backend");
713                    // Don't return error - item may not exist in L3
714                }
715            }
716        }
717
718        // 5. Update merkle trees with deletion marker
719        let mut merkle_batch = MerkleBatch::new();
720        merkle_batch.delete(id.to_string());
721
722        if let Some(ref sql_merkle) = self.sql_merkle {
723            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
724                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
725                crate::metrics::record_error("L3", "merkle", "batch_apply");
726            } else {
727                // Mirror to cache
728                if let Some(ref merkle_cache) = self.merkle_cache {
729                    if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &[id.to_string()]).await {
730                        warn!(error = %e, "Failed to sync merkle cache after deletion");
731                    }
732                }
733            }
734        }
735
736        // 6. Emit CDC delete entry (if enabled)
737        if self.config.read().enable_cdc_stream && found {
738            self.emit_cdc_delete(id).await;
739        }
740
741        info!(found, "Delete operation completed");
742        crate::metrics::record_latency("all", "delete", start.elapsed());
743        Ok(found)
744    }
745
746    /// Delete an item that was replicated from another node.
747    /// 
748    /// Same as `delete()` but does NOT emit CDC events to prevent
749    /// replication loops (A→B→A→B...).
750    /// 
751    /// Use this for items received via replication, not for local deletes.
752    #[tracing::instrument(skip(self), fields(object_id = %id))]
753    pub async fn delete_replicated(&self, id: &str) -> Result<bool, StorageError> {
754        let start = std::time::Instant::now();
755        
756        if !self.should_accept_writes() {
757            crate::metrics::record_operation("engine", "delete_replicated", "rejected");
758            return Err(StorageError::Backend(format!(
759                "Rejecting delete: engine state={}, pressure={}",
760                self.state(),
761                self.pressure()
762            )));
763        }
764
765        let mut found = false;
766
767        // 1. Remove from L1 (immediate)
768        if let Some((_, item)) = self.l1_cache.remove(id) {
769            let size = Self::item_size(&item);
770            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
771            found = true;
772            debug!("Deleted from L1 (replicated)");
773        }
774
775        // 2. Remove from L3 cuckoo filter
776        self.l3_filter.remove(id);
777
778        // 3. Delete from L2 (Redis)
779        if let Some(ref l2) = self.l2_store {
780            if l2.delete(id).await.is_ok() {
781                found = true;
782                debug!("Deleted from L2 (replicated)");
783            }
784        }
785
786        // 4. Delete from L3 (MySQL)
787        if let Some(ref l3) = self.l3_store {
788            if l3.delete(id).await.is_ok() {
789                found = true;
790                debug!("Deleted from L3 (replicated)");
791            }
792        }
793
794        // 5. Update merkle trees with deletion marker
795        let mut merkle_batch = MerkleBatch::new();
796        merkle_batch.delete(id.to_string());
797
798        if let Some(ref sql_merkle) = self.sql_merkle {
799            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
800                error!(error = %e, "Failed to update SQL Merkle tree for replicated deletion");
801            } else {
802                // Mirror to cache
803                if let Some(ref merkle_cache) = self.merkle_cache {
804                    if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &[id.to_string()]).await {
805                        warn!(error = %e, "Failed to sync merkle cache after replicated deletion");
806                    }
807                }
808            }
809        }
810
811        // NO CDC EMISSION - this is a replicated delete, not a local delete
812
813        debug!(found, "Replicated delete completed");
814        crate::metrics::record_latency("all", "delete_replicated", start.elapsed());
815        Ok(found)
816    }
817
818    // --- Internal helpers ---
819
820    /// Insert or update an item in L1, correctly tracking size.
821    fn insert_l1(&self, item: SyncItem) {
822        let new_size = Self::item_size(&item);
823        let key = item.object_id.clone();
824        
825        // Use entry API to handle insert vs update atomically
826        if let Some(old_item) = self.l1_cache.insert(key, item) {
827            // Update: subtract old size, add new size
828            let old_size = Self::item_size(&old_item);
829            // Use wrapping operations to avoid underflow if sizes are estimated
830            let current = self.l1_size_bytes.load(Ordering::Acquire);
831            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
832            self.l1_size_bytes.store(new_total, Ordering::Release);
833        } else {
834            // Insert: just add new size
835            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
836        }
837    }
838
839    /// Calculate approximate size of an item in bytes.
840    #[inline]
841    fn item_size(item: &SyncItem) -> usize {
842        // Use cached size if available, otherwise compute
843        item.size_bytes()
844    }
845
846    fn maybe_evict(&self) {
847        let pressure = self.memory_pressure();
848        if pressure < self.config.read().backpressure_warn {
849            return;
850        }
851
852        let level = BackpressureLevel::from_pressure(pressure);
853        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
854        
855        // Collect cache entries for scoring
856        let now = std::time::Instant::now();
857        let entries: Vec<CacheEntry> = self.l1_cache.iter()
858            .map(|ref_multi| {
859                let item = ref_multi.value();
860                let id = ref_multi.key().clone();
861                
862                // Convert epoch millis to Instant-relative age
863                let now_millis = std::time::SystemTime::now()
864                    .duration_since(std::time::UNIX_EPOCH)
865                    .unwrap_or_default()
866                    .as_millis() as u64;
867                let age_secs = if item.last_accessed > 0 {
868                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
869                } else {
870                    3600.0 // Default 1 hour if never accessed
871                };
872                
873                CacheEntry {
874                    id,
875                    size_bytes: item.size_bytes(),
876                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
877                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
878                    access_count: item.access_count,
879                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
880                }
881            })
882            .collect();
883        
884        if entries.is_empty() {
885            return;
886        }
887        
888        // Calculate how many to evict based on pressure level
889        let evict_count = match level {
890            BackpressureLevel::Normal => 0,
891            BackpressureLevel::Warn => entries.len() / 20,    // 5%
892            BackpressureLevel::Throttle => entries.len() / 10, // 10%
893            BackpressureLevel::Critical => entries.len() / 5,  // 20%
894            BackpressureLevel::Emergency => entries.len() / 3, // 33%
895            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
896        }.max(1);
897        
898        // Select victims using tan curve algorithm
899        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
900        
901        // Evict victims
902        let mut evicted_bytes = 0usize;
903        for victim_id in &victims {
904            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
905                evicted_bytes += item.size_bytes();
906            }
907        }
908        
909        // Update size tracking
910        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
911        
912        info!(
913            evicted = victims.len(),
914            evicted_bytes = evicted_bytes,
915            pressure = %pressure,
916            level = %level,
917            "Evicted entries from L1 cache"
918        );
919    }
920
921    /// Get L1 cache stats
922    pub fn l1_stats(&self) -> (usize, usize) {
923        (
924            self.l1_cache.len(),
925            self.l1_size_bytes.load(Ordering::Acquire),
926        )
927    }
928
929    /// Get L3 filter stats (entries, capacity, trust_state)
930    #[must_use]
931    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
932        self.l3_filter.stats()
933    }
934
935    /// Get access to the L3 filter (for warmup/verification)
936    pub fn l3_filter(&self) -> &Arc<FilterManager> {
937        &self.l3_filter
938    }
939
940    /// Get merkle root hashes from Redis (L2) and SQL (L3).
941    /// 
942    /// Returns `(redis_root, sql_root)` as hex strings.
943    /// Returns `None` for backends that aren't connected or have empty trees.
944    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
945        let redis_root = if let Some(ref rm) = self.merkle_cache {
946            rm.root_hash().await.ok().flatten().map(hex::encode)
947        } else {
948            None
949        };
950        
951        let sql_root = if let Some(ref sm) = self.sql_merkle {
952            sm.root_hash().await.ok().flatten().map(hex::encode)
953        } else {
954            None
955        };
956        
957        (redis_root, sql_root)
958    }
959
960    /// Verify and trust the L3 cuckoo filter.
961    /// 
962    /// Compares the filter's merkle root against L3's merkle root.
963    /// If they match, marks the filter as trusted.
964    /// 
965    /// Returns `true` if the filter is now trusted, `false` otherwise.
966    pub async fn verify_filter(&self) -> bool {
967        // Get SQL merkle root
968        let sql_root = if let Some(ref sm) = self.sql_merkle {
969            match sm.root_hash().await {
970                Ok(Some(root)) => root,
971                _ => return false,
972            }
973        } else {
974            // No SQL backend - can't verify, mark trusted anyway
975            self.l3_filter.mark_trusted();
976            return true;
977        };
978
979        // For now, we trust the filter if we have a SQL root
980        // A full implementation would compare CF merkle against SQL merkle
981        // But since CF doesn't maintain a merkle tree, we trust after warmup
982        info!(
983            sql_root = %hex::encode(sql_root),
984            "Verifying L3 filter against SQL merkle root"
985        );
986        
987        // Mark trusted if we got here (SQL is connected and has a root)
988        self.l3_filter.mark_trusted();
989        true
990    }
991
992    /// Update all gauge metrics with current engine state.
993    /// 
994    /// Call this before snapshotting metrics to ensure gauges reflect current state.
995    /// Useful for OTEL export or monitoring dashboards.
996    pub fn update_gauge_metrics(&self) {
997        let (l1_count, l1_bytes) = self.l1_stats();
998        crate::metrics::set_l1_cache_items(l1_count);
999        crate::metrics::set_l1_cache_bytes(l1_bytes);
1000        crate::metrics::set_memory_pressure(self.memory_pressure());
1001        
1002        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
1003        let filter_load = if filter_capacity > 0 { 
1004            filter_entries as f64 / filter_capacity as f64 
1005        } else { 
1006            0.0 
1007        };
1008        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
1009        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
1010        
1011        crate::metrics::set_backpressure_level(self.pressure() as u8);
1012    }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::*;
1018    use crate::config::SyncEngineConfig;
1019    use tokio::sync::watch;
1020    use serde_json::json;
1021
1022    fn create_test_engine() -> SyncEngine {
1023        let config = SyncEngineConfig::default();
1024        let (_tx, rx) = watch::channel(config.clone());
1025        SyncEngine::new(config, rx)
1026    }
1027
1028    fn create_test_item(id: &str) -> SyncItem {
1029        SyncItem::from_json(
1030            id.to_string(),
1031            json!({"data": "test"}),
1032        )
1033    }
1034
1035    #[test]
1036    fn test_engine_created_state() {
1037        let engine = create_test_engine();
1038        assert_eq!(engine.state(), EngineState::Created);
1039        assert!(!engine.is_ready());
1040    }
1041
1042    #[test]
1043    fn test_memory_pressure_calculation() {
1044        let config = SyncEngineConfig {
1045            l1_max_bytes: 1000,
1046            ..Default::default()
1047        };
1048        let (_tx, rx) = watch::channel(config.clone());
1049        let engine = SyncEngine::new(config, rx);
1050
1051        assert_eq!(engine.memory_pressure(), 0.0);
1052
1053        // Simulate adding items
1054        let item = create_test_item("test1");
1055        engine.insert_l1(item);
1056
1057        // Pressure should be > 0 now
1058        assert!(engine.memory_pressure() > 0.0);
1059    }
1060
1061    #[test]
1062    fn test_l1_insert_and_size_tracking() {
1063        let engine = create_test_engine();
1064        
1065        let item = create_test_item("test1");
1066        let expected_size = item.size_bytes();
1067        
1068        engine.insert_l1(item);
1069        
1070        let (count, size) = engine.l1_stats();
1071        assert_eq!(count, 1);
1072        assert_eq!(size, expected_size);
1073    }
1074
1075    #[test]
1076    fn test_l1_update_size_tracking() {
1077        let engine = create_test_engine();
1078        
1079        let item1 = create_test_item("test1");
1080        engine.insert_l1(item1);
1081        let (_, _size1) = engine.l1_stats();
1082        
1083        // Insert larger item with same ID
1084        let item2 = SyncItem::from_json(
1085            "test1".to_string(),
1086            json!({"data": "much larger content here for testing size changes"}),
1087        );
1088        let size2_expected = item2.size_bytes();
1089        engine.insert_l1(item2);
1090        
1091        let (count, size2) = engine.l1_stats();
1092        assert_eq!(count, 1); // Still one item
1093        assert_eq!(size2, size2_expected); // Size should be updated
1094    }
1095
1096    #[tokio::test]
1097    async fn test_get_nonexistent() {
1098        let engine = create_test_engine();
1099        let result = engine.get("nonexistent").await.unwrap();
1100        assert!(result.is_none());
1101    }
1102
1103    #[tokio::test]
1104    async fn test_get_from_l1() {
1105        let engine = create_test_engine();
1106        let item = create_test_item("test1");
1107        engine.insert_l1(item.clone());
1108
1109        let result = engine.get("test1").await.unwrap();
1110        assert!(result.is_some());
1111        assert_eq!(result.unwrap().object_id, "test1");
1112    }
1113
1114    #[tokio::test]
1115    async fn test_delete_from_l1() {
1116        let engine = create_test_engine();
1117        let item = create_test_item("test1");
1118        engine.insert_l1(item);
1119
1120        let (count_before, _) = engine.l1_stats();
1121        assert_eq!(count_before, 1);
1122
1123        let deleted = engine.delete("test1").await.unwrap();
1124        assert!(deleted);
1125
1126        let (count_after, size_after) = engine.l1_stats();
1127        assert_eq!(count_after, 0);
1128        assert_eq!(size_after, 0);
1129    }
1130
1131    #[test]
1132    fn test_filter_stats() {
1133        let engine = create_test_engine();
1134        
1135        let (entries, capacity, _trust) = engine.l3_filter_stats();
1136        assert_eq!(entries, 0);
1137        assert!(capacity > 0);
1138    }
1139
1140    #[test]
1141    fn test_should_accept_writes() {
1142        let engine = create_test_engine();
1143        assert!(engine.should_accept_writes());
1144    }
1145
1146    #[test]
1147    fn test_pressure_level() {
1148        let engine = create_test_engine();
1149        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1150    }
1151
1152    #[tokio::test]
1153    async fn test_health_check_basic() {
1154        let engine = create_test_engine();
1155        
1156        let health = engine.health_check().await;
1157        
1158        // Engine is in Created state (not started)
1159        assert_eq!(health.state, EngineState::Created);
1160        assert!(!health.ready); // Not ready until started
1161        assert!(!health.healthy); // Not healthy (not Running)
1162        
1163        // Memory should be empty
1164        assert_eq!(health.memory_pressure, 0.0);
1165        assert_eq!(health.l1_items, 0);
1166        assert_eq!(health.l1_bytes, 0);
1167        
1168        // Backpressure should be normal
1169        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1170        assert!(health.accepting_writes);
1171        
1172        // No backends configured
1173        assert!(health.redis_connected.is_none());
1174        assert!(health.sql_connected.is_none());
1175        assert!(health.wal_pending_items.is_none());
1176    }
1177}