reddb_server/replication/
scheduler.rs1use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone)]
14pub struct BackupResult {
15 pub snapshot_id: u64,
17 pub uploaded: bool,
19 pub duration_ms: u64,
21 pub timestamp: u64,
23}
24
25#[derive(Debug, Clone)]
27pub struct BackupStatus {
28 pub running: bool,
30 pub interval_secs: u64,
32 pub last_backup: Option<BackupResult>,
34 pub total_backups: u64,
36 pub total_failures: u64,
38 pub history: Vec<BackupResult>,
40}
41
42pub struct BackupScheduler {
44 running: Arc<AtomicBool>,
45 interval_secs: Arc<RwLock<u64>>,
46 last_backup: Arc<RwLock<Option<BackupResult>>>,
47 total_backups: Arc<RwLock<u64>>,
48 total_failures: Arc<RwLock<u64>>,
49 history: Arc<RwLock<Vec<BackupResult>>>,
50 max_history: usize,
51}
52
53impl BackupScheduler {
54 pub fn new(interval_secs: u64) -> Self {
56 Self {
57 running: Arc::new(AtomicBool::new(false)),
58 interval_secs: Arc::new(RwLock::new(interval_secs)),
59 last_backup: Arc::new(RwLock::new(None)),
60 total_backups: Arc::new(RwLock::new(0)),
61 total_failures: Arc::new(RwLock::new(0)),
62 history: Arc::new(RwLock::new(Vec::new())),
63 max_history: 50,
64 }
65 }
66
67 pub fn start<F>(&self, backup_fn: F)
70 where
71 F: Fn() -> Result<BackupResult, String> + Send + 'static,
72 {
73 if self.running.load(Ordering::SeqCst) {
74 return; }
76 self.running.store(true, Ordering::SeqCst);
77
78 let running = Arc::clone(&self.running);
79 let interval = Arc::clone(&self.interval_secs);
80 let last_backup = Arc::clone(&self.last_backup);
81 let total_backups = Arc::clone(&self.total_backups);
82 let total_failures = Arc::clone(&self.total_failures);
83 let history = Arc::clone(&self.history);
84 let max_history = self.max_history;
85
86 std::thread::Builder::new()
87 .name("reddb-backup-scheduler".into())
88 .spawn(move || {
89 while running.load(Ordering::SeqCst) {
90 let secs = *interval.read().unwrap_or_else(|e| e.into_inner());
91 std::thread::sleep(Duration::from_secs(secs));
92
93 if !running.load(Ordering::SeqCst) {
94 break;
95 }
96
97 match backup_fn() {
98 Ok(result) => {
99 *last_backup.write().unwrap_or_else(|e| e.into_inner()) =
100 Some(result.clone());
101 *total_backups.write().unwrap_or_else(|e| e.into_inner()) += 1;
102 let mut hist = history.write().unwrap_or_else(|e| e.into_inner());
103 hist.push(result);
104 if hist.len() > max_history {
105 hist.remove(0);
106 }
107 }
108 Err(_) => {
109 *total_failures.write().unwrap_or_else(|e| e.into_inner()) += 1;
110 }
111 }
112 }
113 })
114 .ok();
115 }
116
117 pub fn stop(&self) {
119 self.running.store(false, Ordering::SeqCst);
120 }
121
122 pub fn set_interval(&self, secs: u64) {
124 *self
125 .interval_secs
126 .write()
127 .unwrap_or_else(|e| e.into_inner()) = secs;
128 }
129
130 pub fn record_backup(&self, result: BackupResult) {
132 *self.last_backup.write().unwrap_or_else(|e| e.into_inner()) = Some(result.clone());
133 *self
134 .total_backups
135 .write()
136 .unwrap_or_else(|e| e.into_inner()) += 1;
137 let mut hist = self.history.write().unwrap_or_else(|e| e.into_inner());
138 hist.push(result);
139 if hist.len() > self.max_history {
140 hist.remove(0);
141 }
142 }
143
144 pub fn status(&self) -> BackupStatus {
146 BackupStatus {
147 running: self.running.load(Ordering::SeqCst),
148 interval_secs: *self.interval_secs.read().unwrap_or_else(|e| e.into_inner()),
149 last_backup: self
150 .last_backup
151 .read()
152 .unwrap_or_else(|e| e.into_inner())
153 .clone(),
154 total_backups: *self.total_backups.read().unwrap_or_else(|e| e.into_inner()),
155 total_failures: *self
156 .total_failures
157 .read()
158 .unwrap_or_else(|e| e.into_inner()),
159 history: self
160 .history
161 .read()
162 .unwrap_or_else(|e| e.into_inner())
163 .clone(),
164 }
165 }
166
167 pub fn is_running(&self) -> bool {
169 self.running.load(Ordering::SeqCst)
170 }
171}
172
173impl Default for BackupScheduler {
174 fn default() -> Self {
175 Self::new(3600)
176 }
177}