workflow_websocket/client/
mod.rs1use cfg_if::cfg_if;
6
7mod wasm;
8pub use wasm::WebSocketInterface as _WSI;
9
10cfg_if! {
11 if #[cfg(target_arch = "wasm32")] {
12 use wasm::WebSocketInterface;
13 } else {
14 mod native;
15 use native::WebSocketInterface;
16 }
17}
18
19pub mod bindings;
20pub mod config;
21pub mod error;
22pub mod message;
23pub mod options;
24pub mod result;
25
26pub use config::WebSocketConfig;
27pub use error::Error;
28use futures::Future;
29pub use message::*;
30pub use options::{ConnectOptions, ConnectStrategy};
31pub use result::Result;
32
33use async_trait::async_trait;
34use std::pin::Pin;
35use std::sync::Arc;
36use workflow_core::channel::{oneshot, Channel, Receiver, Sender};
37pub type ConnectResult<E> = std::result::Result<Option<Receiver<Result<()>>>, E>;
38
39pub type HandshakeFn = Arc<
40 Box<dyn Send + Sync + Fn(&Sender<Message>, &Receiver<Message>) -> HandshakeFnReturn + 'static>,
41>;
42pub type HandshakeFnReturn = Pin<Box<(dyn Send + Sync + 'static + Future<Output = Result<()>>)>>;
43
44#[async_trait]
45pub trait Handshake: Send + Sync + 'static {
46 async fn handshake(&self, sender: &Sender<Message>, receiver: &Receiver<Message>)
47 -> Result<()>;
48}
49
50#[async_trait]
51pub trait Resolver: Send + Sync + 'static {
52 async fn resolve_url(&self) -> ResolverResult;
53}
54pub type ResolverResult = Result<String>;
55pub type WebSocketError = Error;
56
57struct Inner {
58 client: Arc<WebSocketInterface>,
59 sender_channel: Channel<(Message, Ack)>,
60 receiver_channel: Channel<Message>,
61}
62
63impl Inner {
64 pub fn new(
65 client: Arc<WebSocketInterface>,
66 sender_channel: Channel<(Message, Ack)>,
67 receiver_channel: Channel<Message>,
68 ) -> Self {
69 Self {
70 client,
71 sender_channel,
72 receiver_channel,
73 }
74 }
75}
76
77#[derive(Clone)]
81pub struct WebSocket {
82 inner: Arc<Inner>,
83}
84
85impl WebSocket {
86 pub fn new(url: Option<&str>, config: Option<WebSocketConfig>) -> Result<WebSocket> {
88 if let Some(url) = url {
89 if !url.starts_with("ws://") && !url.starts_with("wss://") {
90 return Err(Error::AddressSchema(url.to_string()));
91 }
92 }
93
94 let config = config.unwrap_or_default();
95
96 let receiver_channel = if let Some(cap) = config.receiver_channel_cap {
97 Channel::bounded(cap)
98 } else {
99 Channel::<Message>::unbounded()
100 };
101
102 let sender_channel = if let Some(cap) = config.sender_channel_cap {
103 Channel::bounded(cap)
104 } else {
105 Channel::<(Message, Ack)>::unbounded()
106 };
107
108 let client = Arc::new(WebSocketInterface::new(
109 url,
110 Some(config),
111 sender_channel.clone(),
112 receiver_channel.clone(),
113 )?);
114
115 let websocket = WebSocket {
116 inner: Arc::new(Inner::new(client, sender_channel, receiver_channel)),
117 };
118
119 Ok(websocket)
120 }
121
122 pub fn url(&self) -> Option<String> {
124 self.inner.client.current_url()
125 }
126
127 pub fn set_url(&self, url: &str) {
131 self.inner.client.set_default_url(url);
132 }
133
134 pub fn configure(&self, config: WebSocketConfig) {
139 self.inner.client.configure(config);
140 }
141
142 pub fn sender_tx(&self) -> &Sender<(Message, Ack)> {
144 &self.inner.sender_channel.sender
145 }
146
147 pub fn receiver_rx(&self) -> &Receiver<Message> {
149 &self.inner.receiver_channel.receiver
150 }
151
152 pub fn is_connected(&self) -> bool {
154 self.inner.client.is_connected()
155 }
156
157 pub async fn connect(&self, options: ConnectOptions) -> ConnectResult<Error> {
170 self.inner.client.connect(options).await
171 }
172
173 pub async fn disconnect(&self) -> Result<()> {
175 self.inner.client.disconnect().await
176 }
177
178 pub async fn reconnect(&self) -> Result<()> {
183 self.inner.client.close().await
184 }
185
186 pub async fn post(&self, message: Message) -> Result<&Self> {
193 if !self.inner.client.is_connected() {
194 return Err(Error::NotConnected);
195 }
196
197 let result = Ok(self
198 .inner
199 .sender_channel
200 .sender
201 .send((message, None))
202 .await?);
203 workflow_core::task::yield_now().await;
204 result.map(|_| self)
205 }
206
207 pub async fn send(&self, message: Message) -> std::result::Result<&Self, Arc<Error>> {
211 if !self.inner.client.is_connected() {
212 return Err(Arc::new(Error::NotConnected));
213 }
214
215 let (ack_sender, ack_receiver) = oneshot();
216 self.inner
217 .sender_channel
218 .send((message, Some(ack_sender)))
219 .await
220 .map_err(|err| Arc::new(err.into()))?;
221
222 ack_receiver
223 .recv()
224 .await
225 .map_err(|_| Arc::new(Error::DispatchChannelAck))?
226 .map(|_| self)
227 }
228
229 pub async fn recv(&self) -> Result<Message> {
232 Ok(self.inner.receiver_channel.receiver.recv().await?)
233 }
234
235 pub fn trigger_abort(&self) -> Result<()> {
239 self.inner.client.trigger_abort()
240 }
241}