1use serde::{Deserialize, Serialize};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{mpsc, RwLock};
23use tokio::time::interval;
24
25pub struct RealtimeScanContext {
27 pub scan_id: String,
28 pub target: String,
29 pub start_time: Instant,
30 pub cancelled: Arc<RwLock<bool>>,
31 pub paused: Arc<RwLock<bool>>,
32 pub progress_tx: mpsc::UnboundedSender<ProgressUpdate>,
33 pub finding_tx: mpsc::UnboundedSender<FindingUpdate>,
34 pub scanner_tx: mpsc::UnboundedSender<ScannerUpdate>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct ProgressUpdate {
40 pub scan_id: String,
41 pub percentage: f64,
42 pub current_scanner: String,
43 pub scanners_completed: usize,
44 pub total_scanners: usize,
45 pub urls_scanned: usize,
46 pub total_urls: usize,
47 pub vulnerabilities_found: usize,
48 pub elapsed_time_ms: u64,
49 pub estimated_completion_ms: Option<u64>,
50 pub timestamp: u64,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct FindingUpdate {
56 pub scan_id: String,
57 pub vulnerability: VulnerabilityFinding,
58 pub timestamp: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct VulnerabilityFinding {
64 pub id: String,
65 pub severity: String,
66 pub confidence: String,
67 pub category: String,
68 pub name: String,
69 pub description: String,
70 pub url: String,
71 pub parameter: Option<String>,
72 pub payload: Option<String>,
73 pub evidence: String,
74 pub remediation: Option<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ScannerUpdate {
80 pub scan_id: String,
81 pub scanner_name: String,
82 pub status: String,
83 pub duration_ms: u64,
84 pub findings_count: usize,
85 pub urls_tested: usize,
86 pub timestamp: u64,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ResourceMetrics {
92 pub cpu_usage_percent: f64,
93 pub memory_usage_mb: u64,
94 pub active_threads: usize,
95 pub network_requests_per_second: f64,
96 pub timestamp: u64,
97}
98
99impl RealtimeScanContext {
100 pub fn new(
102 scan_id: String,
103 target: String,
104 ) -> (
105 Self,
106 mpsc::UnboundedReceiver<ProgressUpdate>,
107 mpsc::UnboundedReceiver<FindingUpdate>,
108 mpsc::UnboundedReceiver<ScannerUpdate>,
109 ) {
110 let (progress_tx, progress_rx) = mpsc::unbounded_channel();
111 let (finding_tx, finding_rx) = mpsc::unbounded_channel();
112 let (scanner_tx, scanner_rx) = mpsc::unbounded_channel();
113
114 let ctx = Self {
115 scan_id,
116 target,
117 start_time: Instant::now(),
118 cancelled: Arc::new(RwLock::new(false)),
119 paused: Arc::new(RwLock::new(false)),
120 progress_tx,
121 finding_tx,
122 scanner_tx,
123 };
124
125 (ctx, progress_rx, finding_rx, scanner_rx)
126 }
127
128 pub async fn is_cancelled(&self) -> bool {
130 *self.cancelled.read().await
131 }
132
133 pub async fn cancel(&self) {
135 let mut cancelled = self.cancelled.write().await;
136 *cancelled = true;
137 println!("[RealtimeScanner] Scan {} cancelled", self.scan_id);
138 }
139
140 pub async fn is_paused(&self) -> bool {
142 *self.paused.read().await
143 }
144
145 pub async fn pause(&self) {
147 let mut paused = self.paused.write().await;
148 *paused = true;
149 println!("[RealtimeScanner] Scan {} paused", self.scan_id);
150 }
151
152 pub async fn resume(&self) {
154 let mut paused = self.paused.write().await;
155 *paused = false;
156 println!("[RealtimeScanner] Scan {} resumed", self.scan_id);
157 }
158
159 pub async fn wait_if_paused(&self) {
161 while self.is_paused().await {
162 tokio::time::sleep(Duration::from_millis(500)).await;
163 }
164 }
165
166 pub fn send_progress(
168 &self,
169 percentage: f64,
170 current_scanner: String,
171 scanners_completed: usize,
172 total_scanners: usize,
173 urls_scanned: usize,
174 total_urls: usize,
175 vulnerabilities_found: usize,
176 estimated_completion_ms: Option<u64>,
177 ) {
178 let elapsed = self.start_time.elapsed().as_millis() as u64;
179
180 let update = ProgressUpdate {
181 scan_id: self.scan_id.clone(),
182 percentage,
183 current_scanner,
184 scanners_completed,
185 total_scanners,
186 urls_scanned,
187 total_urls,
188 vulnerabilities_found,
189 elapsed_time_ms: elapsed,
190 estimated_completion_ms,
191 timestamp: chrono::Utc::now().timestamp_millis() as u64,
192 };
193
194 let _ = self.progress_tx.send(update);
195 }
196
197 pub fn send_finding(&self, vulnerability: VulnerabilityFinding) {
199 let update = FindingUpdate {
200 scan_id: self.scan_id.clone(),
201 vulnerability,
202 timestamp: chrono::Utc::now().timestamp_millis() as u64,
203 };
204
205 let _ = self.finding_tx.send(update);
206 }
207
208 pub fn send_scanner_complete(
210 &self,
211 scanner_name: String,
212 status: String,
213 duration_ms: u64,
214 findings_count: usize,
215 urls_tested: usize,
216 ) {
217 let update = ScannerUpdate {
218 scan_id: self.scan_id.clone(),
219 scanner_name,
220 status,
221 duration_ms,
222 findings_count,
223 urls_tested,
224 timestamp: chrono::Utc::now().timestamp_millis() as u64,
225 };
226
227 let _ = self.scanner_tx.send(update);
228 }
229
230 pub fn elapsed_time_ms(&self) -> u64 {
232 self.start_time.elapsed().as_millis() as u64
233 }
234}
235
236pub struct ProgressTracker {
238 context: Arc<RealtimeScanContext>,
239 total_scanners: usize,
240 scanners_completed: usize,
241 total_urls: usize,
242 urls_scanned: usize,
243 vulnerabilities_found: usize,
244 current_scanner: String,
245}
246
247impl ProgressTracker {
248 pub fn new(
250 context: Arc<RealtimeScanContext>,
251 total_scanners: usize,
252 total_urls: usize,
253 ) -> Self {
254 Self {
255 context,
256 total_scanners,
257 scanners_completed: 0,
258 total_urls,
259 urls_scanned: 0,
260 vulnerabilities_found: 0,
261 current_scanner: String::from("Initializing"),
262 }
263 }
264
265 pub fn start_auto_reporting(self: Arc<Self>) {
267 let tracker = Arc::clone(&self);
268
269 tokio::spawn(async move {
270 let mut interval = interval(Duration::from_secs(5));
271
272 loop {
273 interval.tick().await;
274
275 if tracker.context.is_cancelled().await {
277 break;
278 }
279
280 tracker.context.wait_if_paused().await;
282
283 tracker.send_update().await;
285 }
286 });
287 }
288
289 async fn send_update(&self) {
291 let percentage = if self.total_scanners > 0 {
292 (self.scanners_completed as f64 / self.total_scanners as f64) * 100.0
293 } else {
294 0.0
295 };
296
297 let elapsed_ms = self.context.elapsed_time_ms();
299 let estimated_completion_ms = if percentage > 0.0 && percentage < 100.0 {
300 Some((elapsed_ms as f64 / percentage * 100.0) as u64 - elapsed_ms)
301 } else {
302 None
303 };
304
305 self.context.send_progress(
306 percentage,
307 self.current_scanner.clone(),
308 self.scanners_completed,
309 self.total_scanners,
310 self.urls_scanned,
311 self.total_urls,
312 self.vulnerabilities_found,
313 estimated_completion_ms,
314 );
315 }
316
317 pub async fn set_scanner(&mut self, scanner_name: String) {
319 self.current_scanner = scanner_name;
320 self.send_update().await;
321 }
322
323 pub async fn complete_scanner(&mut self) {
325 self.scanners_completed += 1;
326 self.send_update().await;
327 }
328
329 pub async fn update_urls(&mut self, scanned: usize) {
331 self.urls_scanned = scanned;
332 self.send_update().await;
333 }
334
335 pub async fn add_vulnerability(&mut self) {
337 self.vulnerabilities_found += 1;
338 self.send_update().await;
339 }
340
341 pub fn get_stats(&self) -> (usize, usize, usize) {
343 (
344 self.scanners_completed,
345 self.urls_scanned,
346 self.vulnerabilities_found,
347 )
348 }
349}
350
351pub struct ResourceMonitor {
353 start_cpu_time: f64,
354 start_timestamp: Instant,
355 network_requests: Arc<RwLock<usize>>,
356}
357
358impl ResourceMonitor {
359 pub fn new() -> Self {
361 Self {
362 start_cpu_time: 0.0,
363 start_timestamp: Instant::now(),
364 network_requests: Arc::new(RwLock::new(0)),
365 }
366 }
367
368 pub async fn record_request(&self) {
370 let mut requests = self.network_requests.write().await;
371 *requests += 1;
372 }
373
374 pub async fn get_metrics(&self) -> ResourceMetrics {
376 let elapsed_secs = self.start_timestamp.elapsed().as_secs_f64();
377 let requests = *self.network_requests.read().await;
378 let requests_per_second = if elapsed_secs > 0.0 {
379 requests as f64 / elapsed_secs
380 } else {
381 0.0
382 };
383
384 let cpu_usage = self.get_cpu_usage();
386 let memory_usage = self.get_memory_usage();
387 let active_threads = self.get_active_threads();
388
389 ResourceMetrics {
390 cpu_usage_percent: cpu_usage,
391 memory_usage_mb: memory_usage,
392 active_threads,
393 network_requests_per_second: requests_per_second,
394 timestamp: chrono::Utc::now().timestamp_millis() as u64,
395 }
396 }
397
398 fn get_cpu_usage(&self) -> f64 {
400 0.0
403 }
404
405 fn get_memory_usage(&self) -> u64 {
407 0
410 }
411
412 fn get_active_threads(&self) -> usize {
414 0
417 }
418}
419
420pub struct BackpressureHandler {
422 max_queue_size: usize,
423 current_queue_size: Arc<RwLock<usize>>,
424}
425
426impl BackpressureHandler {
427 pub fn new(max_queue_size: usize) -> Self {
429 Self {
430 max_queue_size,
431 current_queue_size: Arc::new(RwLock::new(0)),
432 }
433 }
434
435 pub async fn wait_if_full(&self) {
437 loop {
438 let queue_size = *self.current_queue_size.read().await;
439
440 if queue_size < self.max_queue_size {
441 break;
442 }
443
444 tokio::time::sleep(Duration::from_millis(100)).await;
446 }
447 }
448
449 pub async fn increment(&self) {
451 let mut size = self.current_queue_size.write().await;
452 *size += 1;
453 }
454
455 pub async fn decrement(&self) {
457 let mut size = self.current_queue_size.write().await;
458 if *size > 0 {
459 *size -= 1;
460 }
461 }
462
463 pub async fn get_size(&self) -> usize {
465 *self.current_queue_size.read().await
466 }
467}
468
469pub struct ScanResultAggregator {
471 pub vulnerabilities: Vec<VulnerabilityFinding>,
472 pub scanner_results: Vec<ScannerUpdate>,
473 pub start_time: Instant,
474}
475
476impl ScanResultAggregator {
477 pub fn new() -> Self {
479 Self {
480 vulnerabilities: Vec::new(),
481 scanner_results: Vec::new(),
482 start_time: Instant::now(),
483 }
484 }
485
486 pub fn add_vulnerability(&mut self, vuln: VulnerabilityFinding) {
488 self.vulnerabilities.push(vuln);
489 }
490
491 pub fn add_scanner_result(&mut self, result: ScannerUpdate) {
493 self.scanner_results.push(result);
494 }
495
496 pub fn get_summary(&self) -> ScanSummary {
498 let mut critical_count = 0;
499 let mut high_count = 0;
500 let mut medium_count = 0;
501 let mut low_count = 0;
502
503 for vuln in &self.vulnerabilities {
504 match vuln.severity.as_str() {
505 "CRITICAL" => critical_count += 1,
506 "HIGH" => high_count += 1,
507 "MEDIUM" => medium_count += 1,
508 "LOW" => low_count += 1,
509 _ => {}
510 }
511 }
512
513 ScanSummary {
514 total_vulnerabilities: self.vulnerabilities.len(),
515 critical_count,
516 high_count,
517 medium_count,
518 low_count,
519 scanners_executed: self.scanner_results.len(),
520 total_duration_ms: self.start_time.elapsed().as_millis() as u64,
521 }
522 }
523}
524
525#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct ScanSummary {
528 pub total_vulnerabilities: usize,
529 pub critical_count: usize,
530 pub high_count: usize,
531 pub medium_count: usize,
532 pub low_count: usize,
533 pub scanners_executed: usize,
534 pub total_duration_ms: u64,
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 #[tokio::test]
542 async fn test_realtime_context_creation() {
543 let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
544 RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
545
546 assert_eq!(ctx.scan_id, "test_scan");
547 assert_eq!(ctx.target, "https://example.com");
548 assert!(!ctx.is_cancelled().await);
549 assert!(!ctx.is_paused().await);
550 }
551
552 #[tokio::test]
553 async fn test_scan_cancellation() {
554 let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
555 RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
556
557 assert!(!ctx.is_cancelled().await);
558
559 ctx.cancel().await;
560
561 assert!(ctx.is_cancelled().await);
562 }
563
564 #[tokio::test]
565 async fn test_scan_pause_resume() {
566 let (ctx, _progress_rx, _finding_rx, _scanner_rx) =
567 RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
568
569 assert!(!ctx.is_paused().await);
570
571 ctx.pause().await;
572 assert!(ctx.is_paused().await);
573
574 ctx.resume().await;
575 assert!(!ctx.is_paused().await);
576 }
577
578 #[tokio::test]
579 async fn test_progress_tracker() {
580 let (ctx, mut progress_rx, _finding_rx, _scanner_rx) =
581 RealtimeScanContext::new("test_scan".to_string(), "https://example.com".to_string());
582
583 let mut tracker = ProgressTracker::new(Arc::new(ctx), 5, 100);
584
585 tracker.set_scanner("XSS Scanner".to_string()).await;
586 tracker.complete_scanner().await;
587
588 if let Ok(update) = progress_rx.try_recv() {
590 assert_eq!(update.scan_id, "test_scan");
591 assert_eq!(update.scanners_completed, 1);
592 assert_eq!(update.total_scanners, 5);
593 }
594 }
595
596 #[test]
597 fn test_scan_result_aggregator() {
598 let mut aggregator = ScanResultAggregator::new();
599
600 let vuln = VulnerabilityFinding {
601 id: "vuln_1".to_string(),
602 severity: "HIGH".to_string(),
603 confidence: "HIGH".to_string(),
604 category: "XSS".to_string(),
605 name: "Cross-Site Scripting".to_string(),
606 description: "XSS vulnerability found".to_string(),
607 url: "https://example.com".to_string(),
608 parameter: Some("q".to_string()),
609 payload: Some("<script>alert(1)</script>".to_string()),
610 evidence: "Payload reflected in response".to_string(),
611 remediation: Some("Sanitize user input".to_string()),
612 };
613
614 aggregator.add_vulnerability(vuln);
615
616 let summary = aggregator.get_summary();
617 assert_eq!(summary.total_vulnerabilities, 1);
618 assert_eq!(summary.high_count, 1);
619 }
620}