sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75/// Main sync engine coordinator.
76///
77/// Manages the three-tier storage architecture:
78/// - **L1**: In-memory DashMap with pressure-based eviction
79/// - **L2**: Redis cache with batch writes
80/// - **L3**: MySQL/SQLite archive (ground truth)
81///
82/// # Thread Safety
83///
84/// The engine is `Send + Sync` and designed for concurrent access.
85/// Internal state uses atomic operations and concurrent data structures.
86pub struct SyncEngine {
87    /// Configuration (can be updated at runtime via watch channel)
88    /// Uses RwLock for interior mutability so run() can take &self
89    pub(super) config: RwLock<SyncEngineConfig>,
90
91    /// Runtime config updates (Mutex for interior mutability in run loop)
92    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94    /// Engine state (broadcast to watchers)
95    pub(super) state: watch::Sender<EngineState>,
96
97    /// Engine state receiver (for internal use)
98    pub(super) state_rx: watch::Receiver<EngineState>,
99
100    /// L1: In-memory cache
101    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103    /// L1 size tracking (bytes)
104    pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106    /// L2: Redis cache (optional)
107    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108    
109    /// L2: Direct RedisStore reference for CDC operations
110    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112    /// L3: MySQL/SQLite archive (optional)
113    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114    
115    /// L3: Direct SqlStore reference for dirty merkle operations
116    pub(super) sql_store: Option<Arc<SqlStore>>,
117
118    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
119    pub(super) l3_filter: Arc<FilterManager>,
120
121    /// Filter persistence (for fast startup)
122    pub(super) filter_persistence: Option<FilterPersistence>,
123
124    /// CF snapshot tracking
125    pub(super) cf_inserts_since_snapshot: AtomicU64,
126    pub(super) cf_last_snapshot: Mutex<Instant>,
127
128    /// Hybrid batcher for L2 writes
129    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131    /// Redis merkle store
132    pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134    /// SQL merkle store
135    pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137    /// Write-ahead log for L3 durability
138    pub(super) l3_wal: Option<WriteAheadLog>,
139
140    /// MySQL health checker
141    pub(super) mysql_health: MysqlHealthChecker,
142
143    /// Eviction policy
144    pub(super) eviction_policy: TanCurvePolicy,
145
146    /// Search state (index manager + cache)
147    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149    /// SQL write concurrency limiter
150    /// 
151    /// Limits concurrent sql_put_batch and merkle_nodes updates to reduce
152    /// row-level lock contention and deadlocks under high load.
153    pub(super) sql_write_semaphore: Arc<Semaphore>,
154}
155
156impl SyncEngine {
157    /// Create a new sync engine.
158    ///
159    /// The engine starts in `Created` state. Call [`start()`](Self::start)
160    /// to connect to backends and transition to `Ready`.
161    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
162        let (state_tx, state_rx) = watch::channel(EngineState::Created);
163
164        let batch_config = BatchConfig {
165            flush_ms: config.batch_flush_ms,
166            flush_count: config.batch_flush_count,
167            flush_bytes: config.batch_flush_bytes,
168        };
169
170        // SQL write concurrency limiter - reduces row lock contention
171        let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
172
173        Self {
174            config: RwLock::new(config.clone()),
175            config_rx: Mutex::new(config_rx),
176            state: state_tx,
177            state_rx,
178            l1_cache: Arc::new(DashMap::new()),
179            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
180            l2_store: None,
181            redis_store: None,
182            l3_store: None,
183            sql_store: None,
184            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
185            filter_persistence: None,
186            cf_inserts_since_snapshot: AtomicU64::new(0),
187            cf_last_snapshot: Mutex::new(Instant::now()),
188            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
189            redis_merkle: None,
190            sql_merkle: None,
191            l3_wal: None,
192            mysql_health: MysqlHealthChecker::new(),
193            eviction_policy: TanCurvePolicy::default(),
194            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
195            sql_write_semaphore,
196        }
197    }
198
199    /// Get current engine state.
200    #[must_use]
201    pub fn state(&self) -> EngineState {
202        *self.state_rx.borrow()
203    }
204
205    /// Get a receiver to watch state changes.
206    #[must_use]
207    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
208        self.state_rx.clone()
209    }
210
211    /// Check if engine is ready to accept requests.
212    #[must_use]
213    pub fn is_ready(&self) -> bool {
214        matches!(self.state(), EngineState::Ready | EngineState::Running)
215    }
216
217    /// Get current memory pressure (0.0 - 1.0+).
218    #[must_use]
219    pub fn memory_pressure(&self) -> f64 {
220        let used = self.l1_size_bytes.load(Ordering::Acquire);
221        let max = self.config.read().l1_max_bytes;
222        if max == 0 {
223            0.0
224        } else {
225            used as f64 / max as f64
226        }
227    }
228
229    /// Get current backpressure level.
230    #[must_use]
231    pub fn pressure(&self) -> BackpressureLevel {
232        BackpressureLevel::from_pressure(self.memory_pressure())
233    }
234
235    /// Check if the engine should accept writes (based on pressure).
236    #[must_use]
237    pub fn should_accept_writes(&self) -> bool {
238        let pressure = self.pressure();
239        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
240    }
241
242    /// Perform a comprehensive health check.
243    ///
244    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
245    /// collects internal state into a [`HealthCheck`] struct suitable for
246    /// `/ready` and `/health` endpoints.
247    ///
248    /// # Performance
249    ///
250    /// - **Cached fields**: Instant (no I/O)
251    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
252    ///
253    /// # Example
254    ///
255    /// ```rust,ignore
256    /// let health = engine.health_check().await;
257    ///
258    /// // For /ready endpoint (load balancer)
259    /// if health.healthy {
260    ///     HttpResponse::Ok().body("ready")
261    /// } else {
262    ///     HttpResponse::ServiceUnavailable().body("not ready")
263    /// }
264    ///
265    /// // For /health endpoint (diagnostics)
266    /// HttpResponse::Ok().json(health)
267    /// ```
268    pub async fn health_check(&self) -> types::HealthCheck {
269        // Cached state (no I/O)
270        let state = self.state();
271        let ready = matches!(state, EngineState::Ready | EngineState::Running);
272        let memory_pressure = self.memory_pressure();
273        let backpressure_level = self.pressure();
274        let accepting_writes = self.should_accept_writes();
275        let (l1_items, l1_bytes) = self.l1_stats();
276        let (filter_items, _, filter_trust) = self.l3_filter_stats();
277        
278        // WAL stats (cached, no I/O)
279        // Note: WalStats doesn't have file_size, so we only report pending items
280        let mysql_healthy = self.mysql_health.is_healthy();
281        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
282            let stats = wal.stats(mysql_healthy);
283            Some(stats.pending_items)
284        } else {
285            None
286        };
287        
288        // Live backend probes (parallel)
289        let (redis_result, sql_result) = tokio::join!(
290            self.probe_redis(),
291            self.probe_sql()
292        );
293        
294        let (redis_connected, redis_latency_ms) = redis_result;
295        let (sql_connected, sql_latency_ms) = sql_result;
296        
297        // Derive overall health
298        // Healthy if: running, backends connected (or not configured), not in critical backpressure
299        let healthy = matches!(state, EngineState::Running)
300            && redis_connected != Some(false)
301            && sql_connected != Some(false)
302            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
303        
304        types::HealthCheck {
305            state,
306            ready,
307            memory_pressure,
308            backpressure_level,
309            accepting_writes,
310            l1_items,
311            l1_bytes,
312            filter_items,
313            filter_trust,
314            redis_connected,
315            redis_latency_ms,
316            sql_connected,
317            sql_latency_ms,
318            wal_pending_items,
319            healthy,
320        }
321    }
322    
323    /// Probe Redis connectivity with PING.
324    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
325        let Some(ref redis_store) = self.redis_store else {
326            return (None, None); // Redis not configured
327        };
328        
329        let start = std::time::Instant::now();
330        let mut conn = redis_store.connection();
331        
332        let result: Result<String, _> = redis::cmd("PING")
333            .query_async(&mut conn)
334            .await;
335        
336        match result {
337            Ok(_) => {
338                let latency_ms = start.elapsed().as_millis() as u64;
339                (Some(true), Some(latency_ms))
340            }
341            Err(_) => (Some(false), None),
342        }
343    }
344    
345    /// Probe SQL connectivity with SELECT 1.
346    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
347        let Some(ref sql_store) = self.sql_store else {
348            return (None, None); // SQL not configured
349        };
350        
351        let start = std::time::Instant::now();
352        let pool = sql_store.pool();
353        
354        let result = sqlx::query("SELECT 1")
355            .fetch_one(&pool)
356            .await;
357        
358        match result {
359            Ok(_) => {
360                let latency_ms = start.elapsed().as_millis() as u64;
361                (Some(true), Some(latency_ms))
362            }
363            Err(_) => (Some(false), None),
364        }
365    }
366
367    // --- Core CRUD Operations ---
368
369    /// Get an item by ID.
370    ///
371    /// Checks storage tiers in order: L1 → L2 → L3.
372    /// Updates access count and promotes to L1 on hit.
373    #[tracing::instrument(skip(self), fields(tier))]
374    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
375        let start = std::time::Instant::now();
376        
377        // 1. Check L1 (in-memory)
378        if let Some(mut item) = self.l1_cache.get_mut(id) {
379            item.access_count = item.access_count.saturating_add(1);
380            item.last_accessed = std::time::SystemTime::now()
381                .duration_since(std::time::UNIX_EPOCH)
382                .unwrap_or_default()
383                .as_millis() as u64;
384            tracing::Span::current().record("tier", "L1");
385            debug!("L1 hit");
386            crate::metrics::record_operation("L1", "get", "hit");
387            crate::metrics::record_latency("L1", "get", start.elapsed());
388            return Ok(Some(item.clone()));
389        }
390
391        // 2. Try L2 (Redis) - no filter, just try it
392        if let Some(ref l2) = self.l2_store {
393            match l2.get(id).await {
394                Ok(Some(item)) => {
395                    // Promote to L1
396                    self.insert_l1(item.clone());
397                    tracing::Span::current().record("tier", "L2");
398                    debug!("L2 hit, promoted to L1");
399                    crate::metrics::record_operation("L2", "get", "hit");
400                    crate::metrics::record_latency("L2", "get", start.elapsed());
401                    return Ok(Some(item));
402                }
403                Ok(None) => {
404                    // Not in Redis
405                    debug!("L2 miss");
406                    crate::metrics::record_operation("L2", "get", "miss");
407                }
408                Err(e) => {
409                    warn!(error = %e, "L2 lookup failed");
410                    crate::metrics::record_operation("L2", "get", "error");
411                }
412            }
413        }
414
415        // 3. Check L3 filter before hitting MySQL
416        if self.l3_filter.should_check_l3(id) {
417            crate::metrics::record_cuckoo_check("L3", "positive");
418            if let Some(ref l3) = self.l3_store {
419                match l3.get(id).await {
420                    Ok(Some(item)) => {
421                        // Promote to L1
422                        if self.memory_pressure() < 1.0 {
423                            self.insert_l1(item.clone());
424                        }
425                        tracing::Span::current().record("tier", "L3");
426                        debug!("L3 hit, promoted to L1");
427                        crate::metrics::record_operation("L3", "get", "hit");
428                        crate::metrics::record_latency("L3", "get", start.elapsed());
429                        crate::metrics::record_bytes_read("L3", item.content.len());
430                        return Ok(Some(item));
431                    }
432                    Ok(None) => {
433                        // False positive in filter
434                        debug!("L3 filter false positive");
435                        crate::metrics::record_operation("L3", "get", "false_positive");
436                        crate::metrics::record_cuckoo_false_positive("L3");
437                    }
438                    Err(e) => {
439                        warn!(error = %e, "L3 lookup failed");
440                        crate::metrics::record_operation("L3", "get", "error");
441                        crate::metrics::record_error("L3", "get", "backend");
442                    }
443                }
444            }
445        } else {
446            // Cuckoo filter says definitely not in L3 - saved a database query
447            crate::metrics::record_cuckoo_check("L3", "negative");
448        }
449
450        tracing::Span::current().record("tier", "miss");
451        debug!("Cache miss");
452        crate::metrics::record_operation("all", "get", "miss");
453        crate::metrics::record_latency("all", "get", start.elapsed());
454        Ok(None)
455    }
456
457    /// Get an item with hash verification.
458    ///
459    /// If the item has a non-empty `content_hash`, the content hash is verified.
460    /// Returns `StorageError::Corruption` if the hash doesn't match.
461    #[tracing::instrument(skip(self), fields(verified))]
462    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
463        let item = match self.get(id).await? {
464            Some(item) => item,
465            None => return Ok(None),
466        };
467
468        // Verify hash if item has content_hash set
469        if !item.content_hash.is_empty() {
470            use sha2::{Sha256, Digest};
471            
472            let computed = Sha256::digest(&item.content);
473            let computed_hex = hex::encode(computed);
474            
475            if computed_hex != item.content_hash {
476                tracing::Span::current().record("verified", false);
477                warn!(
478                    id = %id,
479                    expected = %item.content_hash,
480                    actual = %computed_hex,
481                    "Data corruption detected!"
482                );
483                
484                // Record corruption metric
485                crate::metrics::record_corruption(id);
486                
487                return Err(StorageError::Corruption {
488                    id: id.to_string(),
489                    expected: item.content_hash.clone(),
490                    actual: computed_hex,
491                });
492            }
493            
494            tracing::Span::current().record("verified", true);
495            debug!(id = %id, "Hash verification passed");
496        }
497
498        Ok(Some(item))
499    }
500
501    /// Submit an item for sync.
502    ///
503    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
504    /// Uses default options: Redis + SQL (both enabled).
505    /// Filters are updated only on successful writes in flush_batch_internal().
506    ///
507    /// For custom routing, use [`submit_with`](Self::submit_with).
508    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
509    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
510        self.submit_with(item, SubmitOptions::default()).await
511    }
512
513    /// Submit an item with custom routing options.
514    ///
515    /// The item is immediately stored in L1 and queued for batch write.
516    /// Items are batched by compatible options for efficient pipelined writes.
517    ///
518    /// # Example
519    ///
520    /// ```rust,no_run
521    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
522    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
523    /// // Cache-only with 1 minute TTL (no SQL write)
524    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
525    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
526    ///
527    /// // SQL-only durable storage (no Redis)
528    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
529    /// engine.submit_with(item, SubmitOptions::durable()).await?;
530    /// # Ok(())
531    /// # }
532    /// ```
533    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
534    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
535        let start = std::time::Instant::now();
536        
537        if !self.should_accept_writes() {
538            crate::metrics::record_operation("engine", "submit", "rejected");
539            crate::metrics::record_error("engine", "submit", "backpressure");
540            return Err(StorageError::Backend(format!(
541                "Rejecting write: engine state={}, pressure={}",
542                self.state(),
543                self.pressure()
544            )));
545        }
546
547        let id = item.object_id.clone();
548        let item_bytes = item.content.len();
549
550        // Apply state override from options (if provided)
551        if let Some(ref state) = options.state {
552            item.state = state.clone();
553        }
554        
555        // Attach options to item (travels through batch pipeline)
556        item.submit_options = Some(options);
557
558        // Insert into L1 (immediate, in-memory)
559        self.insert_l1(item.clone());
560        crate::metrics::record_operation("L1", "submit", "success");
561        crate::metrics::record_bytes_written("L1", item_bytes);
562        
563        // NOTE: We do NOT insert into L2/L3 filters here!
564        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
565        // This prevents filter/storage divergence if writes fail.
566        
567        // Queue for batched L2/L3 persistence
568        let batch_to_flush = {
569            let mut batcher = self.l2_batcher.lock().await;
570            if let Some(reason) = batcher.add(item) {
571                batcher.force_flush_with_reason(reason)
572            } else {
573                None
574            }
575        };
576
577        if let Some(batch) = batch_to_flush {
578            // Flush immediately (provides backpressure)
579            self.flush_batch_internal(batch).await;
580        }
581
582        debug!(id = %id, "Item submitted to L1 and batch queue");
583        crate::metrics::record_latency("L1", "submit", start.elapsed());
584        Ok(())
585    }
586
587    /// Delete an item from all storage tiers.
588    /// 
589    /// Deletes are more complex than writes because the item may exist in:
590    /// - L1 (DashMap) - immediate removal
591    /// - L2 (Redis) - async removal
592    /// - L3 (MySQL) - async removal  
593    /// - Cuckoo filters (L2/L3) - remove from both
594    /// - Merkle trees - update with deletion marker
595    #[tracing::instrument(skip(self), fields(object_id = %id))]
596    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
597        let start = std::time::Instant::now();
598        
599        if !self.should_accept_writes() {
600            crate::metrics::record_operation("engine", "delete", "rejected");
601            crate::metrics::record_error("engine", "delete", "backpressure");
602            return Err(StorageError::Backend(format!(
603                "Rejecting delete: engine state={}, pressure={}",
604                self.state(),
605                self.pressure()
606            )));
607        }
608
609        let mut found = false;
610
611        // 1. Remove from L1 (immediate)
612        if let Some((_, item)) = self.l1_cache.remove(id) {
613            let size = Self::item_size(&item);
614            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
615            found = true;
616            debug!("Deleted from L1");
617            crate::metrics::record_operation("L1", "delete", "success");
618        }
619
620        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
621        self.l3_filter.remove(id);
622
623        // 3. Delete from L2 (Redis) - best effort
624        if let Some(ref l2) = self.l2_store {
625            let l2_start = std::time::Instant::now();
626            match l2.delete(id).await {
627                Ok(()) => {
628                    found = true;
629                    debug!("Deleted from L2 (Redis)");
630                    crate::metrics::record_operation("L2", "delete", "success");
631                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
632                }
633                Err(e) => {
634                    warn!(error = %e, "Failed to delete from L2 (Redis)");
635                    crate::metrics::record_operation("L2", "delete", "error");
636                    crate::metrics::record_error("L2", "delete", "backend");
637                }
638            }
639        }
640
641        // 4. Delete from L3 (MySQL) - ground truth
642        if let Some(ref l3) = self.l3_store {
643            let l3_start = std::time::Instant::now();
644            match l3.delete(id).await {
645                Ok(()) => {
646                    found = true;
647                    debug!("Deleted from L3 (MySQL)");
648                    crate::metrics::record_operation("L3", "delete", "success");
649                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
650                }
651                Err(e) => {
652                    error!(error = %e, "Failed to delete from L3 (MySQL)");
653                    crate::metrics::record_operation("L3", "delete", "error");
654                    crate::metrics::record_error("L3", "delete", "backend");
655                    // Don't return error - item may not exist in L3
656                }
657            }
658        }
659
660        // 5. Update merkle trees with deletion marker
661        let mut merkle_batch = MerkleBatch::new();
662        merkle_batch.delete(id.to_string());
663
664        if let Some(ref sql_merkle) = self.sql_merkle {
665            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
666                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
667                crate::metrics::record_error("L3", "merkle", "batch_apply");
668            }
669        }
670
671        if let Some(ref redis_merkle) = self.redis_merkle {
672            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
673                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
674                crate::metrics::record_error("L2", "merkle", "batch_apply");
675            }
676        }
677
678        // 6. Emit CDC delete entry (if enabled)
679        if self.config.read().enable_cdc_stream && found {
680            self.emit_cdc_delete(id).await;
681        }
682
683        info!(found, "Delete operation completed");
684        crate::metrics::record_latency("all", "delete", start.elapsed());
685        Ok(found)
686    }
687
688    // --- Internal helpers ---
689
690    /// Insert or update an item in L1, correctly tracking size.
691    fn insert_l1(&self, item: SyncItem) {
692        let new_size = Self::item_size(&item);
693        let key = item.object_id.clone();
694        
695        // Use entry API to handle insert vs update atomically
696        if let Some(old_item) = self.l1_cache.insert(key, item) {
697            // Update: subtract old size, add new size
698            let old_size = Self::item_size(&old_item);
699            // Use wrapping operations to avoid underflow if sizes are estimated
700            let current = self.l1_size_bytes.load(Ordering::Acquire);
701            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
702            self.l1_size_bytes.store(new_total, Ordering::Release);
703        } else {
704            // Insert: just add new size
705            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
706        }
707    }
708
709    /// Calculate approximate size of an item in bytes.
710    #[inline]
711    fn item_size(item: &SyncItem) -> usize {
712        // Use cached size if available, otherwise compute
713        item.size_bytes()
714    }
715
716    fn maybe_evict(&self) {
717        let pressure = self.memory_pressure();
718        if pressure < self.config.read().backpressure_warn {
719            return;
720        }
721
722        let level = BackpressureLevel::from_pressure(pressure);
723        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
724        
725        // Collect cache entries for scoring
726        let now = std::time::Instant::now();
727        let entries: Vec<CacheEntry> = self.l1_cache.iter()
728            .map(|ref_multi| {
729                let item = ref_multi.value();
730                let id = ref_multi.key().clone();
731                
732                // Convert epoch millis to Instant-relative age
733                let now_millis = std::time::SystemTime::now()
734                    .duration_since(std::time::UNIX_EPOCH)
735                    .unwrap_or_default()
736                    .as_millis() as u64;
737                let age_secs = if item.last_accessed > 0 {
738                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
739                } else {
740                    3600.0 // Default 1 hour if never accessed
741                };
742                
743                CacheEntry {
744                    id,
745                    size_bytes: item.size_bytes(),
746                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
747                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
748                    access_count: item.access_count,
749                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
750                }
751            })
752            .collect();
753        
754        if entries.is_empty() {
755            return;
756        }
757        
758        // Calculate how many to evict based on pressure level
759        let evict_count = match level {
760            BackpressureLevel::Normal => 0,
761            BackpressureLevel::Warn => entries.len() / 20,    // 5%
762            BackpressureLevel::Throttle => entries.len() / 10, // 10%
763            BackpressureLevel::Critical => entries.len() / 5,  // 20%
764            BackpressureLevel::Emergency => entries.len() / 3, // 33%
765            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
766        }.max(1);
767        
768        // Select victims using tan curve algorithm
769        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
770        
771        // Evict victims
772        let mut evicted_bytes = 0usize;
773        for victim_id in &victims {
774            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
775                evicted_bytes += item.size_bytes();
776            }
777        }
778        
779        // Update size tracking
780        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
781        
782        info!(
783            evicted = victims.len(),
784            evicted_bytes = evicted_bytes,
785            pressure = %pressure,
786            level = %level,
787            "Evicted entries from L1 cache"
788        );
789    }
790
791    /// Get L1 cache stats
792    pub fn l1_stats(&self) -> (usize, usize) {
793        (
794            self.l1_cache.len(),
795            self.l1_size_bytes.load(Ordering::Acquire),
796        )
797    }
798
799    /// Get L3 filter stats (entries, capacity, trust_state)
800    #[must_use]
801    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
802        self.l3_filter.stats()
803    }
804
805    /// Get access to the L3 filter (for warmup/verification)
806    pub fn l3_filter(&self) -> &Arc<FilterManager> {
807        &self.l3_filter
808    }
809
810    /// Get merkle root hashes from Redis (L2) and SQL (L3).
811    /// 
812    /// Returns `(redis_root, sql_root)` as hex strings.
813    /// Returns `None` for backends that aren't connected or have empty trees.
814    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
815        let redis_root = if let Some(ref rm) = self.redis_merkle {
816            rm.root_hash().await.ok().flatten().map(hex::encode)
817        } else {
818            None
819        };
820        
821        let sql_root = if let Some(ref sm) = self.sql_merkle {
822            sm.root_hash().await.ok().flatten().map(hex::encode)
823        } else {
824            None
825        };
826        
827        (redis_root, sql_root)
828    }
829
830    /// Verify and trust the L3 cuckoo filter.
831    /// 
832    /// Compares the filter's merkle root against L3's merkle root.
833    /// If they match, marks the filter as trusted.
834    /// 
835    /// Returns `true` if the filter is now trusted, `false` otherwise.
836    pub async fn verify_filter(&self) -> bool {
837        // Get SQL merkle root
838        let sql_root = if let Some(ref sm) = self.sql_merkle {
839            match sm.root_hash().await {
840                Ok(Some(root)) => root,
841                _ => return false,
842            }
843        } else {
844            // No SQL backend - can't verify, mark trusted anyway
845            self.l3_filter.mark_trusted();
846            return true;
847        };
848
849        // For now, we trust the filter if we have a SQL root
850        // A full implementation would compare CF merkle against SQL merkle
851        // But since CF doesn't maintain a merkle tree, we trust after warmup
852        info!(
853            sql_root = %hex::encode(sql_root),
854            "Verifying L3 filter against SQL merkle root"
855        );
856        
857        // Mark trusted if we got here (SQL is connected and has a root)
858        self.l3_filter.mark_trusted();
859        true
860    }
861
862    /// Update all gauge metrics with current engine state.
863    /// 
864    /// Call this before snapshotting metrics to ensure gauges reflect current state.
865    /// Useful for OTEL export or monitoring dashboards.
866    pub fn update_gauge_metrics(&self) {
867        let (l1_count, l1_bytes) = self.l1_stats();
868        crate::metrics::set_l1_cache_items(l1_count);
869        crate::metrics::set_l1_cache_bytes(l1_bytes);
870        crate::metrics::set_memory_pressure(self.memory_pressure());
871        
872        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
873        let filter_load = if filter_capacity > 0 { 
874            filter_entries as f64 / filter_capacity as f64 
875        } else { 
876            0.0 
877        };
878        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
879        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
880        
881        crate::metrics::set_backpressure_level(self.pressure() as u8);
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::config::SyncEngineConfig;
889    use tokio::sync::watch;
890    use serde_json::json;
891
892    fn create_test_engine() -> SyncEngine {
893        let config = SyncEngineConfig::default();
894        let (_tx, rx) = watch::channel(config.clone());
895        SyncEngine::new(config, rx)
896    }
897
898    fn create_test_item(id: &str) -> SyncItem {
899        SyncItem::from_json(
900            id.to_string(),
901            json!({"data": "test"}),
902        )
903    }
904
905    #[test]
906    fn test_engine_created_state() {
907        let engine = create_test_engine();
908        assert_eq!(engine.state(), EngineState::Created);
909        assert!(!engine.is_ready());
910    }
911
912    #[test]
913    fn test_memory_pressure_calculation() {
914        let config = SyncEngineConfig {
915            l1_max_bytes: 1000,
916            ..Default::default()
917        };
918        let (_tx, rx) = watch::channel(config.clone());
919        let engine = SyncEngine::new(config, rx);
920
921        assert_eq!(engine.memory_pressure(), 0.0);
922
923        // Simulate adding items
924        let item = create_test_item("test1");
925        engine.insert_l1(item);
926
927        // Pressure should be > 0 now
928        assert!(engine.memory_pressure() > 0.0);
929    }
930
931    #[test]
932    fn test_l1_insert_and_size_tracking() {
933        let engine = create_test_engine();
934        
935        let item = create_test_item("test1");
936        let expected_size = item.size_bytes();
937        
938        engine.insert_l1(item);
939        
940        let (count, size) = engine.l1_stats();
941        assert_eq!(count, 1);
942        assert_eq!(size, expected_size);
943    }
944
945    #[test]
946    fn test_l1_update_size_tracking() {
947        let engine = create_test_engine();
948        
949        let item1 = create_test_item("test1");
950        engine.insert_l1(item1);
951        let (_, _size1) = engine.l1_stats();
952        
953        // Insert larger item with same ID
954        let item2 = SyncItem::from_json(
955            "test1".to_string(),
956            json!({"data": "much larger content here for testing size changes"}),
957        );
958        let size2_expected = item2.size_bytes();
959        engine.insert_l1(item2);
960        
961        let (count, size2) = engine.l1_stats();
962        assert_eq!(count, 1); // Still one item
963        assert_eq!(size2, size2_expected); // Size should be updated
964    }
965
966    #[tokio::test]
967    async fn test_get_nonexistent() {
968        let engine = create_test_engine();
969        let result = engine.get("nonexistent").await.unwrap();
970        assert!(result.is_none());
971    }
972
973    #[tokio::test]
974    async fn test_get_from_l1() {
975        let engine = create_test_engine();
976        let item = create_test_item("test1");
977        engine.insert_l1(item.clone());
978
979        let result = engine.get("test1").await.unwrap();
980        assert!(result.is_some());
981        assert_eq!(result.unwrap().object_id, "test1");
982    }
983
984    #[tokio::test]
985    async fn test_delete_from_l1() {
986        let engine = create_test_engine();
987        let item = create_test_item("test1");
988        engine.insert_l1(item);
989
990        let (count_before, _) = engine.l1_stats();
991        assert_eq!(count_before, 1);
992
993        let deleted = engine.delete("test1").await.unwrap();
994        assert!(deleted);
995
996        let (count_after, size_after) = engine.l1_stats();
997        assert_eq!(count_after, 0);
998        assert_eq!(size_after, 0);
999    }
1000
1001    #[test]
1002    fn test_filter_stats() {
1003        let engine = create_test_engine();
1004        
1005        let (entries, capacity, _trust) = engine.l3_filter_stats();
1006        assert_eq!(entries, 0);
1007        assert!(capacity > 0);
1008    }
1009
1010    #[test]
1011    fn test_should_accept_writes() {
1012        let engine = create_test_engine();
1013        assert!(engine.should_accept_writes());
1014    }
1015
1016    #[test]
1017    fn test_pressure_level() {
1018        let engine = create_test_engine();
1019        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1020    }
1021
1022    #[tokio::test]
1023    async fn test_health_check_basic() {
1024        let engine = create_test_engine();
1025        
1026        let health = engine.health_check().await;
1027        
1028        // Engine is in Created state (not started)
1029        assert_eq!(health.state, EngineState::Created);
1030        assert!(!health.ready); // Not ready until started
1031        assert!(!health.healthy); // Not healthy (not Running)
1032        
1033        // Memory should be empty
1034        assert_eq!(health.memory_pressure, 0.0);
1035        assert_eq!(health.l1_items, 0);
1036        assert_eq!(health.l1_bytes, 0);
1037        
1038        // Backpressure should be normal
1039        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1040        assert!(health.accepting_writes);
1041        
1042        // No backends configured
1043        assert!(health.redis_connected.is_none());
1044        assert!(health.sql_connected.is_none());
1045        assert!(health.wal_pending_items.is_none());
1046    }
1047}