#![expect(
clippy::map_err_ignore,
clippy::missing_errors_doc,
clippy::uninlined_format_args,
reason = "pre-existing network server implementation debt moved from staged microcrate into hl7v2; cleanup is split from topology collapse"
)]
use super::codec::MllpCodec;
use crate::{Error, Message, parse, write};
use bytes::BytesMut;
use futures::prelude::*;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
use tokio_util::codec::Framed;
#[derive(Debug, Clone)]
pub struct MllpServerConfig {
pub read_timeout: Duration,
pub write_timeout: Duration,
pub max_frame_size: usize,
pub backlog: u32,
pub max_concurrent_connections: usize,
pub ack_timing: AckTimingPolicy,
}
impl Default for MllpServerConfig {
fn default() -> Self {
Self {
read_timeout: Duration::from_secs(30),
write_timeout: Duration::from_secs(30),
max_frame_size: 10 * 1024 * 1024, backlog: 128,
max_concurrent_connections: 100,
ack_timing: AckTimingPolicy::Immediate,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AckTimingPolicy {
Immediate,
Delayed(Duration),
OnDemand,
}
pub trait MessageHandler: Send + Sync {
fn handle_message(
&self,
message: Message,
) -> impl std::future::Future<Output = Result<Option<Message>, Error>> + Send;
}
pub struct MllpServer {
config: MllpServerConfig,
listener: Option<TcpListener>,
}
impl MllpServer {
pub fn new(config: MllpServerConfig) -> Self {
Self {
config,
listener: None,
}
}
pub fn with_default_config() -> Self {
Self::new(MllpServerConfig::default())
}
pub async fn bind(&mut self, addr: impl Into<SocketAddr>) -> Result<(), std::io::Error> {
let addr = addr.into();
let listener = TcpListener::bind(addr).await?;
self.listener = Some(listener);
Ok(())
}
pub fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
self.listener
.as_ref()
.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound")
})?
.local_addr()
}
pub async fn run<H: MessageHandler + 'static>(
&mut self,
handler: H,
) -> Result<(), std::io::Error> {
let listener = self.listener.as_ref().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound")
})?;
let handler = std::sync::Arc::new(handler);
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(
self.config.max_concurrent_connections,
));
loop {
let permit = semaphore
.clone()
.acquire_owned()
.await
.map_err(|e| std::io::Error::other(format!("Semaphore error: {}", e)))?;
let (stream, peer_addr) = listener.accept().await?;
let handler = handler.clone();
let config = self.config.clone();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = handle_connection(stream, peer_addr, handler, config).await {
eprintln!("Error handling connection from {}: {}", peer_addr, e);
}
});
}
}
pub async fn accept(&mut self) -> Result<MllpConnection, std::io::Error> {
let listener = self.listener.as_ref().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound")
})?;
let (stream, peer_addr) = listener.accept().await?;
Ok(MllpConnection::new(stream, peer_addr, self.config.clone()))
}
}
async fn handle_connection<H: MessageHandler>(
stream: TcpStream,
peer_addr: SocketAddr,
handler: std::sync::Arc<H>,
config: MllpServerConfig,
) -> Result<(), std::io::Error> {
let codec = MllpCodec::with_max_frame_size(config.max_frame_size);
let mut framed = Framed::new(stream, codec);
while let Some(result) = framed.next().await {
match result {
Ok(frame) => {
let parse_result = timeout(config.read_timeout, async {
parse(&frame).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})
})
.await;
let message = match parse_result {
Ok(Ok(msg)) => msg,
Ok(Err(e)) => {
eprintln!("Failed to parse message from {}: {}", peer_addr, e);
continue;
}
Err(_) => {
eprintln!("Timeout parsing message from {}", peer_addr);
continue;
}
};
let ack = match handler.handle_message(message).await {
Ok(Some(ack)) => ack,
Ok(None) => continue, Err(e) => {
eprintln!("Error handling message from {}: {}", peer_addr, e);
continue;
}
};
match config.ack_timing {
AckTimingPolicy::Immediate => {
let ack_bytes = write(&ack);
if let Err(e) = framed.send(BytesMut::from(&ack_bytes[..])).await {
eprintln!("Failed to send ACK to {}: {}", peer_addr, e);
break;
}
}
AckTimingPolicy::Delayed(delay) => {
tokio::time::sleep(delay).await;
let ack_bytes = write(&ack);
if let Err(e) = framed.send(BytesMut::from(&ack_bytes[..])).await {
eprintln!("Failed to send ACK to {}: {}", peer_addr, e);
break;
}
}
AckTimingPolicy::OnDemand => {
}
}
}
Err(e) => {
eprintln!("Error reading frame from {}: {}", peer_addr, e);
break;
}
}
}
Ok(())
}
pub struct MllpConnection {
framed: Framed<TcpStream, MllpCodec>,
peer_addr: SocketAddr,
config: MllpServerConfig,
}
impl MllpConnection {
pub fn new(stream: TcpStream, peer_addr: SocketAddr, config: MllpServerConfig) -> Self {
let codec = MllpCodec::with_max_frame_size(config.max_frame_size);
let framed = Framed::new(stream, codec);
Self {
framed,
peer_addr,
config,
}
}
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr
}
pub async fn receive_message(&mut self) -> Result<Option<Message>, std::io::Error> {
match timeout(self.config.read_timeout, self.framed.next()).await {
Ok(Some(Ok(frame))) => {
let message = parse(&frame).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
Ok(Some(message))
}
Ok(Some(Err(e))) => Err(e),
Ok(None) => Ok(None), Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Read timeout",
)),
}
}
pub async fn send_message(&mut self, message: &Message) -> Result<(), std::io::Error> {
let bytes = write(message);
timeout(
self.config.write_timeout,
self.framed.send(BytesMut::from(&bytes[..])),
)
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??;
Ok(())
}
pub async fn close(self) -> Result<(), std::io::Error> {
let stream = self.framed.into_inner();
drop(stream);
Ok(())
}
}
#[cfg(test)]
mod tests {
#![expect(
clippy::assertions_on_result_states,
clippy::unwrap_used,
reason = "pre-existing network server test debt moved into hl7v2; cleanup is split from topology collapse"
)]
use super::*;
#[expect(
dead_code,
reason = "test handler is retained as a fixture for future server behavior expansion"
)]
struct TestHandler;
impl MessageHandler for TestHandler {
async fn handle_message(&self, _message: Message) -> Result<Option<Message>, Error> {
Ok(None)
}
}
#[tokio::test]
async fn test_server_bind() {
use std::net::SocketAddr;
let mut server = MllpServer::with_default_config();
let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let result = server.bind(bind_addr).await;
assert!(result.is_ok());
let addr = server.local_addr();
assert!(addr.is_ok());
}
#[tokio::test]
async fn test_connection_timeout() {
let config = MllpServerConfig {
read_timeout: Duration::from_millis(100),
..Default::default()
};
assert_eq!(config.read_timeout, Duration::from_millis(100));
}
}