1use crate::storage::sql::SqlStore;
14use crate::storage::traits::{ArchiveStore, StorageError};
15use crate::sync_item::SyncItem;
16use std::path::Path;
17use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
18use tokio::sync::Mutex;
19use tracing::{debug, error, info, warn};
20
21#[derive(Debug, Clone, Copy)]
23pub struct WalStats {
24 pub pending_items: u64,
26 pub total_written: u64,
28 pub total_drained: u64,
30 pub draining: bool,
32 pub mysql_healthy: bool,
34}
35
36pub struct WriteAheadLog {
38 store: SqlStore,
40 path: String,
42 pending_count: AtomicU64,
44 total_written: AtomicU64,
46 total_drained: AtomicU64,
48 draining: AtomicBool,
50 max_items: u64,
52 max_bytes: u64,
54}
55
56impl WriteAheadLog {
57 const DEFAULT_MAX_BYTES: u64 = 100 * 1024 * 1024;
59
60 pub async fn new(path: impl AsRef<Path>, max_items: u64) -> Result<Self, StorageError> {
62 Self::with_max_bytes(path, max_items, Self::DEFAULT_MAX_BYTES).await
63 }
64
65 pub async fn with_max_bytes(
67 path: impl AsRef<Path>,
68 max_items: u64,
69 max_bytes: u64,
70 ) -> Result<Self, StorageError> {
71 let path_str = path.as_ref().to_string_lossy().to_string();
72 let url = format!("sqlite://{}?mode=rwc", path_str);
73
74 info!(path = %path_str, max_items, max_bytes, "Initializing write-ahead log");
75
76 let store = SqlStore::new(&url).await?;
77
78 let pending = store.count_all().await.unwrap_or(0);
80 if pending > 0 {
81 warn!(pending, "WAL has items from previous run, will drain");
82 }
83
84 Ok(Self {
85 store,
86 path: path_str,
87 pending_count: AtomicU64::new(pending),
88 total_written: AtomicU64::new(0),
89 total_drained: AtomicU64::new(0),
90 draining: AtomicBool::new(false),
91 max_items,
92 max_bytes,
93 })
94 }
95
96 pub async fn write(&self, item: &SyncItem) -> Result<(), StorageError> {
98 let pending = self.pending_count.load(Ordering::Acquire);
100 if pending >= self.max_items {
101 return Err(StorageError::Backend(format!(
102 "WAL full: {} items (max {})",
103 pending, self.max_items
104 )));
105 }
106
107 if pending % 100 == 0 {
109 if let Ok(size) = self.file_size_bytes() {
110 if size >= self.max_bytes {
111 return Err(StorageError::Backend(format!(
112 "WAL file too large: {} bytes (max {})",
113 size, self.max_bytes
114 )));
115 }
116 }
117 }
118
119 self.store.put(item).await?;
120 self.pending_count.fetch_add(1, Ordering::Release);
121 self.total_written.fetch_add(1, Ordering::Relaxed);
122
123 debug!(
124 id = %item.object_id,
125 pending = pending + 1,
126 "Item written to WAL"
127 );
128
129 Ok(())
130 }
131
132 pub fn file_size_bytes(&self) -> std::io::Result<u64> {
134 std::fs::metadata(&self.path).map(|m| m.len())
135 }
136
137 pub async fn checkpoint(&self) -> Result<(), StorageError> {
140 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
142 .execute(&self.store.pool())
143 .await
144 .map_err(|e| StorageError::Backend(format!("WAL checkpoint failed: {}", e)))?;
145
146 debug!(path = %self.path, "WAL checkpoint completed");
147 Ok(())
148 }
149
150 #[must_use]
152 pub fn has_pending(&self) -> bool {
153 self.pending_count.load(Ordering::Acquire) > 0
154 }
155
156 #[must_use]
158 pub fn stats(&self, mysql_healthy: bool) -> WalStats {
159 WalStats {
160 pending_items: self.pending_count.load(Ordering::Acquire),
161 total_written: self.total_written.load(Ordering::Relaxed),
162 total_drained: self.total_drained.load(Ordering::Relaxed),
163 draining: self.draining.load(Ordering::Acquire),
164 mysql_healthy,
165 }
166 }
167
168 #[must_use]
170 pub fn under_pressure(&self) -> bool {
171 let pending = self.pending_count.load(Ordering::Acquire);
172 pending as f64 / self.max_items as f64 >= 0.8
173 }
174
175 pub async fn drain_to(
179 &self,
180 mysql: &dyn ArchiveStore,
181 batch_size: usize,
182 ) -> Result<Vec<String>, StorageError> {
183 if self.draining.swap(true, Ordering::AcqRel) {
184 return Ok(Vec::new());
186 }
187
188 let _guard = DrainGuard(&self.draining);
189
190 let pending = self.pending_count.load(Ordering::Acquire);
191 if pending == 0 {
192 return Ok(Vec::new());
193 }
194
195 info!(pending, batch_size, "Starting WAL drain to MySQL");
196
197 let items = self.store.scan_batch(batch_size).await?;
199 let batch_len = items.len();
200
201 if batch_len == 0 {
202 return Ok(Vec::new());
203 }
204
205 let ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
207
208 let mut items_mut: Vec<_> = items;
210 match mysql.put_batch(&mut items_mut).await {
211 Ok(result) => {
212 if !result.verified {
213 warn!(
214 batch_id = %result.batch_id,
215 written = result.written,
216 "WAL drain batch verification failed"
217 );
218 }
219
220 match self.store.delete_batch(&ids).await {
222 Ok(deleted) => {
223 debug!(deleted, "Batch deleted from WAL");
224 }
225 Err(e) => {
226 error!(error = %e, "Failed to batch delete from WAL after MySQL write");
228 }
229 }
230
231 let drained = result.written;
232 self.pending_count.fetch_sub(drained as u64, Ordering::Release);
233 self.total_drained.fetch_add(drained as u64, Ordering::Relaxed);
234
235 info!(drained, remaining = pending - drained as u64, "WAL drain batch complete");
236
237 if self.pending_count.load(Ordering::Acquire) == 0 {
239 if let Err(e) = self.checkpoint().await {
240 warn!(error = %e, "Failed to checkpoint WAL after drain");
241 }
242 }
243
244 Ok(ids)
245 }
246 Err(e) => {
247 warn!(
249 error = %e,
250 batch_size = batch_len,
251 "MySQL batch write failed during drain"
252 );
253 Err(e)
254 }
255 }
256 }
257
258 pub fn path(&self) -> &str {
260 &self.path
261 }
262}
263
264struct DrainGuard<'a>(&'a AtomicBool);
266
267impl Drop for DrainGuard<'_> {
268 fn drop(&mut self) {
269 self.0.store(false, Ordering::Release);
270 }
271}
272
273pub struct MysqlHealthChecker {
275 healthy: AtomicBool,
277 failures: AtomicU64,
279 checking: Mutex<()>,
281}
282
283impl MysqlHealthChecker {
284 pub fn new() -> Self {
285 Self {
286 healthy: AtomicBool::new(true), failures: AtomicU64::new(0),
288 checking: Mutex::new(()),
289 }
290 }
291
292 pub fn record_success(&self) {
294 self.failures.store(0, Ordering::Release);
295 self.healthy.store(true, Ordering::Release);
296 }
297
298 pub fn record_failure(&self) {
300 let failures = self.failures.fetch_add(1, Ordering::AcqRel) + 1;
301 if failures >= 3 {
302 self.healthy.store(false, Ordering::Release);
303 }
304 }
305
306 pub fn is_healthy(&self) -> bool {
308 self.healthy.load(Ordering::Acquire)
309 }
310
311 pub fn failure_count(&self) -> u64 {
313 self.failures.load(Ordering::Acquire)
314 }
315
316 pub async fn check(&self, mysql: &dyn ArchiveStore) -> bool {
318 let _guard = self.checking.lock().await;
320
321 match mysql.get("__health_check__").await {
323 Ok(_) => {
324 self.record_success();
325 true
326 }
327 Err(_) => {
328 self.record_failure();
329 false
330 }
331 }
332 }
333}
334
335impl Default for MysqlHealthChecker {
336 fn default() -> Self {
337 Self::new()
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::sync_item::SyncItem;
345 use serde_json::json;
346 use tempfile::tempdir;
347
348 fn test_item(id: &str) -> SyncItem {
349 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
350 }
351
352 #[tokio::test]
353 async fn test_wal_write_and_stats() {
354 let dir = tempdir().unwrap();
355 let wal_path = dir.path().join("test.db");
356
357 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
358
359 let stats = wal.stats(true);
361 assert_eq!(stats.pending_items, 0);
362 assert_eq!(stats.total_written, 0);
363 assert!(!wal.has_pending());
364
365 for i in 0..5 {
367 wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
368 }
369
370 let stats = wal.stats(true);
371 assert_eq!(stats.pending_items, 5);
372 assert_eq!(stats.total_written, 5);
373 assert!(wal.has_pending());
374 }
375
376 #[tokio::test]
377 async fn test_wal_max_items_limit() {
378 let dir = tempdir().unwrap();
379 let wal_path = dir.path().join("test_limit.db");
380
381 let wal = WriteAheadLog::new(wal_path.as_path(), 3).await.unwrap();
383
384 wal.write(&test_item("item-1")).await.unwrap();
386 wal.write(&test_item("item-2")).await.unwrap();
387 wal.write(&test_item("item-3")).await.unwrap();
388
389 let result: Result<(), StorageError> = wal.write(&test_item("item-4")).await;
391 assert!(result.is_err());
392 assert!(result.unwrap_err().to_string().contains("WAL full"));
393 }
394
395 #[tokio::test]
396 async fn test_wal_under_pressure() {
397 let dir = tempdir().unwrap();
398 let wal_path = dir.path().join("test_pressure.db");
399
400 let wal = WriteAheadLog::new(wal_path.as_path(), 10).await.unwrap();
401
402 assert!(!wal.under_pressure());
404
405 for i in 0..8 {
407 wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
408 }
409
410 assert!(wal.under_pressure());
412 }
413
414 #[tokio::test]
415 async fn test_wal_persistence_across_restart() {
416 let dir = tempdir().unwrap();
417 let wal_path = dir.path().join("test_restart.db");
418
419 {
421 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
422 wal.write(&test_item("persist-1")).await.unwrap();
423 wal.write(&test_item("persist-2")).await.unwrap();
424 wal.write(&test_item("persist-3")).await.unwrap();
425 }
426
427 {
429 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
430 assert_eq!(wal.stats(true).pending_items, 3);
431 assert!(wal.has_pending());
432 }
433 }
434
435 #[tokio::test]
436 async fn test_wal_file_size_check() {
437 let dir = tempdir().unwrap();
438 let wal_path = dir.path().join("test_size.db");
439
440 let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
441
442 wal.write(&test_item("size-test")).await.unwrap();
444
445 let size = wal.file_size_bytes().unwrap();
447 assert!(size > 0);
448 }
449
450 #[tokio::test]
451 async fn test_wal_with_max_bytes_limit() {
452 let dir = tempdir().unwrap();
453 let wal_path = dir.path().join("test_bytes.db");
454
455 let wal = WriteAheadLog::with_max_bytes(wal_path.as_path(), 1000, 1024).await.unwrap();
457
458 for i in 0..50 {
461 let result: Result<(), StorageError> = wal.write(&test_item(&format!("bytes-item-{}", i))).await;
462 if result.is_err() {
463 return;
465 }
466 }
467
468 }
471
472 #[test]
475 fn test_health_checker_initial_state() {
476 let checker = MysqlHealthChecker::new();
477 assert!(checker.is_healthy()); assert_eq!(checker.failure_count(), 0);
479 }
480
481 #[test]
482 fn test_health_checker_failure_threshold() {
483 let checker = MysqlHealthChecker::new();
484
485 checker.record_failure();
487 assert!(checker.is_healthy());
488 assert_eq!(checker.failure_count(), 1);
489
490 checker.record_failure();
491 assert!(checker.is_healthy());
492 assert_eq!(checker.failure_count(), 2);
493
494 checker.record_failure();
496 assert!(!checker.is_healthy());
497 assert_eq!(checker.failure_count(), 3);
498 }
499
500 #[test]
501 fn test_health_checker_success_resets() {
502 let checker = MysqlHealthChecker::new();
503
504 checker.record_failure();
506 checker.record_failure();
507 checker.record_failure();
508 assert!(!checker.is_healthy());
509
510 checker.record_success();
512 assert!(checker.is_healthy());
513 assert_eq!(checker.failure_count(), 0);
514 }
515
516 #[test]
517 fn test_health_checker_partial_failures_then_success() {
518 let checker = MysqlHealthChecker::new();
519
520 checker.record_failure();
522 checker.record_failure();
523 checker.record_success();
524
525 assert!(checker.is_healthy());
527 assert_eq!(checker.failure_count(), 0);
528 }
529}