sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore, mpsc};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75/// Main sync engine coordinator.
76///
77/// Manages the three-tier storage architecture:
78/// - **L1**: In-memory DashMap with pressure-based eviction
79/// - **L2**: Redis cache with batch writes
80/// - **L3**: MySQL/SQLite archive (ground truth)
81///
82/// # Thread Safety
83///
84/// The engine is `Send + Sync` and designed for concurrent access.
85/// Internal state uses atomic operations and concurrent data structures.
86pub struct SyncEngine {
87    /// Configuration (can be updated at runtime via watch channel)
88    /// Uses RwLock for interior mutability so run() can take &self
89    pub(super) config: RwLock<SyncEngineConfig>,
90
91    /// Runtime config updates (Mutex for interior mutability in run loop)
92    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94    /// Engine state (broadcast to watchers)
95    pub(super) state: watch::Sender<EngineState>,
96
97    /// Engine state receiver (for internal use)
98    pub(super) state_rx: watch::Receiver<EngineState>,
99
100    /// L1: In-memory cache
101    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103    /// L1 size tracking (bytes)
104    pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106    /// L2: Redis cache (optional)
107    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108    
109    /// L2: Direct RedisStore reference for CDC operations
110    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112    /// L3: MySQL/SQLite archive (optional)
113    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114    
115    /// L3: Direct SqlStore reference for dirty merkle operations
116    pub(super) sql_store: Option<Arc<SqlStore>>,
117
118    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
119    pub(super) l3_filter: Arc<FilterManager>,
120
121    /// Filter persistence (for fast startup)
122    pub(super) filter_persistence: Option<FilterPersistence>,
123
124    /// CF snapshot tracking
125    pub(super) cf_inserts_since_snapshot: AtomicU64,
126    pub(super) cf_last_snapshot: Mutex<Instant>,
127
128    /// Hybrid batcher for L2 writes
129    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131    /// Redis merkle store
132    pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134    /// SQL merkle store
135    pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137    /// Write-ahead log for L3 durability
138    pub(super) l3_wal: Option<WriteAheadLog>,
139
140    /// MySQL health checker
141    pub(super) mysql_health: MysqlHealthChecker,
142
143    /// Eviction policy
144    pub(super) eviction_policy: TanCurvePolicy,
145
146    /// Search state (index manager + cache)
147    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149    /// SQL write concurrency limiter.
150    /// 
151    /// Limits concurrent sql_put_batch and merkle_nodes updates to reduce
152    /// row-level lock contention and deadlocks under high load.
153    pub(super) sql_write_semaphore: Arc<Semaphore>,
154
155    /// View submission queue sender.
156    ///
157    /// Dedicated channel for [`submit_view_batch()`](Self::submit_view_batch) to avoid
158    /// contention with the main batcher. Views (compacted CRDT state) are typically
159    /// larger and less frequent than deltas.
160    pub(super) view_queue_tx: mpsc::Sender<Vec<SyncItem>>,
161    
162    /// View submission queue receiver.
163    ///
164    /// Drained by [`maybe_flush_views()`] in the main loop.
165    pub(super) view_queue_rx: Mutex<mpsc::Receiver<Vec<SyncItem>>>,
166}
167
168impl SyncEngine {
169    /// Create a new sync engine.
170    ///
171    /// The engine starts in `Created` state. Call [`start()`](Self::start)
172    /// to connect to backends and transition to `Ready`.
173    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
174        let (state_tx, state_rx) = watch::channel(EngineState::Created);
175
176        let batch_config = BatchConfig {
177            flush_ms: config.batch_flush_ms,
178            flush_count: config.batch_flush_count,
179            flush_bytes: config.batch_flush_bytes,
180        };
181
182        // SQL write concurrency limiter - reduces row lock contention
183        let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
184
185        // View submission queue - dedicated channel for view materialization.
186        // Buffer size allows for burst without blocking callers.
187        const VIEW_QUEUE_BUFFER_SIZE: usize = 100;
188        let (view_queue_tx, view_queue_rx) = mpsc::channel(VIEW_QUEUE_BUFFER_SIZE);
189
190        Self {
191            config: RwLock::new(config.clone()),
192            config_rx: Mutex::new(config_rx),
193            state: state_tx,
194            state_rx,
195            l1_cache: Arc::new(DashMap::new()),
196            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
197            l2_store: None,
198            redis_store: None,
199            l3_store: None,
200            sql_store: None,
201            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
202            filter_persistence: None,
203            cf_inserts_since_snapshot: AtomicU64::new(0),
204            cf_last_snapshot: Mutex::new(Instant::now()),
205            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
206            redis_merkle: None,
207            sql_merkle: None,
208            l3_wal: None,
209            mysql_health: MysqlHealthChecker::new(),
210            eviction_policy: TanCurvePolicy::default(),
211            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
212            sql_write_semaphore,
213            view_queue_tx,
214            view_queue_rx: Mutex::new(view_queue_rx),
215        }
216    }
217
218    /// Get current engine state.
219    #[must_use]
220    pub fn state(&self) -> EngineState {
221        *self.state_rx.borrow()
222    }
223
224    /// Get a receiver to watch state changes.
225    #[must_use]
226    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
227        self.state_rx.clone()
228    }
229
230    /// Check if engine is ready to accept requests.
231    #[must_use]
232    pub fn is_ready(&self) -> bool {
233        matches!(self.state(), EngineState::Ready | EngineState::Running)
234    }
235
236    /// Get current memory pressure (0.0 - 1.0+).
237    #[must_use]
238    pub fn memory_pressure(&self) -> f64 {
239        let used = self.l1_size_bytes.load(Ordering::Acquire);
240        let max = self.config.read().l1_max_bytes;
241        if max == 0 {
242            0.0
243        } else {
244            used as f64 / max as f64
245        }
246    }
247
248    /// Get current backpressure level.
249    #[must_use]
250    pub fn pressure(&self) -> BackpressureLevel {
251        BackpressureLevel::from_pressure(self.memory_pressure())
252    }
253
254    /// Check if the engine should accept writes (based on pressure).
255    #[must_use]
256    pub fn should_accept_writes(&self) -> bool {
257        let pressure = self.pressure();
258        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
259    }
260
261    /// Perform a comprehensive health check.
262    ///
263    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
264    /// collects internal state into a [`HealthCheck`] struct suitable for
265    /// `/ready` and `/health` endpoints.
266    ///
267    /// # Performance
268    ///
269    /// - **Cached fields**: Instant (no I/O)
270    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
271    ///
272    /// # Example
273    ///
274    /// ```rust,ignore
275    /// let health = engine.health_check().await;
276    ///
277    /// // For /ready endpoint (load balancer)
278    /// if health.healthy {
279    ///     HttpResponse::Ok().body("ready")
280    /// } else {
281    ///     HttpResponse::ServiceUnavailable().body("not ready")
282    /// }
283    ///
284    /// // For /health endpoint (diagnostics)
285    /// HttpResponse::Ok().json(health)
286    /// ```
287    pub async fn health_check(&self) -> types::HealthCheck {
288        // Cached state (no I/O)
289        let state = self.state();
290        let ready = matches!(state, EngineState::Ready | EngineState::Running);
291        let memory_pressure = self.memory_pressure();
292        let backpressure_level = self.pressure();
293        let accepting_writes = self.should_accept_writes();
294        let (l1_items, l1_bytes) = self.l1_stats();
295        let (filter_items, _, filter_trust) = self.l3_filter_stats();
296        
297        // WAL stats (cached, no I/O)
298        // Note: WalStats doesn't have file_size, so we only report pending items
299        let mysql_healthy = self.mysql_health.is_healthy();
300        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
301            let stats = wal.stats(mysql_healthy);
302            Some(stats.pending_items)
303        } else {
304            None
305        };
306        
307        // Live backend probes (parallel)
308        let (redis_result, sql_result) = tokio::join!(
309            self.probe_redis(),
310            self.probe_sql()
311        );
312        
313        let (redis_connected, redis_latency_ms) = redis_result;
314        let (sql_connected, sql_latency_ms) = sql_result;
315        
316        // Derive overall health
317        // Healthy if: running, backends connected (or not configured), not in critical backpressure
318        let healthy = matches!(state, EngineState::Running)
319            && redis_connected != Some(false)
320            && sql_connected != Some(false)
321            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
322        
323        types::HealthCheck {
324            state,
325            ready,
326            memory_pressure,
327            backpressure_level,
328            accepting_writes,
329            l1_items,
330            l1_bytes,
331            filter_items,
332            filter_trust,
333            redis_connected,
334            redis_latency_ms,
335            sql_connected,
336            sql_latency_ms,
337            wal_pending_items,
338            healthy,
339        }
340    }
341    
342    /// Probe Redis connectivity with PING.
343    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
344        let Some(ref redis_store) = self.redis_store else {
345            return (None, None); // Redis not configured
346        };
347        
348        let start = std::time::Instant::now();
349        let mut conn = redis_store.connection();
350        
351        let result: Result<String, _> = redis::cmd("PING")
352            .query_async(&mut conn)
353            .await;
354        
355        match result {
356            Ok(_) => {
357                let latency_ms = start.elapsed().as_millis() as u64;
358                (Some(true), Some(latency_ms))
359            }
360            Err(_) => (Some(false), None),
361        }
362    }
363    
364    /// Probe SQL connectivity with SELECT 1.
365    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
366        let Some(ref sql_store) = self.sql_store else {
367            return (None, None); // SQL not configured
368        };
369        
370        let start = std::time::Instant::now();
371        let pool = sql_store.pool();
372        
373        let result = sqlx::query("SELECT 1")
374            .fetch_one(&pool)
375            .await;
376        
377        match result {
378            Ok(_) => {
379                let latency_ms = start.elapsed().as_millis() as u64;
380                (Some(true), Some(latency_ms))
381            }
382            Err(_) => (Some(false), None),
383        }
384    }
385
386    // --- Core CRUD Operations ---
387
388    /// Get an item by ID.
389    ///
390    /// Checks storage tiers in order: L1 → L2 → L3.
391    /// Updates access count and promotes to L1 on hit.
392    #[tracing::instrument(skip(self), fields(tier))]
393    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
394        let start = std::time::Instant::now();
395        
396        // 1. Check L1 (in-memory)
397        if let Some(mut item) = self.l1_cache.get_mut(id) {
398            item.access_count = item.access_count.saturating_add(1);
399            item.last_accessed = std::time::SystemTime::now()
400                .duration_since(std::time::UNIX_EPOCH)
401                .unwrap_or_default()
402                .as_millis() as u64;
403            tracing::Span::current().record("tier", "L1");
404            debug!("L1 hit");
405            crate::metrics::record_operation("L1", "get", "hit");
406            crate::metrics::record_latency("L1", "get", start.elapsed());
407            return Ok(Some(item.clone()));
408        }
409
410        // 2. Try L2 (Redis) - no filter, just try it
411        if let Some(ref l2) = self.l2_store {
412            match l2.get(id).await {
413                Ok(Some(item)) => {
414                    // Promote to L1
415                    self.insert_l1(item.clone());
416                    tracing::Span::current().record("tier", "L2");
417                    debug!("L2 hit, promoted to L1");
418                    crate::metrics::record_operation("L2", "get", "hit");
419                    crate::metrics::record_latency("L2", "get", start.elapsed());
420                    return Ok(Some(item));
421                }
422                Ok(None) => {
423                    // Not in Redis
424                    debug!("L2 miss");
425                    crate::metrics::record_operation("L2", "get", "miss");
426                }
427                Err(e) => {
428                    warn!(error = %e, "L2 lookup failed");
429                    crate::metrics::record_operation("L2", "get", "error");
430                }
431            }
432        }
433
434        // 3. Check L3 filter before hitting MySQL
435        if self.l3_filter.should_check_l3(id) {
436            crate::metrics::record_cuckoo_check("L3", "positive");
437            if let Some(ref l3) = self.l3_store {
438                match l3.get(id).await {
439                    Ok(Some(item)) => {
440                        // Promote to L1
441                        if self.memory_pressure() < 1.0 {
442                            self.insert_l1(item.clone());
443                        }
444                        tracing::Span::current().record("tier", "L3");
445                        debug!("L3 hit, promoted to L1");
446                        crate::metrics::record_operation("L3", "get", "hit");
447                        crate::metrics::record_latency("L3", "get", start.elapsed());
448                        crate::metrics::record_bytes_read("L3", item.content.len());
449                        return Ok(Some(item));
450                    }
451                    Ok(None) => {
452                        // False positive in filter
453                        debug!("L3 filter false positive");
454                        crate::metrics::record_operation("L3", "get", "false_positive");
455                        crate::metrics::record_cuckoo_false_positive("L3");
456                    }
457                    Err(e) => {
458                        warn!(error = %e, "L3 lookup failed");
459                        crate::metrics::record_operation("L3", "get", "error");
460                        crate::metrics::record_error("L3", "get", "backend");
461                    }
462                }
463            }
464        } else {
465            // Cuckoo filter says definitely not in L3 - saved a database query
466            crate::metrics::record_cuckoo_check("L3", "negative");
467        }
468
469        tracing::Span::current().record("tier", "miss");
470        debug!("Cache miss");
471        crate::metrics::record_operation("all", "get", "miss");
472        crate::metrics::record_latency("all", "get", start.elapsed());
473        Ok(None)
474    }
475
476    /// Get an item with hash verification.
477    ///
478    /// If the item has a non-empty `content_hash`, the content hash is verified.
479    /// Returns `StorageError::Corruption` if the hash doesn't match.
480    #[tracing::instrument(skip(self), fields(verified))]
481    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
482        let item = match self.get(id).await? {
483            Some(item) => item,
484            None => return Ok(None),
485        };
486
487        // Verify hash if item has content_hash set
488        if !item.content_hash.is_empty() {
489            use sha2::{Sha256, Digest};
490            
491            let computed = Sha256::digest(&item.content);
492            let computed_hex = hex::encode(computed);
493            
494            if computed_hex != item.content_hash {
495                tracing::Span::current().record("verified", false);
496                warn!(
497                    id = %id,
498                    expected = %item.content_hash,
499                    actual = %computed_hex,
500                    "Data corruption detected!"
501                );
502                
503                // Record corruption metric
504                crate::metrics::record_corruption(id);
505                
506                return Err(StorageError::Corruption {
507                    id: id.to_string(),
508                    expected: item.content_hash.clone(),
509                    actual: computed_hex,
510                });
511            }
512            
513            tracing::Span::current().record("verified", true);
514            debug!(id = %id, "Hash verification passed");
515        }
516
517        Ok(Some(item))
518    }
519
520    /// Submit an item for sync.
521    ///
522    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
523    /// Uses default options: Redis + SQL (both enabled).
524    /// Filters are updated only on successful writes in flush_batch_internal().
525    ///
526    /// For custom routing, use [`submit_with`](Self::submit_with).
527    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
528    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
529        self.submit_with(item, SubmitOptions::default()).await
530    }
531
532    /// Submit an item with custom routing options.
533    ///
534    /// The item is immediately stored in L1 and queued for batch write.
535    /// Items are batched by compatible options for efficient pipelined writes.
536    ///
537    /// # Example
538    ///
539    /// ```rust,no_run
540    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
541    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
542    /// // Cache-only with 1 minute TTL (no SQL write)
543    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
544    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
545    ///
546    /// // SQL-only durable storage (no Redis)
547    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
548    /// engine.submit_with(item, SubmitOptions::durable()).await?;
549    /// # Ok(())
550    /// # }
551    /// ```
552    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
553    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
554        let start = std::time::Instant::now();
555        
556        if !self.should_accept_writes() {
557            crate::metrics::record_operation("engine", "submit", "rejected");
558            crate::metrics::record_error("engine", "submit", "backpressure");
559            return Err(StorageError::Backend(format!(
560                "Rejecting write: engine state={}, pressure={}",
561                self.state(),
562                self.pressure()
563            )));
564        }
565
566        let id = item.object_id.clone();
567        let item_bytes = item.content.len();
568
569        // Apply state override from options (if provided)
570        if let Some(ref state) = options.state {
571            item.state = state.clone();
572        }
573        
574        // Attach options to item (travels through batch pipeline)
575        item.submit_options = Some(options);
576
577        // Insert into L1 (immediate, in-memory)
578        self.insert_l1(item.clone());
579        crate::metrics::record_operation("L1", "submit", "success");
580        crate::metrics::record_bytes_written("L1", item_bytes);
581        
582        // NOTE: We do NOT insert into L2/L3 filters here!
583        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
584        // This prevents filter/storage divergence if writes fail.
585        
586        // Queue for batched L2/L3 persistence
587        let batch_to_flush = {
588            let mut batcher = self.l2_batcher.lock().await;
589            if let Some(reason) = batcher.add(item) {
590                batcher.force_flush_with_reason(reason)
591            } else {
592                None
593            }
594        };
595
596        if let Some(batch) = batch_to_flush {
597            // Flush immediately (provides backpressure)
598            self.flush_batch_internal(batch).await;
599        }
600
601        debug!(id = %id, "Item submitted to L1 and batch queue");
602        crate::metrics::record_latency("L1", "submit", start.elapsed());
603        Ok(())
604    }
605
606    /// Delete an item from all storage tiers.
607    /// 
608    /// Deletes are more complex than writes because the item may exist in:
609    /// - L1 (DashMap) - immediate removal
610    /// - L2 (Redis) - async removal
611    /// - L3 (MySQL) - async removal  
612    /// - Cuckoo filters (L2/L3) - remove from both
613    /// - Merkle trees - update with deletion marker
614    #[tracing::instrument(skip(self), fields(object_id = %id))]
615    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
616        let start = std::time::Instant::now();
617        
618        if !self.should_accept_writes() {
619            crate::metrics::record_operation("engine", "delete", "rejected");
620            crate::metrics::record_error("engine", "delete", "backpressure");
621            return Err(StorageError::Backend(format!(
622                "Rejecting delete: engine state={}, pressure={}",
623                self.state(),
624                self.pressure()
625            )));
626        }
627
628        let mut found = false;
629
630        // 1. Remove from L1 (immediate)
631        if let Some((_, item)) = self.l1_cache.remove(id) {
632            let size = Self::item_size(&item);
633            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
634            found = true;
635            debug!("Deleted from L1");
636            crate::metrics::record_operation("L1", "delete", "success");
637        }
638
639        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
640        self.l3_filter.remove(id);
641
642        // 3. Delete from L2 (Redis) - best effort
643        if let Some(ref l2) = self.l2_store {
644            let l2_start = std::time::Instant::now();
645            match l2.delete(id).await {
646                Ok(()) => {
647                    found = true;
648                    debug!("Deleted from L2 (Redis)");
649                    crate::metrics::record_operation("L2", "delete", "success");
650                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
651                }
652                Err(e) => {
653                    warn!(error = %e, "Failed to delete from L2 (Redis)");
654                    crate::metrics::record_operation("L2", "delete", "error");
655                    crate::metrics::record_error("L2", "delete", "backend");
656                }
657            }
658        }
659
660        // 4. Delete from L3 (MySQL) - ground truth
661        if let Some(ref l3) = self.l3_store {
662            let l3_start = std::time::Instant::now();
663            match l3.delete(id).await {
664                Ok(()) => {
665                    found = true;
666                    debug!("Deleted from L3 (MySQL)");
667                    crate::metrics::record_operation("L3", "delete", "success");
668                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
669                }
670                Err(e) => {
671                    error!(error = %e, "Failed to delete from L3 (MySQL)");
672                    crate::metrics::record_operation("L3", "delete", "error");
673                    crate::metrics::record_error("L3", "delete", "backend");
674                    // Don't return error - item may not exist in L3
675                }
676            }
677        }
678
679        // 5. Update merkle trees with deletion marker
680        let mut merkle_batch = MerkleBatch::new();
681        merkle_batch.delete(id.to_string());
682
683        if let Some(ref sql_merkle) = self.sql_merkle {
684            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
685                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
686                crate::metrics::record_error("L3", "merkle", "batch_apply");
687            }
688        }
689
690        if let Some(ref redis_merkle) = self.redis_merkle {
691            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
692                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
693                crate::metrics::record_error("L2", "merkle", "batch_apply");
694            }
695        }
696
697        // 6. Emit CDC delete entry (if enabled)
698        if self.config.read().enable_cdc_stream && found {
699            self.emit_cdc_delete(id).await;
700        }
701
702        info!(found, "Delete operation completed");
703        crate::metrics::record_latency("all", "delete", start.elapsed());
704        Ok(found)
705    }
706
707    // --- Internal helpers ---
708
709    /// Insert or update an item in L1, correctly tracking size.
710    fn insert_l1(&self, item: SyncItem) {
711        let new_size = Self::item_size(&item);
712        let key = item.object_id.clone();
713        
714        // Use entry API to handle insert vs update atomically
715        if let Some(old_item) = self.l1_cache.insert(key, item) {
716            // Update: subtract old size, add new size
717            let old_size = Self::item_size(&old_item);
718            // Use wrapping operations to avoid underflow if sizes are estimated
719            let current = self.l1_size_bytes.load(Ordering::Acquire);
720            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
721            self.l1_size_bytes.store(new_total, Ordering::Release);
722        } else {
723            // Insert: just add new size
724            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
725        }
726    }
727
728    /// Calculate approximate size of an item in bytes.
729    #[inline]
730    fn item_size(item: &SyncItem) -> usize {
731        // Use cached size if available, otherwise compute
732        item.size_bytes()
733    }
734
735    fn maybe_evict(&self) {
736        let pressure = self.memory_pressure();
737        if pressure < self.config.read().backpressure_warn {
738            return;
739        }
740
741        let level = BackpressureLevel::from_pressure(pressure);
742        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
743        
744        // Collect cache entries for scoring
745        let now = std::time::Instant::now();
746        let entries: Vec<CacheEntry> = self.l1_cache.iter()
747            .map(|ref_multi| {
748                let item = ref_multi.value();
749                let id = ref_multi.key().clone();
750                
751                // Convert epoch millis to Instant-relative age
752                let now_millis = std::time::SystemTime::now()
753                    .duration_since(std::time::UNIX_EPOCH)
754                    .unwrap_or_default()
755                    .as_millis() as u64;
756                let age_secs = if item.last_accessed > 0 {
757                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
758                } else {
759                    3600.0 // Default 1 hour if never accessed
760                };
761                
762                CacheEntry {
763                    id,
764                    size_bytes: item.size_bytes(),
765                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
766                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
767                    access_count: item.access_count,
768                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
769                }
770            })
771            .collect();
772        
773        if entries.is_empty() {
774            return;
775        }
776        
777        // Calculate how many to evict based on pressure level
778        let evict_count = match level {
779            BackpressureLevel::Normal => 0,
780            BackpressureLevel::Warn => entries.len() / 20,    // 5%
781            BackpressureLevel::Throttle => entries.len() / 10, // 10%
782            BackpressureLevel::Critical => entries.len() / 5,  // 20%
783            BackpressureLevel::Emergency => entries.len() / 3, // 33%
784            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
785        }.max(1);
786        
787        // Select victims using tan curve algorithm
788        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
789        
790        // Evict victims
791        let mut evicted_bytes = 0usize;
792        for victim_id in &victims {
793            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
794                evicted_bytes += item.size_bytes();
795            }
796        }
797        
798        // Update size tracking
799        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
800        
801        info!(
802            evicted = victims.len(),
803            evicted_bytes = evicted_bytes,
804            pressure = %pressure,
805            level = %level,
806            "Evicted entries from L1 cache"
807        );
808    }
809
810    /// Get L1 cache stats
811    pub fn l1_stats(&self) -> (usize, usize) {
812        (
813            self.l1_cache.len(),
814            self.l1_size_bytes.load(Ordering::Acquire),
815        )
816    }
817
818    /// Get L3 filter stats (entries, capacity, trust_state)
819    #[must_use]
820    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
821        self.l3_filter.stats()
822    }
823
824    /// Get access to the L3 filter (for warmup/verification)
825    pub fn l3_filter(&self) -> &Arc<FilterManager> {
826        &self.l3_filter
827    }
828
829    /// Get merkle root hashes from Redis (L2) and SQL (L3).
830    /// 
831    /// Returns `(redis_root, sql_root)` as hex strings.
832    /// Returns `None` for backends that aren't connected or have empty trees.
833    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
834        let redis_root = if let Some(ref rm) = self.redis_merkle {
835            rm.root_hash().await.ok().flatten().map(hex::encode)
836        } else {
837            None
838        };
839        
840        let sql_root = if let Some(ref sm) = self.sql_merkle {
841            sm.root_hash().await.ok().flatten().map(hex::encode)
842        } else {
843            None
844        };
845        
846        (redis_root, sql_root)
847    }
848
849    /// Verify and trust the L3 cuckoo filter.
850    /// 
851    /// Compares the filter's merkle root against L3's merkle root.
852    /// If they match, marks the filter as trusted.
853    /// 
854    /// Returns `true` if the filter is now trusted, `false` otherwise.
855    pub async fn verify_filter(&self) -> bool {
856        // Get SQL merkle root
857        let sql_root = if let Some(ref sm) = self.sql_merkle {
858            match sm.root_hash().await {
859                Ok(Some(root)) => root,
860                _ => return false,
861            }
862        } else {
863            // No SQL backend - can't verify, mark trusted anyway
864            self.l3_filter.mark_trusted();
865            return true;
866        };
867
868        // For now, we trust the filter if we have a SQL root
869        // A full implementation would compare CF merkle against SQL merkle
870        // But since CF doesn't maintain a merkle tree, we trust after warmup
871        info!(
872            sql_root = %hex::encode(sql_root),
873            "Verifying L3 filter against SQL merkle root"
874        );
875        
876        // Mark trusted if we got here (SQL is connected and has a root)
877        self.l3_filter.mark_trusted();
878        true
879    }
880
881    /// Update all gauge metrics with current engine state.
882    /// 
883    /// Call this before snapshotting metrics to ensure gauges reflect current state.
884    /// Useful for OTEL export or monitoring dashboards.
885    pub fn update_gauge_metrics(&self) {
886        let (l1_count, l1_bytes) = self.l1_stats();
887        crate::metrics::set_l1_cache_items(l1_count);
888        crate::metrics::set_l1_cache_bytes(l1_bytes);
889        crate::metrics::set_memory_pressure(self.memory_pressure());
890        
891        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
892        let filter_load = if filter_capacity > 0 { 
893            filter_entries as f64 / filter_capacity as f64 
894        } else { 
895            0.0 
896        };
897        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
898        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
899        
900        crate::metrics::set_backpressure_level(self.pressure() as u8);
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use crate::config::SyncEngineConfig;
908    use tokio::sync::watch;
909    use serde_json::json;
910
911    fn create_test_engine() -> SyncEngine {
912        let config = SyncEngineConfig::default();
913        let (_tx, rx) = watch::channel(config.clone());
914        SyncEngine::new(config, rx)
915    }
916
917    fn create_test_item(id: &str) -> SyncItem {
918        SyncItem::from_json(
919            id.to_string(),
920            json!({"data": "test"}),
921        )
922    }
923
924    #[test]
925    fn test_engine_created_state() {
926        let engine = create_test_engine();
927        assert_eq!(engine.state(), EngineState::Created);
928        assert!(!engine.is_ready());
929    }
930
931    #[test]
932    fn test_memory_pressure_calculation() {
933        let config = SyncEngineConfig {
934            l1_max_bytes: 1000,
935            ..Default::default()
936        };
937        let (_tx, rx) = watch::channel(config.clone());
938        let engine = SyncEngine::new(config, rx);
939
940        assert_eq!(engine.memory_pressure(), 0.0);
941
942        // Simulate adding items
943        let item = create_test_item("test1");
944        engine.insert_l1(item);
945
946        // Pressure should be > 0 now
947        assert!(engine.memory_pressure() > 0.0);
948    }
949
950    #[test]
951    fn test_l1_insert_and_size_tracking() {
952        let engine = create_test_engine();
953        
954        let item = create_test_item("test1");
955        let expected_size = item.size_bytes();
956        
957        engine.insert_l1(item);
958        
959        let (count, size) = engine.l1_stats();
960        assert_eq!(count, 1);
961        assert_eq!(size, expected_size);
962    }
963
964    #[test]
965    fn test_l1_update_size_tracking() {
966        let engine = create_test_engine();
967        
968        let item1 = create_test_item("test1");
969        engine.insert_l1(item1);
970        let (_, _size1) = engine.l1_stats();
971        
972        // Insert larger item with same ID
973        let item2 = SyncItem::from_json(
974            "test1".to_string(),
975            json!({"data": "much larger content here for testing size changes"}),
976        );
977        let size2_expected = item2.size_bytes();
978        engine.insert_l1(item2);
979        
980        let (count, size2) = engine.l1_stats();
981        assert_eq!(count, 1); // Still one item
982        assert_eq!(size2, size2_expected); // Size should be updated
983    }
984
985    #[tokio::test]
986    async fn test_get_nonexistent() {
987        let engine = create_test_engine();
988        let result = engine.get("nonexistent").await.unwrap();
989        assert!(result.is_none());
990    }
991
992    #[tokio::test]
993    async fn test_get_from_l1() {
994        let engine = create_test_engine();
995        let item = create_test_item("test1");
996        engine.insert_l1(item.clone());
997
998        let result = engine.get("test1").await.unwrap();
999        assert!(result.is_some());
1000        assert_eq!(result.unwrap().object_id, "test1");
1001    }
1002
1003    #[tokio::test]
1004    async fn test_delete_from_l1() {
1005        let engine = create_test_engine();
1006        let item = create_test_item("test1");
1007        engine.insert_l1(item);
1008
1009        let (count_before, _) = engine.l1_stats();
1010        assert_eq!(count_before, 1);
1011
1012        let deleted = engine.delete("test1").await.unwrap();
1013        assert!(deleted);
1014
1015        let (count_after, size_after) = engine.l1_stats();
1016        assert_eq!(count_after, 0);
1017        assert_eq!(size_after, 0);
1018    }
1019
1020    #[test]
1021    fn test_filter_stats() {
1022        let engine = create_test_engine();
1023        
1024        let (entries, capacity, _trust) = engine.l3_filter_stats();
1025        assert_eq!(entries, 0);
1026        assert!(capacity > 0);
1027    }
1028
1029    #[test]
1030    fn test_should_accept_writes() {
1031        let engine = create_test_engine();
1032        assert!(engine.should_accept_writes());
1033    }
1034
1035    #[test]
1036    fn test_pressure_level() {
1037        let engine = create_test_engine();
1038        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_health_check_basic() {
1043        let engine = create_test_engine();
1044        
1045        let health = engine.health_check().await;
1046        
1047        // Engine is in Created state (not started)
1048        assert_eq!(health.state, EngineState::Created);
1049        assert!(!health.ready); // Not ready until started
1050        assert!(!health.healthy); // Not healthy (not Running)
1051        
1052        // Memory should be empty
1053        assert_eq!(health.memory_pressure, 0.0);
1054        assert_eq!(health.l1_items, 0);
1055        assert_eq!(health.l1_bytes, 0);
1056        
1057        // Backpressure should be normal
1058        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1059        assert!(health.accepting_writes);
1060        
1061        // No backends configured
1062        assert!(health.redis_connected.is_none());
1063        assert!(health.sql_connected.is_none());
1064        assert!(health.wal_pending_items.is_none());
1065    }
1066}