1use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{MerkleCacheStore, SqlMerkleStore};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36 pub async fn start(&mut self) -> Result<(), StorageError> {
37 let startup_start = std::time::Instant::now();
38 info!("Starting sync engine with trust-verified startup...");
39 let _ = self.state.send(EngineState::Connecting);
40
41 let phase_start = std::time::Instant::now();
43 let (wal_path, wal_max_items) = {
44 let cfg = self.config.read();
45 (
46 cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47 cfg.wal_max_items.unwrap_or(1_000_000),
48 )
49 };
50
51 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52 Ok(wal) => {
53 info!(path = %wal_path, "Write-ahead log initialized");
54 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55 wal
56 }
57 Err(e) => {
58 crate::metrics::record_error("WAL", "init", "sqlite");
59 return Err(StorageError::Backend(format!(
60 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61 wal_path, e
62 )));
63 }
64 };
65
66 match FilterPersistence::new(&wal_path).await {
68 Ok(fp) => {
69 self.filter_persistence = Some(fp);
70 }
71 Err(e) => {
72 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73 crate::metrics::record_error("filter", "init", "persistence");
74 }
75 }
76
77 let pending_count = if wal.has_pending() {
78 wal.stats(false).pending_items
79 } else {
80 0
81 };
82 self.l3_wal = Some(wal);
83
84 let phase_start = std::time::Instant::now();
86 let sql_url = self.config.read().sql_url.clone();
87 if let Some(ref sql_url) = sql_url {
88 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89 match crate::storage::sql::SqlStore::with_registry(sql_url, self.schema_registry.clone()).await {
91 Ok(store) => {
92 let is_sqlite = sql_url.starts_with("sqlite:");
94 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
95 if let Err(e) = sql_merkle.init_schema().await {
96 error!(error = %e, "Failed to initialize SQL merkle schema");
97 crate::metrics::record_error("L3", "init", "merkle_schema");
98 return Err(StorageError::Backend(format!(
99 "Failed to initialize SQL merkle schema: {}", e
100 )));
101 }
102 self.sql_merkle = Some(sql_merkle);
103
104 let store = std::sync::Arc::new(store);
106 self.sql_store = Some(store.clone());
107 self.l3_store = Some(store);
108 tracing::Span::current().record("has_sql", true);
109 self.mysql_health.record_success();
110 crate::metrics::set_backend_healthy("mysql", true);
111 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
112 info!("SQL (L3) connected with merkle store (ground truth)");
113 }
114 Err(e) => {
115 tracing::Span::current().record("has_sql", false);
116 error!(error = %e, "Failed to connect to SQL - this is required for startup");
117 self.mysql_health.record_failure();
118 crate::metrics::set_backend_healthy("mysql", false);
119 crate::metrics::record_connection_error("mysql");
120 return Err(StorageError::Backend(format!(
121 "SQL connection required for startup: {}", e
122 )));
123 }
124 }
125 } else {
126 warn!("No SQL URL configured - operating without ground truth storage!");
127 tracing::Span::current().record("has_sql", false);
128 }
129
130 if pending_count > 0 {
132 let phase_start = std::time::Instant::now();
133 let _ = self.state.send(EngineState::DrainingWal);
134 info!(pending = pending_count, "Draining WAL to SQL before startup...");
135
136 if let Some(ref l3) = self.l3_store {
137 if let Some(ref wal) = self.l3_wal {
138 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
139 Ok(drained) => {
140 info!(drained = drained.len(), "WAL drained to SQL");
141 crate::metrics::record_items_written("L3", drained.len());
142 }
143 Err(e) => {
144 warn!(error = %e, "WAL drain had errors - some items may retry later");
145 crate::metrics::record_error("WAL", "drain", "partial");
146 }
147 }
148 }
149 }
150 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
151 }
152
153 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
155 match sql_merkle.root_hash().await {
156 Ok(Some(root)) => {
157 info!(root = %hex::encode(root), "SQL merkle root (for snapshot validation)");
158 Some(root)
159 }
160 Ok(None) => {
161 info!("SQL merkle tree is empty (no data yet)");
162 None
163 }
164 Err(e) => {
165 warn!(error = %e, "Failed to get SQL merkle root");
166 None
167 }
168 }
169 } else {
170 None
171 };
172
173 let phase_start = std::time::Instant::now();
175 self.restore_cuckoo_filters(&sql_root).await;
176 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
177
178 let phase_start = std::time::Instant::now();
180 let (redis_url, redis_prefix) = {
181 let cfg = self.config.read();
182 (cfg.redis_url.clone(), cfg.redis_prefix.clone())
183 };
184 if let Some(ref redis_url) = redis_url {
185 info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
186 match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
187 Ok(store) => {
188 let merkle_cache = MerkleCacheStore::with_prefix(
189 store.connection(),
190 redis_prefix.as_deref(),
191 );
192 self.merkle_cache = Some(merkle_cache);
193 let store = std::sync::Arc::new(store);
194 self.redis_store = Some(store.clone()); self.l2_store = Some(store);
196 tracing::Span::current().record("has_redis", true);
197 crate::metrics::set_backend_healthy("redis", true);
198 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
199 info!("Redis (L2) connected as merkle cache");
200 }
201 Err(e) => {
202 tracing::Span::current().record("has_redis", false);
203 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
204 crate::metrics::set_backend_healthy("redis", false);
205 crate::metrics::record_connection_error("redis");
206 }
207 }
208 } else {
209 tracing::Span::current().record("has_redis", false);
210 }
211
212 if let (Some(ref sql_merkle), Some(ref merkle_cache), Some(ref sql_root)) =
214 (&self.sql_merkle, &self.merkle_cache, &sql_root)
215 {
216 let phase_start = std::time::Instant::now();
217 let _ = self.state.send(EngineState::SyncingRedis);
218
219 match merkle_cache.root_hash().await {
220 Ok(Some(redis_root)) if &redis_root == sql_root => {
221 info!("Redis cache in sync with SQL");
222 }
223 Ok(Some(redis_root)) => {
224 info!(
225 sql_root = %hex::encode(sql_root),
226 redis_root = %hex::encode(redis_root),
227 "Redis cache stale - syncing from SQL"
228 );
229
230 match self.sync_redis_from_sql_diff(sql_merkle, merkle_cache).await {
231 Ok(synced) => {
232 info!(items_synced = synced, "Redis cache synced from SQL");
233 crate::metrics::record_items_written("L2", synced);
234 }
235 Err(e) => {
236 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
237 crate::metrics::record_error("L2", "sync", "branch_diff");
238 }
239 }
240 }
241 Ok(None) => {
242 info!("Redis cache empty - will populate on writes");
243 }
244 Err(e) => {
245 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
246 crate::metrics::record_error("L2", "merkle", "root_hash");
247 }
248 }
249 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
250 }
251
252 let _ = self.state.send(EngineState::Ready);
253 crate::metrics::record_startup_total(startup_start.elapsed());
254 info!("Sync engine ready (trust-verified startup complete)");
255 Ok(())
256 }
257
258 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
260 let persistence = match &self.filter_persistence {
261 Some(p) => p,
262 None => return,
263 };
264
265 let sql_root = match sql_root {
266 Some(r) => r,
267 None => return,
268 };
269
270 match persistence.load(L3_FILTER_ID).await {
274 Ok(Some(state)) if &state.merkle_root == sql_root => {
275 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
276 warn!(error = %e, "Failed to import L3 filter from snapshot");
277 } else {
278 self.l3_filter.mark_trusted();
279 info!(entries = state.entry_count, "L3 cuckoo filter valid (snapshot merkle matches SQL)");
280 }
281 }
282 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
283 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
284 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
285 }
286 }
287
288 async fn sync_redis_from_sql_diff(
290 &self,
291 sql_merkle: &SqlMerkleStore,
292 merkle_cache: &MerkleCacheStore,
293 ) -> Result<usize, StorageError> {
294 let mut total_synced = 0;
295 let stale_prefixes = self.find_stale_branches(sql_merkle, merkle_cache, "").await?;
296
297 for prefix in stale_prefixes {
298 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
299
300 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
301 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
302
303 if leaf_paths.is_empty() {
304 continue;
305 }
306
307 if let Some(ref l3_store) = self.l3_store {
308 for object_id in &leaf_paths {
309 if let Ok(Some(item)) = l3_store.get(object_id).await {
310 if let Some(ref l2_store) = self.l2_store {
311 if let Err(e) = l2_store.put(&item).await {
312 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
313 } else {
314 total_synced += 1;
315 }
316 }
317 }
318 }
319 }
320
321 if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &leaf_paths).await {
323 warn!(prefix = %prefix, error = %e, "Failed to sync merkle cache");
324 }
325 }
326
327 Ok(total_synced)
328 }
329
330 async fn find_stale_branches(
332 &self,
333 sql_merkle: &SqlMerkleStore,
334 merkle_cache: &MerkleCacheStore,
335 prefix: &str,
336 ) -> Result<Vec<String>, StorageError> {
337 let mut stale = Vec::new();
338
339 let sql_children = sql_merkle.get_children(prefix).await
340 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
341 let redis_children = merkle_cache.get_children(prefix).await
342 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
343
344 if sql_children.is_empty() {
345 return Ok(stale);
346 }
347
348 for (child_path, sql_hash) in sql_children {
349 match redis_children.get(&child_path) {
350 Some(redis_hash) if redis_hash == &sql_hash => continue,
351 Some(_) => {
352 if child_path.contains('.') && !child_path.ends_with('.') {
353 stale.push(child_path);
354 } else {
355 let sub_stale = Box::pin(
356 self.find_stale_branches(sql_merkle, merkle_cache, &child_path)
357 ).await?;
358 stale.extend(sub_stale);
359 }
360 }
361 None => stale.push(child_path),
362 }
363 }
364
365 Ok(stale)
366 }
367
368 #[tracing::instrument(skip(self))]
370 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
371 let _ = self.state.send(EngineState::WarmingUp);
372 info!("Warming up cuckoo filter and L1 cache...");
373
374 if let Some(l3) = &self.l3_store {
375 let batch_size = self.config.read().cuckoo_warmup_batch_size;
376 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
377
378 let total_count = l3.count_all().await.unwrap_or(0);
379 if total_count > 0 {
380 let mut offset = 0u64;
381 let mut loaded = 0usize;
382
383 loop {
384 let keys = l3.scan_keys(offset, batch_size).await?;
385 if keys.is_empty() {
386 break;
387 }
388
389 for key in &keys {
390 self.l3_filter.insert(key);
391 }
392
393 loaded += keys.len();
394 offset += keys.len() as u64;
395
396 if loaded % 10_000 == 0 || loaded == total_count as usize {
397 debug!(loaded, total = %total_count, "L3 filter warmup progress");
398 }
399 }
400
401 self.l3_filter.mark_trusted();
402 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
403 } else {
404 info!("L3 store is empty, skipping filter warmup");
405 self.l3_filter.mark_trusted();
406 }
407 }
408
409 info!(
410 l3_trust = ?self.l3_filter.trust_state(),
411 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
412 );
413
414 let _ = self.state.send(EngineState::Ready);
415 info!("Warm-up complete, engine ready");
416 Ok(())
417 }
418
419 pub async fn tick(&self) {
421 self.maybe_evict();
422 self.maybe_flush_l2().await;
423 }
424
425 pub async fn force_flush(&self) {
427 let batch = self.l2_batcher.lock().await.force_flush();
428 if let Some(batch) = batch {
429 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
430 self.flush_batch_internal(batch).await;
431 }
432 }
433
434 pub async fn log_merkle_diagnostic(&self) {
438 let merkle_root = self.merkle_root().await;
439 let l3_count = self.l3_filter.len();
440 let (l1_items, l1_bytes) = self.l1_stats();
441
442 debug!(
443 merkle_root = merkle_root.as_deref().unwrap_or("none"),
444 l3_filter_count = l3_count,
445 l1_items = l1_items,
446 l1_bytes = l1_bytes,
447 "sync_engine_state"
448 );
449 }
450
451 #[tracing::instrument(skip(self))]
466 pub async fn run(&self) {
467 let _ = self.state.send(EngineState::Running);
468 info!("Sync engine running");
469
470 let mut health_check_interval = tokio::time::interval(
471 tokio::time::Duration::from_secs(30)
472 );
473 let mut wal_drain_interval = tokio::time::interval(
474 tokio::time::Duration::from_secs(5)
475 );
476 let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
477 let mut cf_snapshot_interval = tokio::time::interval(
478 tokio::time::Duration::from_secs(cf_snapshot_secs)
479 );
480
481 loop {
482 let config_changed = {
484 let rx = self.config_rx.lock().await;
485 rx.has_changed().unwrap_or(false)
487 };
488
489 if config_changed {
490 let new_config = self.config_rx.lock().await.borrow_and_update().clone();
491 info!("Config hot-reloaded: l1_max_bytes={}, redis_url={:?}",
492 new_config.l1_max_bytes,
493 new_config.redis_url.as_ref().map(|u| u.rsplit('@').next().unwrap_or(u)));
494 *self.config.write() = new_config;
495 }
496
497 tokio::select! {
498 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
499 self.maybe_evict();
500 self.maybe_flush_l2().await;
501 self.maybe_snapshot_cf_by_threshold().await;
502 }
503
504 _ = health_check_interval.tick() => {
505 self.check_mysql_health().await;
506 }
507
508 _ = wal_drain_interval.tick() => {
509 self.maybe_drain_wal().await;
510 }
511
512 _ = cf_snapshot_interval.tick() => {
513 self.maybe_snapshot_cf_by_time().await;
514 }
515 }
516 }
517 }
518
519 #[tracing::instrument(skip(self))]
521 pub async fn shutdown(&self) {
522 use crate::FlushReason;
523
524 let shutdown_start = std::time::Instant::now();
525 info!("Initiating sync engine shutdown...");
526 let _ = self.state.send(EngineState::ShuttingDown);
527
528 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
529 if let Some(batch) = batch {
530 let batch_size = batch.items.len();
531 info!(batch_size, "Flushing final L2 batch on shutdown");
532 {
533 let mut batcher = self.l2_batcher.lock().await;
534 batcher.add_batch(batch.items);
535 }
536 self.maybe_flush_l2().await;
537 crate::metrics::record_items_written("L2", batch_size);
538 }
539
540 self.snapshot_cuckoo_filters("shutdown").await;
541
542 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
543 info!("Sync engine shutdown complete");
544 }
545}