Skip to main content

datex_native/com_interfaces/tcp/
tcp_client.rs

1use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_client::TCPClientInterfaceSetupData;
2
3use datex_core::{derive_setup_data, network::{
4    com_hub::errors::ComInterfaceCreateError,
5    com_interfaces::com_interface::{
6        factory::{
7            ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
8        },
9        properties::{InterfaceDirection, ComInterfaceProperties},
10    },
11}};
12use core::{
13     result::Result, str::FromStr,
14};
15use std::net::SocketAddr;
16use std::sync::Arc;
17use futures_util::lock::Mutex;
18use log::{error, warn};
19use tokio::{
20    io::{AsyncReadExt, AsyncWriteExt},
21    net::{TcpStream},
22};
23use datex_core::network::com_interfaces::com_interface::factory::ComInterfaceConfiguration;
24use datex_core::network::com_interfaces::com_interface::factory::{SendCallback, SendFailure, SocketConfiguration, SocketProperties};
25
26
27derive_setup_data!(TCPClientInterfaceSetupDataNative, TCPClientInterfaceSetupData);
28
29
30/// Implementation of the TCP Client Native Interface
31impl TCPClientInterfaceSetupDataNative {
32    async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
33        let address = SocketAddr::from_str(&self.address)
34            .map_err(ComInterfaceCreateError::invalid_setup_data)?;
35
36        let stream = TcpStream::connect(address).await.map_err(|error| {
37            ComInterfaceCreateError::connection_error_with_details(error)
38        })?;
39
40        let (mut read, write) = stream.into_split();
41        let write = Arc::new(Mutex::new(write));
42
43        Ok(ComInterfaceConfiguration::new_single_socket(
44            ComInterfaceProperties {
45                name: Some(self.0.address),
46                ..Self::get_default_properties()
47            },
48            SocketConfiguration::new_in_out(
49                SocketProperties::new(
50                    InterfaceDirection::InOut,
51                    1,
52                ),
53                async gen move {
54                    loop {
55                        let mut buffer = [0u8; 1024];
56                        match read.read(&mut buffer).await {
57                            Ok(0) => {
58                                warn!("Connection closed by peer");
59                                return;
60                            }
61                            Ok(n) => {
62                                yield Ok(buffer[..n].to_vec());
63                            }
64                            Err(e) => {
65                                error!("Failed to read from socket: {e}");
66                                return yield Err(())
67                            }
68                        }
69                    }
70                },
71                SendCallback::new_async(move |block| {
72                    let write = write.clone();
73                    async move {
74                        write
75                            .lock()
76                            .await
77                            .write_all(&block.to_bytes()).await
78                            .map_err(|e| {
79                                error!("WebSocket write error: {e}");
80                                SendFailure(Box::new(block))
81                            })
82                    }
83                }),
84            ),
85        ))
86    }
87}
88
89impl ComInterfaceAsyncFactory for TCPClientInterfaceSetupDataNative {
90    fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
91        Box::pin(self.create_interface())
92    }
93
94    fn get_default_properties() -> ComInterfaceProperties {
95        TCPClientInterfaceSetupData::get_default_properties()
96    }
97}
98
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_client::TCPClientInterfaceSetupData;
104
105    #[tokio::test]
106    async fn test_construct_invalid_address() {
107        
108        const ADDRESS: &str = "1.2.3";
109        let result = TCPClientInterfaceSetupDataNative(TCPClientInterfaceSetupData {
110            address: ADDRESS.to_string(),
111        })
112            .create_interface()
113            .await;
114        assert!(matches!(result, Err(ComInterfaceCreateError::InvalidSetupData(_))));
115    }
116}