use crate::core::{ScanError, ScanReport};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct ScanHandle {
pub id: String,
status: Arc<std::sync::RwLock<ScanStatus>>,
}
impl ScanHandle {
pub(crate) fn new() -> Self {
Self {
id: Uuid::new_v4().to_string(),
status: Arc::new(std::sync::RwLock::new(ScanStatus::Pending)),
}
}
pub fn status(&self) -> ScanStatus {
self.status
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.clone()
}
pub fn is_complete(&self) -> bool {
matches!(
self.status(),
ScanStatus::Complete { .. } | ScanStatus::Failed { .. }
)
}
pub fn is_pending(&self) -> bool {
matches!(self.status(), ScanStatus::Pending)
}
pub fn is_in_progress(&self) -> bool {
matches!(self.status(), ScanStatus::InProgress)
}
pub(crate) fn set_in_progress(&self) {
*self
.status
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner()) = ScanStatus::InProgress;
}
pub(crate) fn set_complete(&self, result: ScanReport) {
*self
.status
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner()) = ScanStatus::Complete {
result: Box::new(result),
};
}
pub(crate) fn set_failed(&self, error: String) {
*self
.status
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner()) = ScanStatus::Failed { error };
}
#[cfg(feature = "tokio-runtime")]
pub async fn wait(self) -> Result<ScanReport, ScanError> {
loop {
match self.status() {
ScanStatus::Complete { result } => return Ok(*result),
ScanStatus::Failed { error } => {
return Err(ScanError::internal(error));
}
_ => {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
}
#[derive(Debug, Clone)]
pub enum ScanStatus {
Pending,
InProgress,
Complete {
result: Box<ScanReport>,
},
Failed {
error: String,
},
}
#[derive(Debug)]
pub struct ScanQueue {
max_concurrent: usize,
active_count: AtomicU64,
pending_count: AtomicU64,
}
impl ScanQueue {
pub fn new(max_concurrent: usize) -> Self {
Self {
max_concurrent: max_concurrent.max(1),
active_count: AtomicU64::new(0),
pending_count: AtomicU64::new(0),
}
}
pub fn max_concurrent(&self) -> usize {
self.max_concurrent
}
pub fn active_count(&self) -> u64 {
self.active_count.load(Ordering::Relaxed)
}
pub fn pending_count(&self) -> u64 {
self.pending_count.load(Ordering::Relaxed)
}
pub fn is_full(&self) -> bool {
self.active_count() >= self.max_concurrent as u64
}
pub(crate) fn acquire(&self) -> bool {
let current = self.active_count.fetch_add(1, Ordering::SeqCst);
if current >= self.max_concurrent as u64 {
self.active_count.fetch_sub(1, Ordering::SeqCst);
false
} else {
self.pending_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |val| {
if val > 0 {
Some(val - 1)
} else {
Some(0)
}
})
.ok();
true
}
}
pub(crate) fn release(&self) {
self.active_count.fetch_sub(1, Ordering::SeqCst);
}
pub(crate) fn add_pending(&self) {
self.pending_count.fetch_add(1, Ordering::SeqCst);
}
}
impl Default for ScanQueue {
fn default() -> Self {
Self::new(4)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scan_handle() {
let handle = ScanHandle::new();
assert!(handle.is_pending());
assert!(!handle.is_complete());
assert!(!handle.is_in_progress());
}
#[test]
fn test_scan_handle_transitions() {
let handle = ScanHandle::new();
handle.set_in_progress();
assert!(handle.is_in_progress());
handle.set_failed("test error".to_string());
assert!(handle.is_complete());
assert!(matches!(handle.status(), ScanStatus::Failed { .. }));
}
#[test]
fn test_scan_queue() {
let queue = ScanQueue::new(2);
assert_eq!(queue.max_concurrent(), 2);
assert_eq!(queue.active_count(), 0);
assert!(!queue.is_full());
}
}