datex_native/com_interfaces/websocket/
websocket_client.rs1use datex_core::{derive_setup_data};
2use core::{ result::Result};
3use std::sync::Arc;
4use futures_util::{
5 SinkExt, StreamExt,
6 stream::{SplitSink, SplitStream},
7};
8use log::{error, info, warn};
9use tokio::net::TcpStream;
10use tungstenite::Message;
11use url::Url;
12use futures::lock::Mutex;
13
14use datex_core::network::com_interfaces::default_setup_data::websocket::websocket_client::{WebSocketClientInterfaceSetupData};
15use datex_core::{
16 network::{
17 com_hub::errors::ComInterfaceCreateError,
18 com_interfaces::com_interface::{
19 factory::{
20 ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
21 },
22 properties::{InterfaceDirection, ComInterfaceProperties},
23 },
24 },
25};
26use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
27use datex_core::network::com_interfaces::default_setup_data::http_common::parse_url;
28use datex_core::global::dxb_block::DXBBlock;
29use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceConfiguration, SocketConfiguration, SendCallback, SendFailure, SocketProperties};
30
31derive_setup_data!(WebSocketClientInterfaceSetupDataNative, WebSocketClientInterfaceSetupData);
32
33impl WebSocketClientInterfaceSetupDataNative {
34 async fn create_interface(
35 self,
36 ) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
37 let (_address, write, mut read) =
38 self.create_websocket_client_connection().await?;
39 let write = Arc::new(Mutex::new(write));
40
41 Ok(
42 ComInterfaceConfiguration::new_single_socket(
43 ComInterfaceProperties {
44 name: Some(self.url.clone()),
45 ..Self::get_default_properties()
46 },
47 SocketConfiguration::new_in_out(
48 SocketProperties::new(InterfaceDirection::InOut, 1),
49 async gen move {
50 loop {
51 match read.next().await {
52 Some(Ok(Message::Binary(data))) => {
53 yield Ok(data);
54 }
55 Some(Ok(_)) => {
56 error!("Invalid message type received");
57 return yield Err(());
58 }
59 Some(Err(e)) => {
60 error!("WebSocket read error: {e}");
61 return yield Err(());
62 }
63 None => {
64 return;
65 }
66 }
67 }
68 },
69 SendCallback::new_async(move |block: DXBBlock| {
70 let write = write.clone();
71 async move {
72 write
73 .lock()
74 .await
75 .send(Message::Binary(block.to_bytes())).await
76 .map_err(|e| {
77 error!("WebSocket write error: {e}");
78 SendFailure(Box::new(block))
79 })
80 }
81 })
82 )
83 )
84 )
85 }
86
87 async fn create_websocket_client_connection(
89 &self,
90 ) -> Result<
91 (
92 Url,
93 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
94 SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
95 ),
96 ComInterfaceCreateError,
97 > {
98 let address = parse_url(&self.url).map_err(|_| {
99 ComInterfaceCreateError::InvalidSetupData(
100 "Invalid WebSocket URL".to_string(),
101 )
102 })?;
103 if address.scheme() != "ws" && address.scheme() != "wss" {
104 return Err(ComInterfaceCreateError::InvalidSetupData(
105 "Invalid WebSocket URL scheme".to_string(),
106 ));
107 }
108 info!("Connecting to WebSocket server at {address}");
109 let (stream, _) = tokio_tungstenite::connect_async(address.clone())
110 .await
111 .map_err(|e| {
112 ComInterfaceCreateError::connection_error_with_details(
113 e.to_string(),
114 )
115 })?;
116 let (write, read) = stream.split();
117 Ok((address, write, read))
118 }
119}
120
121impl ComInterfaceAsyncFactory for WebSocketClientInterfaceSetupDataNative {
122 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
123 Box::pin(self.create_interface())
124 }
125
126 fn get_default_properties() -> ComInterfaceProperties {
127 WebSocketClientInterfaceSetupData::get_default_properties()
128 }
129}