wasmer_bus_ws/api/
mod.rs

1use std::sync::Arc;
2use wasmer_bus::macros::*;
3
4use crate::model::SendResult;
5use crate::model::SocketState;
6
7#[wasmer_bus(format = "bincode")]
8pub trait SocketBuilder {
9    async fn connect(
10        &self,
11        url: String,
12        state_change: impl Fn(SocketState),
13        receive: impl Fn(Vec<u8>),
14    ) -> Arc<dyn WebSocket>;
15}
16
17#[wasmer_bus(format = "bincode")]
18pub trait WebSocket {
19    async fn send(&self, data: Vec<u8>) -> SendResult;
20}
21
22/*
23#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
24pub struct SocketBuilderConnectStateChangeCallback(pub SocketState);
25#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
26pub struct SocketBuilderConnectReceiveCallback(pub Vec<u8>);
27#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
28pub struct SocketBuilderConnectRequest {
29    pub url: String,
30}
31#[wasmer_bus::async_trait]
32pub trait SocketBuilder
33where
34    Self: std::fmt::Debug + Send + Sync,
35{
36    async fn connect(
37        &self,
38        url: String,
39        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
40        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
41    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
42    fn blocking_connect(
43        &self,
44        url: String,
45        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
46        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
47    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
48    fn as_client(&self) -> Option<SocketBuilderClient>;
49}
50#[wasmer_bus::async_trait]
51pub trait SocketBuilderSimplified
52where
53    Self: std::fmt::Debug + Send + Sync,
54{
55    async fn connect(
56        &self,
57        url: String,
58        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
59        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
60    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
61}
62#[wasmer_bus::async_trait]
63impl<T> SocketBuilder for T
64where
65    T: SocketBuilderSimplified,
66{
67    async fn connect(
68        &self,
69        url: String,
70        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
71        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
72    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
73        SocketBuilderSimplified::connect(self, url, state_change, receive).await
74    }
75    fn blocking_connect(
76        &self,
77        url: String,
78        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
79        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
80    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
81        wasmer_bus::task::block_on(SocketBuilderSimplified::connect(
82            self,
83            url,
84            state_change,
85            receive,
86        ))
87    }
88    fn as_client(&self) -> Option<SocketBuilderClient> {
89        None
90    }
91}
92#[derive(Debug, Clone)]
93pub struct SocketBuilderService {}
94impl SocketBuilderService {
95    #[allow(dead_code)]
96    pub(crate) fn attach(
97        wasm_me: std::sync::Arc<dyn SocketBuilder>,
98        call_handle: wasmer_bus::abi::CallSmartHandle,
99    ) {
100        {
101            let wasm_me = wasm_me.clone();
102            let call_handle = call_handle.clone();
103            wasmer_bus::task::respond_to(
104                call_handle,
105                wasmer_bus::abi::SerializationFormat::Bincode,
106                #[allow(unused_variables)]
107                move |wasm_handle: wasmer_bus::abi::CallHandle,
108                      wasm_req: SocketBuilderConnectRequest| {
109                    let wasm_me = wasm_me.clone();
110                    let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
111                    let url = wasm_req.url;
112                    async move {
113                        let state_change = {
114                            let wasm_handle = wasm_handle.clone();
115                            Box::new(move |response: SocketState| {
116                                let response = SocketBuilderConnectStateChangeCallback(response);
117                                let _ = wasmer_bus::abi::subcall(
118                                    wasm_handle.clone(),
119                                    wasmer_bus::abi::SerializationFormat::Bincode,
120                                    response,
121                                )
122                                .invoke();
123                            })
124                        };
125                        let receive = {
126                            let wasm_handle = wasm_handle.clone();
127                            Box::new(move |response: Vec<u8>| {
128                                let response = SocketBuilderConnectReceiveCallback(response);
129                                let _ = wasmer_bus::abi::subcall(
130                                    wasm_handle.clone(),
131                                    wasmer_bus::abi::SerializationFormat::Bincode,
132                                    response,
133                                )
134                                .invoke();
135                            })
136                        };
137                        let svc = wasm_me.connect(url, state_change, receive).await?;
138                        WebSocketService::attach(svc, wasm_handle);
139                        Ok(())
140                    }
141                },
142                true,
143            );
144        }
145    }
146    pub fn listen(wasm_me: std::sync::Arc<dyn SocketBuilder>) {
147        {
148            let wasm_me = wasm_me.clone();
149            wasmer_bus::task::listen(
150                wasmer_bus::abi::SerializationFormat::Bincode,
151                #[allow(unused_variables)]
152                move |wasm_handle: wasmer_bus::abi::CallHandle,
153                      wasm_req: SocketBuilderConnectRequest| {
154                    let wasm_me = wasm_me.clone();
155                    let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
156                    let url = wasm_req.url;
157                    async move {
158                        let state_change = {
159                            let wasm_handle = wasm_handle.clone();
160                            Box::new(move |response: SocketState| {
161                                let response = SocketBuilderConnectStateChangeCallback(response);
162                                let _ = wasmer_bus::abi::subcall(
163                                    wasm_handle.clone(),
164                                    wasmer_bus::abi::SerializationFormat::Bincode,
165                                    response,
166                                )
167                                .invoke();
168                            })
169                        };
170                        let receive = {
171                            let wasm_handle = wasm_handle.clone();
172                            Box::new(move |response: Vec<u8>| {
173                                let response = SocketBuilderConnectReceiveCallback(response);
174                                let _ = wasmer_bus::abi::subcall(
175                                    wasm_handle.clone(),
176                                    wasmer_bus::abi::SerializationFormat::Bincode,
177                                    response,
178                                )
179                                .invoke();
180                            })
181                        };
182                        let svc = wasm_me.connect(url, state_change, receive).await?;
183                        WebSocketService::attach(svc, wasm_handle);
184                        Ok(())
185                    }
186                },
187            );
188        }
189    }
190    pub fn serve() {
191        wasmer_bus::task::serve();
192    }
193}
194#[derive(Debug, Clone)]
195pub struct SocketBuilderClient {
196    ctx: wasmer_bus::abi::CallContext,
197    task: Option<wasmer_bus::abi::Call>,
198    join: Option<wasmer_bus::abi::CallJoin<()>>,
199}
200impl SocketBuilderClient {
201    pub fn new(wapm: &str) -> Self {
202        Self {
203            ctx: wasmer_bus::abi::CallContext::NewBusCall {
204                wapm: wapm.to_string().into(),
205                instance: None,
206            },
207            task: None,
208            join: None,
209        }
210    }
211    pub fn new_with_instance(wapm: &str, instance: &str, access_token: &str) -> Self {
212        Self {
213            ctx: wasmer_bus::abi::CallContext::NewBusCall {
214                wapm: wapm.to_string().into(),
215                instance: Some(wasmer_bus::abi::CallInstance::new(instance, access_token)),
216            },
217            task: None,
218            join: None,
219        }
220    }
221    pub fn attach(handle: wasmer_bus::abi::CallSmartHandle) -> Self {
222        Self {
223            ctx: wasmer_bus::abi::CallContext::SubCall { parent: handle },
224            task: None,
225            join: None,
226        }
227    }
228    pub fn wait(self) -> Result<(), wasmer_bus::abi::BusError> {
229        if let Some(join) = self.join {
230            join.wait()?;
231        }
232        if let Some(task) = self.task {
233            task.join()?.wait()?;
234        }
235        Ok(())
236    }
237    pub fn try_wait(&mut self) -> Result<Option<()>, wasmer_bus::abi::BusError> {
238        if let Some(task) = self.task.take() {
239            self.join.replace(task.join()?);
240        }
241        if let Some(join) = self.join.as_mut() {
242            join.try_wait()
243        } else {
244            Ok(None)
245        }
246    }
247    pub async fn connect(
248        &self,
249        url: String,
250        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
251        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
252    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
253        let request = SocketBuilderConnectRequest { url };
254        let handle = wasmer_bus::abi::call(
255            self.ctx.clone(),
256            wasmer_bus::abi::SerializationFormat::Bincode,
257            request,
258        )
259        .callback(move |req: SocketBuilderConnectStateChangeCallback| state_change(req.0))
260        .callback(move |req: SocketBuilderConnectReceiveCallback| receive(req.0))
261        .detach()?;
262        Ok(Arc::new(WebSocketClient::attach(handle)))
263    }
264    pub fn blocking_connect(
265        &self,
266        url: String,
267        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
268        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
269    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
270        wasmer_bus::task::block_on(self.connect(url, state_change, receive))
271    }
272}
273impl std::future::Future for SocketBuilderClient {
274    type Output = Result<(), wasmer_bus::abi::BusError>;
275    fn poll(
276        mut self: std::pin::Pin<&mut Self>,
277        cx: &mut std::task::Context<'_>,
278    ) -> std::task::Poll<Self::Output> {
279        if let Some(task) = self.task.take() {
280            self.join.replace(task.join()?);
281        }
282        if let Some(join) = self.join.as_mut() {
283            let join = std::pin::Pin::new(join);
284            return join.poll(cx);
285        } else {
286            std::task::Poll::Ready(Ok(()))
287        }
288    }
289}
290#[wasmer_bus::async_trait]
291impl SocketBuilder for SocketBuilderClient {
292    async fn connect(
293        &self,
294        url: String,
295        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
296        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
297    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
298        SocketBuilderClient::connect(self, url, state_change, receive).await
299    }
300    fn blocking_connect(
301        &self,
302        url: String,
303        state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
304        receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
305    ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
306        SocketBuilderClient::blocking_connect(self, url, state_change, receive)
307    }
308    fn as_client(&self) -> Option<SocketBuilderClient> {
309        Some(self.clone())
310    }
311}
312
313#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
314pub struct WebSocketSendRequest {
315    pub data: Vec<u8>,
316}
317#[wasmer_bus::async_trait]
318pub trait WebSocket
319where
320    Self: std::fmt::Debug + Send + Sync,
321{
322    async fn send(&self, data: Vec<u8>)
323        -> std::result::Result<SendResult, wasmer_bus::abi::BusError>;
324    fn blocking_send(
325        &self,
326        data: Vec<u8>,
327    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError>;
328    fn as_client(&self) -> Option<WebSocketClient>;
329}
330#[wasmer_bus::async_trait]
331pub trait WebSocketSimplified
332where
333    Self: std::fmt::Debug + Send + Sync,
334{
335    async fn send(&self, data: Vec<u8>) -> SendResult;
336}
337#[wasmer_bus::async_trait]
338impl<T> WebSocket for T
339where
340    T: WebSocketSimplified,
341{
342    async fn send(
343        &self,
344        data: Vec<u8>,
345    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
346        Ok(WebSocketSimplified::send(self, data).await)
347    }
348    fn blocking_send(
349        &self,
350        data: Vec<u8>,
351    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
352        Ok(wasmer_bus::task::block_on(WebSocketSimplified::send(
353            self, data,
354        )))
355    }
356    fn as_client(&self) -> Option<WebSocketClient> {
357        None
358    }
359}
360#[derive(Debug, Clone)]
361pub struct WebSocketService {}
362impl WebSocketService {
363    #[allow(dead_code)]
364    pub(crate) fn attach(
365        wasm_me: std::sync::Arc<dyn WebSocket>,
366        call_handle: wasmer_bus::abi::CallSmartHandle,
367    ) {
368        {
369            let wasm_me = wasm_me.clone();
370            let call_handle = call_handle.clone();
371            wasmer_bus::task::respond_to(
372                call_handle,
373                wasmer_bus::abi::SerializationFormat::Bincode,
374                #[allow(unused_variables)]
375                move |wasm_handle: wasmer_bus::abi::CallHandle, wasm_req: WebSocketSendRequest| {
376                    let wasm_me = wasm_me.clone();
377                    let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
378                    let data = wasm_req.data;
379                    async move { wasm_me.send(data).await }
380                },
381            );
382        }
383    }
384    pub fn listen(wasm_me: std::sync::Arc<dyn WebSocket>) {
385        {
386            let wasm_me = wasm_me.clone();
387            wasmer_bus::task::listen(
388                wasmer_bus::abi::SerializationFormat::Bincode,
389                #[allow(unused_variables)]
390                move |_wasm_handle: wasmer_bus::abi::CallHandle, wasm_req: WebSocketSendRequest| {
391                    let wasm_me = wasm_me.clone();
392                    let data = wasm_req.data;
393                    async move { wasm_me.send(data).await }
394                },
395            );
396        }
397    }
398    pub fn serve() {
399        wasmer_bus::task::serve();
400    }
401}
402#[derive(Debug, Clone)]
403pub struct WebSocketClient {
404    ctx: wasmer_bus::abi::CallContext,
405    task: Option<wasmer_bus::abi::Call>,
406    join: Option<wasmer_bus::abi::CallJoin<()>>,
407}
408impl WebSocketClient {
409    pub fn new(wapm: &str) -> Self {
410        Self {
411            ctx: wasmer_bus::abi::CallContext::NewBusCall {
412                wapm: wapm.to_string().into(),
413                instance: None,
414            },
415            task: None,
416            join: None,
417        }
418    }
419    pub fn new_with_instance(wapm: &str, instance: &str, access_token: &str) -> Self {
420        Self {
421            ctx: wasmer_bus::abi::CallContext::NewBusCall {
422                wapm: wapm.to_string().into(),
423                instance: Some(wasmer_bus::abi::CallInstance::new(instance, access_token)),
424            },
425            task: None,
426            join: None,
427        }
428    }
429    pub fn attach(handle: wasmer_bus::abi::CallSmartHandle) -> Self {
430        Self {
431            ctx: wasmer_bus::abi::CallContext::SubCall { parent: handle },
432            task: None,
433            join: None,
434        }
435    }
436    pub fn wait(self) -> Result<(), wasmer_bus::abi::BusError> {
437        if let Some(join) = self.join {
438            join.wait()?;
439        }
440        if let Some(task) = self.task {
441            task.join()?.wait()?;
442        }
443        Ok(())
444    }
445    pub fn try_wait(&mut self) -> Result<Option<()>, wasmer_bus::abi::BusError> {
446        if let Some(task) = self.task.take() {
447            self.join.replace(task.join()?);
448        }
449        if let Some(join) = self.join.as_mut() {
450            join.try_wait()
451        } else {
452            Ok(None)
453        }
454    }
455    pub async fn send(
456        &self,
457        data: Vec<u8>,
458    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
459        let request = WebSocketSendRequest { data };
460        wasmer_bus::abi::call(
461            self.ctx.clone(),
462            wasmer_bus::abi::SerializationFormat::Bincode,
463            request,
464        )
465        .invoke()
466        .join()?
467        .await
468    }
469    pub fn blocking_send(
470        &self,
471        data: Vec<u8>,
472    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
473        wasmer_bus::task::block_on(self.send(data))
474    }
475}
476impl std::future::Future for WebSocketClient {
477    type Output = Result<(), wasmer_bus::abi::BusError>;
478    fn poll(
479        mut self: std::pin::Pin<&mut Self>,
480        cx: &mut std::task::Context<'_>,
481    ) -> std::task::Poll<Self::Output> {
482        if let Some(task) = self.task.take() {
483            self.join.replace(task.join()?);
484        }
485        if let Some(join) = self.join.as_mut() {
486            let join = std::pin::Pin::new(join);
487            return join.poll(cx);
488        } else {
489            std::task::Poll::Ready(Ok(()))
490        }
491    }
492}
493#[wasmer_bus::async_trait]
494impl WebSocket for WebSocketClient {
495    async fn send(
496        &self,
497        data: Vec<u8>,
498    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
499        WebSocketClient::send(self, data).await
500    }
501    fn blocking_send(
502        &self,
503        data: Vec<u8>,
504    ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
505        WebSocketClient::blocking_send(self, data)
506    }
507    fn as_client(&self) -> Option<WebSocketClient> {
508        Some(self.clone())
509    }
510}
511*/