celers_cli/
backup.rs

1//! Backup and restore functionality for CeleRS broker state.
2//!
3//! Provides complete backup and restore capabilities for broker state including
4//! queues, scheduled tasks, worker configurations, and metrics.
5
6use anyhow::{Context, Result};
7use colored::Colorize;
8use flate2::read::GzDecoder;
9use flate2::write::GzEncoder;
10use flate2::Compression;
11use redis::Commands;
12use reqwest::Url;
13use serde::{Deserialize, Serialize};
14use std::fs::File;
15use std::io::Read;
16use tar::{Archive, Builder};
17
18/// Mask password in URL for safe display
19fn mask_password(url: &str) -> String {
20    if let Ok(parsed) = Url::parse(url) {
21        if parsed.password().is_some() {
22            let mut masked = parsed.clone();
23            let _ = masked.set_password(Some("****"));
24            return masked.to_string();
25        }
26    }
27    url.to_string()
28}
29
30/// Backup metadata
31#[derive(Debug, Serialize, Deserialize)]
32pub struct BackupMetadata {
33    /// Backup creation timestamp
34    pub timestamp: String,
35    /// Broker type
36    pub broker_type: String,
37    /// Broker URL (sanitized)
38    pub broker_url: String,
39    /// Number of queues backed up
40    pub queue_count: usize,
41    /// Number of tasks backed up
42    pub task_count: usize,
43    /// Number of scheduled tasks backed up
44    pub schedule_count: usize,
45    /// CeleRS version
46    pub version: String,
47}
48
49/// Queue backup data
50#[derive(Debug, Serialize, Deserialize)]
51pub struct QueueBackup {
52    /// Queue name
53    pub name: String,
54    /// Queue type (fifo or priority)
55    pub queue_type: String,
56    /// Pending tasks
57    pub pending_tasks: Vec<String>,
58    /// DLQ tasks
59    pub dlq_tasks: Vec<String>,
60    /// Delayed tasks
61    pub delayed_tasks: Vec<String>,
62}
63
64/// Complete backup structure
65#[derive(Debug, Serialize, Deserialize)]
66pub struct Backup {
67    /// Backup metadata
68    pub metadata: BackupMetadata,
69    /// Queue backups
70    pub queues: Vec<QueueBackup>,
71    /// Scheduled tasks
72    pub schedules: Vec<ScheduleBackup>,
73}
74
75/// Scheduled task backup
76#[derive(Debug, Serialize, Deserialize)]
77pub struct ScheduleBackup {
78    /// Schedule name
79    pub name: String,
80    /// Task name
81    pub task: String,
82    /// Cron expression
83    pub cron: String,
84    /// Queue name
85    pub queue: String,
86    /// Task arguments (JSON)
87    pub args: Option<String>,
88}
89
90/// Create a full backup of broker state
91///
92/// # Arguments
93///
94/// * `broker_url` - Broker connection URL
95/// * `output_path` - Output file path (should end with .tar.gz)
96///
97/// # Examples
98///
99/// ```no_run
100/// use celers_cli::backup::create_backup;
101///
102/// # async fn example() -> anyhow::Result<()> {
103/// create_backup("redis://localhost:6379", "backup.tar.gz").await?;
104/// # Ok(())
105/// # }
106/// ```
107pub async fn create_backup(broker_url: &str, output_path: &str) -> Result<()> {
108    println!("{}", "Creating backup...".cyan());
109
110    // Connect to Redis
111    let client = redis::Client::open(broker_url).context("Failed to create Redis client")?;
112    let mut con = client
113        .get_connection()
114        .context("Failed to connect to Redis")?;
115
116    // Get all queue names
117    let queue_keys: Vec<String> = con
118        .keys("celers:queue:*")
119        .context("Failed to get queue keys")?;
120
121    let mut queues = Vec::new();
122    let mut total_tasks = 0;
123
124    for key in queue_keys {
125        // Extract queue name from key
126        let queue_name = key
127            .strip_prefix("celers:queue:")
128            .unwrap_or(&key)
129            .to_string();
130
131        // Skip internal keys
132        if queue_name.contains(':') {
133            continue;
134        }
135
136        println!("  Backing up queue: {}", queue_name.yellow());
137
138        // Get pending tasks
139        let pending_tasks: Vec<String> = con
140            .lrange(format!("celers:queue:{queue_name}"), 0, -1)
141            .unwrap_or_default();
142
143        // Get DLQ tasks
144        let dlq_tasks: Vec<String> = con
145            .lrange(format!("celers:dlq:{queue_name}"), 0, -1)
146            .unwrap_or_default();
147
148        // Get delayed tasks
149        let delayed_tasks: Vec<String> = con
150            .zrange(format!("celers:delayed:{queue_name}"), 0, -1)
151            .unwrap_or_default();
152
153        let task_count = pending_tasks.len() + dlq_tasks.len() + delayed_tasks.len();
154        total_tasks += task_count;
155
156        println!(
157            "    {} tasks (pending: {}, dlq: {}, delayed: {})",
158            task_count,
159            pending_tasks.len(),
160            dlq_tasks.len(),
161            delayed_tasks.len()
162        );
163
164        queues.push(QueueBackup {
165            name: queue_name,
166            queue_type: "fifo".to_string(), // Default to FIFO
167            pending_tasks,
168            dlq_tasks,
169            delayed_tasks,
170        });
171    }
172
173    // Get scheduled tasks
174    let mut schedules = Vec::new();
175    let schedule_keys: Vec<String> = con.keys("celers:beat:schedule:*").unwrap_or_default();
176
177    for key in schedule_keys {
178        let schedule_name = key
179            .strip_prefix("celers:beat:schedule:")
180            .unwrap_or(&key)
181            .to_string();
182
183        if let Ok(_data) = con.get::<_, String>(&key) {
184            // Parse schedule data (simplified - actual format may vary)
185            // TODO: Parse actual schedule format from Redis
186            schedules.push(ScheduleBackup {
187                name: schedule_name,
188                task: String::new(),
189                cron: String::new(),
190                queue: String::new(),
191                args: None,
192            });
193        }
194    }
195
196    // Create backup metadata
197    let metadata = BackupMetadata {
198        timestamp: chrono::Utc::now().to_rfc3339(),
199        broker_type: "redis".to_string(),
200        broker_url: mask_password(broker_url),
201        queue_count: queues.len(),
202        task_count: total_tasks,
203        schedule_count: schedules.len(),
204        version: env!("CARGO_PKG_VERSION").to_string(),
205    };
206
207    let backup = Backup {
208        metadata,
209        queues,
210        schedules,
211    };
212
213    // Create tar.gz archive
214    let tar_gz = File::create(output_path).context("Failed to create output file")?;
215    let enc = GzEncoder::new(tar_gz, Compression::default());
216    let mut tar = Builder::new(enc);
217
218    // Add backup data as JSON
219    let json_data = serde_json::to_string_pretty(&backup)?;
220    let mut header = tar::Header::new_gnu();
221    header.set_size(json_data.len() as u64);
222    header.set_mode(0o644);
223    header.set_cksum();
224
225    tar.append_data(&mut header, "backup.json", json_data.as_bytes())?;
226
227    // Finish the archive
228    tar.into_inner()?;
229
230    println!();
231    println!("{} Backup created successfully", "✓".green().bold());
232    println!("  File: {}", output_path.cyan());
233    println!("  Queues: {}", backup.metadata.queue_count);
234    println!("  Tasks: {}", backup.metadata.task_count);
235    println!("  Schedules: {}", backup.metadata.schedule_count);
236
237    Ok(())
238}
239
240/// Restore broker state from backup
241///
242/// # Arguments
243///
244/// * `broker_url` - Broker connection URL
245/// * `input_path` - Backup file path (.tar.gz)
246/// * `dry_run` - If true, validate without actually restoring
247/// * `selective_queues` - Optional list of specific queues to restore
248///
249/// # Examples
250///
251/// ```no_run
252/// use celers_cli::backup::restore_backup;
253///
254/// # async fn example() -> anyhow::Result<()> {
255/// restore_backup("redis://localhost:6379", "backup.tar.gz", false, None).await?;
256/// # Ok(())
257/// # }
258/// ```
259#[allow(clippy::too_many_arguments)]
260pub async fn restore_backup(
261    broker_url: &str,
262    input_path: &str,
263    dry_run: bool,
264    selective_queues: Option<Vec<String>>,
265) -> Result<()> {
266    println!("{}", "Restoring from backup...".cyan());
267
268    // Extract and parse backup
269    let tar_gz = File::open(input_path).context("Failed to open backup file")?;
270    let dec = GzDecoder::new(tar_gz);
271    let mut archive = Archive::new(dec);
272
273    let mut backup_json = String::new();
274    for entry in archive.entries()? {
275        let mut entry = entry?;
276        if entry.path()?.to_str() == Some("backup.json") {
277            entry.read_to_string(&mut backup_json)?;
278            break;
279        }
280    }
281
282    let backup: Backup =
283        serde_json::from_str(&backup_json).context("Failed to parse backup data")?;
284
285    // Display backup info
286    println!();
287    println!("{}", "Backup Information:".green().bold());
288    println!("  Created: {}", backup.metadata.timestamp.yellow());
289    println!("  Broker: {}", backup.metadata.broker_url);
290    println!("  Queues: {}", backup.metadata.queue_count);
291    println!("  Tasks: {}", backup.metadata.task_count);
292    println!("  Schedules: {}", backup.metadata.schedule_count);
293    println!();
294
295    if dry_run {
296        println!("{} Dry run mode - no changes will be made", "ℹ".blue());
297        return Ok(());
298    }
299
300    // Connect to Redis
301    let client = redis::Client::open(broker_url).context("Failed to create Redis client")?;
302    let mut con = client
303        .get_connection()
304        .context("Failed to connect to Redis")?;
305
306    let mut restored_queues = 0;
307    let mut restored_tasks = 0;
308
309    // Restore queues
310    for queue in backup.queues {
311        // Check if we should restore this queue
312        if let Some(ref filter) = selective_queues {
313            if !filter.contains(&queue.name) {
314                continue;
315            }
316        }
317
318        println!("  Restoring queue: {}", queue.name.yellow());
319
320        // Restore pending tasks
321        for task in queue.pending_tasks {
322            let _: () = con.rpush(format!("celers:queue:{}", queue.name), &task)?;
323            restored_tasks += 1;
324        }
325
326        // Restore DLQ tasks
327        for task in queue.dlq_tasks {
328            let _: () = con.rpush(format!("celers:dlq:{}", queue.name), &task)?;
329            restored_tasks += 1;
330        }
331
332        // Restore delayed tasks
333        for task in queue.delayed_tasks {
334            let _: () = con.zadd(
335                format!("celers:delayed:{}", queue.name),
336                &task,
337                chrono::Utc::now().timestamp(),
338            )?;
339            restored_tasks += 1;
340        }
341
342        restored_queues += 1;
343    }
344
345    println!();
346    println!("{} Restore completed successfully", "✓".green().bold());
347    println!("  Queues restored: {}", restored_queues);
348    println!("  Tasks restored: {}", restored_tasks);
349
350    Ok(())
351}