1use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36 pub async fn start(&mut self) -> Result<(), StorageError> {
37 let startup_start = std::time::Instant::now();
38 info!("Starting sync engine with trust-verified startup...");
39 let _ = self.state.send(EngineState::Connecting);
40
41 let phase_start = std::time::Instant::now();
43 let (wal_path, wal_max_items) = {
44 let cfg = self.config.read();
45 (
46 cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47 cfg.wal_max_items.unwrap_or(1_000_000),
48 )
49 };
50
51 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52 Ok(wal) => {
53 info!(path = %wal_path, "Write-ahead log initialized");
54 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55 wal
56 }
57 Err(e) => {
58 crate::metrics::record_error("WAL", "init", "sqlite");
59 return Err(StorageError::Backend(format!(
60 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61 wal_path, e
62 )));
63 }
64 };
65
66 match FilterPersistence::new(&wal_path).await {
68 Ok(fp) => {
69 self.filter_persistence = Some(fp);
70 }
71 Err(e) => {
72 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73 crate::metrics::record_error("filter", "init", "persistence");
74 }
75 }
76
77 let pending_count = if wal.has_pending() {
78 wal.stats(false).pending_items
79 } else {
80 0
81 };
82 self.l3_wal = Some(wal);
83
84 let phase_start = std::time::Instant::now();
86 let sql_url = self.config.read().sql_url.clone();
87 if let Some(ref sql_url) = sql_url {
88 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89 match crate::storage::sql::SqlStore::with_registry(sql_url, self.schema_registry.clone()).await {
91 Ok(store) => {
92 let is_sqlite = sql_url.starts_with("sqlite:");
94 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
95 if let Err(e) = sql_merkle.init_schema().await {
96 error!(error = %e, "Failed to initialize SQL merkle schema");
97 crate::metrics::record_error("L3", "init", "merkle_schema");
98 return Err(StorageError::Backend(format!(
99 "Failed to initialize SQL merkle schema: {}", e
100 )));
101 }
102 self.sql_merkle = Some(sql_merkle);
103
104 let store = std::sync::Arc::new(store);
106 self.sql_store = Some(store.clone());
107 self.l3_store = Some(store);
108 tracing::Span::current().record("has_sql", true);
109 self.mysql_health.record_success();
110 crate::metrics::set_backend_healthy("mysql", true);
111 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
112 info!("SQL (L3) connected with merkle store (ground truth)");
113 }
114 Err(e) => {
115 tracing::Span::current().record("has_sql", false);
116 error!(error = %e, "Failed to connect to SQL - this is required for startup");
117 self.mysql_health.record_failure();
118 crate::metrics::set_backend_healthy("mysql", false);
119 crate::metrics::record_connection_error("mysql");
120 return Err(StorageError::Backend(format!(
121 "SQL connection required for startup: {}", e
122 )));
123 }
124 }
125 } else {
126 warn!("No SQL URL configured - operating without ground truth storage!");
127 tracing::Span::current().record("has_sql", false);
128 }
129
130 if pending_count > 0 {
132 let phase_start = std::time::Instant::now();
133 let _ = self.state.send(EngineState::DrainingWal);
134 info!(pending = pending_count, "Draining WAL to SQL before startup...");
135
136 if let Some(ref l3) = self.l3_store {
137 if let Some(ref wal) = self.l3_wal {
138 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
139 Ok(drained) => {
140 info!(drained = drained.len(), "WAL drained to SQL");
141 crate::metrics::record_items_written("L3", drained.len());
142 }
143 Err(e) => {
144 warn!(error = %e, "WAL drain had errors - some items may retry later");
145 crate::metrics::record_error("WAL", "drain", "partial");
146 }
147 }
148 }
149 }
150 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
151 }
152
153 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
155 match sql_merkle.root_hash().await {
156 Ok(Some(root)) => {
157 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
158 Some(root)
159 }
160 Ok(None) => {
161 info!("SQL merkle tree is empty (no data yet)");
162 None
163 }
164 Err(e) => {
165 warn!(error = %e, "Failed to get SQL merkle root");
166 None
167 }
168 }
169 } else {
170 None
171 };
172
173 let phase_start = std::time::Instant::now();
175 self.restore_cuckoo_filters(&sql_root).await;
176 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
177
178 let phase_start = std::time::Instant::now();
180 let (redis_url, redis_prefix) = {
181 let cfg = self.config.read();
182 (cfg.redis_url.clone(), cfg.redis_prefix.clone())
183 };
184 if let Some(ref redis_url) = redis_url {
185 info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
186 match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
187 Ok(store) => {
188 let redis_merkle = RedisMerkleStore::with_prefix(
189 store.connection(),
190 redis_prefix.as_deref(),
191 );
192 self.redis_merkle = Some(redis_merkle);
193 let store = std::sync::Arc::new(store);
194 self.redis_store = Some(store.clone()); self.l2_store = Some(store);
196 tracing::Span::current().record("has_redis", true);
197 crate::metrics::set_backend_healthy("redis", true);
198 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
199 info!("Redis (L2) connected with merkle shadow tree");
200 }
201 Err(e) => {
202 tracing::Span::current().record("has_redis", false);
203 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
204 crate::metrics::set_backend_healthy("redis", false);
205 crate::metrics::record_connection_error("redis");
206 }
207 }
208 } else {
209 tracing::Span::current().record("has_redis", false);
210 }
211
212 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
214 (&self.sql_merkle, &self.redis_merkle, &sql_root)
215 {
216 let phase_start = std::time::Instant::now();
217 let _ = self.state.send(EngineState::SyncingRedis);
218
219 match redis_merkle.root_hash().await {
220 Ok(Some(redis_root)) if &redis_root == sql_root => {
221 info!("Redis merkle root matches SQL - Redis is in sync");
222 }
223 Ok(Some(redis_root)) => {
224 info!(
225 sql_root = %hex::encode(sql_root),
226 redis_root = %hex::encode(redis_root),
227 "Redis merkle root mismatch - initiating branch diff sync"
228 );
229
230 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
231 Ok(synced) => {
232 info!(items_synced = synced, "Redis sync complete via branch diff");
233 crate::metrics::record_items_written("L2", synced);
234 }
235 Err(e) => {
236 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
237 crate::metrics::record_error("L2", "sync", "branch_diff");
238 }
239 }
240 }
241 Ok(None) => {
242 info!("Redis merkle tree is empty - will be populated on writes");
243 }
244 Err(e) => {
245 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
246 crate::metrics::record_error("L2", "merkle", "root_hash");
247 }
248 }
249 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
250 }
251
252 let _ = self.state.send(EngineState::Ready);
253 crate::metrics::record_startup_total(startup_start.elapsed());
254 info!("Sync engine ready (trust-verified startup complete)");
255 Ok(())
256 }
257
258 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
260 let persistence = match &self.filter_persistence {
261 Some(p) => p,
262 None => return,
263 };
264
265 let sql_root = match sql_root {
266 Some(r) => r,
267 None => return,
268 };
269
270 match persistence.load(L3_FILTER_ID).await {
274 Ok(Some(state)) if &state.merkle_root == sql_root => {
275 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
276 warn!(error = %e, "Failed to import L3 filter from snapshot");
277 } else {
278 self.l3_filter.mark_trusted();
279 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
280 }
281 }
282 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
283 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
284 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
285 }
286 }
287
288 async fn sync_redis_from_sql_diff(
290 &self,
291 sql_merkle: &SqlMerkleStore,
292 redis_merkle: &RedisMerkleStore,
293 ) -> Result<usize, StorageError> {
294 let mut total_synced = 0;
295 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
296
297 for prefix in stale_prefixes {
298 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
299
300 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
301 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
302
303 if leaf_paths.is_empty() {
304 continue;
305 }
306
307 let mut merkle_batch = MerkleBatch::new();
308
309 if let Some(ref l3_store) = self.l3_store {
310 for object_id in &leaf_paths {
311 if let Ok(Some(item)) = l3_store.get(object_id).await {
312 let payload_hash = PathMerkle::payload_hash(&item.content);
313 let leaf_hash = PathMerkle::leaf_hash(
314 &item.object_id,
315 item.version,
316 &payload_hash,
317 );
318 merkle_batch.insert(object_id.clone(), leaf_hash);
319
320 if let Some(ref l2_store) = self.l2_store {
321 if let Err(e) = l2_store.put(&item).await {
322 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
323 } else {
324 total_synced += 1;
325 }
326 }
327 }
328 }
329
330 if !merkle_batch.is_empty() {
331 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
332 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
333 }
334 }
335 }
336 }
337
338 Ok(total_synced)
339 }
340
341 async fn find_stale_branches(
343 &self,
344 sql_merkle: &SqlMerkleStore,
345 redis_merkle: &RedisMerkleStore,
346 prefix: &str,
347 ) -> Result<Vec<String>, StorageError> {
348 let mut stale = Vec::new();
349
350 let sql_children = sql_merkle.get_children(prefix).await
351 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
352 let redis_children = redis_merkle.get_children(prefix).await
353 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
354
355 if sql_children.is_empty() {
356 return Ok(stale);
357 }
358
359 for (child_path, sql_hash) in sql_children {
360 match redis_children.get(&child_path) {
361 Some(redis_hash) if redis_hash == &sql_hash => continue,
362 Some(_) => {
363 if child_path.contains('.') && !child_path.ends_with('.') {
364 stale.push(child_path);
365 } else {
366 let sub_stale = Box::pin(
367 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
368 ).await?;
369 stale.extend(sub_stale);
370 }
371 }
372 None => stale.push(child_path),
373 }
374 }
375
376 Ok(stale)
377 }
378
379 #[tracing::instrument(skip(self))]
381 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
382 let _ = self.state.send(EngineState::WarmingUp);
383 info!("Warming up cuckoo filter and L1 cache...");
384
385 if let Some(l3) = &self.l3_store {
386 let batch_size = self.config.read().cuckoo_warmup_batch_size;
387 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
388
389 let total_count = l3.count_all().await.unwrap_or(0);
390 if total_count > 0 {
391 let mut offset = 0u64;
392 let mut loaded = 0usize;
393
394 loop {
395 let keys = l3.scan_keys(offset, batch_size).await?;
396 if keys.is_empty() {
397 break;
398 }
399
400 for key in &keys {
401 self.l3_filter.insert(key);
402 }
403
404 loaded += keys.len();
405 offset += keys.len() as u64;
406
407 if loaded % 10_000 == 0 || loaded == total_count as usize {
408 debug!(loaded, total = %total_count, "L3 filter warmup progress");
409 }
410 }
411
412 self.l3_filter.mark_trusted();
413 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
414 } else {
415 info!("L3 store is empty, skipping filter warmup");
416 self.l3_filter.mark_trusted();
417 }
418 }
419
420 info!(
421 l3_trust = ?self.l3_filter.trust_state(),
422 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
423 );
424
425 let _ = self.state.send(EngineState::Ready);
426 info!("Warm-up complete, engine ready");
427 Ok(())
428 }
429
430 pub async fn tick(&self) {
432 self.maybe_evict();
433 self.maybe_flush_l2().await;
434 }
435
436 pub async fn force_flush(&self) {
438 let batch = self.l2_batcher.lock().await.force_flush();
439 if let Some(batch) = batch {
440 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
441 self.flush_batch_internal(batch).await;
442 }
443 }
444
445 #[tracing::instrument(skip(self))]
460 pub async fn run(&self) {
461 let _ = self.state.send(EngineState::Running);
462 info!("Sync engine running");
463
464 let mut health_check_interval = tokio::time::interval(
465 tokio::time::Duration::from_secs(30)
466 );
467 let mut wal_drain_interval = tokio::time::interval(
468 tokio::time::Duration::from_secs(5)
469 );
470 let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
471 let mut cf_snapshot_interval = tokio::time::interval(
472 tokio::time::Duration::from_secs(cf_snapshot_secs)
473 );
474
475 loop {
476 let config_changed = {
478 let rx = self.config_rx.lock().await;
479 rx.has_changed().unwrap_or(false)
481 };
482
483 if config_changed {
484 let new_config = self.config_rx.lock().await.borrow_and_update().clone();
485 info!("Config hot-reloaded: l1_max_bytes={}, redis_url={:?}",
486 new_config.l1_max_bytes,
487 new_config.redis_url.as_ref().map(|u| u.rsplit('@').next().unwrap_or(u)));
488 *self.config.write() = new_config;
489 }
490
491 tokio::select! {
492 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
493 self.maybe_evict();
494 self.maybe_flush_l2().await;
495 self.maybe_snapshot_cf_by_threshold().await;
496 }
497
498 _ = health_check_interval.tick() => {
499 self.check_mysql_health().await;
500 }
501
502 _ = wal_drain_interval.tick() => {
503 self.maybe_drain_wal().await;
504 }
505
506 _ = cf_snapshot_interval.tick() => {
507 self.maybe_snapshot_cf_by_time().await;
508 }
509 }
510 }
511 }
512
513 #[tracing::instrument(skip(self))]
515 pub async fn shutdown(&self) {
516 use crate::FlushReason;
517
518 let shutdown_start = std::time::Instant::now();
519 info!("Initiating sync engine shutdown...");
520 let _ = self.state.send(EngineState::ShuttingDown);
521
522 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
523 if let Some(batch) = batch {
524 let batch_size = batch.items.len();
525 info!(batch_size, "Flushing final L2 batch on shutdown");
526 {
527 let mut batcher = self.l2_batcher.lock().await;
528 batcher.add_batch(batch.items);
529 }
530 self.maybe_flush_l2().await;
531 crate::metrics::record_items_written("L2", batch_size);
532 }
533
534 self.snapshot_cuckoo_filters("shutdown").await;
535
536 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
537 info!("Sync engine shutdown complete");
538 }
539}