use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use std::time::Duration;
use parking_lot::Mutex;
use crate::error::{AsynError, AsynResult, AsynStatus};
use crate::exception::AsynException;
use crate::interpose::{EomReason, OctetReadResult};
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::user::AsynUser;
pub const DEFAULT_MAX_CLIENTS: usize = 64;
pub const UDP_MAX_DATAGRAM: usize = 65507;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IpServerProtocol {
#[default]
Tcp,
Udp,
}
#[derive(Debug, Clone)]
pub struct IpServerConfig {
pub bind_host: String,
pub bind_port: u16,
pub protocol: IpServerProtocol,
pub max_clients: usize,
pub read_timeout: Option<Duration>,
}
impl IpServerConfig {
pub fn parse(spec: &str) -> AsynResult<Self> {
let trimmed = spec.trim();
let mut tokens: Vec<&str> = trimmed.split_whitespace().collect();
if tokens.is_empty() {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: "empty IP server port spec".into(),
});
}
let mut protocol = IpServerProtocol::Tcp;
if tokens.len() == 2 {
let last = tokens.last().unwrap().to_ascii_uppercase();
match last.as_str() {
"TCP" => {
protocol = IpServerProtocol::Tcp;
tokens.pop();
}
"UDP" => {
protocol = IpServerProtocol::Udp;
tokens.pop();
}
_ => {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!(
"unknown protocol token '{}' in '{spec}' (expected tcp or udp)",
tokens.last().unwrap()
),
});
}
}
}
if tokens.len() != 1 {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("unexpected tokens after host:port in '{spec}'"),
});
}
let addr_part = tokens[0];
let (host, port) = if let Some(rest) = addr_part.strip_prefix('[') {
let end = rest.find(']').ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: format!("missing closing bracket in IPv6 address: '{spec}'"),
})?;
let host = rest[..end].to_string();
let port_part = rest[end + 1..]
.strip_prefix(':')
.ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: format!("missing port after bracketed IPv6 address: '{spec}'"),
})?;
let port: u16 = port_part.parse().map_err(|_| AsynError::Status {
status: AsynStatus::Error,
message: format!("invalid port in '{spec}'"),
})?;
(host, port)
} else {
let (host, port_part) =
addr_part
.rsplit_once(':')
.ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: format!("missing port in '{spec}' (expected host:port)"),
})?;
let port: u16 = port_part.parse().map_err(|_| AsynError::Status {
status: AsynStatus::Error,
message: format!("invalid port in '{spec}'"),
})?;
(host.to_string(), port)
};
Ok(Self {
bind_host: host,
bind_port: port,
protocol,
max_clients: DEFAULT_MAX_CLIENTS,
read_timeout: None,
})
}
}
pub struct ClientSlot {
stream: Mutex<Option<TcpStream>>,
peer: Mutex<Option<SocketAddr>>,
}
impl ClientSlot {
fn new_empty() -> Self {
Self {
stream: Mutex::new(None),
peer: Mutex::new(None),
}
}
fn is_occupied(&self) -> bool {
self.stream.lock().is_some()
}
fn assign(&self, stream: TcpStream, peer: SocketAddr) {
*self.stream.lock() = Some(stream);
*self.peer.lock() = Some(peer);
}
fn clear(&self) {
*self.stream.lock() = None;
*self.peer.lock() = None;
}
fn peer_addr(&self) -> Option<SocketAddr> {
*self.peer.lock()
}
}
pub struct DrvAsynIPServerPort {
base: PortDriverBase,
config: IpServerConfig,
listener: Mutex<Option<TcpListener>>,
slots: Vec<Arc<ClientSlot>>,
udp_socket: Mutex<Option<Arc<UdpSocket>>>,
udp_cache: Arc<Mutex<UdpCache>>,
udp_shutdown: Arc<AtomicBool>,
udp_thread: Mutex<Option<JoinHandle<()>>>,
}
struct UdpCache {
data: Vec<u8>,
pos: usize,
}
impl UdpCache {
fn new() -> Self {
Self {
data: Vec::new(),
pos: 0,
}
}
fn is_empty(&self) -> bool {
self.pos >= self.data.len()
}
fn clear(&mut self) {
self.data.clear();
self.pos = 0;
}
}
impl DrvAsynIPServerPort {
pub fn new(port_name: &str, spec: &str) -> AsynResult<Self> {
let config = IpServerConfig::parse(spec)?;
Self::with_config(port_name, config)
}
pub fn with_config(port_name: &str, config: IpServerConfig) -> AsynResult<Self> {
let max = config.max_clients.max(1);
let mut base = PortDriverBase::new(
port_name,
max,
PortFlags {
multi_device: true,
can_block: true,
destructible: true,
},
);
base.connected = false;
base.auto_connect = true;
let mut slots = Vec::with_capacity(max);
for _ in 0..max {
slots.push(Arc::new(ClientSlot::new_empty()));
}
Ok(Self {
base,
config,
listener: Mutex::new(None),
slots,
udp_socket: Mutex::new(None),
udp_cache: Arc::new(Mutex::new(UdpCache::new())),
udp_shutdown: Arc::new(AtomicBool::new(false)),
udp_thread: Mutex::new(None),
})
}
fn open_listener(&mut self) -> AsynResult<()> {
if self.config.protocol == IpServerProtocol::Udp {
return self.open_udp_listener();
}
let bind_str = if self.config.bind_host.contains(':') {
format!("[{}]:{}", self.config.bind_host, self.config.bind_port)
} else {
format!("{}:{}", self.config.bind_host, self.config.bind_port)
};
let listener = self.bind_with_options(&bind_str)?;
listener
.set_nonblocking(false)
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("set_nonblocking failed: {e}"),
})?;
*self.listener.lock() = Some(listener);
self.base.set_connected(true);
Ok(())
}
fn open_udp_listener(&mut self) -> AsynResult<()> {
let bind_str = if self.config.bind_host.contains(':') {
format!("[{}]:{}", self.config.bind_host, self.config.bind_port)
} else {
format!("{}:{}", self.config.bind_host, self.config.bind_port)
};
let addr: SocketAddr =
bind_str
.parse()
.map_err(|e: std::net::AddrParseError| AsynError::Status {
status: AsynStatus::Error,
message: format!("invalid UDP bind address '{bind_str}': {e}"),
})?;
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("UDP socket() failed: {e}"),
})?;
sock.set_reuse_address(true)
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("UDP SO_REUSEADDR failed: {e}"),
})?;
sock.bind(&addr.into()).map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("UDP bind '{bind_str}' failed: {e}"),
})?;
let socket = UdpSocket::from(sock);
socket
.set_read_timeout(Some(Duration::from_millis(200)))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("UDP set_read_timeout failed: {e}"),
})?;
let socket = Arc::new(socket);
self.udp_shutdown.store(false, Ordering::SeqCst);
let socket_t = Arc::clone(&socket);
let cache_t = Arc::clone(&self.udp_cache);
let shutdown_t = Arc::clone(&self.udp_shutdown);
let port_name = self.base.port_name.clone();
let handle = std::thread::Builder::new()
.name(format!("udp-server-{port_name}"))
.spawn(move || udp_recv_loop(socket_t, cache_t, shutdown_t, port_name))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("UDP recv thread spawn failed: {e}"),
})?;
*self.udp_socket.lock() = Some(socket);
*self.udp_thread.lock() = Some(handle);
self.base.set_connected(true);
Ok(())
}
fn bind_with_options(&self, bind_str: &str) -> AsynResult<TcpListener> {
let addr: SocketAddr =
bind_str
.parse()
.map_err(|e: std::net::AddrParseError| AsynError::Status {
status: AsynStatus::Error,
message: format!("invalid bind address '{bind_str}': {e}"),
})?;
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket =
socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("socket() failed: {e}"),
})?;
socket
.set_reuse_address(true)
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("SO_REUSEADDR failed: {e}"),
})?;
socket.bind(&addr.into()).map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("bind '{bind_str}' failed: {e}"),
})?;
socket.listen(128).map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("listen failed: {e}"),
})?;
Ok(TcpListener::from(socket))
}
pub fn local_port(&self) -> u16 {
self.listener
.lock()
.as_ref()
.and_then(|l| l.local_addr().ok())
.map(|a| a.port())
.unwrap_or(0)
}
pub fn accept_one(&self) -> AsynResult<usize> {
let listener_guard = self.listener.lock();
let listener = listener_guard.as_ref().ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: "listener not bound — connect() the port first".into(),
})?;
let (stream, peer) = listener.accept().map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("accept failed: {e}"),
})?;
if let Some(t) = self.config.read_timeout {
stream
.set_read_timeout(Some(t))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("set_read_timeout failed: {e}"),
})?;
}
for (i, slot) in self.slots.iter().enumerate() {
if !slot.is_occupied() {
slot.assign(stream, peer);
self.base
.announce_exception(AsynException::Connect, i as i32);
return Ok(i);
}
}
Err(AsynError::Status {
status: AsynStatus::Error,
message: format!(
"no free client slot (max_clients={}); dropped connection from {peer}",
self.config.max_clients
),
})
}
pub fn drop_client(&self, addr: i32) -> AsynResult<()> {
let idx = self.slot_index(addr)?;
let slot = &self.slots[idx];
if slot.is_occupied() {
slot.clear();
self.base.announce_exception(AsynException::Connect, addr);
}
Ok(())
}
fn slot_index(&self, addr: i32) -> AsynResult<usize> {
if addr < 0 || (addr as usize) >= self.slots.len() {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("addr {addr} out of range (max {})", self.slots.len()),
});
}
Ok(addr as usize)
}
fn slot_arc(&self, addr: i32) -> AsynResult<Arc<ClientSlot>> {
let idx = self.slot_index(addr)?;
let slot = self.slots[idx].clone();
if slot.is_occupied() {
Ok(slot)
} else {
Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("slot {addr} has no connected client"),
})
}
}
pub fn peer(&self, addr: i32) -> Option<SocketAddr> {
let idx = self.slot_index(addr).ok()?;
self.slots[idx].peer_addr()
}
pub fn child_port_name(&self, idx: usize) -> String {
format!("{}:{}", self.base.port_name, idx)
}
pub fn child_port_names(&self) -> Vec<String> {
(0..self.slots.len())
.map(|i| self.child_port_name(i))
.collect()
}
pub fn make_subport(&self, idx: usize) -> AsynResult<DrvAsynIPSubport> {
if idx >= self.slots.len() {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!(
"subport idx {idx} out of range (max_clients={})",
self.slots.len()
),
});
}
let name = self.child_port_name(idx);
Ok(DrvAsynIPSubport::new(name, Arc::clone(&self.slots[idx])))
}
}
impl PortDriver for DrvAsynIPServerPort {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn connect(&mut self, _user: &AsynUser) -> AsynResult<()> {
let already_up = self.base.connected
&& (self.listener.lock().is_some() || self.udp_socket.lock().is_some());
if already_up {
return Ok(());
}
self.open_listener()
}
fn disconnect(&mut self, _user: &AsynUser) -> AsynResult<()> {
for (i, slot) in self.slots.iter().enumerate() {
if slot.is_occupied() {
slot.clear();
self.base
.announce_exception(AsynException::Connect, i as i32);
}
}
self.stop_udp_worker();
*self.udp_socket.lock() = None;
self.udp_cache.lock().clear();
*self.listener.lock() = None;
self.base.set_connected(false);
Ok(())
}
fn shutdown(&mut self) -> AsynResult<()> {
self.stop_udp_worker();
*self.udp_socket.lock() = None;
self.udp_cache.lock().clear();
*self.listener.lock() = None;
Ok(())
}
fn read_octet(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<usize> {
if self.config.protocol == IpServerProtocol::Udp {
return Ok(self.udp_drain_into(buf));
}
let res = self.base_read_octet(user, buf)?;
Ok(res.nbytes_transferred)
}
fn write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
if self.config.protocol == IpServerProtocol::Udp {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: "UDP server-mode port is read-only (C asyn writeIt returns asynError)"
.into(),
});
}
if user.addr < 0 {
for (i, slot) in self.slots.iter().enumerate() {
if !slot.is_occupied() {
continue;
}
if let Err(e) = self.write_to_slot(slot, data) {
tracing::debug!(
target: "asyn_rs::ip_server_port",
addr = i,
error = %e,
"broadcast write to slot failed"
);
slot.clear();
self.base
.announce_exception(AsynException::Connect, i as i32);
}
}
return Ok(());
}
let arc = self.slot_arc(user.addr)?;
match self.write_to_slot(&arc, data) {
Ok(()) => Ok(()),
Err(e) => {
if let Ok(idx) = self.slot_index(user.addr) {
self.slots[idx].clear();
self.base
.announce_exception(AsynException::Connect, user.addr);
}
Err(e)
}
}
}
}
impl DrvAsynIPServerPort {
fn base_read_octet(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
let arc = self.slot_arc(user.addr)?;
let mut stream_guard = arc.stream.lock();
let stream = stream_guard.as_mut().ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: format!("slot {} stream gone", user.addr),
})?;
if user.timeout > Duration::from_nanos(0) {
stream
.set_read_timeout(Some(user.timeout))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("set_read_timeout failed: {e}"),
})?;
}
match stream.read(buf) {
Ok(0) => {
drop(stream_guard);
if let Ok(idx) = self.slot_index(user.addr) {
self.slots[idx].clear();
self.base
.announce_exception(AsynException::Connect, user.addr);
}
Err(AsynError::Status {
status: AsynStatus::Disconnected,
message: format!("peer closed slot {}", user.addr),
})
}
Ok(n) => Ok(OctetReadResult {
nbytes_transferred: n,
eom_reason: if n >= buf.len() {
EomReason::CNT
} else {
EomReason::empty()
},
}),
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
Err(AsynError::Status {
status: AsynStatus::Timeout,
message: "read timeout".into(),
})
}
Err(e) => Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("read failed: {e}"),
}),
}
}
fn write_to_slot(&self, slot: &ClientSlot, data: &[u8]) -> AsynResult<()> {
let mut g = slot.stream.lock();
let stream = g.as_mut().ok_or_else(|| AsynError::Status {
status: AsynStatus::Error,
message: "slot stream gone".into(),
})?;
stream.write_all(data).map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("write failed: {e}"),
})?;
stream.flush().map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("flush failed: {e}"),
})?;
Ok(())
}
fn udp_drain_into(&self, buf: &mut [u8]) -> usize {
let mut cache = self.udp_cache.lock();
if cache.is_empty() {
return 0;
}
let avail = cache.data.len() - cache.pos;
let n = avail.min(buf.len());
buf[..n].copy_from_slice(&cache.data[cache.pos..cache.pos + n]);
cache.pos += n;
if cache.is_empty() {
cache.clear();
}
n
}
fn stop_udp_worker(&mut self) {
self.udp_shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.udp_thread.lock().take() {
let _ = handle.join();
}
}
pub fn udp_cache_pending(&self) -> usize {
let c = self.udp_cache.lock();
c.data.len().saturating_sub(c.pos)
}
}
fn udp_recv_loop(
socket: Arc<UdpSocket>,
cache: Arc<Mutex<UdpCache>>,
shutdown: Arc<AtomicBool>,
port_name: String,
) {
let mut buf = vec![0u8; UDP_MAX_DATAGRAM];
loop {
if shutdown.load(Ordering::SeqCst) {
break;
}
let cache_empty = cache.lock().is_empty();
if !cache_empty {
std::thread::sleep(Duration::from_millis(1));
continue;
}
match socket.recv(&mut buf) {
Ok(n) => {
let mut c = cache.lock();
c.data.clear();
c.data.extend_from_slice(&buf[..n]);
c.pos = 0;
}
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
continue;
}
Err(e) => {
tracing::warn!(
target: "asyn_rs::ip_server_port",
port = %port_name,
error = %e,
"UDP recv error — exiting recv loop"
);
break;
}
}
}
}
pub struct DrvAsynIPSubport {
base: PortDriverBase,
slot: Arc<ClientSlot>,
}
impl DrvAsynIPSubport {
fn new(port_name: String, slot: Arc<ClientSlot>) -> Self {
let mut base = PortDriverBase::new(
&port_name,
1,
PortFlags {
multi_device: false,
can_block: true,
destructible: true,
},
);
base.connected = slot.is_occupied();
base.auto_connect = false; Self { base, slot }
}
pub fn peer(&self) -> Option<SocketAddr> {
self.slot.peer_addr()
}
}
impl PortDriver for DrvAsynIPSubport {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn connect(&mut self, _user: &AsynUser) -> AsynResult<()> {
self.base.set_connected(self.slot.is_occupied());
if !self.base.connected {
return Err(AsynError::Status {
status: AsynStatus::Disconnected,
message: "no client assigned to this subport slot yet".into(),
});
}
Ok(())
}
fn disconnect(&mut self, _user: &AsynUser) -> AsynResult<()> {
if self.slot.is_occupied() {
self.slot.clear();
self.base.announce_exception(AsynException::Connect, 0);
}
self.base.set_connected(false);
Ok(())
}
fn read_octet(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<usize> {
let mut stream_guard = self.slot.stream.lock();
let stream = stream_guard.as_mut().ok_or_else(|| AsynError::Status {
status: AsynStatus::Disconnected,
message: "subport slot has no client".into(),
})?;
if user.timeout > Duration::from_nanos(0) {
stream
.set_read_timeout(Some(user.timeout))
.map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("set_read_timeout failed: {e}"),
})?;
}
match stream.read(buf) {
Ok(0) => {
drop(stream_guard);
self.slot.clear();
self.base.announce_exception(AsynException::Connect, 0);
self.base.set_connected(false);
Err(AsynError::Status {
status: AsynStatus::Disconnected,
message: "peer closed".into(),
})
}
Ok(n) => Ok(n),
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
Err(AsynError::Status {
status: AsynStatus::Timeout,
message: "read timeout".into(),
})
}
Err(e) => Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("read failed: {e}"),
}),
}
}
fn write_octet(&mut self, _user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
let mut g = self.slot.stream.lock();
let stream = g.as_mut().ok_or_else(|| AsynError::Status {
status: AsynStatus::Disconnected,
message: "subport slot has no client".into(),
})?;
stream.write_all(data).map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("write failed: {e}"),
})?;
stream.flush().map_err(|e| AsynError::Status {
status: AsynStatus::Error,
message: format!("flush failed: {e}"),
})?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_basic_ipv4() {
let cfg = IpServerConfig::parse("0.0.0.0:8080").unwrap();
assert_eq!(cfg.bind_host, "0.0.0.0");
assert_eq!(cfg.bind_port, 8080);
assert_eq!(cfg.max_clients, DEFAULT_MAX_CLIENTS);
}
#[test]
fn parse_with_tcp_token() {
let cfg = IpServerConfig::parse("127.0.0.1:5000 TCP").unwrap();
assert_eq!(cfg.bind_host, "127.0.0.1");
assert_eq!(cfg.bind_port, 5000);
}
#[test]
fn parse_rejects_so_reuseport_token() {
assert!(IpServerConfig::parse("0.0.0.0:9000 SO_REUSEPORT").is_err());
assert!(IpServerConfig::parse("0.0.0.0:9000 TCP SO_REUSEPORT").is_err());
}
#[test]
fn parse_ipv6_bracket_form() {
let cfg = IpServerConfig::parse("[::1]:7000").unwrap();
assert_eq!(cfg.bind_host, "::1");
assert_eq!(cfg.bind_port, 7000);
}
#[test]
fn parse_udp_protocol_token() {
let cfg = IpServerConfig::parse("0.0.0.0:7000 UDP").unwrap();
assert_eq!(cfg.protocol, IpServerProtocol::Udp);
let cfg2 = IpServerConfig::parse("0.0.0.0:7000").unwrap();
assert_eq!(cfg2.protocol, IpServerProtocol::Tcp, "default is TCP");
}
#[test]
fn udp_server_receives_datagrams_from_any_peer() {
use std::net::UdpSocket as ClientSock;
let mut srv = DrvAsynIPServerPort::new("udp_srv", "127.0.0.1:0 UDP").unwrap();
srv.connect(&AsynUser::default()).unwrap();
let server_addr = srv
.udp_socket
.lock()
.as_ref()
.unwrap()
.local_addr()
.unwrap();
let c1 = ClientSock::bind("127.0.0.1:0").unwrap();
let c2 = ClientSock::bind("127.0.0.1:0").unwrap();
c1.send_to(b"alpha", server_addr).unwrap();
c2.send_to(b"bravo", server_addr).unwrap();
let user = AsynUser::default()
.with_addr(0)
.with_timeout(Duration::from_secs(2));
let mut got: Vec<String> = Vec::new();
let deadline = std::time::Instant::now() + Duration::from_secs(3);
let mut buf = [0u8; 64];
while got.len() < 2 && std::time::Instant::now() < deadline {
let n = srv.read_octet(&user, &mut buf).unwrap();
if n == 0 {
std::thread::sleep(Duration::from_millis(10));
continue;
}
got.push(String::from_utf8_lossy(&buf[..n]).to_string());
}
got.sort();
assert_eq!(got, vec!["alpha".to_string(), "bravo".to_string()]);
srv.disconnect(&AsynUser::default()).unwrap();
}
#[test]
fn udp_server_write_octet_errors() {
let mut srv = DrvAsynIPServerPort::new("udp_srv2", "127.0.0.1:0 UDP").unwrap();
srv.connect(&AsynUser::default()).unwrap();
let mut user = AsynUser::default().with_addr(0);
let err = srv.write_octet(&mut user, b"x").unwrap_err();
match err {
AsynError::Status { message, .. } => {
assert!(
message.contains("read-only"),
"expected read-only error, got: {message}"
);
}
_ => panic!("wrong error variant"),
}
srv.disconnect(&AsynUser::default()).unwrap();
}
#[test]
fn udp_server_read_returns_zero_when_empty() {
let mut srv = DrvAsynIPServerPort::new("udp_srv3", "127.0.0.1:0 UDP").unwrap();
srv.connect(&AsynUser::default()).unwrap();
let user = AsynUser::default()
.with_addr(0)
.with_timeout(Duration::from_millis(50));
let mut buf = [0u8; 64];
let n = srv.read_octet(&user, &mut buf).unwrap();
assert_eq!(n, 0, "empty UDP cache must return 0 bytes, not error");
srv.disconnect(&AsynUser::default()).unwrap();
}
#[test]
fn udp_server_disconnect_stops_worker_cleanly() {
let mut srv = DrvAsynIPServerPort::new("udp_srv4", "127.0.0.1:0 UDP").unwrap();
srv.connect(&AsynUser::default()).unwrap();
srv.disconnect(&AsynUser::default()).unwrap();
srv.connect(&AsynUser::default()).unwrap();
srv.disconnect(&AsynUser::default()).unwrap();
}
#[test]
fn udp_server_shutdown_joins_recv_worker() {
let mut srv = DrvAsynIPServerPort::new("udp_srv_sd", "127.0.0.1:0 UDP").unwrap();
srv.connect(&AsynUser::default()).unwrap();
assert!(srv.udp_thread.lock().is_some());
let start = std::time::Instant::now();
srv.shutdown().unwrap();
let elapsed = start.elapsed();
assert!(
srv.udp_thread.lock().is_none(),
"shutdown must join and clear the recv worker handle"
);
assert!(
srv.udp_socket.lock().is_none(),
"shutdown must drop the UDP socket"
);
assert!(
elapsed < Duration::from_secs(3),
"shutdown join took too long ({elapsed:?}) — worker did not exit"
);
srv.connect(&AsynUser::default()).unwrap();
srv.disconnect(&AsynUser::default()).unwrap();
}
#[test]
fn tcp_server_shutdown_releases_listener() {
let mut srv = DrvAsynIPServerPort::new("tcp_srv_sd", "127.0.0.1:0").unwrap();
srv.connect(&AsynUser::default()).unwrap();
assert!(srv.listener.lock().is_some());
srv.shutdown().unwrap();
assert!(
srv.listener.lock().is_none(),
"shutdown must drop the TCP listener"
);
}
#[test]
fn parse_rejects_missing_port() {
assert!(IpServerConfig::parse("0.0.0.0").is_err());
}
#[test]
fn parse_rejects_unknown_protocol_token() {
let err = IpServerConfig::parse("0.0.0.0:8080 BOGUS").unwrap_err();
match err {
AsynError::Status { message, .. } => {
assert!(
message.contains("unknown protocol token") || message.contains("BOGUS"),
"msg={message}"
);
}
_ => panic!("expected Status err"),
}
}
#[test]
fn server_accepts_and_round_trips() {
let mut srv = DrvAsynIPServerPort::new("srv1", "127.0.0.1:0").unwrap();
let user = AsynUser::default();
srv.connect(&user).unwrap();
let port = srv.local_port();
assert!(port > 0);
let client_handle = std::thread::spawn(move || {
let mut s = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
s.write_all(b"hello-server").unwrap();
let mut buf = [0u8; 32];
let n = s.read(&mut buf).unwrap();
buf[..n].to_vec()
});
let addr = srv.accept_one().unwrap();
assert_eq!(addr, 0, "first slot");
let mut user = AsynUser::new(0).with_timeout(Duration::from_secs(2));
let mut buf = [0u8; 32];
let n = srv.read_octet(&user, &mut buf).unwrap();
assert_eq!(&buf[..n], b"hello-server");
srv.write_octet(&mut user, b"hello-client").unwrap();
let reply = client_handle.join().unwrap();
assert_eq!(reply, b"hello-client");
}
#[test]
fn slot_table_caps_concurrent_clients() {
let cfg = IpServerConfig {
bind_host: "127.0.0.1".into(),
bind_port: 0,
protocol: IpServerProtocol::Tcp,
max_clients: 2,
read_timeout: None,
};
let mut srv = DrvAsynIPServerPort::with_config("srv2", cfg).unwrap();
srv.connect(&AsynUser::default()).unwrap();
let port = srv.local_port();
let _c1 = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let _c2 = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let _c3 = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
assert_eq!(srv.accept_one().unwrap(), 0);
assert_eq!(srv.accept_one().unwrap(), 1);
let err = srv.accept_one().unwrap_err();
match err {
AsynError::Status { message, .. } => {
assert!(
message.contains("no free client slot"),
"expected slot-full error, got: {message}"
);
}
_ => panic!("wrong error variant"),
}
}
#[test]
fn drop_client_releases_slot() {
let cfg = IpServerConfig {
bind_host: "127.0.0.1".into(),
bind_port: 0,
protocol: IpServerProtocol::Tcp,
max_clients: 1,
read_timeout: None,
};
let mut srv = DrvAsynIPServerPort::with_config("srv3", cfg).unwrap();
srv.connect(&AsynUser::default()).unwrap();
let port = srv.local_port();
let _c1 = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
assert_eq!(srv.accept_one().unwrap(), 0);
assert!(srv.peer(0).is_some());
srv.drop_client(0).unwrap();
assert!(srv.peer(0).is_none());
let _c2 = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
assert_eq!(srv.accept_one().unwrap(), 0);
assert!(srv.peer(0).is_some());
}
#[test]
fn child_port_names_match_c_convention() {
let cfg = IpServerConfig {
bind_host: "127.0.0.1".into(),
bind_port: 0,
protocol: IpServerProtocol::Tcp,
max_clients: 3,
read_timeout: None,
};
let srv = DrvAsynIPServerPort::with_config("parent", cfg).unwrap();
assert_eq!(srv.child_port_name(0), "parent:0");
assert_eq!(srv.child_port_name(1), "parent:1");
assert_eq!(srv.child_port_name(2), "parent:2");
assert_eq!(
srv.child_port_names(),
vec![
"parent:0".to_string(),
"parent:1".to_string(),
"parent:2".to_string()
]
);
}
#[test]
fn make_subport_rejects_out_of_range_idx() {
let cfg = IpServerConfig {
bind_host: "127.0.0.1".into(),
bind_port: 0,
protocol: IpServerProtocol::Tcp,
max_clients: 2,
read_timeout: None,
};
let srv = DrvAsynIPServerPort::with_config("p2", cfg).unwrap();
assert!(srv.make_subport(0).is_ok());
assert!(srv.make_subport(1).is_ok());
match srv.make_subport(2) {
Err(AsynError::Status { message, .. }) => {
assert!(message.contains("out of range"), "msg={message}");
}
Ok(_) => panic!("expected out-of-range error"),
Err(other) => panic!("expected Status error, got {other:?}"),
}
}
#[test]
fn subport_shares_slot_with_parent_after_accept() {
let cfg = IpServerConfig {
bind_host: "127.0.0.1".into(),
bind_port: 0,
protocol: IpServerProtocol::Tcp,
max_clients: 1,
read_timeout: None,
};
let mut srv = DrvAsynIPServerPort::with_config("psh", cfg).unwrap();
srv.connect(&AsynUser::default()).unwrap();
let port = srv.local_port();
let mut sub = srv.make_subport(0).unwrap();
assert!(sub.connect(&AsynUser::default()).is_err());
let client_handle = std::thread::spawn(move || {
let mut c = std::net::TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let mut buf = [0u8; 5];
let _ = c.read(&mut buf).unwrap();
buf
});
assert_eq!(srv.accept_one().unwrap(), 0);
sub.connect(&AsynUser::default()).unwrap();
assert!(sub.peer().is_some());
let mut user = AsynUser::default();
sub.write_octet(&mut user, b"hello").unwrap();
let buf = client_handle.join().unwrap();
assert_eq!(&buf, b"hello");
}
}