Skip to main content

reddb_server/replication/
scheduler.rs

1//! Backup Scheduler — automatic periodic snapshots with optional remote upload.
2//!
3//! Runs as a background thread, configurable via `red_config`:
4//! - `red.config.backup.enabled` — enable/disable
5//! - `red.config.backup.interval_secs` — backup interval (default 3600 = 1 hour)
6//! - `red.config.backup.retention_count` — snapshots to keep
7
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12/// Result of a backup operation.
13#[derive(Debug, Clone)]
14pub struct BackupResult {
15    /// Snapshot identifier
16    pub snapshot_id: u64,
17    /// Whether the snapshot was uploaded to remote backend
18    pub uploaded: bool,
19    /// Duration of the backup in milliseconds
20    pub duration_ms: u64,
21    /// When the backup was taken (unix ms)
22    pub timestamp: u64,
23}
24
25/// Backup history and status.
26#[derive(Debug, Clone)]
27pub struct BackupStatus {
28    /// Whether the scheduler is running
29    pub running: bool,
30    /// Interval in seconds between backups
31    pub interval_secs: u64,
32    /// Last backup result
33    pub last_backup: Option<BackupResult>,
34    /// Total backups completed since start
35    pub total_backups: u64,
36    /// Total backup failures since start
37    pub total_failures: u64,
38    /// Recent backup history
39    pub history: Vec<BackupResult>,
40}
41
42/// Backup scheduler that runs periodic snapshots in a background thread.
43pub struct BackupScheduler {
44    running: Arc<AtomicBool>,
45    interval_secs: Arc<RwLock<u64>>,
46    last_backup: Arc<RwLock<Option<BackupResult>>>,
47    total_backups: Arc<RwLock<u64>>,
48    total_failures: Arc<RwLock<u64>>,
49    history: Arc<RwLock<Vec<BackupResult>>>,
50    max_history: usize,
51}
52
53impl BackupScheduler {
54    /// Create a new scheduler (not yet started).
55    pub fn new(interval_secs: u64) -> Self {
56        Self {
57            running: Arc::new(AtomicBool::new(false)),
58            interval_secs: Arc::new(RwLock::new(interval_secs)),
59            last_backup: Arc::new(RwLock::new(None)),
60            total_backups: Arc::new(RwLock::new(0)),
61            total_failures: Arc::new(RwLock::new(0)),
62            history: Arc::new(RwLock::new(Vec::new())),
63            max_history: 50,
64        }
65    }
66
67    /// Start the scheduler background thread.
68    /// The `backup_fn` is called each interval to perform the actual backup.
69    pub fn start<F>(&self, backup_fn: F)
70    where
71        F: Fn() -> Result<BackupResult, String> + Send + 'static,
72    {
73        if self.running.load(Ordering::SeqCst) {
74            return; // Already running
75        }
76        self.running.store(true, Ordering::SeqCst);
77
78        let running = Arc::clone(&self.running);
79        let interval = Arc::clone(&self.interval_secs);
80        let last_backup = Arc::clone(&self.last_backup);
81        let total_backups = Arc::clone(&self.total_backups);
82        let total_failures = Arc::clone(&self.total_failures);
83        let history = Arc::clone(&self.history);
84        let max_history = self.max_history;
85
86        std::thread::Builder::new()
87            .name("reddb-backup-scheduler".into())
88            .spawn(move || {
89                while running.load(Ordering::SeqCst) {
90                    let secs = *interval.read().unwrap_or_else(|e| e.into_inner());
91                    std::thread::sleep(Duration::from_secs(secs));
92
93                    if !running.load(Ordering::SeqCst) {
94                        break;
95                    }
96
97                    match backup_fn() {
98                        Ok(result) => {
99                            *last_backup.write().unwrap_or_else(|e| e.into_inner()) =
100                                Some(result.clone());
101                            *total_backups.write().unwrap_or_else(|e| e.into_inner()) += 1;
102                            let mut hist = history.write().unwrap_or_else(|e| e.into_inner());
103                            hist.push(result);
104                            if hist.len() > max_history {
105                                hist.remove(0);
106                            }
107                        }
108                        Err(_) => {
109                            *total_failures.write().unwrap_or_else(|e| e.into_inner()) += 1;
110                        }
111                    }
112                }
113            })
114            .ok();
115    }
116
117    /// Stop the scheduler.
118    pub fn stop(&self) {
119        self.running.store(false, Ordering::SeqCst);
120    }
121
122    /// Update the backup interval.
123    pub fn set_interval(&self, secs: u64) {
124        *self
125            .interval_secs
126            .write()
127            .unwrap_or_else(|e| e.into_inner()) = secs;
128    }
129
130    /// Record a manual backup result.
131    pub fn record_backup(&self, result: BackupResult) {
132        *self.last_backup.write().unwrap_or_else(|e| e.into_inner()) = Some(result.clone());
133        *self
134            .total_backups
135            .write()
136            .unwrap_or_else(|e| e.into_inner()) += 1;
137        let mut hist = self.history.write().unwrap_or_else(|e| e.into_inner());
138        hist.push(result);
139        if hist.len() > self.max_history {
140            hist.remove(0);
141        }
142    }
143
144    /// Get current status.
145    pub fn status(&self) -> BackupStatus {
146        BackupStatus {
147            running: self.running.load(Ordering::SeqCst),
148            interval_secs: *self.interval_secs.read().unwrap_or_else(|e| e.into_inner()),
149            last_backup: self
150                .last_backup
151                .read()
152                .unwrap_or_else(|e| e.into_inner())
153                .clone(),
154            total_backups: *self.total_backups.read().unwrap_or_else(|e| e.into_inner()),
155            total_failures: *self
156                .total_failures
157                .read()
158                .unwrap_or_else(|e| e.into_inner()),
159            history: self
160                .history
161                .read()
162                .unwrap_or_else(|e| e.into_inner())
163                .clone(),
164        }
165    }
166
167    /// Check if scheduler is running.
168    pub fn is_running(&self) -> bool {
169        self.running.load(Ordering::SeqCst)
170    }
171}
172
173impl Default for BackupScheduler {
174    fn default() -> Self {
175        Self::new(3600)
176    }
177}