Skip to main content

lonkero_scanner/
queue.rs

1// Copyright (c) 2026 Bountyy Oy. All rights reserved.
2// This software is proprietary and confidential.
3
4use anyhow::{Context, Result};
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info};
7
8use crate::types::{ScanJob, ScanProgress, ScanResults};
9
10#[derive(Clone)]
11pub struct RedisQueue {
12    pool: Pool,
13}
14
15impl RedisQueue {
16    pub async fn new(redis_url: &str) -> Result<Self> {
17        let cfg = Config::from_url(redis_url);
18        let pool = cfg
19            .create_pool(Some(Runtime::Tokio1))
20            .context("Failed to create Redis pool")?;
21
22        // Test connection
23        let mut conn = pool.get().await.context("Failed to get Redis connection")?;
24        let _: String = deadpool_redis::redis::cmd("PING")
25            .query_async(&mut conn)
26            .await
27            .context("Failed to ping Redis")?;
28
29        Ok(Self { pool })
30    }
31
32    /// Pop a scan job from the queue (blocking with timeout)
33    pub async fn pop_scan_job(&self, timeout_secs: u64) -> Result<Option<ScanJob>> {
34        let mut conn = self
35            .pool
36            .get()
37            .await
38            .context("Failed to get Redis connection")?;
39
40        // BRPOP scan:queue timeout
41        let result: Option<(String, String)> = deadpool_redis::redis::cmd("BRPOP")
42            .arg("scan:queue")
43            .arg(timeout_secs)
44            .query_async(&mut conn)
45            .await
46            .context("Failed to pop from queue")?;
47
48        match result {
49            Some((_, job_json)) => {
50                let job: ScanJob =
51                    serde_json::from_str(&job_json).context("Failed to deserialize scan job")?;
52                debug!("Popped scan job: {}", job.scan_id);
53                Ok(Some(job))
54            }
55            None => Ok(None),
56        }
57    }
58
59    /// Update scan status
60    pub async fn update_scan_status(&self, scan_id: String, status: String) -> Result<()> {
61        let mut conn = self
62            .pool
63            .get()
64            .await
65            .context("Failed to get Redis connection")?;
66        let key = format!("scan:{}:status", scan_id);
67
68        deadpool_redis::redis::cmd("SET")
69            .arg(&key)
70            .arg(&status)
71            .query_async::<()>(&mut conn)
72            .await
73            .context("Failed to update scan status")?;
74
75        // Set expiry (24 hours)
76        deadpool_redis::redis::cmd("EXPIRE")
77            .arg(&key)
78            .arg(86400)
79            .query_async::<()>(&mut conn)
80            .await
81            .context("Failed to set expiry")?;
82
83        debug!("Updated scan {} status to {}", scan_id, status);
84        Ok(())
85    }
86
87    /// Publish scan progress
88    pub async fn publish_progress(&self, progress: &ScanProgress) -> Result<()> {
89        let mut conn = self
90            .pool
91            .get()
92            .await
93            .context("Failed to get Redis connection")?;
94        let channel = format!("scan:{}:progress", progress.scan_id);
95        let message = serde_json::to_string(progress).context("Failed to serialize progress")?;
96
97        deadpool_redis::redis::cmd("PUBLISH")
98            .arg(&channel)
99            .arg(message)
100            .query_async::<()>(&mut conn)
101            .await
102            .context("Failed to publish progress")?;
103
104        Ok(())
105    }
106
107    /// Store scan results
108    pub async fn store_scan_results(&self, scan_id: String, results: &ScanResults) -> Result<()> {
109        let mut conn = self
110            .pool
111            .get()
112            .await
113            .context("Failed to get Redis connection")?;
114        let key = format!("scan:{}:results", scan_id);
115        let results_json =
116            serde_json::to_string(results).context("Failed to serialize scan results")?;
117
118        deadpool_redis::redis::cmd("SET")
119            .arg(&key)
120            .arg(results_json)
121            .query_async::<()>(&mut conn)
122            .await
123            .context("Failed to store scan results")?;
124
125        // Set expiry (7 days)
126        deadpool_redis::redis::cmd("EXPIRE")
127            .arg(&key)
128            .arg(604800)
129            .query_async::<()>(&mut conn)
130            .await
131            .context("Failed to set expiry")?;
132
133        info!(
134            "Stored results for scan {} ({} vulnerabilities)",
135            scan_id,
136            results.vulnerabilities.len()
137        );
138        Ok(())
139    }
140
141    /// Store scan error
142    pub async fn store_scan_error(&self, scan_id: String, error: String) -> Result<()> {
143        let mut conn = self
144            .pool
145            .get()
146            .await
147            .context("Failed to get Redis connection")?;
148        let key = format!("scan:{}:error", scan_id);
149
150        deadpool_redis::redis::cmd("SET")
151            .arg(&key)
152            .arg(&error)
153            .query_async::<()>(&mut conn)
154            .await
155            .context("Failed to store scan error")?;
156
157        // Set expiry (24 hours)
158        deadpool_redis::redis::cmd("EXPIRE")
159            .arg(&key)
160            .arg(86400)
161            .query_async::<()>(&mut conn)
162            .await
163            .context("Failed to set expiry")?;
164
165        Ok(())
166    }
167
168    /// Increment test counter
169    pub async fn increment_tests(&self, scan_id: String, count: u64) -> Result<()> {
170        let mut conn = self
171            .pool
172            .get()
173            .await
174            .context("Failed to get Redis connection")?;
175        let key = format!("scan:{}:tests", scan_id);
176
177        deadpool_redis::redis::cmd("INCRBY")
178            .arg(&key)
179            .arg(count)
180            .query_async::<()>(&mut conn)
181            .await
182            .context("Failed to increment test counter")?;
183
184        // Set expiry (24 hours)
185        deadpool_redis::redis::cmd("EXPIRE")
186            .arg(&key)
187            .arg(86400)
188            .query_async::<()>(&mut conn)
189            .await
190            .context("Failed to set expiry")?;
191
192        Ok(())
193    }
194}