1use crate::storage::sql::SqlStore;
11use crate::storage::traits::{ArchiveStore, StorageError};
12use crate::sync_item::SyncItem;
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
15use tokio::sync::Mutex;
16use tracing::{debug, error, info, warn};
17
18#[derive(Debug, Clone, Copy)]
20pub struct WalStats {
21 pub pending_items: u64,
23 pub total_written: u64,
25 pub total_drained: u64,
27 pub draining: bool,
29 pub mysql_healthy: bool,
31}
32
33pub struct WriteAheadLog {
35 store: SqlStore,
37 path: String,
39 pending_count: AtomicU64,
41 total_written: AtomicU64,
43 total_drained: AtomicU64,
45 draining: AtomicBool,
47 max_items: u64,
49 max_bytes: u64,
51}
52
53impl WriteAheadLog {
54 const DEFAULT_MAX_BYTES: u64 = 100 * 1024 * 1024;
56
57 pub async fn new(path: impl AsRef<Path>, max_items: u64) -> Result<Self, StorageError> {
59 Self::with_max_bytes(path, max_items, Self::DEFAULT_MAX_BYTES).await
60 }
61
62 pub async fn with_max_bytes(
64 path: impl AsRef<Path>,
65 max_items: u64,
66 max_bytes: u64,
67 ) -> Result<Self, StorageError> {
68 let path_str = path.as_ref().to_string_lossy().to_string();
69 let url = format!("sqlite://{}?mode=rwc", path_str);
70
71 info!(path = %path_str, max_items, max_bytes, "Initializing write-ahead log");
72
73 let store = SqlStore::new(&url).await?;
74
75 let pending = store.count_all().await.unwrap_or(0);
77 if pending > 0 {
78 warn!(pending, "WAL has items from previous run, will drain");
79 }
80
81 Ok(Self {
82 store,
83 path: path_str,
84 pending_count: AtomicU64::new(pending),
85 total_written: AtomicU64::new(0),
86 total_drained: AtomicU64::new(0),
87 draining: AtomicBool::new(false),
88 max_items,
89 max_bytes,
90 })
91 }
92
93 pub async fn write(&self, item: &SyncItem) -> Result<(), StorageError> {
95 let pending = self.pending_count.load(Ordering::Acquire);
97 if pending >= self.max_items {
98 return Err(StorageError::Backend(format!(
99 "WAL full: {} items (max {})",
100 pending, self.max_items
101 )));
102 }
103
104 if pending % 100 == 0 {
106 if let Ok(size) = self.file_size_bytes() {
107 if size >= self.max_bytes {
108 return Err(StorageError::Backend(format!(
109 "WAL file too large: {} bytes (max {})",
110 size, self.max_bytes
111 )));
112 }
113 }
114 }
115
116 self.store.put(item).await?;
117 self.pending_count.fetch_add(1, Ordering::Release);
118 self.total_written.fetch_add(1, Ordering::Relaxed);
119
120 debug!(
121 id = %item.object_id,
122 pending = pending + 1,
123 "Item written to WAL"
124 );
125
126 Ok(())
127 }
128
129 pub fn file_size_bytes(&self) -> std::io::Result<u64> {
131 std::fs::metadata(&self.path).map(|m| m.len())
132 }
133
134 pub async fn checkpoint(&self) -> Result<(), StorageError> {
137 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
139 .execute(&self.store.pool())
140 .await
141 .map_err(|e| StorageError::Backend(format!("WAL checkpoint failed: {}", e)))?;
142
143 debug!(path = %self.path, "WAL checkpoint completed");
144 Ok(())
145 }
146
147 #[must_use]
149 pub fn has_pending(&self) -> bool {
150 self.pending_count.load(Ordering::Acquire) > 0
151 }
152
153 #[must_use]
155 pub fn stats(&self, mysql_healthy: bool) -> WalStats {
156 WalStats {
157 pending_items: self.pending_count.load(Ordering::Acquire),
158 total_written: self.total_written.load(Ordering::Relaxed),
159 total_drained: self.total_drained.load(Ordering::Relaxed),
160 draining: self.draining.load(Ordering::Acquire),
161 mysql_healthy,
162 }
163 }
164
165 #[must_use]
167 pub fn under_pressure(&self) -> bool {
168 let pending = self.pending_count.load(Ordering::Acquire);
169 pending as f64 / self.max_items as f64 >= 0.8
170 }
171
172 pub async fn drain_to(
176 &self,
177 mysql: &dyn ArchiveStore,
178 batch_size: usize,
179 ) -> Result<Vec<String>, StorageError> {
180 if self.draining.swap(true, Ordering::AcqRel) {
181 return Ok(Vec::new());
183 }
184
185 let _guard = DrainGuard(&self.draining);
186
187 let pending = self.pending_count.load(Ordering::Acquire);
188 if pending == 0 {
189 return Ok(Vec::new());
190 }
191
192 info!(pending, batch_size, "Starting WAL drain to MySQL");
193
194 let items = self.store.scan_batch(batch_size).await?;
196 let batch_len = items.len();
197
198 if batch_len == 0 {
199 return Ok(Vec::new());
200 }
201
202 let ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
204
205 let mut items_mut: Vec<_> = items;
207 match mysql.put_batch(&mut items_mut).await {
208 Ok(result) => {
209 if !result.verified {
210 warn!(
211 batch_id = %result.batch_id,
212 written = result.written,
213 "WAL drain batch verification failed"
214 );
215 }
216
217 match self.store.delete_batch(&ids).await {
219 Ok(deleted) => {
220 debug!(deleted, "Batch deleted from WAL");
221 }
222 Err(e) => {
223 error!(error = %e, "Failed to batch delete from WAL after MySQL write");
225 }
226 }
227
228 let drained = result.written;
229 self.pending_count.fetch_sub(drained as u64, Ordering::Release);
230 self.total_drained.fetch_add(drained as u64, Ordering::Relaxed);
231
232 info!(drained, remaining = pending - drained as u64, "WAL drain batch complete");
233
234 if self.pending_count.load(Ordering::Acquire) == 0 {
236 if let Err(e) = self.checkpoint().await {
237 warn!(error = %e, "Failed to checkpoint WAL after drain");
238 }
239 }
240
241 Ok(ids)
242 }
243 Err(e) => {
244 warn!(
246 error = %e,
247 batch_size = batch_len,
248 "MySQL batch write failed during drain"
249 );
250 Err(e)
251 }
252 }
253 }
254
255 pub fn path(&self) -> &str {
257 &self.path
258 }
259}
260
261struct DrainGuard<'a>(&'a AtomicBool);
263
264impl Drop for DrainGuard<'_> {
265 fn drop(&mut self) {
266 self.0.store(false, Ordering::Release);
267 }
268}
269
270pub struct MysqlHealthChecker {
272 healthy: AtomicBool,
274 failures: AtomicU64,
276 checking: Mutex<()>,
278}
279
280impl MysqlHealthChecker {
281 pub fn new() -> Self {
282 Self {
283 healthy: AtomicBool::new(true), failures: AtomicU64::new(0),
285 checking: Mutex::new(()),
286 }
287 }
288
289 pub fn record_success(&self) {
291 self.failures.store(0, Ordering::Release);
292 self.healthy.store(true, Ordering::Release);
293 }
294
295 pub fn record_failure(&self) {
297 let failures = self.failures.fetch_add(1, Ordering::AcqRel) + 1;
298 if failures >= 3 {
299 self.healthy.store(false, Ordering::Release);
300 }
301 }
302
303 pub fn is_healthy(&self) -> bool {
305 self.healthy.load(Ordering::Acquire)
306 }
307
308 pub fn failure_count(&self) -> u64 {
310 self.failures.load(Ordering::Acquire)
311 }
312
313 pub async fn check(&self, mysql: &dyn ArchiveStore) -> bool {
315 let _guard = self.checking.lock().await;
317
318 match mysql.get("__health_check__").await {
320 Ok(_) => {
321 self.record_success();
322 true
323 }
324 Err(_) => {
325 self.record_failure();
326 false
327 }
328 }
329 }
330}
331
332impl Default for MysqlHealthChecker {
333 fn default() -> Self {
334 Self::new()
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::sync_item::SyncItem;
342 use serde_json::json;
343 use tempfile::tempdir;
344
345 fn test_item(id: &str) -> SyncItem {
346 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
347 }
348
349 #[tokio::test]
350 async fn test_wal_write_and_stats() {
351 let dir = tempdir().unwrap();
352 let wal_path = dir.path().join("test.db");
353
354 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
355
356 let stats = wal.stats(true);
358 assert_eq!(stats.pending_items, 0);
359 assert_eq!(stats.total_written, 0);
360 assert!(!wal.has_pending());
361
362 for i in 0..5 {
364 wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
365 }
366
367 let stats = wal.stats(true);
368 assert_eq!(stats.pending_items, 5);
369 assert_eq!(stats.total_written, 5);
370 assert!(wal.has_pending());
371 }
372
373 #[tokio::test]
374 async fn test_wal_max_items_limit() {
375 let dir = tempdir().unwrap();
376 let wal_path = dir.path().join("test_limit.db");
377
378 let wal = WriteAheadLog::new(wal_path.as_path(), 3).await.unwrap();
380
381 wal.write(&test_item("item-1")).await.unwrap();
383 wal.write(&test_item("item-2")).await.unwrap();
384 wal.write(&test_item("item-3")).await.unwrap();
385
386 let result: Result<(), StorageError> = wal.write(&test_item("item-4")).await;
388 assert!(result.is_err());
389 assert!(result.unwrap_err().to_string().contains("WAL full"));
390 }
391
392 #[tokio::test]
393 async fn test_wal_under_pressure() {
394 let dir = tempdir().unwrap();
395 let wal_path = dir.path().join("test_pressure.db");
396
397 let wal = WriteAheadLog::new(wal_path.as_path(), 10).await.unwrap();
398
399 assert!(!wal.under_pressure());
401
402 for i in 0..8 {
404 wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
405 }
406
407 assert!(wal.under_pressure());
409 }
410
411 #[tokio::test]
412 async fn test_wal_persistence_across_restart() {
413 let dir = tempdir().unwrap();
414 let wal_path = dir.path().join("test_restart.db");
415
416 {
418 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
419 wal.write(&test_item("persist-1")).await.unwrap();
420 wal.write(&test_item("persist-2")).await.unwrap();
421 wal.write(&test_item("persist-3")).await.unwrap();
422 }
423
424 {
426 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
427 assert_eq!(wal.stats(true).pending_items, 3);
428 assert!(wal.has_pending());
429 }
430 }
431
432 #[tokio::test]
433 async fn test_wal_file_size_check() {
434 let dir = tempdir().unwrap();
435 let wal_path = dir.path().join("test_size.db");
436
437 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
438
439 wal.write(&test_item("size-test")).await.unwrap();
441
442 let size = wal.file_size_bytes().unwrap();
444 assert!(size > 0);
445 }
446
447 #[tokio::test]
448 async fn test_wal_with_max_bytes_limit() {
449 let dir = tempdir().unwrap();
450 let wal_path = dir.path().join("test_bytes.db");
451
452 let wal = WriteAheadLog::with_max_bytes(wal_path.as_path(), 1000, 1024).await.unwrap();
454
455 for i in 0..50 {
458 let result: Result<(), StorageError> = wal.write(&test_item(&format!("bytes-item-{}", i))).await;
459 if result.is_err() {
460 return;
462 }
463 }
464
465 }
468
469 #[test]
472 fn test_health_checker_initial_state() {
473 let checker = MysqlHealthChecker::new();
474 assert!(checker.is_healthy()); assert_eq!(checker.failure_count(), 0);
476 }
477
478 #[test]
479 fn test_health_checker_failure_threshold() {
480 let checker = MysqlHealthChecker::new();
481
482 checker.record_failure();
484 assert!(checker.is_healthy());
485 assert_eq!(checker.failure_count(), 1);
486
487 checker.record_failure();
488 assert!(checker.is_healthy());
489 assert_eq!(checker.failure_count(), 2);
490
491 checker.record_failure();
493 assert!(!checker.is_healthy());
494 assert_eq!(checker.failure_count(), 3);
495 }
496
497 #[test]
498 fn test_health_checker_success_resets() {
499 let checker = MysqlHealthChecker::new();
500
501 checker.record_failure();
503 checker.record_failure();
504 checker.record_failure();
505 assert!(!checker.is_healthy());
506
507 checker.record_success();
509 assert!(checker.is_healthy());
510 assert_eq!(checker.failure_count(), 0);
511 }
512
513 #[test]
514 fn test_health_checker_partial_failures_then_success() {
515 let checker = MysqlHealthChecker::new();
516
517 checker.record_failure();
519 checker.record_failure();
520 checker.record_success();
521
522 assert!(checker.is_healthy());
524 assert_eq!(checker.failure_count(), 0);
525 }
526}