workflow_websocket/client/
mod.rs

1//!
2//! async WebSocket client functionality (requires a browser (WASM) or tokio (native) executors)
3//!
4
5use cfg_if::cfg_if;
6
7mod wasm;
8pub use wasm::WebSocketInterface as _WSI;
9
10cfg_if! {
11    if #[cfg(target_arch = "wasm32")] {
12        use wasm::WebSocketInterface;
13    } else {
14        mod native;
15        use native::WebSocketInterface;
16    }
17}
18
19pub mod bindings;
20pub mod config;
21pub mod error;
22pub mod message;
23pub mod options;
24pub mod result;
25
26pub use config::WebSocketConfig;
27pub use error::Error;
28use futures::Future;
29pub use message::*;
30pub use options::{ConnectOptions, ConnectStrategy};
31pub use result::Result;
32
33use async_trait::async_trait;
34use std::pin::Pin;
35use std::sync::Arc;
36use workflow_core::channel::{oneshot, Channel, Receiver, Sender};
37pub type ConnectResult<E> = std::result::Result<Option<Receiver<Result<()>>>, E>;
38
39pub type HandshakeFn = Arc<
40    Box<dyn Send + Sync + Fn(&Sender<Message>, &Receiver<Message>) -> HandshakeFnReturn + 'static>,
41>;
42pub type HandshakeFnReturn = Pin<Box<(dyn Send + Sync + 'static + Future<Output = Result<()>>)>>;
43
44#[async_trait]
45pub trait Handshake: Send + Sync + 'static {
46    async fn handshake(&self, sender: &Sender<Message>, receiver: &Receiver<Message>)
47        -> Result<()>;
48}
49
50#[async_trait]
51pub trait Resolver: Send + Sync + 'static {
52    async fn resolve_url(&self) -> ResolverResult;
53}
54pub type ResolverResult = Result<String>;
55pub type WebSocketError = Error;
56
57struct Inner {
58    client: Arc<WebSocketInterface>,
59    sender_channel: Channel<(Message, Ack)>,
60    receiver_channel: Channel<Message>,
61}
62
63impl Inner {
64    pub fn new(
65        client: Arc<WebSocketInterface>,
66        sender_channel: Channel<(Message, Ack)>,
67        receiver_channel: Channel<Message>,
68    ) -> Self {
69        Self {
70            client,
71            sender_channel,
72            receiver_channel,
73        }
74    }
75}
76
77/// An async WebSocket implementation capable of operating
78/// uniformly under a browser-backed executor in WASM and under
79/// native tokio-runtime.
80#[derive(Clone)]
81pub struct WebSocket {
82    inner: Arc<Inner>,
83}
84
85impl WebSocket {
86    /// Create a new WebSocket instance connecting to the given URL.
87    pub fn new(url: Option<&str>, config: Option<WebSocketConfig>) -> Result<WebSocket> {
88        if let Some(url) = url {
89            if !url.starts_with("ws://") && !url.starts_with("wss://") {
90                return Err(Error::AddressSchema(url.to_string()));
91            }
92        }
93
94        let config = config.unwrap_or_default();
95
96        let receiver_channel = if let Some(cap) = config.receiver_channel_cap {
97            Channel::bounded(cap)
98        } else {
99            Channel::<Message>::unbounded()
100        };
101
102        let sender_channel = if let Some(cap) = config.sender_channel_cap {
103            Channel::bounded(cap)
104        } else {
105            Channel::<(Message, Ack)>::unbounded()
106        };
107
108        let client = Arc::new(WebSocketInterface::new(
109            url,
110            Some(config),
111            sender_channel.clone(),
112            receiver_channel.clone(),
113        )?);
114
115        let websocket = WebSocket {
116            inner: Arc::new(Inner::new(client, sender_channel, receiver_channel)),
117        };
118
119        Ok(websocket)
120    }
121
122    /// Get current websocket connection URL
123    pub fn url(&self) -> Option<String> {
124        self.inner.client.current_url()
125    }
126
127    /// Changes WebSocket connection URL.
128    /// Following this call, you must invoke
129    /// `WebSocket::reconnect().await` manually
130    pub fn set_url(&self, url: &str) {
131        self.inner.client.set_default_url(url);
132    }
133
134    /// Configure WebSocket connection settings
135    /// Can be supplied after the WebSocket has been
136    /// has been created to alter the configuration
137    /// for the next connection.
138    pub fn configure(&self, config: WebSocketConfig) {
139        self.inner.client.configure(config);
140    }
141
142    /// Returns the reference to the Sender channel
143    pub fn sender_tx(&self) -> &Sender<(Message, Ack)> {
144        &self.inner.sender_channel.sender
145    }
146
147    /// Returns the reference to the Receiver channel
148    pub fn receiver_rx(&self) -> &Receiver<Message> {
149        &self.inner.receiver_channel.receiver
150    }
151
152    /// Returns true if websocket is connected, false otherwise
153    pub fn is_connected(&self) -> bool {
154        self.inner.client.is_connected()
155    }
156
157    /// Connects the websocket to the destination URL.
158    /// Optionally accepts `block_until_connected` argument
159    /// that will block the async execution until the websocket
160    /// is connected.
161    ///
162    /// Once invoked, connection task will run in the background
163    /// and will attempt to repeatedly reconnect if the websocket
164    /// connection is closed.
165    ///
166    /// To suspend reconnection, you have to call `disconnect()`
167    /// method explicitly.
168    ///
169    pub async fn connect(&self, options: ConnectOptions) -> ConnectResult<Error> {
170        self.inner.client.connect(options).await
171    }
172
173    /// Disconnects the websocket from the destination server.
174    pub async fn disconnect(&self) -> Result<()> {
175        self.inner.client.disconnect().await
176    }
177
178    /// Trigger WebSocket to reconnect.  This method
179    /// closes the underlying WebSocket connection
180    /// causing the WebSocket implementation to
181    /// re-initiate connection.
182    pub async fn reconnect(&self) -> Result<()> {
183        self.inner.client.close().await
184    }
185
186    /// Sends a message to the destination server. This function
187    /// will queue the message on the relay channel and return
188    /// successfully if the message has been queued.
189    /// This function enforces async yield in order to prevent
190    /// potential blockage of the executor if it is being executed
191    /// in tight loops.
192    pub async fn post(&self, message: Message) -> Result<&Self> {
193        if !self.inner.client.is_connected() {
194            return Err(Error::NotConnected);
195        }
196
197        let result = Ok(self
198            .inner
199            .sender_channel
200            .sender
201            .send((message, None))
202            .await?);
203        workflow_core::task::yield_now().await;
204        result.map(|_| self)
205    }
206
207    /// Sends a message to the destination server. This function
208    /// will block until until the message was relayed to the
209    /// underlying websocket implementation.
210    pub async fn send(&self, message: Message) -> std::result::Result<&Self, Arc<Error>> {
211        if !self.inner.client.is_connected() {
212            return Err(Arc::new(Error::NotConnected));
213        }
214
215        let (ack_sender, ack_receiver) = oneshot();
216        self.inner
217            .sender_channel
218            .send((message, Some(ack_sender)))
219            .await
220            .map_err(|err| Arc::new(err.into()))?;
221
222        ack_receiver
223            .recv()
224            .await
225            .map_err(|_| Arc::new(Error::DispatchChannelAck))?
226            .map(|_| self)
227    }
228
229    /// Receives message from the websocket. Blocks until a message is
230    /// received from the underlying websocket connection.
231    pub async fn recv(&self) -> Result<Message> {
232        Ok(self.inner.receiver_channel.receiver.recv().await?)
233    }
234
235    /// Triggers a disconnection on the underlying WebSocket.
236    /// This is intended for debug purposes only.
237    /// Can be used to test application reconnection logic.
238    pub fn trigger_abort(&self) -> Result<()> {
239        self.inner.client.trigger_abort()
240    }
241}