use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::backend::PacketSink;
use crate::event::Event;
use crate::scanner::arp_validator::ArpValidator;
use crate::scanner::{arp_scanner, ScannerInputEvent};
pub fn start_tx_worker(
mut scanner_input_rx: mpsc::Receiver<ScannerInputEvent>,
mut packet_sink: Box<dyn PacketSink>,
interface_name: String,
scanner_outputs: mpsc::Sender<Event>,
arp_validator: Arc<Mutex<ArpValidator>>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
let tx_cancel_token = cancel_token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tx_cancel_token.cancelled() => {
tracing::debug!("TX worker task cancelled, shutting down");
break;
}
event = scanner_input_rx.recv() => {
match event {
Some(ScannerInputEvent::StartScanning) => {
let Some(nif) = crate::interface_utils::find_interface(&interface_name) else {
tracing::error!("Interface not found for scanning: {}", interface_name);
continue;
};
for ip_network in nif
.ips
.iter()
.filter(|ip_network: &&pnet::ipnetwork::IpNetwork| ip_network.is_ipv4())
.cloned()
{
arp_scanner::scan_range(
&nif,
ip_network,
scanner_outputs.clone(),
&mut packet_sink,
Some(&arp_validator),
)
.await;
}
}
None => break, }
}
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_start_tx_worker_creates_handle() {
let (_tx, _rx) = mpsc::channel::<ScannerInputEvent>(10);
let interface_name = String::from("eth0");
let cancel_token = CancellationToken::new();
let _ = (interface_name, cancel_token);
}
}