1use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17 #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33 pub async fn start(&mut self) -> Result<(), StorageError> {
34 let startup_start = std::time::Instant::now();
35 info!("Starting sync engine with trust-verified startup...");
36 let _ = self.state.send(EngineState::Connecting);
37
38 let phase_start = std::time::Instant::now();
40 let wal_path = self.config.wal_path.clone()
41 .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42 let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43
44 let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45 Ok(wal) => {
46 info!(path = %wal_path, "Write-ahead log initialized");
47 crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48 wal
49 }
50 Err(e) => {
51 crate::metrics::record_error("WAL", "init", "sqlite");
52 return Err(StorageError::Backend(format!(
53 "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54 wal_path, e
55 )));
56 }
57 };
58
59 match FilterPersistence::new(&wal_path).await {
61 Ok(fp) => {
62 self.filter_persistence = Some(fp);
63 }
64 Err(e) => {
65 warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66 crate::metrics::record_error("filter", "init", "persistence");
67 }
68 }
69
70 let pending_count = if wal.has_pending() {
71 wal.stats(false).pending_items
72 } else {
73 0
74 };
75 self.l3_wal = Some(wal);
76
77 let phase_start = std::time::Instant::now();
79 if let Some(ref sql_url) = self.config.sql_url {
80 info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81 match crate::storage::sql::SqlStore::new(sql_url).await {
82 Ok(store) => {
83 let is_sqlite = sql_url.starts_with("sqlite:");
85 let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86 if let Err(e) = sql_merkle.init_schema().await {
87 error!(error = %e, "Failed to initialize SQL merkle schema");
88 crate::metrics::record_error("L3", "init", "merkle_schema");
89 return Err(StorageError::Backend(format!(
90 "Failed to initialize SQL merkle schema: {}", e
91 )));
92 }
93 self.sql_merkle = Some(sql_merkle);
94
95 self.l3_store = Some(std::sync::Arc::new(store));
96 tracing::Span::current().record("has_sql", true);
97 self.mysql_health.record_success();
98 crate::metrics::set_backend_healthy("mysql", true);
99 crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
100 info!("SQL (L3) connected with merkle store (ground truth)");
101 }
102 Err(e) => {
103 tracing::Span::current().record("has_sql", false);
104 error!(error = %e, "Failed to connect to SQL - this is required for startup");
105 self.mysql_health.record_failure();
106 crate::metrics::set_backend_healthy("mysql", false);
107 crate::metrics::record_connection_error("mysql");
108 return Err(StorageError::Backend(format!(
109 "SQL connection required for startup: {}", e
110 )));
111 }
112 }
113 } else {
114 warn!("No SQL URL configured - operating without ground truth storage!");
115 tracing::Span::current().record("has_sql", false);
116 }
117
118 if pending_count > 0 {
120 let phase_start = std::time::Instant::now();
121 let _ = self.state.send(EngineState::DrainingWal);
122 info!(pending = pending_count, "Draining WAL to SQL before startup...");
123
124 if let Some(ref l3) = self.l3_store {
125 if let Some(ref wal) = self.l3_wal {
126 match wal.drain_to(l3.as_ref(), pending_count as usize).await {
127 Ok(drained) => {
128 info!(drained = drained.len(), "WAL drained to SQL");
129 crate::metrics::record_items_written("L3", drained.len());
130 }
131 Err(e) => {
132 warn!(error = %e, "WAL drain had errors - some items may retry later");
133 crate::metrics::record_error("WAL", "drain", "partial");
134 }
135 }
136 }
137 }
138 crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
139 }
140
141 let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
143 match sql_merkle.root_hash().await {
144 Ok(Some(root)) => {
145 info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
146 Some(root)
147 }
148 Ok(None) => {
149 info!("SQL merkle tree is empty (no data yet)");
150 None
151 }
152 Err(e) => {
153 warn!(error = %e, "Failed to get SQL merkle root");
154 None
155 }
156 }
157 } else {
158 None
159 };
160
161 let phase_start = std::time::Instant::now();
163 self.restore_cuckoo_filters(&sql_root).await;
164 crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
165
166 let phase_start = std::time::Instant::now();
168 if let Some(ref redis_url) = self.config.redis_url {
169 info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
170 match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
171 Ok(store) => {
172 let redis_merkle = RedisMerkleStore::with_prefix(
173 store.connection(),
174 self.config.redis_prefix.as_deref(),
175 );
176 self.redis_merkle = Some(redis_merkle);
177 self.l2_store = Some(std::sync::Arc::new(store));
178 tracing::Span::current().record("has_redis", true);
179 crate::metrics::set_backend_healthy("redis", true);
180 crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
181 info!("Redis (L2) connected with merkle shadow tree");
182 }
183 Err(e) => {
184 tracing::Span::current().record("has_redis", false);
185 warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
186 crate::metrics::set_backend_healthy("redis", false);
187 crate::metrics::record_connection_error("redis");
188 }
189 }
190 } else {
191 tracing::Span::current().record("has_redis", false);
192 }
193
194 if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) =
196 (&self.sql_merkle, &self.redis_merkle, &sql_root)
197 {
198 let phase_start = std::time::Instant::now();
199 let _ = self.state.send(EngineState::SyncingRedis);
200
201 match redis_merkle.root_hash().await {
202 Ok(Some(redis_root)) if &redis_root == sql_root => {
203 info!("Redis merkle root matches SQL - Redis is in sync");
204 }
205 Ok(Some(redis_root)) => {
206 info!(
207 sql_root = %hex::encode(sql_root),
208 redis_root = %hex::encode(redis_root),
209 "Redis merkle root mismatch - initiating branch diff sync"
210 );
211
212 match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
213 Ok(synced) => {
214 info!(items_synced = synced, "Redis sync complete via branch diff");
215 crate::metrics::record_items_written("L2", synced);
216 }
217 Err(e) => {
218 warn!(error = %e, "Branch diff sync failed - Redis may be stale");
219 crate::metrics::record_error("L2", "sync", "branch_diff");
220 }
221 }
222 }
223 Ok(None) => {
224 info!("Redis merkle tree is empty - will be populated on writes");
225 }
226 Err(e) => {
227 warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
228 crate::metrics::record_error("L2", "merkle", "root_hash");
229 }
230 }
231 crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
232 }
233
234 let _ = self.state.send(EngineState::Ready);
235 crate::metrics::record_startup_total(startup_start.elapsed());
236 info!("Sync engine ready (trust-verified startup complete)");
237 Ok(())
238 }
239
240 async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
242 let persistence = match &self.filter_persistence {
243 Some(p) => p,
244 None => return,
245 };
246
247 let sql_root = match sql_root {
248 Some(r) => r,
249 None => return,
250 };
251
252 match persistence.load(L3_FILTER_ID).await {
256 Ok(Some(state)) if &state.merkle_root == sql_root => {
257 if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
258 warn!(error = %e, "Failed to import L3 filter from snapshot");
259 } else {
260 self.l3_filter.mark_trusted();
261 info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
262 }
263 }
264 Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
265 Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
266 Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
267 }
268 }
269
270 async fn sync_redis_from_sql_diff(
272 &self,
273 sql_merkle: &SqlMerkleStore,
274 redis_merkle: &RedisMerkleStore,
275 ) -> Result<usize, StorageError> {
276 let mut total_synced = 0;
277 let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
278
279 for prefix in stale_prefixes {
280 info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
281
282 let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
283 .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
284
285 if leaf_paths.is_empty() {
286 continue;
287 }
288
289 let mut merkle_batch = MerkleBatch::new();
290
291 if let Some(ref l3_store) = self.l3_store {
292 for object_id in &leaf_paths {
293 if let Ok(Some(item)) = l3_store.get(object_id).await {
294 let payload_hash = PathMerkle::payload_hash(&item.content);
295 let leaf_hash = PathMerkle::leaf_hash(
296 &item.object_id,
297 item.version,
298 item.updated_at,
299 &payload_hash,
300 );
301 merkle_batch.insert(object_id.clone(), leaf_hash);
302
303 if let Some(ref l2_store) = self.l2_store {
304 if let Err(e) = l2_store.put(&item).await {
305 warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
306 } else {
307 total_synced += 1;
308 }
309 }
310 }
311 }
312
313 if !merkle_batch.is_empty() {
314 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
315 warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
316 }
317 }
318 }
319 }
320
321 Ok(total_synced)
322 }
323
324 async fn find_stale_branches(
326 &self,
327 sql_merkle: &SqlMerkleStore,
328 redis_merkle: &RedisMerkleStore,
329 prefix: &str,
330 ) -> Result<Vec<String>, StorageError> {
331 let mut stale = Vec::new();
332
333 let sql_children = sql_merkle.get_children(prefix).await
334 .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
335 let redis_children = redis_merkle.get_children(prefix).await
336 .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
337
338 if sql_children.is_empty() {
339 return Ok(stale);
340 }
341
342 for (child_path, sql_hash) in sql_children {
343 match redis_children.get(&child_path) {
344 Some(redis_hash) if redis_hash == &sql_hash => continue,
345 Some(_) => {
346 if child_path.contains('.') && !child_path.ends_with('.') {
347 stale.push(child_path);
348 } else {
349 let sub_stale = Box::pin(
350 self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
351 ).await?;
352 stale.extend(sub_stale);
353 }
354 }
355 None => stale.push(child_path),
356 }
357 }
358
359 Ok(stale)
360 }
361
362 #[tracing::instrument(skip(self))]
364 pub async fn warm_up(&mut self) -> Result<(), StorageError> {
365 let _ = self.state.send(EngineState::WarmingUp);
366 info!("Warming up cuckoo filter and L1 cache...");
367
368 if let Some(l3) = &self.l3_store {
369 let batch_size = self.config.cuckoo_warmup_batch_size;
370 info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
371
372 let total_count = l3.count_all().await.unwrap_or(0);
373 if total_count > 0 {
374 let mut offset = 0u64;
375 let mut loaded = 0usize;
376
377 loop {
378 let keys = l3.scan_keys(offset, batch_size).await?;
379 if keys.is_empty() {
380 break;
381 }
382
383 for key in &keys {
384 self.l3_filter.insert(key);
385 }
386
387 loaded += keys.len();
388 offset += keys.len() as u64;
389
390 if loaded % 10_000 == 0 || loaded == total_count as usize {
391 debug!(loaded, total = %total_count, "L3 filter warmup progress");
392 }
393 }
394
395 self.l3_filter.mark_trusted();
396 info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
397 } else {
398 info!("L3 store is empty, skipping filter warmup");
399 self.l3_filter.mark_trusted();
400 }
401 }
402
403 info!(
404 l3_trust = ?self.l3_filter.trust_state(),
405 "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
406 );
407
408 let _ = self.state.send(EngineState::Ready);
409 info!("Warm-up complete, engine ready");
410 Ok(())
411 }
412
413 pub async fn tick(&self) {
415 self.maybe_evict();
416 self.maybe_flush_l2().await;
417 }
418
419 pub async fn force_flush(&self) {
421 let batch = self.l2_batcher.lock().await.force_flush();
422 if let Some(batch) = batch {
423 debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
424 self.flush_batch_internal(batch).await;
425 }
426 }
427
428 #[tracing::instrument(skip(self))]
430 pub async fn run(&mut self) {
431 let _ = self.state.send(EngineState::Running);
432 info!("Sync engine running");
433
434 let mut health_check_interval = tokio::time::interval(
435 tokio::time::Duration::from_secs(30)
436 );
437 let mut wal_drain_interval = tokio::time::interval(
438 tokio::time::Duration::from_secs(5)
439 );
440 let mut cf_snapshot_interval = tokio::time::interval(
441 tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
442 );
443
444 loop {
445 tokio::select! {
446 Ok(()) = self.config_rx.changed() => {
447 let new_config = self.config_rx.borrow().clone();
448 info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
449 self.config = new_config;
450 }
451
452 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
453 self.maybe_evict();
454 self.maybe_flush_l2().await;
455 self.maybe_snapshot_cf_by_threshold().await;
456 }
457
458 _ = health_check_interval.tick() => {
459 self.check_mysql_health().await;
460 }
461
462 _ = wal_drain_interval.tick() => {
463 self.maybe_drain_wal().await;
464 }
465
466 _ = cf_snapshot_interval.tick() => {
467 self.maybe_snapshot_cf_by_time().await;
468 }
469 }
470 }
471 }
472
473 #[tracing::instrument(skip(self))]
475 pub async fn shutdown(&mut self) {
476 use crate::FlushReason;
477
478 let shutdown_start = std::time::Instant::now();
479 info!("Initiating sync engine shutdown...");
480 let _ = self.state.send(EngineState::ShuttingDown);
481
482 let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
483 if let Some(batch) = batch {
484 let batch_size = batch.items.len();
485 info!(batch_size, "Flushing final L2 batch on shutdown");
486 {
487 let mut batcher = self.l2_batcher.lock().await;
488 batcher.add_batch(batch.items);
489 }
490 self.maybe_flush_l2().await;
491 crate::metrics::record_items_written("L2", batch_size);
492 }
493
494 self.snapshot_cuckoo_filters("shutdown").await;
495
496 crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
497 info!("Sync engine shutdown complete");
498 }
499}