Skip to main content

datex_native/com_interfaces/websocket/
websocket_client.rs

1use datex_core::{derive_setup_data};
2use core::{ result::Result};
3use std::sync::Arc;
4use futures_util::{
5    SinkExt, StreamExt,
6    stream::{SplitSink, SplitStream},
7};
8use log::{error, info, warn};
9use tokio::net::TcpStream;
10use tungstenite::Message;
11use url::Url;
12use futures::lock::Mutex;
13
14use datex_core::network::com_interfaces::default_setup_data::websocket::websocket_client::{WebSocketClientInterfaceSetupData};
15use datex_core::{
16    network::{
17        com_hub::errors::ComInterfaceCreateError,
18        com_interfaces::com_interface::{
19            factory::{
20                ComInterfaceAsyncFactory, ComInterfaceAsyncFactoryResult,
21            },
22            properties::{InterfaceDirection, ComInterfaceProperties},
23        },
24    },
25};
26use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
27use datex_core::network::com_interfaces::default_setup_data::http_common::parse_url;
28use datex_core::global::dxb_block::DXBBlock;
29use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceConfiguration, SocketConfiguration, SendCallback, SendFailure, SocketProperties};
30
31derive_setup_data!(WebSocketClientInterfaceSetupDataNative, WebSocketClientInterfaceSetupData);
32
33impl WebSocketClientInterfaceSetupDataNative {
34    async fn create_interface(
35        self,
36    ) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
37        let (_address, write, mut read) =
38            self.create_websocket_client_connection().await?;
39        let write = Arc::new(Mutex::new(write));
40
41        Ok(
42            ComInterfaceConfiguration::new_single_socket(
43                ComInterfaceProperties {
44                    name: Some(self.url.clone()),
45                    ..Self::get_default_properties()
46                },
47                SocketConfiguration::new_in_out(
48                    SocketProperties::new(InterfaceDirection::InOut, 1),
49                    async gen move {
50                        loop {
51                            match read.next().await {
52                                Some(Ok(Message::Binary(data))) => {
53                                    yield Ok(data);
54                                }
55                                Some(Ok(_)) => {
56                                    error!("Invalid message type received");
57                                    return yield Err(());
58                                }
59                                Some(Err(e)) => {
60                                    error!("WebSocket read error: {e}");
61                                    return yield Err(());
62                                }
63                                None => {
64                                    return;
65                                }
66                            }
67                        }
68                    },
69                    SendCallback::new_async(move |block: DXBBlock| {
70                        let write = write.clone();
71                        async move {
72                            write
73                                .lock()
74                                .await
75                                .send(Message::Binary(block.to_bytes())).await
76                                .map_err(|e| {
77                                    error!("WebSocket write error: {e}");
78                                    SendFailure(Box::new(block))
79                                })
80                        }
81                    })
82                )
83            )
84        )
85    }
86
87    /// initialize a new websocket client connection
88    async fn create_websocket_client_connection(
89        &self,
90    ) -> Result<
91        (
92            Url,
93            SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
94            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
95        ),
96        ComInterfaceCreateError,
97    > {
98        let address = parse_url(&self.url).map_err(|_| {
99            ComInterfaceCreateError::InvalidSetupData(
100                "Invalid WebSocket URL".to_string(),
101            )
102        })?;
103        if address.scheme() != "ws" && address.scheme() != "wss" {
104            return Err(ComInterfaceCreateError::InvalidSetupData(
105                "Invalid WebSocket URL scheme".to_string(),
106            ));
107        }
108        info!("Connecting to WebSocket server at {address}");
109        let (stream, _) = tokio_tungstenite::connect_async(address.clone())
110            .await
111            .map_err(|e| {
112                ComInterfaceCreateError::connection_error_with_details(
113                    e.to_string(),
114                )
115            })?;
116        let (write, read) = stream.split();
117        Ok((address, write, read))
118    }
119}
120
121impl ComInterfaceAsyncFactory for WebSocketClientInterfaceSetupDataNative {
122    fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
123        Box::pin(self.create_interface())
124    }
125
126    fn get_default_properties() -> ComInterfaceProperties {
127        WebSocketClientInterfaceSetupData::get_default_properties()
128    }
129}