1use anyhow::{Context, Result};
7use colored::Colorize;
8use flate2::read::GzDecoder;
9use flate2::write::GzEncoder;
10use flate2::Compression;
11use redis::Commands;
12use reqwest::Url;
13use serde::{Deserialize, Serialize};
14use std::fs::File;
15use std::io::Read;
16use tar::{Archive, Builder};
17
18fn mask_password(url: &str) -> String {
20 if let Ok(parsed) = Url::parse(url) {
21 if parsed.password().is_some() {
22 let mut masked = parsed.clone();
23 let _ = masked.set_password(Some("****"));
24 return masked.to_string();
25 }
26 }
27 url.to_string()
28}
29
30#[derive(Debug, Serialize, Deserialize)]
32pub struct BackupMetadata {
33 pub timestamp: String,
35 pub broker_type: String,
37 pub broker_url: String,
39 pub queue_count: usize,
41 pub task_count: usize,
43 pub schedule_count: usize,
45 pub version: String,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
51pub struct QueueBackup {
52 pub name: String,
54 pub queue_type: String,
56 pub pending_tasks: Vec<String>,
58 pub dlq_tasks: Vec<String>,
60 pub delayed_tasks: Vec<String>,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
66pub struct Backup {
67 pub metadata: BackupMetadata,
69 pub queues: Vec<QueueBackup>,
71 pub schedules: Vec<ScheduleBackup>,
73}
74
75#[derive(Debug, Serialize, Deserialize)]
77pub struct ScheduleBackup {
78 pub name: String,
80 pub task: String,
82 pub cron: String,
84 pub queue: String,
86 pub args: Option<String>,
88}
89
90pub async fn create_backup(broker_url: &str, output_path: &str) -> Result<()> {
108 println!("{}", "Creating backup...".cyan());
109
110 let client = redis::Client::open(broker_url).context("Failed to create Redis client")?;
112 let mut con = client
113 .get_connection()
114 .context("Failed to connect to Redis")?;
115
116 let queue_keys: Vec<String> = con
118 .keys("celers:queue:*")
119 .context("Failed to get queue keys")?;
120
121 let mut queues = Vec::new();
122 let mut total_tasks = 0;
123
124 for key in queue_keys {
125 let queue_name = key
127 .strip_prefix("celers:queue:")
128 .unwrap_or(&key)
129 .to_string();
130
131 if queue_name.contains(':') {
133 continue;
134 }
135
136 println!(" Backing up queue: {}", queue_name.yellow());
137
138 let pending_tasks: Vec<String> = con
140 .lrange(format!("celers:queue:{queue_name}"), 0, -1)
141 .unwrap_or_default();
142
143 let dlq_tasks: Vec<String> = con
145 .lrange(format!("celers:dlq:{queue_name}"), 0, -1)
146 .unwrap_or_default();
147
148 let delayed_tasks: Vec<String> = con
150 .zrange(format!("celers:delayed:{queue_name}"), 0, -1)
151 .unwrap_or_default();
152
153 let task_count = pending_tasks.len() + dlq_tasks.len() + delayed_tasks.len();
154 total_tasks += task_count;
155
156 println!(
157 " {} tasks (pending: {}, dlq: {}, delayed: {})",
158 task_count,
159 pending_tasks.len(),
160 dlq_tasks.len(),
161 delayed_tasks.len()
162 );
163
164 queues.push(QueueBackup {
165 name: queue_name,
166 queue_type: "fifo".to_string(), pending_tasks,
168 dlq_tasks,
169 delayed_tasks,
170 });
171 }
172
173 let mut schedules = Vec::new();
175 let schedule_keys: Vec<String> = con.keys("celers:beat:schedule:*").unwrap_or_default();
176
177 for key in schedule_keys {
178 let schedule_name = key
179 .strip_prefix("celers:beat:schedule:")
180 .unwrap_or(&key)
181 .to_string();
182
183 if let Ok(_data) = con.get::<_, String>(&key) {
184 schedules.push(ScheduleBackup {
187 name: schedule_name,
188 task: String::new(),
189 cron: String::new(),
190 queue: String::new(),
191 args: None,
192 });
193 }
194 }
195
196 let metadata = BackupMetadata {
198 timestamp: chrono::Utc::now().to_rfc3339(),
199 broker_type: "redis".to_string(),
200 broker_url: mask_password(broker_url),
201 queue_count: queues.len(),
202 task_count: total_tasks,
203 schedule_count: schedules.len(),
204 version: env!("CARGO_PKG_VERSION").to_string(),
205 };
206
207 let backup = Backup {
208 metadata,
209 queues,
210 schedules,
211 };
212
213 let tar_gz = File::create(output_path).context("Failed to create output file")?;
215 let enc = GzEncoder::new(tar_gz, Compression::default());
216 let mut tar = Builder::new(enc);
217
218 let json_data = serde_json::to_string_pretty(&backup)?;
220 let mut header = tar::Header::new_gnu();
221 header.set_size(json_data.len() as u64);
222 header.set_mode(0o644);
223 header.set_cksum();
224
225 tar.append_data(&mut header, "backup.json", json_data.as_bytes())?;
226
227 tar.into_inner()?;
229
230 println!();
231 println!("{} Backup created successfully", "✓".green().bold());
232 println!(" File: {}", output_path.cyan());
233 println!(" Queues: {}", backup.metadata.queue_count);
234 println!(" Tasks: {}", backup.metadata.task_count);
235 println!(" Schedules: {}", backup.metadata.schedule_count);
236
237 Ok(())
238}
239
240#[allow(clippy::too_many_arguments)]
260pub async fn restore_backup(
261 broker_url: &str,
262 input_path: &str,
263 dry_run: bool,
264 selective_queues: Option<Vec<String>>,
265) -> Result<()> {
266 println!("{}", "Restoring from backup...".cyan());
267
268 let tar_gz = File::open(input_path).context("Failed to open backup file")?;
270 let dec = GzDecoder::new(tar_gz);
271 let mut archive = Archive::new(dec);
272
273 let mut backup_json = String::new();
274 for entry in archive.entries()? {
275 let mut entry = entry?;
276 if entry.path()?.to_str() == Some("backup.json") {
277 entry.read_to_string(&mut backup_json)?;
278 break;
279 }
280 }
281
282 let backup: Backup =
283 serde_json::from_str(&backup_json).context("Failed to parse backup data")?;
284
285 println!();
287 println!("{}", "Backup Information:".green().bold());
288 println!(" Created: {}", backup.metadata.timestamp.yellow());
289 println!(" Broker: {}", backup.metadata.broker_url);
290 println!(" Queues: {}", backup.metadata.queue_count);
291 println!(" Tasks: {}", backup.metadata.task_count);
292 println!(" Schedules: {}", backup.metadata.schedule_count);
293 println!();
294
295 if dry_run {
296 println!("{} Dry run mode - no changes will be made", "ℹ".blue());
297 return Ok(());
298 }
299
300 let client = redis::Client::open(broker_url).context("Failed to create Redis client")?;
302 let mut con = client
303 .get_connection()
304 .context("Failed to connect to Redis")?;
305
306 let mut restored_queues = 0;
307 let mut restored_tasks = 0;
308
309 for queue in backup.queues {
311 if let Some(ref filter) = selective_queues {
313 if !filter.contains(&queue.name) {
314 continue;
315 }
316 }
317
318 println!(" Restoring queue: {}", queue.name.yellow());
319
320 for task in queue.pending_tasks {
322 let _: () = con.rpush(format!("celers:queue:{}", queue.name), &task)?;
323 restored_tasks += 1;
324 }
325
326 for task in queue.dlq_tasks {
328 let _: () = con.rpush(format!("celers:dlq:{}", queue.name), &task)?;
329 restored_tasks += 1;
330 }
331
332 for task in queue.delayed_tasks {
334 let _: () = con.zadd(
335 format!("celers:delayed:{}", queue.name),
336 &task,
337 chrono::Utc::now().timestamp(),
338 )?;
339 restored_tasks += 1;
340 }
341
342 restored_queues += 1;
343 }
344
345 println!();
346 println!("{} Restore completed successfully", "✓".green().bold());
347 println!(" Queues restored: {}", restored_queues);
348 println!(" Tasks restored: {}", restored_tasks);
349
350 Ok(())
351}