Skip to main content

lonkero_scanner/realtime/
mod.rs

1// Copyright (c) 2026 Bountyy Oy. All rights reserved.
2// This software is proprietary and confidential.
3
4/**
5 * Real-Time Scanner Integration
6 * Rust scanner module for real-time vulnerability detection with streaming results
7 *
8 * Features:
9 * - Real-time result streaming via callbacks
10 * - Progress reporting (every 5 seconds)
11 * - Cancellation support
12 * - Resource monitoring (CPU, memory)
13 * - Backpressure handling
14 * - Scanner-level completion events
15 *
16 * @copyright 2026 Bountyy Oy
17 * @license Proprietary
18 */
19use serde::{Deserialize, Serialize};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, RwLock};
23use tokio::time::interval;
24
25/// Real-time scan context
26pub struct RealtimeScanContext {
27    pub scan_id: String,
28    pub target: String,
29    pub start_time: Instant,
30    pub cancelled: Arc<RwLock<bool>>,
31    pub paused: Arc<RwLock<bool>>,
32    pub progress_tx: mpsc::UnboundedSender<ProgressUpdate>,
33    pub finding_tx: mpsc::UnboundedSender<FindingUpdate>,
34    pub scanner_tx: mpsc::UnboundedSender<ScannerUpdate>,
35}
36
37/// Progress update
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct ProgressUpdate {
40    pub scan_id: String,
41    pub percentage: f64,
42    pub current_scanner: String,
43    pub scanners_completed: usize,
44    pub total_scanners: usize,
45    pub urls_scanned: usize,
46    pub total_urls: usize,
47    pub vulnerabilities_found: usize,
48    pub elapsed_time_ms: u64,
49    pub estimated_completion_ms: Option<u64>,
50    pub timestamp: u64,
51}
52
53/// Finding update (new vulnerability discovered)
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct FindingUpdate {
56    pub scan_id: String,
57    pub vulnerability: VulnerabilityFinding,
58    pub timestamp: u64,
59}
60
61/// Vulnerability finding
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct VulnerabilityFinding {
64    pub id: String,
65    pub severity: String,
66    pub confidence: String,
67    pub category: String,
68    pub name: String,
69    pub description: String,
70    pub url: String,
71    pub parameter: Option<String>,
72    pub payload: Option<String>,
73    pub evidence: String,
74    pub remediation: Option<String>,
75}
76
77/// Scanner completion update
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ScannerUpdate {
80    pub scan_id: String,
81    pub scanner_name: String,
82    pub status: String,
83    pub duration_ms: u64,
84    pub findings_count: usize,
85    pub urls_tested: usize,
86    pub timestamp: u64,
87}
88
89/// Resource metrics
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ResourceMetrics {
92    pub cpu_usage_percent: f64,
93    pub memory_usage_mb: u64,
94    pub active_threads: usize,
95    pub network_requests_per_second: f64,
96    pub timestamp: u64,
97}
98
99impl RealtimeScanContext {
100    /// Create new real-time scan context
101    pub fn new(
102        scan_id: String,
103        target: String,
104    ) -> (
105        Self,
106        mpsc::UnboundedReceiver<ProgressUpdate>,
107        mpsc::UnboundedReceiver<FindingUpdate>,
108        mpsc::UnboundedReceiver<ScannerUpdate>,
109    ) {
110        let (progress_tx, progress_rx) = mpsc::unbounded_channel();
111        let (finding_tx, finding_rx) = mpsc::unbounded_channel();
112        let (scanner_tx, scanner_rx) = mpsc::unbounded_channel();
113
114        let ctx = Self {
115            scan_id,
116            target,
117            start_time: Instant::now(),
118            cancelled: Arc::new(RwLock::new(false)),
119            paused: Arc::new(RwLock::new(false)),
120            progress_tx,
121            finding_tx,
122            scanner_tx,
123        };
124
125        (ctx, progress_rx, finding_rx, scanner_rx)
126    }
127
128    /// Check if scan is cancelled
129    pub async fn is_cancelled(&self) -> bool {
130        *self.cancelled.read().await
131    }
132
133    /// Cancel the scan
134    pub async fn cancel(&self) {
135        let mut cancelled = self.cancelled.write().await;
136        *cancelled = true;
137        println!("[RealtimeScanner] Scan {} cancelled", self.scan_id);
138    }
139
140    /// Check if scan is paused
141    pub async fn is_paused(&self) -> bool {
142        *self.paused.read().await
143    }
144
145    /// Pause the scan
146    pub async fn pause(&self) {
147        let mut paused = self.paused.write().await;
148        *paused = true;
149        println!("[RealtimeScanner] Scan {} paused", self.scan_id);
150    }
151
152    /// Resume the scan
153    pub async fn resume(&self) {
154        let mut paused = self.paused.write().await;
155        *paused = false;
156        println!("[RealtimeScanner] Scan {} resumed", self.scan_id);
157    }
158
159    /// Wait while paused
160    pub async fn wait_if_paused(&self) {
161        while self.is_paused().await {
162            tokio::time::sleep(Duration::from_millis(500)).await;
163        }
164    }
165
166    /// Send progress update
167    pub fn send_progress(
168        &self,
169        percentage: f64,
170        current_scanner: String,
171        scanners_completed: usize,
172        total_scanners: usize,
173        urls_scanned: usize,
174        total_urls: usize,
175        vulnerabilities_found: usize,
176        estimated_completion_ms: Option<u64>,
177    ) {
178        let elapsed = self.start_time.elapsed().as_millis() as u64;
179
180        let update = ProgressUpdate {
181            scan_id: self.scan_id.clone(),
182            percentage,
183            current_scanner,
184            scanners_completed,
185            total_scanners,
186            urls_scanned,
187            total_urls,
188            vulnerabilities_found,
189            elapsed_time_ms: elapsed,
190            estimated_completion_ms,
191            timestamp: chrono::Utc::now().timestamp_millis() as u64,
192        };
193
194        let _ = self.progress_tx.send(update);
195    }
196
197    /// Send finding update
198    pub fn send_finding(&self, vulnerability: VulnerabilityFinding) {
199        let update = FindingUpdate {
200            scan_id: self.scan_id.clone(),
201            vulnerability,
202            timestamp: chrono::Utc::now().timestamp_millis() as u64,
203        };
204
205        let _ = self.finding_tx.send(update);
206    }
207
208    /// Send scanner completion
209    pub fn send_scanner_complete(
210        &self,
211        scanner_name: String,
212        status: String,
213        duration_ms: u64,
214        findings_count: usize,
215        urls_tested: usize,
216    ) {
217        let update = ScannerUpdate {
218            scan_id: self.scan_id.clone(),
219            scanner_name,
220            status,
221            duration_ms,
222            findings_count,
223            urls_tested,
224            timestamp: chrono::Utc::now().timestamp_millis() as u64,
225        };
226
227        let _ = self.scanner_tx.send(update);
228    }
229
230    /// Get elapsed time
231    pub fn elapsed_time_ms(&self) -> u64 {
232        self.start_time.elapsed().as_millis() as u64
233    }
234}
235
236/// Progress tracker for automated progress updates
237pub struct ProgressTracker {
238    context: Arc<RealtimeScanContext>,
239    total_scanners: usize,
240    scanners_completed: usize,
241    total_urls: usize,
242    urls_scanned: usize,
243    vulnerabilities_found: usize,
244    current_scanner: String,
245}
246
247impl ProgressTracker {
248    /// Create new progress tracker
249    pub fn new(
250        context: Arc<RealtimeScanContext>,
251        total_scanners: usize,
252        total_urls: usize,
253    ) -> Self {
254        Self {
255            context,
256            total_scanners,
257            scanners_completed: 0,
258            total_urls,
259            urls_scanned: 0,
260            vulnerabilities_found: 0,
261            current_scanner: String::from("Initializing"),
262        }
263    }
264
265    /// Start automated progress reporting
266    pub fn start_auto_reporting(self: Arc<Self>) {
267        let tracker = Arc::clone(&self);
268
269        tokio::spawn(async move {
270            let mut interval = interval(Duration::from_secs(5));
271
272            loop {
273                interval.tick().await;
274
275                // Check if scan is cancelled
276                if tracker.context.is_cancelled().await {
277                    break;
278                }
279
280                // Wait if paused
281                tracker.context.wait_if_paused().await;
282
283                // Send progress update
284                tracker.send_update().await;
285            }
286        });
287    }
288
289    /// Send progress update
290    async fn send_update(&self) {
291        let percentage = if self.total_scanners > 0 {
292            (self.scanners_completed as f64 / self.total_scanners as f64) * 100.0
293        } else {
294            0.0
295        };
296
297        // Estimate completion time
298        let elapsed_ms = self.context.elapsed_time_ms();
299        let estimated_completion_ms = if percentage > 0.0 && percentage < 100.0 {
300            Some((elapsed_ms as f64 / percentage * 100.0) as u64 - elapsed_ms)
301        } else {
302            None
303        };
304
305        self.context.send_progress(
306            percentage,
307            self.current_scanner.clone(),
308            self.scanners_completed,
309            self.total_scanners,
310            self.urls_scanned,
311            self.total_urls,
312            self.vulnerabilities_found,
313            estimated_completion_ms,
314        );
315    }
316
317    /// Update current scanner
318    pub async fn set_scanner(&mut self, scanner_name: String) {
319        self.current_scanner = scanner_name;
320        self.send_update().await;
321    }
322
323    /// Complete a scanner
324    pub async fn complete_scanner(&mut self) {
325        self.scanners_completed += 1;
326        self.send_update().await;
327    }
328
329    /// Update URL count
330    pub async fn update_urls(&mut self, scanned: usize) {
331        self.urls_scanned = scanned;
332        self.send_update().await;
333    }
334
335    /// Add vulnerability
336    pub async fn add_vulnerability(&mut self) {
337        self.vulnerabilities_found += 1;
338        self.send_update().await;
339    }
340
341    /// Get current stats
342    pub fn get_stats(&self) -> (usize, usize, usize) {
343        (
344            self.scanners_completed,
345            self.urls_scanned,
346            self.vulnerabilities_found,
347        )
348    }
349}
350
351/// Resource monitor
352pub struct ResourceMonitor {
353    start_cpu_time: f64,
354    start_timestamp: Instant,
355    network_requests: Arc<RwLock<usize>>,
356}
357
358impl ResourceMonitor {
359    /// Create new resource monitor
360    pub fn new() -> Self {
361        Self {
362            start_cpu_time: 0.0,
363            start_timestamp: Instant::now(),
364            network_requests: Arc::new(RwLock::new(0)),
365        }
366    }
367
368    /// Record network request
369    pub async fn record_request(&self) {
370        let mut requests = self.network_requests.write().await;
371        *requests += 1;
372    }
373
374    /// Get resource metrics
375    pub async fn get_metrics(&self) -> ResourceMetrics {
376        let elapsed_secs = self.start_timestamp.elapsed().as_secs_f64();
377        let requests = *self.network_requests.read().await;
378        let requests_per_second = if elapsed_secs > 0.0 {
379            requests as f64 / elapsed_secs
380        } else {
381            0.0
382        };
383
384        // Get system metrics (simplified - in production use proper system monitoring)
385        let cpu_usage = self.get_cpu_usage();
386        let memory_usage = self.get_memory_usage();
387        let active_threads = self.get_active_threads();
388
389        ResourceMetrics {
390            cpu_usage_percent: cpu_usage,
391            memory_usage_mb: memory_usage,
392            active_threads,
393            network_requests_per_second: requests_per_second,
394            timestamp: chrono::Utc::now().timestamp_millis() as u64,
395        }
396    }
397
398    /// Get CPU usage (simplified)
399    fn get_cpu_usage(&self) -> f64 {
400        // In production, use proper CPU monitoring
401        // For now, return a placeholder
402        0.0
403    }
404
405    /// Get memory usage (simplified)
406    fn get_memory_usage(&self) -> u64 {
407        // In production, use proper memory monitoring
408        // For now, return a placeholder
409        0
410    }
411
412    /// Get active threads (simplified)
413    fn get_active_threads(&self) -> usize {
414        // In production, use proper thread counting
415        // For now, return a placeholder
416        0
417    }
418}
419
420/// Backpressure handler
421pub struct BackpressureHandler {
422    max_queue_size: usize,
423    current_queue_size: Arc<RwLock<usize>>,
424}
425
426impl BackpressureHandler {
427    /// Create new backpressure handler
428    pub fn new(max_queue_size: usize) -> Self {
429        Self {
430            max_queue_size,
431            current_queue_size: Arc::new(RwLock::new(0)),
432        }
433    }
434
435    /// Wait if queue is full
436    pub async fn wait_if_full(&self) {
437        loop {
438            let queue_size = *self.current_queue_size.read().await;
439
440            if queue_size < self.max_queue_size {
441                break;
442            }
443
444            // Wait a bit before checking again
445            tokio::time::sleep(Duration::from_millis(100)).await;
446        }
447    }
448
449    /// Increment queue size
450    pub async fn increment(&self) {
451        let mut size = self.current_queue_size.write().await;
452        *size += 1;
453    }
454
455    /// Decrement queue size
456    pub async fn decrement(&self) {
457        let mut size = self.current_queue_size.write().await;
458        if *size > 0 {
459            *size -= 1;
460        }
461    }
462
463    /// Get current queue size
464    pub async fn get_size(&self) -> usize {
465        *self.current_queue_size.read().await
466    }
467}
468
469/// Scan result aggregator
470pub struct ScanResultAggregator {
471    pub vulnerabilities: Vec<VulnerabilityFinding>,
472    pub scanner_results: Vec<ScannerUpdate>,
473    pub start_time: Instant,
474}
475
476impl ScanResultAggregator {
477    /// Create new aggregator
478    pub fn new() -> Self {
479        Self {
480            vulnerabilities: Vec::new(),
481            scanner_results: Vec::new(),
482            start_time: Instant::now(),
483        }
484    }
485
486    /// Add vulnerability
487    pub fn add_vulnerability(&mut self, vuln: VulnerabilityFinding) {
488        self.vulnerabilities.push(vuln);
489    }
490
491    /// Add scanner result
492    pub fn add_scanner_result(&mut self, result: ScannerUpdate) {
493        self.scanner_results.push(result);
494    }
495
496    /// Get summary
497    pub fn get_summary(&self) -> ScanSummary {
498        let mut critical_count = 0;
499        let mut high_count = 0;
500        let mut medium_count = 0;
501        let mut low_count = 0;
502
503        for vuln in &self.vulnerabilities {
504            match vuln.severity.as_str() {
505                "CRITICAL" => critical_count += 1,
506                "HIGH" => high_count += 1,
507                "MEDIUM" => medium_count += 1,
508                "LOW" => low_count += 1,
509                _ => {}
510            }
511        }
512
513        ScanSummary {
514            total_vulnerabilities: self.vulnerabilities.len(),
515            critical_count,
516            high_count,
517            medium_count,
518            low_count,
519            scanners_executed: self.scanner_results.len(),
520            total_duration_ms: self.start_time.elapsed().as_millis() as u64,
521        }
522    }
523}
524
525/// Scan summary
526#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct ScanSummary {
528    pub total_vulnerabilities: usize,
529    pub critical_count: usize,
530    pub high_count: usize,
531    pub medium_count: usize,
532    pub low_count: usize,
533    pub scanners_executed: usize,
534    pub total_duration_ms: u64,
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    #[tokio::test]
542    async fn test_realtime_context_creation() {
543        let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
544            RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
545
546        assert_eq!(ctx.scan_id, "test_scan");
547        assert_eq!(ctx.target, "https://example.com");
548        assert!(!ctx.is_cancelled().await);
549        assert!(!ctx.is_paused().await);
550    }
551
552    #[tokio::test]
553    async fn test_scan_cancellation() {
554        let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
555            RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
556
557        assert!(!ctx.is_cancelled().await);
558
559        ctx.cancel().await;
560
561        assert!(ctx.is_cancelled().await);
562    }
563
564    #[tokio::test]
565    async fn test_scan_pause_resume() {
566        let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
567            RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
568
569        assert!(!ctx.is_paused().await);
570
571        ctx.pause().await;
572        assert!(ctx.is_paused().await);
573
574        ctx.resume().await;
575        assert!(!ctx.is_paused().await);
576    }
577
578    #[tokio::test]
579    async fn test_progress_tracker() {
580        let (ctx, mut progress_rx, _finding_rx, _scanner_rx) =
581            RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
582
583        let mut tracker = ProgressTracker::new(Arc::new(ctx), 5, 100);
584
585        tracker.set_scanner("XSS Scanner".to_string()).await;
586        tracker.complete_scanner().await;
587
588        // Check if progress update was received
589        if let Ok(update) = progress_rx.try_recv() {
590            assert_eq!(update.scan_id, "test_scan");
591            assert_eq!(update.scanners_completed, 1);
592            assert_eq!(update.total_scanners, 5);
593        }
594    }
595
596    #[test]
597    fn test_scan_result_aggregator() {
598        let mut aggregator = ScanResultAggregator::new();
599
600        let vuln = VulnerabilityFinding {
601            id: "vuln_1".to_string(),
602            severity: "HIGH".to_string(),
603            confidence: "HIGH".to_string(),
604            category: "XSS".to_string(),
605            name: "Cross-Site Scripting".to_string(),
606            description: "XSS vulnerability found".to_string(),
607            url: "https://example.com".to_string(),
608            parameter: Some("q".to_string()),
609            payload: Some("<script>alert(1)</script>".to_string()),
610            evidence: "Payload reflected in response".to_string(),
611            remediation: Some("Sanitize user input".to_string()),
612        };
613
614        aggregator.add_vulnerability(vuln);
615
616        let summary = aggregator.get_summary();
617        assert_eq!(summary.total_vulnerabilities, 1);
618        assert_eq!(summary.high_count, 1);
619    }
620}