use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use crate::device::mmio_state::VirtioMmioState;
use crate::irq::Irq;
type IrqCallback = Arc<dyn Fn(Irq, bool) -> crate::error::Result<()> + Send + Sync>;
pub struct NetRxWorkerSlot {
handle: Mutex<Option<std::thread::JoinHandle<()>>>,
irq_callback: Option<IrqCallback>,
exit_vcpus: Option<Arc<dyn Fn() + Send + Sync>>,
running: Option<Arc<AtomicBool>>,
host_fd: Mutex<Option<i32>>,
rx_inject_channel: Mutex<Option<crossbeam_channel::Receiver<Vec<u8>>>>,
inline_conn_channel:
Mutex<Option<crossbeam_channel::Receiver<arcbox_net_inject::inline_conn::InlineConn>>>,
}
impl NetRxWorkerSlot {
pub fn new() -> Self {
Self {
handle: Mutex::new(None),
irq_callback: None,
exit_vcpus: None,
running: None,
host_fd: Mutex::new(None),
rx_inject_channel: Mutex::new(None),
inline_conn_channel: Mutex::new(None),
}
}
pub fn set_hooks(
&mut self,
irq_callback: IrqCallback,
exit_vcpus: Arc<dyn Fn() + Send + Sync>,
) {
self.irq_callback = Some(irq_callback);
self.exit_vcpus = Some(exit_vcpus);
}
pub fn set_running(&mut self, running: Arc<AtomicBool>) {
self.running = Some(running);
}
pub fn set_host_fd(&self, fd: i32) {
*self
.host_fd
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(fd);
}
fn get_host_fd(&self) -> Option<i32> {
*self
.host_fd
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub fn set_rx_inject_channel(&self, rx: crossbeam_channel::Receiver<Vec<u8>>) {
*self
.rx_inject_channel
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
}
pub fn set_inline_conn_channel(
&self,
rx: crossbeam_channel::Receiver<arcbox_net_inject::inline_conn::InlineConn>,
) {
*self
.inline_conn_channel
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
}
pub fn take_handle(&self) -> Option<std::thread::JoinHandle<()>> {
self.handle
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take()
}
pub fn try_spawn(
&self,
mmio_arc: &Arc<RwLock<VirtioMmioState>>,
irq: Irq,
guest_ram_base: *mut u8,
guest_ram_size: usize,
guest_ram_gpa: u64,
) {
{
let guard = self
.handle
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if guard.is_some() {
return;
}
}
let Some(irq_callback) = self.irq_callback.clone() else {
tracing::warn!("net-io: irq_callback not set");
return;
};
let Some(exit_vcpus) = self.exit_vcpus.clone() else {
tracing::warn!("net-io: exit_vcpus not set");
return;
};
let Some(running) = self.running.clone() else {
tracing::warn!("net-io: running flag not set");
return;
};
let mmio = match mmio_arc.read() {
Ok(m) => m,
Err(_) => return,
};
let qi = 0;
let rx_channel = self
.rx_inject_channel
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take();
if let Some(rx_channel) = rx_channel {
let guest_mem = unsafe {
arcbox_net_inject::guest_mem::GuestMemWriter::new(
guest_ram_base,
guest_ram_size,
guest_ram_gpa as usize,
)
};
let queue = arcbox_net_inject::queue::RxQueueConfig {
desc_gpa: mmio.queue_desc[qi],
avail_gpa: mmio.queue_driver[qi],
used_gpa: mmio.queue_device[qi],
size: mmio.queue_num[qi],
};
let event_idx_enabled =
(mmio.driver_features & arcbox_virtio::queue::VIRTIO_F_EVENT_IDX) != 0;
drop(mmio);
let vmm_callback = irq_callback;
let inject_callback: Arc<arcbox_net_inject::irq::IrqCallback> =
Arc::new(move |gsi, level| {
vmm_callback(gsi, level)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
});
let mmio_arc_clone = mmio_arc.clone();
let set_interrupt_status: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
if let Ok(mut s) = mmio_arc_clone.write() {
s.trigger_interrupt(1); }
});
let conn_rx = self
.inline_conn_channel
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take()
.unwrap_or_else(|| {
let (_tx, rx) = crossbeam_channel::unbounded();
rx
});
let inject_thread = arcbox_net_inject::inject::RxInjectThread {
rx: rx_channel,
conn_rx,
guest_mem,
queue,
irq: arcbox_net_inject::irq::IrqHandle {
callback: inject_callback,
exit_vcpus,
irq,
},
set_interrupt_status,
running,
event_idx_enabled,
};
match std::thread::Builder::new()
.name("rx-inject".to_string())
.spawn(move || inject_thread.run())
{
Ok(handle) => {
*self
.handle
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(handle);
tracing::info!(
"Spawned rx-inject thread for primary VirtioNet (channel-based)"
);
}
Err(e) => {
tracing::error!("Failed to spawn rx-inject thread: {e}");
}
}
return;
}
let Some(net_fd) = self.get_host_fd() else {
tracing::warn!("net-io: no host fd available for primary VirtioNet");
return;
};
let rx_queue = crate::net_rx_worker::RxQueueConfig {
desc_gpa: mmio.queue_desc[qi],
avail_gpa: mmio.queue_driver[qi],
used_gpa: mmio.queue_device[qi],
size: mmio.queue_num[qi],
};
drop(mmio);
let guest_mem = unsafe {
crate::blk_worker::GuestMemWriter::new(
guest_ram_base,
guest_ram_size,
guest_ram_gpa as usize,
)
};
let ctx = crate::net_rx_worker::NetRxWorkerContext {
net_host_fd: net_fd,
guest_mem,
rx_queue,
mmio_state: mmio_arc.clone(),
irq_callback,
irq,
exit_vcpus,
running,
};
match std::thread::Builder::new()
.name("net-io".to_string())
.spawn(move || crate::net_rx_worker::net_rx_worker_loop(ctx))
{
Ok(handle) => {
*self
.handle
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(handle);
tracing::info!("Spawned net-io worker thread for primary VirtioNet (legacy)");
}
Err(e) => {
tracing::error!("Failed to spawn net-io worker thread: {e}");
}
}
}
}