1use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33 pub async fn start(&mut self) -> Result<(), StorageError> {
34 let startup_start = std::time::Instant::now();
35 info!("Starting sync engine with trust-verified startup...");
36 let _ = self.state.send(EngineState::Connecting);
37
38 let phase_start = std::time::Instant::now();
40 let (wal_path, wal_max_items) = {
41 let cfg = self.config.read();
42 (
43 cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
44 cfg.wal_max_items.unwrap_or(1_000_000),
45 )
46 };
47
48 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
49 Ok(wal) => {
50 info!(path = %wal_path, "Write-ahead log initialized");
51 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
52 wal
53 }
54 Err(e) => {
55 crate::metrics::record_error("WAL", "init", "sqlite");
56 return Err(StorageError::Backend(format!(
57 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
58 wal_path, e
59 )));
60 }
61 };
62
63 match FilterPersistence::new(&wal_path).await {
65 Ok(fp) => {
66 self.filter_persistence = Some(fp);
67 }
68 Err(e) => {
69 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
70 crate::metrics::record_error("filter", "init", "persistence");
71 }
72 }
73
74 let pending_count = if wal.has_pending() {
75 wal.stats(false).pending_items
76 } else {
77 0
78 };
79 self.l3_wal = Some(wal);
80
81 let phase_start = std::time::Instant::now();
83 let sql_url = self.config.read().sql_url.clone();
84 if let Some(ref sql_url) = sql_url {
85 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
86 match crate::storage::sql::SqlStore::new(sql_url).await {
87 Ok(store) => {
88 let is_sqlite = sql_url.starts_with("sqlite:");
90 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
91 if let Err(e) = sql_merkle.init_schema().await {
92 error!(error = %e, "Failed to initialize SQL merkle schema");
93 crate::metrics::record_error("L3", "init", "merkle_schema");
94 return Err(StorageError::Backend(format!(
95 "Failed to initialize SQL merkle schema: {}", e
96 )));
97 }
98 self.sql_merkle = Some(sql_merkle);
99
100 let store = std::sync::Arc::new(store);
102 self.sql_store = Some(store.clone());
103 self.l3_store = Some(store);
104 tracing::Span::current().record("has_sql", true);
105 self.mysql_health.record_success();
106 crate::metrics::set_backend_healthy("mysql", true);
107 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
108 info!("SQL (L3) connected with merkle store (ground truth)");
109 }
110 Err(e) => {
111 tracing::Span::current().record("has_sql", false);
112 error!(error = %e, "Failed to connect to SQL - this is required for startup");
113 self.mysql_health.record_failure();
114 crate::metrics::set_backend_healthy("mysql", false);
115 crate::metrics::record_connection_error("mysql");
116 return Err(StorageError::Backend(format!(
117 "SQL connection required for startup: {}", e
118 )));
119 }
120 }
121 } else {
122 warn!("No SQL URL configured - operating without ground truth storage!");
123 tracing::Span::current().record("has_sql", false);
124 }
125
126 if pending_count > 0 {
128 let phase_start = std::time::Instant::now();
129 let _ = self.state.send(EngineState::DrainingWal);
130 info!(pending = pending_count, "Draining WAL to SQL before startup...");
131
132 if let Some(ref l3) = self.l3_store {
133 if let Some(ref wal) = self.l3_wal {
134 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
135 Ok(drained) => {
136 info!(drained = drained.len(), "WAL drained to SQL");
137 crate::metrics::record_items_written("L3", drained.len());
138 }
139 Err(e) => {
140 warn!(error = %e, "WAL drain had errors - some items may retry later");
141 crate::metrics::record_error("WAL", "drain", "partial");
142 }
143 }
144 }
145 }
146 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
147 }
148
149 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
151 match sql_merkle.root_hash().await {
152 Ok(Some(root)) => {
153 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
154 Some(root)
155 }
156 Ok(None) => {
157 info!("SQL merkle tree is empty (no data yet)");
158 None
159 }
160 Err(e) => {
161 warn!(error = %e, "Failed to get SQL merkle root");
162 None
163 }
164 }
165 } else {
166 None
167 };
168
169 let phase_start = std::time::Instant::now();
171 self.restore_cuckoo_filters(&sql_root).await;
172 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
173
174 let phase_start = std::time::Instant::now();
176 let (redis_url, redis_prefix) = {
177 let cfg = self.config.read();
178 (cfg.redis_url.clone(), cfg.redis_prefix.clone())
179 };
180 if let Some(ref redis_url) = redis_url {
181 info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
182 match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
183 Ok(store) => {
184 let redis_merkle = RedisMerkleStore::with_prefix(
185 store.connection(),
186 redis_prefix.as_deref(),
187 );
188 self.redis_merkle = Some(redis_merkle);
189 let store = std::sync::Arc::new(store);
190 self.redis_store = Some(store.clone()); self.l2_store = Some(store);
192 tracing::Span::current().record("has_redis", true);
193 crate::metrics::set_backend_healthy("redis", true);
194 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
195 info!("Redis (L2) connected with merkle shadow tree");
196 }
197 Err(e) => {
198 tracing::Span::current().record("has_redis", false);
199 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
200 crate::metrics::set_backend_healthy("redis", false);
201 crate::metrics::record_connection_error("redis");
202 }
203 }
204 } else {
205 tracing::Span::current().record("has_redis", false);
206 }
207
208 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
210 (&self.sql_merkle, &self.redis_merkle, &sql_root)
211 {
212 let phase_start = std::time::Instant::now();
213 let _ = self.state.send(EngineState::SyncingRedis);
214
215 match redis_merkle.root_hash().await {
216 Ok(Some(redis_root)) if &redis_root == sql_root => {
217 info!("Redis merkle root matches SQL - Redis is in sync");
218 }
219 Ok(Some(redis_root)) => {
220 info!(
221 sql_root = %hex::encode(sql_root),
222 redis_root = %hex::encode(redis_root),
223 "Redis merkle root mismatch - initiating branch diff sync"
224 );
225
226 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
227 Ok(synced) => {
228 info!(items_synced = synced, "Redis sync complete via branch diff");
229 crate::metrics::record_items_written("L2", synced);
230 }
231 Err(e) => {
232 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
233 crate::metrics::record_error("L2", "sync", "branch_diff");
234 }
235 }
236 }
237 Ok(None) => {
238 info!("Redis merkle tree is empty - will be populated on writes");
239 }
240 Err(e) => {
241 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
242 crate::metrics::record_error("L2", "merkle", "root_hash");
243 }
244 }
245 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
246 }
247
248 let _ = self.state.send(EngineState::Ready);
249 crate::metrics::record_startup_total(startup_start.elapsed());
250 info!("Sync engine ready (trust-verified startup complete)");
251 Ok(())
252 }
253
254 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
256 let persistence = match &self.filter_persistence {
257 Some(p) => p,
258 None => return,
259 };
260
261 let sql_root = match sql_root {
262 Some(r) => r,
263 None => return,
264 };
265
266 match persistence.load(L3_FILTER_ID).await {
270 Ok(Some(state)) if &state.merkle_root == sql_root => {
271 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
272 warn!(error = %e, "Failed to import L3 filter from snapshot");
273 } else {
274 self.l3_filter.mark_trusted();
275 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
276 }
277 }
278 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
279 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
280 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
281 }
282 }
283
284 async fn sync_redis_from_sql_diff(
286 &self,
287 sql_merkle: &SqlMerkleStore,
288 redis_merkle: &RedisMerkleStore,
289 ) -> Result<usize, StorageError> {
290 let mut total_synced = 0;
291 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
292
293 for prefix in stale_prefixes {
294 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
295
296 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
297 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
298
299 if leaf_paths.is_empty() {
300 continue;
301 }
302
303 let mut merkle_batch = MerkleBatch::new();
304
305 if let Some(ref l3_store) = self.l3_store {
306 for object_id in &leaf_paths {
307 if let Ok(Some(item)) = l3_store.get(object_id).await {
308 let payload_hash = PathMerkle::payload_hash(&item.content);
309 let leaf_hash = PathMerkle::leaf_hash(
310 &item.object_id,
311 item.version,
312 item.updated_at,
313 &payload_hash,
314 );
315 merkle_batch.insert(object_id.clone(), leaf_hash);
316
317 if let Some(ref l2_store) = self.l2_store {
318 if let Err(e) = l2_store.put(&item).await {
319 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
320 } else {
321 total_synced += 1;
322 }
323 }
324 }
325 }
326
327 if !merkle_batch.is_empty() {
328 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
329 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
330 }
331 }
332 }
333 }
334
335 Ok(total_synced)
336 }
337
338 async fn find_stale_branches(
340 &self,
341 sql_merkle: &SqlMerkleStore,
342 redis_merkle: &RedisMerkleStore,
343 prefix: &str,
344 ) -> Result<Vec<String>, StorageError> {
345 let mut stale = Vec::new();
346
347 let sql_children = sql_merkle.get_children(prefix).await
348 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
349 let redis_children = redis_merkle.get_children(prefix).await
350 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
351
352 if sql_children.is_empty() {
353 return Ok(stale);
354 }
355
356 for (child_path, sql_hash) in sql_children {
357 match redis_children.get(&child_path) {
358 Some(redis_hash) if redis_hash == &sql_hash => continue,
359 Some(_) => {
360 if child_path.contains('.') && !child_path.ends_with('.') {
361 stale.push(child_path);
362 } else {
363 let sub_stale = Box::pin(
364 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
365 ).await?;
366 stale.extend(sub_stale);
367 }
368 }
369 None => stale.push(child_path),
370 }
371 }
372
373 Ok(stale)
374 }
375
376 #[tracing::instrument(skip(self))]
378 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
379 let _ = self.state.send(EngineState::WarmingUp);
380 info!("Warming up cuckoo filter and L1 cache...");
381
382 if let Some(l3) = &self.l3_store {
383 let batch_size = self.config.read().cuckoo_warmup_batch_size;
384 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
385
386 let total_count = l3.count_all().await.unwrap_or(0);
387 if total_count > 0 {
388 let mut offset = 0u64;
389 let mut loaded = 0usize;
390
391 loop {
392 let keys = l3.scan_keys(offset, batch_size).await?;
393 if keys.is_empty() {
394 break;
395 }
396
397 for key in &keys {
398 self.l3_filter.insert(key);
399 }
400
401 loaded += keys.len();
402 offset += keys.len() as u64;
403
404 if loaded % 10_000 == 0 || loaded == total_count as usize {
405 debug!(loaded, total = %total_count, "L3 filter warmup progress");
406 }
407 }
408
409 self.l3_filter.mark_trusted();
410 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
411 } else {
412 info!("L3 store is empty, skipping filter warmup");
413 self.l3_filter.mark_trusted();
414 }
415 }
416
417 info!(
418 l3_trust = ?self.l3_filter.trust_state(),
419 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
420 );
421
422 let _ = self.state.send(EngineState::Ready);
423 info!("Warm-up complete, engine ready");
424 Ok(())
425 }
426
427 pub async fn tick(&self) {
429 self.maybe_evict();
430 self.maybe_flush_l2().await;
431 }
432
433 pub async fn force_flush(&self) {
435 let batch = self.l2_batcher.lock().await.force_flush();
436 if let Some(batch) = batch {
437 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
438 self.flush_batch_internal(batch).await;
439 }
440 }
441
442 #[tracing::instrument(skip(self))]
457 pub async fn run(&self) {
458 let _ = self.state.send(EngineState::Running);
459 info!("Sync engine running");
460
461 let mut health_check_interval = tokio::time::interval(
462 tokio::time::Duration::from_secs(30)
463 );
464 let mut wal_drain_interval = tokio::time::interval(
465 tokio::time::Duration::from_secs(5)
466 );
467 let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
468 let mut cf_snapshot_interval = tokio::time::interval(
469 tokio::time::Duration::from_secs(cf_snapshot_secs)
470 );
471
472 loop {
473 let config_changed = {
475 let rx = self.config_rx.lock().await;
476 rx.has_changed().unwrap_or(false)
478 };
479
480 if config_changed {
481 let new_config = self.config_rx.lock().await.borrow_and_update().clone();
482 info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
483 *self.config.write() = new_config;
484 }
485
486 tokio::select! {
487 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
488 self.maybe_evict();
489 self.maybe_flush_l2().await;
490 self.maybe_snapshot_cf_by_threshold().await;
491 }
492
493 _ = health_check_interval.tick() => {
494 self.check_mysql_health().await;
495 }
496
497 _ = wal_drain_interval.tick() => {
498 self.maybe_drain_wal().await;
499 }
500
501 _ = cf_snapshot_interval.tick() => {
502 self.maybe_snapshot_cf_by_time().await;
503 }
504 }
505 }
506 }
507
508 #[tracing::instrument(skip(self))]
510 pub async fn shutdown(&self) {
511 use crate::FlushReason;
512
513 let shutdown_start = std::time::Instant::now();
514 info!("Initiating sync engine shutdown...");
515 let _ = self.state.send(EngineState::ShuttingDown);
516
517 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
518 if let Some(batch) = batch {
519 let batch_size = batch.items.len();
520 info!(batch_size, "Flushing final L2 batch on shutdown");
521 {
522 let mut batcher = self.l2_batcher.lock().await;
523 batcher.add_batch(batch.items);
524 }
525 self.maybe_flush_l2().await;
526 crate::metrics::record_items_written("L2", batch_size);
527 }
528
529 self.snapshot_cuckoo_filters("shutdown").await;
530
531 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
532 info!("Sync engine shutdown complete");
533 }
534}