1use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33 pub async fn start(&mut self) -> Result<(), StorageError> {
34 let startup_start = std::time::Instant::now();
35 info!("Starting sync engine with trust-verified startup...");
36 let _ = self.state.send(EngineState::Connecting);
37
38 let phase_start = std::time::Instant::now();
40 let wal_path = self.config.wal_path.clone()
41 .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42 let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43
44 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45 Ok(wal) => {
46 info!(path = %wal_path, "Write-ahead log initialized");
47 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48 wal
49 }
50 Err(e) => {
51 crate::metrics::record_error("WAL", "init", "sqlite");
52 return Err(StorageError::Backend(format!(
53 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54 wal_path, e
55 )));
56 }
57 };
58
59 match FilterPersistence::new(&wal_path).await {
61 Ok(fp) => {
62 self.filter_persistence = Some(fp);
63 }
64 Err(e) => {
65 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66 crate::metrics::record_error("filter", "init", "persistence");
67 }
68 }
69
70 let pending_count = if wal.has_pending() {
71 wal.stats(false).pending_items
72 } else {
73 0
74 };
75 self.l3_wal = Some(wal);
76
77 let phase_start = std::time::Instant::now();
79 if let Some(ref sql_url) = self.config.sql_url {
80 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81 match crate::storage::sql::SqlStore::new(sql_url).await {
82 Ok(store) => {
83 let is_sqlite = sql_url.starts_with("sqlite:");
85 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86 if let Err(e) = sql_merkle.init_schema().await {
87 error!(error = %e, "Failed to initialize SQL merkle schema");
88 crate::metrics::record_error("L3", "init", "merkle_schema");
89 return Err(StorageError::Backend(format!(
90 "Failed to initialize SQL merkle schema: {}", e
91 )));
92 }
93 self.sql_merkle = Some(sql_merkle);
94
95 let store = std::sync::Arc::new(store);
97 self.sql_store = Some(store.clone());
98 self.l3_store = Some(store);
99 tracing::Span::current().record("has_sql", true);
100 self.mysql_health.record_success();
101 crate::metrics::set_backend_healthy("mysql", true);
102 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
103 info!("SQL (L3) connected with merkle store (ground truth)");
104 }
105 Err(e) => {
106 tracing::Span::current().record("has_sql", false);
107 error!(error = %e, "Failed to connect to SQL - this is required for startup");
108 self.mysql_health.record_failure();
109 crate::metrics::set_backend_healthy("mysql", false);
110 crate::metrics::record_connection_error("mysql");
111 return Err(StorageError::Backend(format!(
112 "SQL connection required for startup: {}", e
113 )));
114 }
115 }
116 } else {
117 warn!("No SQL URL configured - operating without ground truth storage!");
118 tracing::Span::current().record("has_sql", false);
119 }
120
121 if pending_count > 0 {
123 let phase_start = std::time::Instant::now();
124 let _ = self.state.send(EngineState::DrainingWal);
125 info!(pending = pending_count, "Draining WAL to SQL before startup...");
126
127 if let Some(ref l3) = self.l3_store {
128 if let Some(ref wal) = self.l3_wal {
129 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
130 Ok(drained) => {
131 info!(drained = drained.len(), "WAL drained to SQL");
132 crate::metrics::record_items_written("L3", drained.len());
133 }
134 Err(e) => {
135 warn!(error = %e, "WAL drain had errors - some items may retry later");
136 crate::metrics::record_error("WAL", "drain", "partial");
137 }
138 }
139 }
140 }
141 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
142 }
143
144 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
146 match sql_merkle.root_hash().await {
147 Ok(Some(root)) => {
148 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
149 Some(root)
150 }
151 Ok(None) => {
152 info!("SQL merkle tree is empty (no data yet)");
153 None
154 }
155 Err(e) => {
156 warn!(error = %e, "Failed to get SQL merkle root");
157 None
158 }
159 }
160 } else {
161 None
162 };
163
164 let phase_start = std::time::Instant::now();
166 self.restore_cuckoo_filters(&sql_root).await;
167 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
168
169 let phase_start = std::time::Instant::now();
171 if let Some(ref redis_url) = self.config.redis_url {
172 info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
173 match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
174 Ok(store) => {
175 let redis_merkle = RedisMerkleStore::with_prefix(
176 store.connection(),
177 self.config.redis_prefix.as_deref(),
178 );
179 self.redis_merkle = Some(redis_merkle);
180 self.l2_store = Some(std::sync::Arc::new(store));
181 tracing::Span::current().record("has_redis", true);
182 crate::metrics::set_backend_healthy("redis", true);
183 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
184 info!("Redis (L2) connected with merkle shadow tree");
185 }
186 Err(e) => {
187 tracing::Span::current().record("has_redis", false);
188 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
189 crate::metrics::set_backend_healthy("redis", false);
190 crate::metrics::record_connection_error("redis");
191 }
192 }
193 } else {
194 tracing::Span::current().record("has_redis", false);
195 }
196
197 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
199 (&self.sql_merkle, &self.redis_merkle, &sql_root)
200 {
201 let phase_start = std::time::Instant::now();
202 let _ = self.state.send(EngineState::SyncingRedis);
203
204 match redis_merkle.root_hash().await {
205 Ok(Some(redis_root)) if &redis_root == sql_root => {
206 info!("Redis merkle root matches SQL - Redis is in sync");
207 }
208 Ok(Some(redis_root)) => {
209 info!(
210 sql_root = %hex::encode(sql_root),
211 redis_root = %hex::encode(redis_root),
212 "Redis merkle root mismatch - initiating branch diff sync"
213 );
214
215 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
216 Ok(synced) => {
217 info!(items_synced = synced, "Redis sync complete via branch diff");
218 crate::metrics::record_items_written("L2", synced);
219 }
220 Err(e) => {
221 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
222 crate::metrics::record_error("L2", "sync", "branch_diff");
223 }
224 }
225 }
226 Ok(None) => {
227 info!("Redis merkle tree is empty - will be populated on writes");
228 }
229 Err(e) => {
230 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
231 crate::metrics::record_error("L2", "merkle", "root_hash");
232 }
233 }
234 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
235 }
236
237 let _ = self.state.send(EngineState::Ready);
238 crate::metrics::record_startup_total(startup_start.elapsed());
239 info!("Sync engine ready (trust-verified startup complete)");
240 Ok(())
241 }
242
243 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
245 let persistence = match &self.filter_persistence {
246 Some(p) => p,
247 None => return,
248 };
249
250 let sql_root = match sql_root {
251 Some(r) => r,
252 None => return,
253 };
254
255 match persistence.load(L3_FILTER_ID).await {
259 Ok(Some(state)) if &state.merkle_root == sql_root => {
260 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
261 warn!(error = %e, "Failed to import L3 filter from snapshot");
262 } else {
263 self.l3_filter.mark_trusted();
264 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
265 }
266 }
267 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
268 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
269 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
270 }
271 }
272
273 async fn sync_redis_from_sql_diff(
275 &self,
276 sql_merkle: &SqlMerkleStore,
277 redis_merkle: &RedisMerkleStore,
278 ) -> Result<usize, StorageError> {
279 let mut total_synced = 0;
280 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
281
282 for prefix in stale_prefixes {
283 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
284
285 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
286 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
287
288 if leaf_paths.is_empty() {
289 continue;
290 }
291
292 let mut merkle_batch = MerkleBatch::new();
293
294 if let Some(ref l3_store) = self.l3_store {
295 for object_id in &leaf_paths {
296 if let Ok(Some(item)) = l3_store.get(object_id).await {
297 let payload_hash = PathMerkle::payload_hash(&item.content);
298 let leaf_hash = PathMerkle::leaf_hash(
299 &item.object_id,
300 item.version,
301 item.updated_at,
302 &payload_hash,
303 );
304 merkle_batch.insert(object_id.clone(), leaf_hash);
305
306 if let Some(ref l2_store) = self.l2_store {
307 if let Err(e) = l2_store.put(&item).await {
308 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
309 } else {
310 total_synced += 1;
311 }
312 }
313 }
314 }
315
316 if !merkle_batch.is_empty() {
317 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
318 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
319 }
320 }
321 }
322 }
323
324 Ok(total_synced)
325 }
326
327 async fn find_stale_branches(
329 &self,
330 sql_merkle: &SqlMerkleStore,
331 redis_merkle: &RedisMerkleStore,
332 prefix: &str,
333 ) -> Result<Vec<String>, StorageError> {
334 let mut stale = Vec::new();
335
336 let sql_children = sql_merkle.get_children(prefix).await
337 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
338 let redis_children = redis_merkle.get_children(prefix).await
339 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
340
341 if sql_children.is_empty() {
342 return Ok(stale);
343 }
344
345 for (child_path, sql_hash) in sql_children {
346 match redis_children.get(&child_path) {
347 Some(redis_hash) if redis_hash == &sql_hash => continue,
348 Some(_) => {
349 if child_path.contains('.') && !child_path.ends_with('.') {
350 stale.push(child_path);
351 } else {
352 let sub_stale = Box::pin(
353 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
354 ).await?;
355 stale.extend(sub_stale);
356 }
357 }
358 None => stale.push(child_path),
359 }
360 }
361
362 Ok(stale)
363 }
364
365 #[tracing::instrument(skip(self))]
367 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
368 let _ = self.state.send(EngineState::WarmingUp);
369 info!("Warming up cuckoo filter and L1 cache...");
370
371 if let Some(l3) = &self.l3_store {
372 let batch_size = self.config.cuckoo_warmup_batch_size;
373 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
374
375 let total_count = l3.count_all().await.unwrap_or(0);
376 if total_count > 0 {
377 let mut offset = 0u64;
378 let mut loaded = 0usize;
379
380 loop {
381 let keys = l3.scan_keys(offset, batch_size).await?;
382 if keys.is_empty() {
383 break;
384 }
385
386 for key in &keys {
387 self.l3_filter.insert(key);
388 }
389
390 loaded += keys.len();
391 offset += keys.len() as u64;
392
393 if loaded % 10_000 == 0 || loaded == total_count as usize {
394 debug!(loaded, total = %total_count, "L3 filter warmup progress");
395 }
396 }
397
398 self.l3_filter.mark_trusted();
399 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
400 } else {
401 info!("L3 store is empty, skipping filter warmup");
402 self.l3_filter.mark_trusted();
403 }
404 }
405
406 info!(
407 l3_trust = ?self.l3_filter.trust_state(),
408 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
409 );
410
411 let _ = self.state.send(EngineState::Ready);
412 info!("Warm-up complete, engine ready");
413 Ok(())
414 }
415
416 pub async fn tick(&self) {
418 self.maybe_evict();
419 self.maybe_flush_l2().await;
420 }
421
422 pub async fn force_flush(&self) {
424 let batch = self.l2_batcher.lock().await.force_flush();
425 if let Some(batch) = batch {
426 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
427 self.flush_batch_internal(batch).await;
428 }
429 }
430
431 #[tracing::instrument(skip(self))]
433 pub async fn run(&mut self) {
434 let _ = self.state.send(EngineState::Running);
435 info!("Sync engine running");
436
437 let mut health_check_interval = tokio::time::interval(
438 tokio::time::Duration::from_secs(30)
439 );
440 let mut wal_drain_interval = tokio::time::interval(
441 tokio::time::Duration::from_secs(5)
442 );
443 let mut cf_snapshot_interval = tokio::time::interval(
444 tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
445 );
446
447 loop {
448 tokio::select! {
449 Ok(()) = self.config_rx.changed() => {
450 let new_config = self.config_rx.borrow().clone();
451 info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
452 self.config = new_config;
453 }
454
455 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
456 self.maybe_evict();
457 self.maybe_flush_l2().await;
458 self.maybe_snapshot_cf_by_threshold().await;
459 }
460
461 _ = health_check_interval.tick() => {
462 self.check_mysql_health().await;
463 }
464
465 _ = wal_drain_interval.tick() => {
466 self.maybe_drain_wal().await;
467 }
468
469 _ = cf_snapshot_interval.tick() => {
470 self.maybe_snapshot_cf_by_time().await;
471 }
472 }
473 }
474 }
475
476 #[tracing::instrument(skip(self))]
478 pub async fn shutdown(&mut self) {
479 use crate::FlushReason;
480
481 let shutdown_start = std::time::Instant::now();
482 info!("Initiating sync engine shutdown...");
483 let _ = self.state.send(EngineState::ShuttingDown);
484
485 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
486 if let Some(batch) = batch {
487 let batch_size = batch.items.len();
488 info!(batch_size, "Flushing final L2 batch on shutdown");
489 {
490 let mut batcher = self.l2_batcher.lock().await;
491 batcher.add_batch(batch.items);
492 }
493 self.maybe_flush_l2().await;
494 crate::metrics::record_items_written("L2", batch_size);
495 }
496
497 self.snapshot_cuckoo_filters("shutdown").await;
498
499 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
500 info!("Sync engine shutdown complete");
501 }
502}