#![allow(dead_code)]
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
pub struct TsiListener {
pub cid: u64,
pub vm_port: u32,
pub host_addr: SocketAddr,
accept_thread: Option<std::thread::JoinHandle<()>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for TsiListener {
fn drop(&mut self) {
self.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = std::net::TcpStream::connect_timeout(
&self.host_addr,
std::time::Duration::from_millis(50),
);
if let Some(h) = self.accept_thread.take() {
let _ = h.join();
}
}
}
static NEXT_HOST_SRC_PORT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
static INIT: std::sync::Once = std::sync::Once::new();
fn ensure_initialized() {
INIT.call_once(|| {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let pid = std::process::id();
let mix = now ^ pid.wrapping_mul(2654435761);
let range = 0xFFFF_0000u32 - 0xC000_0000u32;
let start = 0xC000_0000u32 + (mix % range);
NEXT_HOST_SRC_PORT.store(start, std::sync::atomic::Ordering::Relaxed);
});
}
pub fn alloc_host_src_port() -> u32 {
ensure_initialized();
NEXT_HOST_SRC_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
fn bind_dual_stack() -> std::io::Result<TcpListener> {
use std::os::fd::FromRawFd;
unsafe {
let fd = libc::socket(libc::AF_INET6, libc::SOCK_STREAM, 0);
if fd < 0 {
return Err(std::io::Error::last_os_error());
}
let zero: libc::c_int = 0;
if libc::setsockopt(
fd,
libc::IPPROTO_IPV6,
libc::IPV6_V6ONLY,
&zero as *const _ as *const _,
std::mem::size_of::<libc::c_int>() as _,
) < 0
{
let e = std::io::Error::last_os_error();
libc::close(fd);
return Err(e);
}
let one: libc::c_int = 1;
let _ = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&one as *const _ as *const _,
std::mem::size_of::<libc::c_int>() as _,
);
let mut addr: libc::sockaddr_in6 = std::mem::zeroed();
addr.sin6_family = libc::AF_INET6 as _;
addr.sin6_port = 0;
addr.sin6_addr = libc::in6_addr { s6_addr: [0u8; 16] }; if libc::bind(
fd,
&addr as *const _ as *const _,
std::mem::size_of::<libc::sockaddr_in6>() as _,
) < 0
{
let e = std::io::Error::last_os_error();
libc::close(fd);
return Err(e);
}
if libc::listen(fd, 1024) < 0 {
let e = std::io::Error::last_os_error();
libc::close(fd);
return Err(e);
}
Ok(TcpListener::from_raw_fd(fd))
}
}
impl TsiListener {
pub fn bind(
cid: u64,
vm_port: u32,
on_accept: Arc<dyn Fn(u64, u32, u32, TcpStream) + Send + Sync>,
) -> std::io::Result<Self> {
let listener = bind_dual_stack().or_else(|_| TcpListener::bind("127.0.0.1:0"))?;
let host_addr = listener.local_addr()?;
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_th = shutdown.clone();
let trace = crate::devices::virtio::vsock::muxer::vsock_trace_enabled();
let h = std::thread::Builder::new()
.name(format!("tsi-accept-{vm_port}"))
.spawn(move || {
if trace {
eprintln!(
"[tsi_stream] accept loop for cid={cid} vm_port={vm_port} on {host_addr}"
);
}
for stream in listener.incoming() {
if shutdown_th.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
let stream = match stream {
Ok(s) => s,
Err(e) => {
eprintln!("[tsi_stream] accept err: {e}");
continue;
}
};
let _ = stream.set_nodelay(true);
let host_src_port = alloc_host_src_port();
if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
eprintln!(
"[tsi_stream] cid={cid} vm_port={vm_port} accept from {:?} -> src_port={host_src_port}",
stream.peer_addr().ok());
}
on_accept(cid, host_src_port, vm_port, stream);
}
if trace {
eprintln!("[tsi_stream] accept loop EXIT cid={cid} vm_port={vm_port}");
}
})?;
Ok(Self {
cid,
vm_port,
host_addr,
accept_thread: Some(h),
shutdown,
})
}
}