datex_native/com_interfaces/tcp/
tcp_server.rs1use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_server::TCPServerInterfaceSetupData;
2use core::net::AddrParseError;
3use datex_core::{derive_setup_data, network::{
4 com_hub::errors::ComInterfaceCreateError,
5 com_interfaces::com_interface::{
6 factory::{
7 ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
8 },
9 properties::{InterfaceDirection, ComInterfaceProperties},
10 },
11}};
12use core::{ result::Result};
13use std::io;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use log::{error, info, warn};
17use tokio::{
18 io::{AsyncReadExt, AsyncWriteExt},
19 net::{
20 TcpListener,
21 tcp::{OwnedReadHalf, OwnedWriteHalf},
22 },
23};
24use datex_core::global::dxb_block::DXBBlock;
25use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceConfiguration, SendCallback, SendFailure, SocketConfiguration, SocketProperties};
26use futures::lock::Mutex;
27
28derive_setup_data!(TCPServerInterfaceSetupDataNative, TCPServerInterfaceSetupData);
29
30impl TCPServerInterfaceSetupDataNative {
31 async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
32 let host = self.host.clone().unwrap_or_else(|| "0.0.0.0".to_string());
33
34 let address: SocketAddr = format!("{}:{}", host, self.port)
35 .parse()
36 .map_err(|e: AddrParseError| {
37 ComInterfaceCreateError::InvalidSetupData(e.to_string())
38 })?;
39
40 let listener = TcpListener::bind(address).await.map_err(|e| {
41 ComInterfaceCreateError::connection_error_with_details(e)
42 })?;
43 info!("TCP Server listening on {address}");
44
45 Ok(ComInterfaceConfiguration::new_multi_socket(
46 ComInterfaceProperties {
47 name: Some(format!("{}:{}", host, self.port)),
48 ..Self::get_default_properties()
49 },
50 async gen move {
51 loop {
52 match Self::get_next_socket_connection(&listener).await {
54 Ok((addr, mut read, write)) => {
55 info!("Accepted new TCP connection from {addr}");
56 yield Ok(SocketConfiguration::new_in_out(
58 SocketProperties::new(InterfaceDirection::InOut, 1),
59 async gen move {
61 loop {
63 let mut buffer = [0u8; 1024];
64 match read.read(&mut buffer).await {
65 Ok(0) => {
66 warn!("Connection closed by peer");
67 return;
68 }
69 Ok(n) => {
70 yield Ok(buffer[..n].to_vec());
71 }
72 Err(e) => {
73 error!("Failed to read from socket: {e}");
74 return yield Err(());
75 }
76 }
77 }
78 },
79 SendCallback::new_async(move |block: DXBBlock| {
81 let write = write.clone();
82 async move {
83 write
84 .lock()
85 .await
86 .write_all(&block.to_bytes())
87 .await
88 .map_err(|e| {
89 error!("TCP write error: {e}");
90 SendFailure(Box::new(block))
91 })
92 }
93 })
94 ));
95 }
96 Err(_) => {
97 continue;
99 }
100 }
101 }
102 },
103 ))
104 }
105
106 async fn get_next_socket_connection(listener: &TcpListener) -> Result<(SocketAddr, OwnedReadHalf, Arc<Mutex<OwnedWriteHalf>>), io::Error> {
107 let (stream, addr) = listener.accept().await?;
108 let (tcp_read_half, tcp_write_half) = stream.into_split();
110 Ok((addr, tcp_read_half, Arc::new(Mutex::new(tcp_write_half))))
111 }
112}
113
114impl ComInterfaceAsyncFactory for TCPServerInterfaceSetupDataNative {
115 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
116 Box::pin(self.create_interface())
117 }
118
119 fn get_default_properties() -> ComInterfaceProperties {
120 TCPServerInterfaceSetupData::get_default_properties()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use std::assert_matches;
127 use datex_core::{
128 network::{
129 com_hub::errors::ComInterfaceCreateError,
130 },
131 };
132 use super::*;
133
134 #[tokio::test]
135 async fn test_construct() {
136
137 const PORT: u16 = 5088;
138 let interface_configuration =
139 TCPServerInterfaceSetupDataNative(TCPServerInterfaceSetupData::new_with_port(PORT))
140 .create_interface()
141 .await
142 .unwrap();
143
144 assert_eq!(
145 interface_configuration.properties.name,
146 Some(format!("0.0.0.0:{}", PORT))
147 );
148 }
149
150 #[tokio::test]
151 async fn test_construct_invalid_address() {
152
153 assert_matches!(
154 TCPServerInterfaceSetupDataNative(TCPServerInterfaceSetupData::new_with_host_and_port(
155 "invalid-address".to_string(),
156 5088
157 ))
158 .create_interface()
159 .await,
160 Err(ComInterfaceCreateError::InvalidSetupData(_))
161 );
162 }
163}