1use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36 pub async fn start(&mut self) -> Result<(), StorageError> {
37 let startup_start = std::time::Instant::now();
38 info!("Starting sync engine with trust-verified startup...");
39 let _ = self.state.send(EngineState::Connecting);
40
41 let phase_start = std::time::Instant::now();
43 let (wal_path, wal_max_items) = {
44 let cfg = self.config.read();
45 (
46 cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47 cfg.wal_max_items.unwrap_or(1_000_000),
48 )
49 };
50
51 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52 Ok(wal) => {
53 info!(path = %wal_path, "Write-ahead log initialized");
54 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55 wal
56 }
57 Err(e) => {
58 crate::metrics::record_error("WAL", "init", "sqlite");
59 return Err(StorageError::Backend(format!(
60 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61 wal_path, e
62 )));
63 }
64 };
65
66 match FilterPersistence::new(&wal_path).await {
68 Ok(fp) => {
69 self.filter_persistence = Some(fp);
70 }
71 Err(e) => {
72 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73 crate::metrics::record_error("filter", "init", "persistence");
74 }
75 }
76
77 let pending_count = if wal.has_pending() {
78 wal.stats(false).pending_items
79 } else {
80 0
81 };
82 self.l3_wal = Some(wal);
83
84 let phase_start = std::time::Instant::now();
86 let sql_url = self.config.read().sql_url.clone();
87 if let Some(ref sql_url) = sql_url {
88 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89 match crate::storage::sql::SqlStore::new(sql_url).await {
90 Ok(store) => {
91 let is_sqlite = sql_url.starts_with("sqlite:");
93 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
94 if let Err(e) = sql_merkle.init_schema().await {
95 error!(error = %e, "Failed to initialize SQL merkle schema");
96 crate::metrics::record_error("L3", "init", "merkle_schema");
97 return Err(StorageError::Backend(format!(
98 "Failed to initialize SQL merkle schema: {}", e
99 )));
100 }
101 self.sql_merkle = Some(sql_merkle);
102
103 let store = std::sync::Arc::new(store);
105 self.sql_store = Some(store.clone());
106 self.l3_store = Some(store);
107 tracing::Span::current().record("has_sql", true);
108 self.mysql_health.record_success();
109 crate::metrics::set_backend_healthy("mysql", true);
110 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
111 info!("SQL (L3) connected with merkle store (ground truth)");
112 }
113 Err(e) => {
114 tracing::Span::current().record("has_sql", false);
115 error!(error = %e, "Failed to connect to SQL - this is required for startup");
116 self.mysql_health.record_failure();
117 crate::metrics::set_backend_healthy("mysql", false);
118 crate::metrics::record_connection_error("mysql");
119 return Err(StorageError::Backend(format!(
120 "SQL connection required for startup: {}", e
121 )));
122 }
123 }
124 } else {
125 warn!("No SQL URL configured - operating without ground truth storage!");
126 tracing::Span::current().record("has_sql", false);
127 }
128
129 if pending_count > 0 {
131 let phase_start = std::time::Instant::now();
132 let _ = self.state.send(EngineState::DrainingWal);
133 info!(pending = pending_count, "Draining WAL to SQL before startup...");
134
135 if let Some(ref l3) = self.l3_store {
136 if let Some(ref wal) = self.l3_wal {
137 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
138 Ok(drained) => {
139 info!(drained = drained.len(), "WAL drained to SQL");
140 crate::metrics::record_items_written("L3", drained.len());
141 }
142 Err(e) => {
143 warn!(error = %e, "WAL drain had errors - some items may retry later");
144 crate::metrics::record_error("WAL", "drain", "partial");
145 }
146 }
147 }
148 }
149 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
150 }
151
152 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
154 match sql_merkle.root_hash().await {
155 Ok(Some(root)) => {
156 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
157 Some(root)
158 }
159 Ok(None) => {
160 info!("SQL merkle tree is empty (no data yet)");
161 None
162 }
163 Err(e) => {
164 warn!(error = %e, "Failed to get SQL merkle root");
165 None
166 }
167 }
168 } else {
169 None
170 };
171
172 let phase_start = std::time::Instant::now();
174 self.restore_cuckoo_filters(&sql_root).await;
175 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
176
177 let phase_start = std::time::Instant::now();
179 let (redis_url, redis_prefix) = {
180 let cfg = self.config.read();
181 (cfg.redis_url.clone(), cfg.redis_prefix.clone())
182 };
183 if let Some(ref redis_url) = redis_url {
184 info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
185 match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
186 Ok(store) => {
187 let redis_merkle = RedisMerkleStore::with_prefix(
188 store.connection(),
189 redis_prefix.as_deref(),
190 );
191 self.redis_merkle = Some(redis_merkle);
192 let store = std::sync::Arc::new(store);
193 self.redis_store = Some(store.clone()); self.l2_store = Some(store);
195 tracing::Span::current().record("has_redis", true);
196 crate::metrics::set_backend_healthy("redis", true);
197 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
198 info!("Redis (L2) connected with merkle shadow tree");
199 }
200 Err(e) => {
201 tracing::Span::current().record("has_redis", false);
202 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
203 crate::metrics::set_backend_healthy("redis", false);
204 crate::metrics::record_connection_error("redis");
205 }
206 }
207 } else {
208 tracing::Span::current().record("has_redis", false);
209 }
210
211 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
213 (&self.sql_merkle, &self.redis_merkle, &sql_root)
214 {
215 let phase_start = std::time::Instant::now();
216 let _ = self.state.send(EngineState::SyncingRedis);
217
218 match redis_merkle.root_hash().await {
219 Ok(Some(redis_root)) if &redis_root == sql_root => {
220 info!("Redis merkle root matches SQL - Redis is in sync");
221 }
222 Ok(Some(redis_root)) => {
223 info!(
224 sql_root = %hex::encode(sql_root),
225 redis_root = %hex::encode(redis_root),
226 "Redis merkle root mismatch - initiating branch diff sync"
227 );
228
229 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
230 Ok(synced) => {
231 info!(items_synced = synced, "Redis sync complete via branch diff");
232 crate::metrics::record_items_written("L2", synced);
233 }
234 Err(e) => {
235 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
236 crate::metrics::record_error("L2", "sync", "branch_diff");
237 }
238 }
239 }
240 Ok(None) => {
241 info!("Redis merkle tree is empty - will be populated on writes");
242 }
243 Err(e) => {
244 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
245 crate::metrics::record_error("L2", "merkle", "root_hash");
246 }
247 }
248 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
249 }
250
251 let _ = self.state.send(EngineState::Ready);
252 crate::metrics::record_startup_total(startup_start.elapsed());
253 info!("Sync engine ready (trust-verified startup complete)");
254 Ok(())
255 }
256
257 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
259 let persistence = match &self.filter_persistence {
260 Some(p) => p,
261 None => return,
262 };
263
264 let sql_root = match sql_root {
265 Some(r) => r,
266 None => return,
267 };
268
269 match persistence.load(L3_FILTER_ID).await {
273 Ok(Some(state)) if &state.merkle_root == sql_root => {
274 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
275 warn!(error = %e, "Failed to import L3 filter from snapshot");
276 } else {
277 self.l3_filter.mark_trusted();
278 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
279 }
280 }
281 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
282 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
283 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
284 }
285 }
286
287 async fn sync_redis_from_sql_diff(
289 &self,
290 sql_merkle: &SqlMerkleStore,
291 redis_merkle: &RedisMerkleStore,
292 ) -> Result<usize, StorageError> {
293 let mut total_synced = 0;
294 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
295
296 for prefix in stale_prefixes {
297 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
298
299 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
300 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
301
302 if leaf_paths.is_empty() {
303 continue;
304 }
305
306 let mut merkle_batch = MerkleBatch::new();
307
308 if let Some(ref l3_store) = self.l3_store {
309 for object_id in &leaf_paths {
310 if let Ok(Some(item)) = l3_store.get(object_id).await {
311 let payload_hash = PathMerkle::payload_hash(&item.content);
312 let leaf_hash = PathMerkle::leaf_hash(
313 &item.object_id,
314 item.version,
315 item.updated_at,
316 &payload_hash,
317 );
318 merkle_batch.insert(object_id.clone(), leaf_hash);
319
320 if let Some(ref l2_store) = self.l2_store {
321 if let Err(e) = l2_store.put(&item).await {
322 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
323 } else {
324 total_synced += 1;
325 }
326 }
327 }
328 }
329
330 if !merkle_batch.is_empty() {
331 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
332 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
333 }
334 }
335 }
336 }
337
338 Ok(total_synced)
339 }
340
341 async fn find_stale_branches(
343 &self,
344 sql_merkle: &SqlMerkleStore,
345 redis_merkle: &RedisMerkleStore,
346 prefix: &str,
347 ) -> Result<Vec<String>, StorageError> {
348 let mut stale = Vec::new();
349
350 let sql_children = sql_merkle.get_children(prefix).await
351 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
352 let redis_children = redis_merkle.get_children(prefix).await
353 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
354
355 if sql_children.is_empty() {
356 return Ok(stale);
357 }
358
359 for (child_path, sql_hash) in sql_children {
360 match redis_children.get(&child_path) {
361 Some(redis_hash) if redis_hash == &sql_hash => continue,
362 Some(_) => {
363 if child_path.contains('.') && !child_path.ends_with('.') {
364 stale.push(child_path);
365 } else {
366 let sub_stale = Box::pin(
367 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
368 ).await?;
369 stale.extend(sub_stale);
370 }
371 }
372 None => stale.push(child_path),
373 }
374 }
375
376 Ok(stale)
377 }
378
379 #[tracing::instrument(skip(self))]
381 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
382 let _ = self.state.send(EngineState::WarmingUp);
383 info!("Warming up cuckoo filter and L1 cache...");
384
385 if let Some(l3) = &self.l3_store {
386 let batch_size = self.config.read().cuckoo_warmup_batch_size;
387 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
388
389 let total_count = l3.count_all().await.unwrap_or(0);
390 if total_count > 0 {
391 let mut offset = 0u64;
392 let mut loaded = 0usize;
393
394 loop {
395 let keys = l3.scan_keys(offset, batch_size).await?;
396 if keys.is_empty() {
397 break;
398 }
399
400 for key in &keys {
401 self.l3_filter.insert(key);
402 }
403
404 loaded += keys.len();
405 offset += keys.len() as u64;
406
407 if loaded % 10_000 == 0 || loaded == total_count as usize {
408 debug!(loaded, total = %total_count, "L3 filter warmup progress");
409 }
410 }
411
412 self.l3_filter.mark_trusted();
413 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
414 } else {
415 info!("L3 store is empty, skipping filter warmup");
416 self.l3_filter.mark_trusted();
417 }
418 }
419
420 info!(
421 l3_trust = ?self.l3_filter.trust_state(),
422 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
423 );
424
425 let _ = self.state.send(EngineState::Ready);
426 info!("Warm-up complete, engine ready");
427 Ok(())
428 }
429
430 pub async fn tick(&self) {
432 self.maybe_evict();
433 self.maybe_flush_l2().await;
434 }
435
436 pub async fn force_flush(&self) {
438 let batch = self.l2_batcher.lock().await.force_flush();
439 if let Some(batch) = batch {
440 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
441 self.flush_batch_internal(batch).await;
442 }
443 }
444
445 #[tracing::instrument(skip(self))]
460 pub async fn run(&self) {
461 let _ = self.state.send(EngineState::Running);
462 info!("Sync engine running");
463
464 let mut health_check_interval = tokio::time::interval(
465 tokio::time::Duration::from_secs(30)
466 );
467 let mut wal_drain_interval = tokio::time::interval(
468 tokio::time::Duration::from_secs(5)
469 );
470 let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
471 let mut cf_snapshot_interval = tokio::time::interval(
472 tokio::time::Duration::from_secs(cf_snapshot_secs)
473 );
474
475 loop {
476 let config_changed = {
478 let rx = self.config_rx.lock().await;
479 rx.has_changed().unwrap_or(false)
481 };
482
483 if config_changed {
484 let new_config = self.config_rx.lock().await.borrow_and_update().clone();
485 info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
486 *self.config.write() = new_config;
487 }
488
489 tokio::select! {
490 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
491 self.maybe_evict();
492 self.maybe_flush_l2().await;
493 self.maybe_snapshot_cf_by_threshold().await;
494 }
495
496 _ = health_check_interval.tick() => {
497 self.check_mysql_health().await;
498 }
499
500 _ = wal_drain_interval.tick() => {
501 self.maybe_drain_wal().await;
502 }
503
504 _ = cf_snapshot_interval.tick() => {
505 self.maybe_snapshot_cf_by_time().await;
506 }
507 }
508 }
509 }
510
511 #[tracing::instrument(skip(self))]
513 pub async fn shutdown(&self) {
514 use crate::FlushReason;
515
516 let shutdown_start = std::time::Instant::now();
517 info!("Initiating sync engine shutdown...");
518 let _ = self.state.send(EngineState::ShuttingDown);
519
520 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
521 if let Some(batch) = batch {
522 let batch_size = batch.items.len();
523 info!(batch_size, "Flushing final L2 batch on shutdown");
524 {
525 let mut batcher = self.l2_batcher.lock().await;
526 batcher.add_batch(batch.items);
527 }
528 self.maybe_flush_l2().await;
529 crate::metrics::record_items_written("L2", batch_size);
530 }
531
532 self.snapshot_cuckoo_filters("shutdown").await;
533
534 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
535 info!("Sync engine shutdown complete");
536 }
537}