use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use rns_core::constants;
use rns_core::transport::types::{IngressControlConfig, InterfaceId, InterfaceInfo};
use crate::event::{Event, EventSender};
use crate::hdlc;
use crate::interface::{ListenerControl, Writer};
#[derive(Debug, Clone)]
pub struct TcpServerConfig {
pub name: String,
pub listen_ip: String,
pub listen_port: u16,
pub interface_id: InterfaceId,
pub max_connections: Option<usize>,
pub ingress_control: IngressControlConfig,
pub runtime: Arc<Mutex<TcpServerRuntime>>,
}
#[derive(Debug, Clone)]
pub struct TcpServerRuntime {
pub max_connections: Option<usize>,
}
impl TcpServerRuntime {
pub fn from_config(config: &TcpServerConfig) -> Self {
Self {
max_connections: config.max_connections,
}
}
}
#[derive(Debug, Clone)]
pub struct TcpServerRuntimeConfigHandle {
pub interface_name: String,
pub runtime: Arc<Mutex<TcpServerRuntime>>,
pub startup: TcpServerRuntime,
}
impl Default for TcpServerConfig {
fn default() -> Self {
let mut config = TcpServerConfig {
name: String::new(),
listen_ip: "0.0.0.0".into(),
listen_port: 4242,
interface_id: InterfaceId(0),
max_connections: None,
ingress_control: IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(TcpServerRuntime {
max_connections: None,
})),
};
let startup = TcpServerRuntime::from_config(&config);
config.runtime = Arc::new(Mutex::new(startup));
config
}
}
struct TcpServerWriter {
stream: TcpStream,
}
impl Writer for TcpServerWriter {
fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
self.stream.write_all(&hdlc::frame(data))
}
}
pub fn start(
config: TcpServerConfig,
tx: EventSender,
next_id: Arc<AtomicU64>,
) -> io::Result<ListenerControl> {
let addr = format!("{}:{}", config.listen_ip, config.listen_port);
let listener = TcpListener::bind(&addr)?;
listener.set_nonblocking(true)?;
log::info!("[{}] TCP server listening on {}", config.name, addr);
let name = config.name.clone();
let runtime = Arc::clone(&config.runtime);
let ingress_control = config.ingress_control;
let active_connections = Arc::new(AtomicUsize::new(0));
let control = ListenerControl::new();
let listener_control = control.clone();
thread::Builder::new()
.name(format!("tcp-server-{}", config.interface_id.0))
.spawn(move || {
listener_loop(
listener,
name,
tx,
next_id,
runtime,
ingress_control,
active_connections,
listener_control,
);
})?;
Ok(control)
}
fn listener_loop(
listener: TcpListener,
name: String,
tx: EventSender,
next_id: Arc<AtomicU64>,
runtime: Arc<Mutex<TcpServerRuntime>>,
ingress_control: IngressControlConfig,
active_connections: Arc<AtomicUsize>,
control: ListenerControl,
) {
loop {
if control.should_stop() {
log::info!("[{}] listener stopping", name);
return;
}
let stream_result = listener.accept().map(|(stream, _)| stream);
let stream = match stream_result {
Ok(s) => s,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(std::time::Duration::from_millis(50));
continue;
}
Err(e) => {
log::warn!("[{}] accept failed: {}", name, e);
continue;
}
};
let max_connections = runtime.lock().unwrap().max_connections;
if let Some(max) = max_connections {
if active_connections.load(Ordering::Relaxed) >= max {
let peer = stream.peer_addr().ok();
log::warn!(
"[{}] max connections ({}) reached, rejecting {:?}",
name,
max,
peer
);
drop(stream);
continue;
}
}
active_connections.fetch_add(1, Ordering::Relaxed);
let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
let peer_addr = stream.peer_addr().ok();
log::info!(
"[{}] client connected: {:?} → id {}",
name,
peer_addr,
client_id.0
);
if let Err(e) = stream.set_nodelay(true) {
log::warn!("[{}] set_nodelay failed: {}", name, e);
}
let writer_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
log::warn!("[{}] failed to clone stream: {}", name, e);
continue;
}
};
let writer: Box<dyn Writer> = Box::new(TcpServerWriter {
stream: writer_stream,
});
let info = InterfaceInfo {
id: client_id,
name: format!("TCPServerInterface/Client-{}", client_id.0),
mode: constants::MODE_FULL,
out_capable: true,
in_capable: true,
bitrate: None,
announce_rate_target: None,
announce_rate_grace: 0,
announce_rate_penalty: 0.0,
announce_cap: constants::ANNOUNCE_CAP,
is_local_client: false,
wants_tunnel: false,
tunnel_id: None,
mtu: 65535,
ia_freq: 0.0,
started: 0.0,
ingress_control,
};
if tx
.send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
.is_err()
{
return;
}
let client_tx = tx.clone();
let client_name = name.clone();
let client_active = active_connections.clone();
thread::Builder::new()
.name(format!("tcp-server-reader-{}", client_id.0))
.spawn(move || {
client_reader_loop(stream, client_id, client_name, client_tx, client_active);
})
.ok();
}
}
fn client_reader_loop(
mut stream: TcpStream,
id: InterfaceId,
name: String,
tx: EventSender,
active_connections: Arc<AtomicUsize>,
) {
let mut decoder = hdlc::Decoder::new();
let mut buf = [0u8; 4096];
loop {
match stream.read(&mut buf) {
Ok(0) => {
log::info!("[{}] client {} disconnected", name, id.0);
active_connections.fetch_sub(1, Ordering::Relaxed);
let _ = tx.send(Event::InterfaceDown(id));
return;
}
Ok(n) => {
for frame in decoder.feed(&buf[..n]) {
if tx
.send(Event::Frame {
interface_id: id,
data: frame,
})
.is_err()
{
active_connections.fetch_sub(1, Ordering::Relaxed);
return;
}
}
}
Err(e) => {
log::warn!("[{}] client {} read error: {}", name, id.0, e);
active_connections.fetch_sub(1, Ordering::Relaxed);
let _ = tx.send(Event::InterfaceDown(id));
return;
}
}
}
}
use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
use std::collections::HashMap;
pub struct TcpServerFactory;
impl InterfaceFactory for TcpServerFactory {
fn type_name(&self) -> &str {
"TCPServerInterface"
}
fn parse_config(
&self,
name: &str,
id: InterfaceId,
params: &HashMap<String, String>,
) -> Result<Box<dyn InterfaceConfigData>, String> {
let listen_ip = params
.get("listen_ip")
.cloned()
.unwrap_or_else(|| "0.0.0.0".into());
let listen_port = params
.get("listen_port")
.and_then(|v| v.parse().ok())
.unwrap_or(4242);
let max_connections = params.get("max_connections").and_then(|v| v.parse().ok());
let mut config = TcpServerConfig {
name: name.to_string(),
listen_ip,
listen_port,
interface_id: id,
max_connections,
ingress_control: IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(TcpServerRuntime {
max_connections: None,
})),
};
let startup = TcpServerRuntime::from_config(&config);
config.runtime = Arc::new(Mutex::new(startup));
Ok(Box::new(config))
}
fn start(
&self,
config: Box<dyn InterfaceConfigData>,
ctx: StartContext,
) -> io::Result<StartResult> {
let mut cfg = *config
.into_any()
.downcast::<TcpServerConfig>()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
cfg.ingress_control = ctx.ingress_control;
let control = start(cfg, ctx.tx, ctx.next_dynamic_id)?;
Ok(StartResult::Listener {
control: Some(control),
})
}
}
pub(crate) fn runtime_handle_from_config(config: &TcpServerConfig) -> TcpServerRuntimeConfigHandle {
TcpServerRuntimeConfigHandle {
interface_name: config.name.clone(),
runtime: Arc::clone(&config.runtime),
startup: TcpServerRuntime::from_config(config),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::TcpStream;
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;
fn find_free_port() -> u16 {
TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port()
}
fn make_server_config(
port: u16,
interface_id: u64,
max_connections: Option<usize>,
) -> TcpServerConfig {
let mut config = TcpServerConfig {
name: "test-server".into(),
listen_ip: "127.0.0.1".into(),
listen_port: port,
interface_id: InterfaceId(interface_id),
max_connections,
ingress_control: IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(TcpServerRuntime {
max_connections: None,
})),
};
let startup = TcpServerRuntime::from_config(&config);
config.runtime = Arc::new(Mutex::new(startup));
config
}
#[test]
fn accept_connection() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(1000));
let config = make_server_config(port, 1, None);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
Event::InterfaceUp(id, writer, info) => {
assert_eq!(id, InterfaceId(1000));
assert!(writer.is_some());
assert!(info.is_some());
}
other => panic!("expected InterfaceUp, got {:?}", other),
}
}
#[test]
fn spawned_client_inherits_ingress_control_config() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(1100));
let mut config = make_server_config(port, 11, None);
config.ingress_control = IngressControlConfig::disabled();
config.ingress_control.max_held_announces = 17;
config.ingress_control.burst_hold = 1.5;
config.ingress_control.burst_freq_new = 2.5;
config.ingress_control.burst_freq = 3.5;
config.ingress_control.new_time = 4.5;
config.ingress_control.burst_penalty = 5.5;
config.ingress_control.held_release_interval = 6.5;
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let _client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
Event::InterfaceUp(_, _, Some(info)) => {
assert!(!info.ingress_control.enabled);
assert_eq!(info.ingress_control.max_held_announces, 17);
assert_eq!(info.ingress_control.burst_hold, 1.5);
assert_eq!(info.ingress_control.burst_freq_new, 2.5);
assert_eq!(info.ingress_control.burst_freq, 3.5);
assert_eq!(info.ingress_control.new_time, 4.5);
assert_eq!(info.ingress_control.burst_penalty, 5.5);
assert_eq!(info.ingress_control.held_release_interval, 6.5);
}
other => panic!("expected InterfaceUp with InterfaceInfo, got {:?}", other),
}
}
#[test]
fn listener_stop_prevents_new_accepts() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(1500));
let config = make_server_config(port, 15, None);
let control = start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
control.request_stop();
thread::sleep(Duration::from_millis(120));
let connect_result = TcpStream::connect(format!("127.0.0.1:{}", port));
if let Ok(stream) = connect_result {
drop(stream);
}
match rx.recv_timeout(Duration::from_millis(200)) {
Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => {}
other => panic!(
"expected no InterfaceUp after listener stop, got {:?}",
other
),
}
}
#[test]
fn receive_frame_from_client() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(2000));
let config = make_server_config(port, 2, None);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let payload: Vec<u8> = (0..32).collect();
let framed = hdlc::frame(&payload);
client.write_all(&framed).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
Event::Frame { interface_id, data } => {
assert_eq!(interface_id, InterfaceId(2000));
assert_eq!(data, payload);
}
other => panic!("expected Frame, got {:?}", other),
}
}
#[test]
fn send_frame_to_client() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(3000));
let config = make_server_config(port, 3, None);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let mut client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
client
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let event = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let mut writer = match event {
Event::InterfaceUp(_, Some(w), _) => w,
other => panic!("expected InterfaceUp with writer, got {:?}", other),
};
let payload: Vec<u8> = (0..24).collect();
writer.send_frame(&payload).unwrap();
let mut buf = [0u8; 256];
let n = client.read(&mut buf).unwrap();
let expected = hdlc::frame(&payload);
assert_eq!(&buf[..n], &expected[..]);
}
#[test]
fn multiple_clients() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(4000));
let config = make_server_config(port, 4, None);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let mut ids = Vec::new();
for _ in 0..2 {
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
Event::InterfaceUp(id, _, _) => ids.push(id),
other => panic!("expected InterfaceUp, got {:?}", other),
}
}
assert_eq!(ids.len(), 2);
assert_ne!(ids[0], ids[1]);
}
#[test]
fn client_disconnect() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(5000));
let config = make_server_config(port, 5, None);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let client = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
drop(client);
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(
matches!(event, Event::InterfaceDown(InterfaceId(5000))),
"expected InterfaceDown(5000), got {:?}",
event
);
}
#[test]
fn server_bind_port() {
let port = find_free_port();
let (tx, _rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(6000));
let config = make_server_config(port, 6, None);
start(config, tx, next_id).unwrap();
}
#[test]
fn max_connections_rejects_excess() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(7000));
let config = make_server_config(port, 7, Some(2));
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
for _ in 0..2 {
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(matches!(event, Event::InterfaceUp(_, _, _)));
}
let client3 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
client3
.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();
thread::sleep(Duration::from_millis(100));
let result = rx.recv_timeout(Duration::from_millis(500));
assert!(
result.is_err(),
"expected no InterfaceUp for rejected connection, got {:?}",
result
);
}
#[test]
fn max_connections_allows_after_disconnect() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(7100));
let config = make_server_config(port, 71, Some(1));
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(matches!(event, Event::InterfaceUp(_, _, _)));
drop(client1);
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(matches!(event, Event::InterfaceDown(_)));
let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(
matches!(event, Event::InterfaceUp(_, _, _)),
"expected InterfaceUp after slot freed, got {:?}",
event
);
}
#[test]
fn runtime_max_connections_updates_live() {
let port = find_free_port();
let (tx, rx) = crate::event::channel();
let next_id = Arc::new(AtomicU64::new(7200));
let config = make_server_config(port, 72, None);
let runtime = Arc::clone(&config.runtime);
start(config, tx, next_id).unwrap();
thread::sleep(Duration::from_millis(50));
let _client1 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(matches!(event, Event::InterfaceUp(_, _, _)));
{
let mut runtime = runtime.lock().unwrap();
runtime.max_connections = Some(1);
}
let _client2 = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let result = rx.recv_timeout(Duration::from_millis(400));
assert!(
result.is_err(),
"expected no InterfaceUp after lowering max_connections, got {:?}",
result
);
}
}