#![expect(
clippy::map_err_ignore,
clippy::missing_errors_doc,
reason = "pre-existing network client implementation debt moved from staged microcrate into hl7v2; cleanup is split from topology collapse"
)]
use super::codec::MllpCodec;
use crate::{Message, parse, write};
use bytes::BytesMut;
use futures::prelude::*;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_util::codec::Framed;
#[derive(Debug, Clone)]
pub struct MllpClientConfig {
pub connect_timeout: Duration,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub max_frame_size: usize,
}
impl Default for MllpClientConfig {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(10),
read_timeout: Duration::from_secs(30),
write_timeout: Duration::from_secs(30),
max_frame_size: 10 * 1024 * 1024, }
}
}
pub struct MllpClient {
config: MllpClientConfig,
framed: Option<Framed<TcpStream, MllpCodec>>,
peer_addr: Option<SocketAddr>,
}
impl MllpClient {
pub fn new(config: MllpClientConfig) -> Self {
Self {
config,
framed: None,
peer_addr: None,
}
}
pub fn with_default_config() -> Self {
Self::new(MllpClientConfig::default())
}
pub async fn connect(&mut self, addr: impl Into<SocketAddr>) -> Result<(), std::io::Error> {
let addr = addr.into();
let stream = timeout(self.config.connect_timeout, TcpStream::connect(addr))
.await
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::TimedOut, "Connection timeout")
})??;
let codec = MllpCodec::with_max_frame_size(self.config.max_frame_size);
self.framed = Some(Framed::new(stream, codec));
self.peer_addr = Some(addr);
Ok(())
}
pub fn is_connected(&self) -> bool {
self.framed.is_some()
}
pub fn peer_addr(&self) -> Option<SocketAddr> {
self.peer_addr
}
pub async fn send_message(&mut self, message: &Message) -> Result<Message, std::io::Error> {
let framed = self.framed.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected")
})?;
let bytes = write(message);
timeout(
self.config.write_timeout,
framed.send(BytesMut::from(&bytes[..])),
)
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??;
let response = timeout(self.config.read_timeout, framed.next())
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Read timeout"))?
.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Connection closed")
})??;
let ack = parse(&response)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
Ok(ack)
}
pub async fn send_message_no_ack(&mut self, message: &Message) -> Result<(), std::io::Error> {
let framed = self.framed.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected")
})?;
let bytes = write(message);
timeout(
self.config.write_timeout,
framed.send(BytesMut::from(&bytes[..])),
)
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??;
Ok(())
}
pub async fn receive_message(&mut self) -> Result<Option<Message>, std::io::Error> {
let framed = self.framed.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected")
})?;
match timeout(self.config.read_timeout, framed.next()).await {
Ok(Some(Ok(frame))) => {
let message = parse(&frame).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
Ok(Some(message))
}
Ok(Some(Err(e))) => Err(e),
Ok(None) => Ok(None), Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Read timeout",
)),
}
}
pub async fn close(mut self) -> Result<(), std::io::Error> {
if let Some(framed) = self.framed.take() {
let stream = framed.into_inner();
drop(stream);
}
Ok(())
}
pub async fn disconnect(&mut self) -> Result<(), std::io::Error> {
if let Some(framed) = self.framed.take() {
let stream = framed.into_inner();
drop(stream);
}
self.peer_addr = None;
Ok(())
}
}
pub struct MllpClientBuilder {
config: MllpClientConfig,
}
impl MllpClientBuilder {
pub fn new() -> Self {
Self {
config: MllpClientConfig::default(),
}
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.config.connect_timeout = timeout;
self
}
pub fn read_timeout(mut self, timeout: Duration) -> Self {
self.config.read_timeout = timeout;
self
}
pub fn write_timeout(mut self, timeout: Duration) -> Self {
self.config.write_timeout = timeout;
self
}
pub fn max_frame_size(mut self, size: usize) -> Self {
self.config.max_frame_size = size;
self
}
pub fn build(self) -> MllpClient {
MllpClient::new(self.config)
}
}
impl Default for MllpClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#![expect(
clippy::unwrap_used,
reason = "pre-existing network client test debt moved into hl7v2; cleanup is split from topology collapse"
)]
use super::*;
#[test]
fn test_client_builder() {
let client = MllpClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.read_timeout(Duration::from_secs(15))
.write_timeout(Duration::from_secs(15))
.max_frame_size(1024 * 1024)
.build();
assert_eq!(client.config.connect_timeout, Duration::from_secs(5));
assert_eq!(client.config.read_timeout, Duration::from_secs(15));
assert_eq!(client.config.write_timeout, Duration::from_secs(15));
assert_eq!(client.config.max_frame_size, 1024 * 1024);
}
#[test]
fn test_client_not_connected() {
let client = MllpClient::with_default_config();
assert!(!client.is_connected());
assert!(client.peer_addr().is_none());
}
#[tokio::test]
async fn test_client_connect_timeout() {
use std::net::SocketAddr;
let mut client = MllpClientBuilder::new()
.connect_timeout(Duration::from_millis(1))
.build();
let addr: SocketAddr = "192.0.2.1:2575".parse().unwrap();
let result = client.connect(addr).await;
assert!(result.is_err());
}
}