1use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33 pub async fn start(&mut self) -> Result<(), StorageError> {
34 let startup_start = std::time::Instant::now();
35 info!("Starting sync engine with trust-verified startup...");
36 let _ = self.state.send(EngineState::Connecting);
37
38 let phase_start = std::time::Instant::now();
40 let wal_path = self.config.wal_path.clone()
41 .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42 let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43
44 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45 Ok(wal) => {
46 info!(path = %wal_path, "Write-ahead log initialized");
47 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48 wal
49 }
50 Err(e) => {
51 crate::metrics::record_error("WAL", "init", "sqlite");
52 return Err(StorageError::Backend(format!(
53 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54 wal_path, e
55 )));
56 }
57 };
58
59 match FilterPersistence::new(&wal_path).await {
61 Ok(fp) => {
62 self.filter_persistence = Some(fp);
63 }
64 Err(e) => {
65 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66 crate::metrics::record_error("filter", "init", "persistence");
67 }
68 }
69
70 let pending_count = if wal.has_pending() {
71 wal.stats(false).pending_items
72 } else {
73 0
74 };
75 self.l3_wal = Some(wal);
76
77 let phase_start = std::time::Instant::now();
79 if let Some(ref sql_url) = self.config.sql_url {
80 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81 match crate::storage::sql::SqlStore::new(sql_url).await {
82 Ok(store) => {
83 let is_sqlite = sql_url.starts_with("sqlite:");
85 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86 if let Err(e) = sql_merkle.init_schema().await {
87 error!(error = %e, "Failed to initialize SQL merkle schema");
88 crate::metrics::record_error("L3", "init", "merkle_schema");
89 return Err(StorageError::Backend(format!(
90 "Failed to initialize SQL merkle schema: {}", e
91 )));
92 }
93 self.sql_merkle = Some(sql_merkle);
94
95 let store = std::sync::Arc::new(store);
97 self.sql_store = Some(store.clone());
98 self.l3_store = Some(store);
99 tracing::Span::current().record("has_sql", true);
100 self.mysql_health.record_success();
101 crate::metrics::set_backend_healthy("mysql", true);
102 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
103 info!("SQL (L3) connected with merkle store (ground truth)");
104 }
105 Err(e) => {
106 tracing::Span::current().record("has_sql", false);
107 error!(error = %e, "Failed to connect to SQL - this is required for startup");
108 self.mysql_health.record_failure();
109 crate::metrics::set_backend_healthy("mysql", false);
110 crate::metrics::record_connection_error("mysql");
111 return Err(StorageError::Backend(format!(
112 "SQL connection required for startup: {}", e
113 )));
114 }
115 }
116 } else {
117 warn!("No SQL URL configured - operating without ground truth storage!");
118 tracing::Span::current().record("has_sql", false);
119 }
120
121 if pending_count > 0 {
123 let phase_start = std::time::Instant::now();
124 let _ = self.state.send(EngineState::DrainingWal);
125 info!(pending = pending_count, "Draining WAL to SQL before startup...");
126
127 if let Some(ref l3) = self.l3_store {
128 if let Some(ref wal) = self.l3_wal {
129 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
130 Ok(drained) => {
131 info!(drained = drained.len(), "WAL drained to SQL");
132 crate::metrics::record_items_written("L3", drained.len());
133 }
134 Err(e) => {
135 warn!(error = %e, "WAL drain had errors - some items may retry later");
136 crate::metrics::record_error("WAL", "drain", "partial");
137 }
138 }
139 }
140 }
141 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
142 }
143
144 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
146 match sql_merkle.root_hash().await {
147 Ok(Some(root)) => {
148 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
149 Some(root)
150 }
151 Ok(None) => {
152 info!("SQL merkle tree is empty (no data yet)");
153 None
154 }
155 Err(e) => {
156 warn!(error = %e, "Failed to get SQL merkle root");
157 None
158 }
159 }
160 } else {
161 None
162 };
163
164 let phase_start = std::time::Instant::now();
166 self.restore_cuckoo_filters(&sql_root).await;
167 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
168
169 let phase_start = std::time::Instant::now();
171 if let Some(ref redis_url) = self.config.redis_url {
172 info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
173 match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
174 Ok(store) => {
175 let redis_merkle = RedisMerkleStore::with_prefix(
176 store.connection(),
177 self.config.redis_prefix.as_deref(),
178 );
179 self.redis_merkle = Some(redis_merkle);
180 let store = std::sync::Arc::new(store);
181 self.redis_store = Some(store.clone()); self.l2_store = Some(store);
183 tracing::Span::current().record("has_redis", true);
184 crate::metrics::set_backend_healthy("redis", true);
185 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
186 info!("Redis (L2) connected with merkle shadow tree");
187 }
188 Err(e) => {
189 tracing::Span::current().record("has_redis", false);
190 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
191 crate::metrics::set_backend_healthy("redis", false);
192 crate::metrics::record_connection_error("redis");
193 }
194 }
195 } else {
196 tracing::Span::current().record("has_redis", false);
197 }
198
199 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
201 (&self.sql_merkle, &self.redis_merkle, &sql_root)
202 {
203 let phase_start = std::time::Instant::now();
204 let _ = self.state.send(EngineState::SyncingRedis);
205
206 match redis_merkle.root_hash().await {
207 Ok(Some(redis_root)) if &redis_root == sql_root => {
208 info!("Redis merkle root matches SQL - Redis is in sync");
209 }
210 Ok(Some(redis_root)) => {
211 info!(
212 sql_root = %hex::encode(sql_root),
213 redis_root = %hex::encode(redis_root),
214 "Redis merkle root mismatch - initiating branch diff sync"
215 );
216
217 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
218 Ok(synced) => {
219 info!(items_synced = synced, "Redis sync complete via branch diff");
220 crate::metrics::record_items_written("L2", synced);
221 }
222 Err(e) => {
223 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
224 crate::metrics::record_error("L2", "sync", "branch_diff");
225 }
226 }
227 }
228 Ok(None) => {
229 info!("Redis merkle tree is empty - will be populated on writes");
230 }
231 Err(e) => {
232 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
233 crate::metrics::record_error("L2", "merkle", "root_hash");
234 }
235 }
236 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
237 }
238
239 let _ = self.state.send(EngineState::Ready);
240 crate::metrics::record_startup_total(startup_start.elapsed());
241 info!("Sync engine ready (trust-verified startup complete)");
242 Ok(())
243 }
244
245 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
247 let persistence = match &self.filter_persistence {
248 Some(p) => p,
249 None => return,
250 };
251
252 let sql_root = match sql_root {
253 Some(r) => r,
254 None => return,
255 };
256
257 match persistence.load(L3_FILTER_ID).await {
261 Ok(Some(state)) if &state.merkle_root == sql_root => {
262 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
263 warn!(error = %e, "Failed to import L3 filter from snapshot");
264 } else {
265 self.l3_filter.mark_trusted();
266 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
267 }
268 }
269 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
270 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
271 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
272 }
273 }
274
275 async fn sync_redis_from_sql_diff(
277 &self,
278 sql_merkle: &SqlMerkleStore,
279 redis_merkle: &RedisMerkleStore,
280 ) -> Result<usize, StorageError> {
281 let mut total_synced = 0;
282 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
283
284 for prefix in stale_prefixes {
285 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
286
287 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
288 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
289
290 if leaf_paths.is_empty() {
291 continue;
292 }
293
294 let mut merkle_batch = MerkleBatch::new();
295
296 if let Some(ref l3_store) = self.l3_store {
297 for object_id in &leaf_paths {
298 if let Ok(Some(item)) = l3_store.get(object_id).await {
299 let payload_hash = PathMerkle::payload_hash(&item.content);
300 let leaf_hash = PathMerkle::leaf_hash(
301 &item.object_id,
302 item.version,
303 item.updated_at,
304 &payload_hash,
305 );
306 merkle_batch.insert(object_id.clone(), leaf_hash);
307
308 if let Some(ref l2_store) = self.l2_store {
309 if let Err(e) = l2_store.put(&item).await {
310 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
311 } else {
312 total_synced += 1;
313 }
314 }
315 }
316 }
317
318 if !merkle_batch.is_empty() {
319 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
320 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
321 }
322 }
323 }
324 }
325
326 Ok(total_synced)
327 }
328
329 async fn find_stale_branches(
331 &self,
332 sql_merkle: &SqlMerkleStore,
333 redis_merkle: &RedisMerkleStore,
334 prefix: &str,
335 ) -> Result<Vec<String>, StorageError> {
336 let mut stale = Vec::new();
337
338 let sql_children = sql_merkle.get_children(prefix).await
339 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
340 let redis_children = redis_merkle.get_children(prefix).await
341 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
342
343 if sql_children.is_empty() {
344 return Ok(stale);
345 }
346
347 for (child_path, sql_hash) in sql_children {
348 match redis_children.get(&child_path) {
349 Some(redis_hash) if redis_hash == &sql_hash => continue,
350 Some(_) => {
351 if child_path.contains('.') && !child_path.ends_with('.') {
352 stale.push(child_path);
353 } else {
354 let sub_stale = Box::pin(
355 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
356 ).await?;
357 stale.extend(sub_stale);
358 }
359 }
360 None => stale.push(child_path),
361 }
362 }
363
364 Ok(stale)
365 }
366
367 #[tracing::instrument(skip(self))]
369 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
370 let _ = self.state.send(EngineState::WarmingUp);
371 info!("Warming up cuckoo filter and L1 cache...");
372
373 if let Some(l3) = &self.l3_store {
374 let batch_size = self.config.cuckoo_warmup_batch_size;
375 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
376
377 let total_count = l3.count_all().await.unwrap_or(0);
378 if total_count > 0 {
379 let mut offset = 0u64;
380 let mut loaded = 0usize;
381
382 loop {
383 let keys = l3.scan_keys(offset, batch_size).await?;
384 if keys.is_empty() {
385 break;
386 }
387
388 for key in &keys {
389 self.l3_filter.insert(key);
390 }
391
392 loaded += keys.len();
393 offset += keys.len() as u64;
394
395 if loaded % 10_000 == 0 || loaded == total_count as usize {
396 debug!(loaded, total = %total_count, "L3 filter warmup progress");
397 }
398 }
399
400 self.l3_filter.mark_trusted();
401 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
402 } else {
403 info!("L3 store is empty, skipping filter warmup");
404 self.l3_filter.mark_trusted();
405 }
406 }
407
408 info!(
409 l3_trust = ?self.l3_filter.trust_state(),
410 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
411 );
412
413 let _ = self.state.send(EngineState::Ready);
414 info!("Warm-up complete, engine ready");
415 Ok(())
416 }
417
418 pub async fn tick(&self) {
420 self.maybe_evict();
421 self.maybe_flush_l2().await;
422 }
423
424 pub async fn force_flush(&self) {
426 let batch = self.l2_batcher.lock().await.force_flush();
427 if let Some(batch) = batch {
428 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
429 self.flush_batch_internal(batch).await;
430 }
431 }
432
433 #[tracing::instrument(skip(self))]
435 pub async fn run(&mut self) {
436 let _ = self.state.send(EngineState::Running);
437 info!("Sync engine running");
438
439 let mut health_check_interval = tokio::time::interval(
440 tokio::time::Duration::from_secs(30)
441 );
442 let mut wal_drain_interval = tokio::time::interval(
443 tokio::time::Duration::from_secs(5)
444 );
445 let mut cf_snapshot_interval = tokio::time::interval(
446 tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
447 );
448
449 loop {
450 tokio::select! {
451 Ok(()) = self.config_rx.changed() => {
452 let new_config = self.config_rx.borrow().clone();
453 info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
454 self.config = new_config;
455 }
456
457 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
458 self.maybe_evict();
459 self.maybe_flush_l2().await;
460 self.maybe_snapshot_cf_by_threshold().await;
461 }
462
463 _ = health_check_interval.tick() => {
464 self.check_mysql_health().await;
465 }
466
467 _ = wal_drain_interval.tick() => {
468 self.maybe_drain_wal().await;
469 }
470
471 _ = cf_snapshot_interval.tick() => {
472 self.maybe_snapshot_cf_by_time().await;
473 }
474 }
475 }
476 }
477
478 #[tracing::instrument(skip(self))]
480 pub async fn shutdown(&self) {
481 use crate::FlushReason;
482
483 let shutdown_start = std::time::Instant::now();
484 info!("Initiating sync engine shutdown...");
485 let _ = self.state.send(EngineState::ShuttingDown);
486
487 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
488 if let Some(batch) = batch {
489 let batch_size = batch.items.len();
490 info!(batch_size, "Flushing final L2 batch on shutdown");
491 {
492 let mut batcher = self.l2_batcher.lock().await;
493 batcher.add_batch(batch.items);
494 }
495 self.maybe_flush_l2().await;
496 crate::metrics::record_items_written("L2", batch_size);
497 }
498
499 self.snapshot_cuckoo_filters("shutdown").await;
500
501 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
502 info!("Sync engine shutdown complete");
503 }
504}