1use anyhow::{Context, Result};
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info};
7
8use crate::types::{ScanJob, ScanProgress, ScanResults};
9
10#[derive(Clone)]
11pub struct RedisQueue {
12 pool: Pool,
13}
14
15impl RedisQueue {
16 pub async fn new(redis_url: &str) -> Result<Self> {
17 let cfg = Config::from_url(redis_url);
18 let pool = cfg
19 .create_pool(Some(Runtime::Tokio1))
20 .context("Failed to create Redis pool")?;
21
22 let mut conn = pool.get().await.context("Failed to get Redis connection")?;
24 let _: String = deadpool_redis::redis::cmd("PING")
25 .query_async(&mut conn)
26 .await
27 .context("Failed to ping Redis")?;
28
29 Ok(Self { pool })
30 }
31
32 pub async fn pop_scan_job(&self, timeout_secs: u64) -> Result<Option<ScanJob>> {
34 let mut conn = self
35 .pool
36 .get()
37 .await
38 .context("Failed to get Redis connection")?;
39
40 let result: Option<(String, String)> = deadpool_redis::redis::cmd("BRPOP")
42 .arg("scan:queue")
43 .arg(timeout_secs)
44 .query_async(&mut conn)
45 .await
46 .context("Failed to pop from queue")?;
47
48 match result {
49 Some((_, job_json)) => {
50 let job: ScanJob =
51 serde_json::from_str(&job_json).context("Failed to deserialize scan job")?;
52 debug!("Popped scan job: {}", job.scan_id);
53 Ok(Some(job))
54 }
55 None => Ok(None),
56 }
57 }
58
59 pub async fn update_scan_status(&self, scan_id: String, status: String) -> Result<()> {
61 let mut conn = self
62 .pool
63 .get()
64 .await
65 .context("Failed to get Redis connection")?;
66 let key = format!("scan:{}:status", scan_id);
67
68 deadpool_redis::redis::cmd("SET")
69 .arg(&key)
70 .arg(&status)
71 .query_async::<()>(&mut conn)
72 .await
73 .context("Failed to update scan status")?;
74
75 deadpool_redis::redis::cmd("EXPIRE")
77 .arg(&key)
78 .arg(86400)
79 .query_async::<()>(&mut conn)
80 .await
81 .context("Failed to set expiry")?;
82
83 debug!("Updated scan {} status to {}", scan_id, status);
84 Ok(())
85 }
86
87 pub async fn publish_progress(&self, progress: &ScanProgress) -> Result<()> {
89 let mut conn = self
90 .pool
91 .get()
92 .await
93 .context("Failed to get Redis connection")?;
94 let channel = format!("scan:{}:progress", progress.scan_id);
95 let message = serde_json::to_string(progress).context("Failed to serialize progress")?;
96
97 deadpool_redis::redis::cmd("PUBLISH")
98 .arg(&channel)
99 .arg(message)
100 .query_async::<()>(&mut conn)
101 .await
102 .context("Failed to publish progress")?;
103
104 Ok(())
105 }
106
107 pub async fn store_scan_results(&self, scan_id: String, results: &ScanResults) -> Result<()> {
109 let mut conn = self
110 .pool
111 .get()
112 .await
113 .context("Failed to get Redis connection")?;
114 let key = format!("scan:{}:results", scan_id);
115 let results_json =
116 serde_json::to_string(results).context("Failed to serialize scan results")?;
117
118 deadpool_redis::redis::cmd("SET")
119 .arg(&key)
120 .arg(results_json)
121 .query_async::<()>(&mut conn)
122 .await
123 .context("Failed to store scan results")?;
124
125 deadpool_redis::redis::cmd("EXPIRE")
127 .arg(&key)
128 .arg(604800)
129 .query_async::<()>(&mut conn)
130 .await
131 .context("Failed to set expiry")?;
132
133 info!(
134 "Stored results for scan {} ({} vulnerabilities)",
135 scan_id,
136 results.vulnerabilities.len()
137 );
138 Ok(())
139 }
140
141 pub async fn store_scan_error(&self, scan_id: String, error: String) -> Result<()> {
143 let mut conn = self
144 .pool
145 .get()
146 .await
147 .context("Failed to get Redis connection")?;
148 let key = format!("scan:{}:error", scan_id);
149
150 deadpool_redis::redis::cmd("SET")
151 .arg(&key)
152 .arg(&error)
153 .query_async::<()>(&mut conn)
154 .await
155 .context("Failed to store scan error")?;
156
157 deadpool_redis::redis::cmd("EXPIRE")
159 .arg(&key)
160 .arg(86400)
161 .query_async::<()>(&mut conn)
162 .await
163 .context("Failed to set expiry")?;
164
165 Ok(())
166 }
167
168 pub async fn increment_tests(&self, scan_id: String, count: u64) -> Result<()> {
170 let mut conn = self
171 .pool
172 .get()
173 .await
174 .context("Failed to get Redis connection")?;
175 let key = format!("scan:{}:tests", scan_id);
176
177 deadpool_redis::redis::cmd("INCRBY")
178 .arg(&key)
179 .arg(count)
180 .query_async::<()>(&mut conn)
181 .await
182 .context("Failed to increment test counter")?;
183
184 deadpool_redis::redis::cmd("EXPIRE")
186 .arg(&key)
187 .arg(86400)
188 .query_async::<()>(&mut conn)
189 .await
190 .context("Failed to set expiry")?;
191
192 Ok(())
193 }
194}