Skip to main content

datex_native/com_interfaces/tcp/
tcp_server.rs

1use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_server::TCPServerInterfaceSetupData;
2use core::net::AddrParseError;
3use datex_core::{derive_setup_data, network::{
4    com_hub::errors::ComInterfaceCreateError,
5    com_interfaces::com_interface::{
6        factory::{
7            ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
8        },
9        properties::{InterfaceDirection, ComInterfaceProperties},
10    },
11}};
12use core::{ result::Result};
13use std::io;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use log::{error, info, warn};
17use tokio::{
18    io::{AsyncReadExt, AsyncWriteExt},
19    net::{
20        TcpListener,
21        tcp::{OwnedReadHalf, OwnedWriteHalf},
22    },
23};
24use datex_core::global::dxb_block::DXBBlock;
25use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceConfiguration, SendCallback, SendFailure, SocketConfiguration, SocketProperties};
26use futures::lock::Mutex;
27
28derive_setup_data!(TCPServerInterfaceSetupDataNative, TCPServerInterfaceSetupData);
29
30impl TCPServerInterfaceSetupDataNative {
31    async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
32        let host = self.host.clone().unwrap_or_else(|| "0.0.0.0".to_string());
33
34        let address: SocketAddr = format!("{}:{}", host, self.port)
35            .parse()
36            .map_err(|e: AddrParseError| {
37                ComInterfaceCreateError::InvalidSetupData(e.to_string())
38            })?;
39
40        let listener = TcpListener::bind(address).await.map_err(|e| {
41            ComInterfaceCreateError::connection_error_with_details(e)
42        })?;
43        info!("TCP Server listening on {address}");
44
45        Ok(ComInterfaceConfiguration::new_multi_socket(
46            ComInterfaceProperties {
47                name: Some(format!("{}:{}", host, self.port)),
48                ..Self::get_default_properties()
49            },
50            async gen move {
51                loop {
52                    // get next websocket connection
53                    match Self::get_next_socket_connection(&listener).await {
54                        Ok((addr, mut read, write)) => {
55                            info!("Accepted new TCP connection from {addr}");
56                            // yield new socket data
57                            yield Ok(SocketConfiguration::new_in_out(
58                                SocketProperties::new(InterfaceDirection::InOut, 1),
59                                // socket incoming blocks iterator
60                                async gen move {
61                                    // read blocks
62                                    loop {
63                                        let mut buffer = [0u8; 1024];
64                                        match read.read(&mut buffer).await {
65                                            Ok(0) => {
66                                                warn!("Connection closed by peer");
67                                                return;
68                                            }
69                                            Ok(n) => {
70                                                yield Ok(buffer[..n].to_vec());
71                                            }
72                                            Err(e) => {
73                                                error!("Failed to read from socket: {e}");
74                                                return yield Err(());
75                                            }
76                                        }
77                                    }
78                                },
79                                // socket send callback
80                                SendCallback::new_async(move |block: DXBBlock| {
81                                    let write = write.clone();
82                                    async move {
83                                        write
84                                            .lock()
85                                            .await
86                                            .write_all(&block.to_bytes())
87                                            .await
88                                            .map_err(|e| {
89                                                error!("TCP write error: {e}");
90                                                SendFailure(Box::new(block))
91                                            })
92                                    }
93                                })
94                            ));
95                        }
96                        Err(_) => {
97                            // Failed to accept connection, continue to next
98                            continue;
99                        }
100                    }
101                }
102            },
103        ))
104    }
105
106    async fn get_next_socket_connection(listener: &TcpListener) -> Result<(SocketAddr, OwnedReadHalf, Arc<Mutex<OwnedWriteHalf>>), io::Error> {
107        let (stream, addr) = listener.accept().await?;
108        // Handle the client connection
109        let (tcp_read_half, tcp_write_half) = stream.into_split();
110        Ok((addr, tcp_read_half, Arc::new(Mutex::new(tcp_write_half))))
111    }
112}
113
114impl ComInterfaceAsyncFactory for TCPServerInterfaceSetupDataNative {
115    fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
116        Box::pin(self.create_interface())
117    }
118
119    fn get_default_properties() -> ComInterfaceProperties {
120        TCPServerInterfaceSetupData::get_default_properties()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use std::assert_matches;
127    use datex_core::{
128        network::{
129            com_hub::errors::ComInterfaceCreateError,
130        },
131    };
132    use super::*;
133
134    #[tokio::test]
135    async fn test_construct() {
136        
137        const PORT: u16 = 5088;
138        let interface_configuration =
139            TCPServerInterfaceSetupDataNative(TCPServerInterfaceSetupData::new_with_port(PORT))
140                .create_interface()
141                .await
142                .unwrap();
143
144        assert_eq!(
145            interface_configuration.properties.name,
146            Some(format!("0.0.0.0:{}", PORT))
147        );
148    }
149
150    #[tokio::test]
151    async fn test_construct_invalid_address() {
152        
153        assert_matches!(
154            TCPServerInterfaceSetupDataNative(TCPServerInterfaceSetupData::new_with_host_and_port(
155                "invalid-address".to_string(),
156                5088
157            ))
158            .create_interface()
159            .await,
160            Err(ComInterfaceCreateError::InvalidSetupData(_))
161        );
162    }
163}