datex_native/com_interfaces/tcp/
tcp_client.rs1use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_client::TCPClientInterfaceSetupData;
2
3use datex_core::{derive_setup_data, network::{
4 com_hub::errors::ComInterfaceCreateError,
5 com_interfaces::com_interface::{
6 factory::{
7 ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
8 },
9 properties::{InterfaceDirection, ComInterfaceProperties},
10 },
11}};
12use core::{
13 result::Result, str::FromStr,
14};
15use std::net::SocketAddr;
16use std::sync::Arc;
17use futures_util::lock::Mutex;
18use log::{error, warn};
19use tokio::{
20 io::{AsyncReadExt, AsyncWriteExt},
21 net::{TcpStream},
22};
23use datex_core::network::com_interfaces::com_interface::factory::ComInterfaceConfiguration;
24use datex_core::network::com_interfaces::com_interface::factory::{SendCallback, SendFailure, SocketConfiguration, SocketProperties};
25
26
27derive_setup_data!(TCPClientInterfaceSetupDataNative, TCPClientInterfaceSetupData);
28
29
30impl TCPClientInterfaceSetupDataNative {
32 async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
33 let address = SocketAddr::from_str(&self.address)
34 .map_err(ComInterfaceCreateError::invalid_setup_data)?;
35
36 let stream = TcpStream::connect(address).await.map_err(|error| {
37 ComInterfaceCreateError::connection_error_with_details(error)
38 })?;
39
40 let (mut read, write) = stream.into_split();
41 let write = Arc::new(Mutex::new(write));
42
43 Ok(ComInterfaceConfiguration::new_single_socket(
44 ComInterfaceProperties {
45 name: Some(self.0.address),
46 ..Self::get_default_properties()
47 },
48 SocketConfiguration::new_in_out(
49 SocketProperties::new(
50 InterfaceDirection::InOut,
51 1,
52 ),
53 async gen move {
54 loop {
55 let mut buffer = [0u8; 1024];
56 match read.read(&mut buffer).await {
57 Ok(0) => {
58 warn!("Connection closed by peer");
59 return;
60 }
61 Ok(n) => {
62 yield Ok(buffer[..n].to_vec());
63 }
64 Err(e) => {
65 error!("Failed to read from socket: {e}");
66 return yield Err(())
67 }
68 }
69 }
70 },
71 SendCallback::new_async(move |block| {
72 let write = write.clone();
73 async move {
74 write
75 .lock()
76 .await
77 .write_all(&block.to_bytes()).await
78 .map_err(|e| {
79 error!("WebSocket write error: {e}");
80 SendFailure(Box::new(block))
81 })
82 }
83 }),
84 ),
85 ))
86 }
87}
88
89impl ComInterfaceAsyncFactory for TCPClientInterfaceSetupDataNative {
90 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
91 Box::pin(self.create_interface())
92 }
93
94 fn get_default_properties() -> ComInterfaceProperties {
95 TCPClientInterfaceSetupData::get_default_properties()
96 }
97}
98
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use datex_core::network::com_interfaces::default_setup_data::tcp::tcp_client::TCPClientInterfaceSetupData;
104
105 #[tokio::test]
106 async fn test_construct_invalid_address() {
107
108 const ADDRESS: &str = "1.2.3";
109 let result = TCPClientInterfaceSetupDataNative(TCPClientInterfaceSetupData {
110 address: ADDRESS.to_string(),
111 })
112 .create_interface()
113 .await;
114 assert!(matches!(result, Err(ComInterfaceCreateError::InvalidSetupData(_))));
115 }
116}