wasmer_bus_ws/api/mod.rs
1use std::sync::Arc;
2use wasmer_bus::macros::*;
3
4use crate::model::SendResult;
5use crate::model::SocketState;
6
7#[wasmer_bus(format = "bincode")]
8pub trait SocketBuilder {
9 async fn connect(
10 &self,
11 url: String,
12 state_change: impl Fn(SocketState),
13 receive: impl Fn(Vec<u8>),
14 ) -> Arc<dyn WebSocket>;
15}
16
17#[wasmer_bus(format = "bincode")]
18pub trait WebSocket {
19 async fn send(&self, data: Vec<u8>) -> SendResult;
20}
21
22/*
23#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
24pub struct SocketBuilderConnectStateChangeCallback(pub SocketState);
25#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
26pub struct SocketBuilderConnectReceiveCallback(pub Vec<u8>);
27#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
28pub struct SocketBuilderConnectRequest {
29 pub url: String,
30}
31#[wasmer_bus::async_trait]
32pub trait SocketBuilder
33where
34 Self: std::fmt::Debug + Send + Sync,
35{
36 async fn connect(
37 &self,
38 url: String,
39 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
40 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
41 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
42 fn blocking_connect(
43 &self,
44 url: String,
45 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
46 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
47 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
48 fn as_client(&self) -> Option<SocketBuilderClient>;
49}
50#[wasmer_bus::async_trait]
51pub trait SocketBuilderSimplified
52where
53 Self: std::fmt::Debug + Send + Sync,
54{
55 async fn connect(
56 &self,
57 url: String,
58 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
59 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
60 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError>;
61}
62#[wasmer_bus::async_trait]
63impl<T> SocketBuilder for T
64where
65 T: SocketBuilderSimplified,
66{
67 async fn connect(
68 &self,
69 url: String,
70 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
71 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
72 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
73 SocketBuilderSimplified::connect(self, url, state_change, receive).await
74 }
75 fn blocking_connect(
76 &self,
77 url: String,
78 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
79 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
80 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
81 wasmer_bus::task::block_on(SocketBuilderSimplified::connect(
82 self,
83 url,
84 state_change,
85 receive,
86 ))
87 }
88 fn as_client(&self) -> Option<SocketBuilderClient> {
89 None
90 }
91}
92#[derive(Debug, Clone)]
93pub struct SocketBuilderService {}
94impl SocketBuilderService {
95 #[allow(dead_code)]
96 pub(crate) fn attach(
97 wasm_me: std::sync::Arc<dyn SocketBuilder>,
98 call_handle: wasmer_bus::abi::CallSmartHandle,
99 ) {
100 {
101 let wasm_me = wasm_me.clone();
102 let call_handle = call_handle.clone();
103 wasmer_bus::task::respond_to(
104 call_handle,
105 wasmer_bus::abi::SerializationFormat::Bincode,
106 #[allow(unused_variables)]
107 move |wasm_handle: wasmer_bus::abi::CallHandle,
108 wasm_req: SocketBuilderConnectRequest| {
109 let wasm_me = wasm_me.clone();
110 let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
111 let url = wasm_req.url;
112 async move {
113 let state_change = {
114 let wasm_handle = wasm_handle.clone();
115 Box::new(move |response: SocketState| {
116 let response = SocketBuilderConnectStateChangeCallback(response);
117 let _ = wasmer_bus::abi::subcall(
118 wasm_handle.clone(),
119 wasmer_bus::abi::SerializationFormat::Bincode,
120 response,
121 )
122 .invoke();
123 })
124 };
125 let receive = {
126 let wasm_handle = wasm_handle.clone();
127 Box::new(move |response: Vec<u8>| {
128 let response = SocketBuilderConnectReceiveCallback(response);
129 let _ = wasmer_bus::abi::subcall(
130 wasm_handle.clone(),
131 wasmer_bus::abi::SerializationFormat::Bincode,
132 response,
133 )
134 .invoke();
135 })
136 };
137 let svc = wasm_me.connect(url, state_change, receive).await?;
138 WebSocketService::attach(svc, wasm_handle);
139 Ok(())
140 }
141 },
142 true,
143 );
144 }
145 }
146 pub fn listen(wasm_me: std::sync::Arc<dyn SocketBuilder>) {
147 {
148 let wasm_me = wasm_me.clone();
149 wasmer_bus::task::listen(
150 wasmer_bus::abi::SerializationFormat::Bincode,
151 #[allow(unused_variables)]
152 move |wasm_handle: wasmer_bus::abi::CallHandle,
153 wasm_req: SocketBuilderConnectRequest| {
154 let wasm_me = wasm_me.clone();
155 let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
156 let url = wasm_req.url;
157 async move {
158 let state_change = {
159 let wasm_handle = wasm_handle.clone();
160 Box::new(move |response: SocketState| {
161 let response = SocketBuilderConnectStateChangeCallback(response);
162 let _ = wasmer_bus::abi::subcall(
163 wasm_handle.clone(),
164 wasmer_bus::abi::SerializationFormat::Bincode,
165 response,
166 )
167 .invoke();
168 })
169 };
170 let receive = {
171 let wasm_handle = wasm_handle.clone();
172 Box::new(move |response: Vec<u8>| {
173 let response = SocketBuilderConnectReceiveCallback(response);
174 let _ = wasmer_bus::abi::subcall(
175 wasm_handle.clone(),
176 wasmer_bus::abi::SerializationFormat::Bincode,
177 response,
178 )
179 .invoke();
180 })
181 };
182 let svc = wasm_me.connect(url, state_change, receive).await?;
183 WebSocketService::attach(svc, wasm_handle);
184 Ok(())
185 }
186 },
187 );
188 }
189 }
190 pub fn serve() {
191 wasmer_bus::task::serve();
192 }
193}
194#[derive(Debug, Clone)]
195pub struct SocketBuilderClient {
196 ctx: wasmer_bus::abi::CallContext,
197 task: Option<wasmer_bus::abi::Call>,
198 join: Option<wasmer_bus::abi::CallJoin<()>>,
199}
200impl SocketBuilderClient {
201 pub fn new(wapm: &str) -> Self {
202 Self {
203 ctx: wasmer_bus::abi::CallContext::NewBusCall {
204 wapm: wapm.to_string().into(),
205 instance: None,
206 },
207 task: None,
208 join: None,
209 }
210 }
211 pub fn new_with_instance(wapm: &str, instance: &str, access_token: &str) -> Self {
212 Self {
213 ctx: wasmer_bus::abi::CallContext::NewBusCall {
214 wapm: wapm.to_string().into(),
215 instance: Some(wasmer_bus::abi::CallInstance::new(instance, access_token)),
216 },
217 task: None,
218 join: None,
219 }
220 }
221 pub fn attach(handle: wasmer_bus::abi::CallSmartHandle) -> Self {
222 Self {
223 ctx: wasmer_bus::abi::CallContext::SubCall { parent: handle },
224 task: None,
225 join: None,
226 }
227 }
228 pub fn wait(self) -> Result<(), wasmer_bus::abi::BusError> {
229 if let Some(join) = self.join {
230 join.wait()?;
231 }
232 if let Some(task) = self.task {
233 task.join()?.wait()?;
234 }
235 Ok(())
236 }
237 pub fn try_wait(&mut self) -> Result<Option<()>, wasmer_bus::abi::BusError> {
238 if let Some(task) = self.task.take() {
239 self.join.replace(task.join()?);
240 }
241 if let Some(join) = self.join.as_mut() {
242 join.try_wait()
243 } else {
244 Ok(None)
245 }
246 }
247 pub async fn connect(
248 &self,
249 url: String,
250 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
251 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
252 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
253 let request = SocketBuilderConnectRequest { url };
254 let handle = wasmer_bus::abi::call(
255 self.ctx.clone(),
256 wasmer_bus::abi::SerializationFormat::Bincode,
257 request,
258 )
259 .callback(move |req: SocketBuilderConnectStateChangeCallback| state_change(req.0))
260 .callback(move |req: SocketBuilderConnectReceiveCallback| receive(req.0))
261 .detach()?;
262 Ok(Arc::new(WebSocketClient::attach(handle)))
263 }
264 pub fn blocking_connect(
265 &self,
266 url: String,
267 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
268 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
269 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
270 wasmer_bus::task::block_on(self.connect(url, state_change, receive))
271 }
272}
273impl std::future::Future for SocketBuilderClient {
274 type Output = Result<(), wasmer_bus::abi::BusError>;
275 fn poll(
276 mut self: std::pin::Pin<&mut Self>,
277 cx: &mut std::task::Context<'_>,
278 ) -> std::task::Poll<Self::Output> {
279 if let Some(task) = self.task.take() {
280 self.join.replace(task.join()?);
281 }
282 if let Some(join) = self.join.as_mut() {
283 let join = std::pin::Pin::new(join);
284 return join.poll(cx);
285 } else {
286 std::task::Poll::Ready(Ok(()))
287 }
288 }
289}
290#[wasmer_bus::async_trait]
291impl SocketBuilder for SocketBuilderClient {
292 async fn connect(
293 &self,
294 url: String,
295 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
296 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
297 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
298 SocketBuilderClient::connect(self, url, state_change, receive).await
299 }
300 fn blocking_connect(
301 &self,
302 url: String,
303 state_change: Box<dyn Fn(SocketState) + Send + Sync + 'static>,
304 receive: Box<dyn Fn(Vec<u8>) + Send + Sync + 'static>,
305 ) -> std::result::Result<std::sync::Arc<dyn WebSocket>, wasmer_bus::abi::BusError> {
306 SocketBuilderClient::blocking_connect(self, url, state_change, receive)
307 }
308 fn as_client(&self) -> Option<SocketBuilderClient> {
309 Some(self.clone())
310 }
311}
312
313#[derive(Debug, Clone, serde :: Serialize, serde :: Deserialize)]
314pub struct WebSocketSendRequest {
315 pub data: Vec<u8>,
316}
317#[wasmer_bus::async_trait]
318pub trait WebSocket
319where
320 Self: std::fmt::Debug + Send + Sync,
321{
322 async fn send(&self, data: Vec<u8>)
323 -> std::result::Result<SendResult, wasmer_bus::abi::BusError>;
324 fn blocking_send(
325 &self,
326 data: Vec<u8>,
327 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError>;
328 fn as_client(&self) -> Option<WebSocketClient>;
329}
330#[wasmer_bus::async_trait]
331pub trait WebSocketSimplified
332where
333 Self: std::fmt::Debug + Send + Sync,
334{
335 async fn send(&self, data: Vec<u8>) -> SendResult;
336}
337#[wasmer_bus::async_trait]
338impl<T> WebSocket for T
339where
340 T: WebSocketSimplified,
341{
342 async fn send(
343 &self,
344 data: Vec<u8>,
345 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
346 Ok(WebSocketSimplified::send(self, data).await)
347 }
348 fn blocking_send(
349 &self,
350 data: Vec<u8>,
351 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
352 Ok(wasmer_bus::task::block_on(WebSocketSimplified::send(
353 self, data,
354 )))
355 }
356 fn as_client(&self) -> Option<WebSocketClient> {
357 None
358 }
359}
360#[derive(Debug, Clone)]
361pub struct WebSocketService {}
362impl WebSocketService {
363 #[allow(dead_code)]
364 pub(crate) fn attach(
365 wasm_me: std::sync::Arc<dyn WebSocket>,
366 call_handle: wasmer_bus::abi::CallSmartHandle,
367 ) {
368 {
369 let wasm_me = wasm_me.clone();
370 let call_handle = call_handle.clone();
371 wasmer_bus::task::respond_to(
372 call_handle,
373 wasmer_bus::abi::SerializationFormat::Bincode,
374 #[allow(unused_variables)]
375 move |wasm_handle: wasmer_bus::abi::CallHandle, wasm_req: WebSocketSendRequest| {
376 let wasm_me = wasm_me.clone();
377 let wasm_handle = wasmer_bus::abi::CallSmartHandle::new(wasm_handle);
378 let data = wasm_req.data;
379 async move { wasm_me.send(data).await }
380 },
381 );
382 }
383 }
384 pub fn listen(wasm_me: std::sync::Arc<dyn WebSocket>) {
385 {
386 let wasm_me = wasm_me.clone();
387 wasmer_bus::task::listen(
388 wasmer_bus::abi::SerializationFormat::Bincode,
389 #[allow(unused_variables)]
390 move |_wasm_handle: wasmer_bus::abi::CallHandle, wasm_req: WebSocketSendRequest| {
391 let wasm_me = wasm_me.clone();
392 let data = wasm_req.data;
393 async move { wasm_me.send(data).await }
394 },
395 );
396 }
397 }
398 pub fn serve() {
399 wasmer_bus::task::serve();
400 }
401}
402#[derive(Debug, Clone)]
403pub struct WebSocketClient {
404 ctx: wasmer_bus::abi::CallContext,
405 task: Option<wasmer_bus::abi::Call>,
406 join: Option<wasmer_bus::abi::CallJoin<()>>,
407}
408impl WebSocketClient {
409 pub fn new(wapm: &str) -> Self {
410 Self {
411 ctx: wasmer_bus::abi::CallContext::NewBusCall {
412 wapm: wapm.to_string().into(),
413 instance: None,
414 },
415 task: None,
416 join: None,
417 }
418 }
419 pub fn new_with_instance(wapm: &str, instance: &str, access_token: &str) -> Self {
420 Self {
421 ctx: wasmer_bus::abi::CallContext::NewBusCall {
422 wapm: wapm.to_string().into(),
423 instance: Some(wasmer_bus::abi::CallInstance::new(instance, access_token)),
424 },
425 task: None,
426 join: None,
427 }
428 }
429 pub fn attach(handle: wasmer_bus::abi::CallSmartHandle) -> Self {
430 Self {
431 ctx: wasmer_bus::abi::CallContext::SubCall { parent: handle },
432 task: None,
433 join: None,
434 }
435 }
436 pub fn wait(self) -> Result<(), wasmer_bus::abi::BusError> {
437 if let Some(join) = self.join {
438 join.wait()?;
439 }
440 if let Some(task) = self.task {
441 task.join()?.wait()?;
442 }
443 Ok(())
444 }
445 pub fn try_wait(&mut self) -> Result<Option<()>, wasmer_bus::abi::BusError> {
446 if let Some(task) = self.task.take() {
447 self.join.replace(task.join()?);
448 }
449 if let Some(join) = self.join.as_mut() {
450 join.try_wait()
451 } else {
452 Ok(None)
453 }
454 }
455 pub async fn send(
456 &self,
457 data: Vec<u8>,
458 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
459 let request = WebSocketSendRequest { data };
460 wasmer_bus::abi::call(
461 self.ctx.clone(),
462 wasmer_bus::abi::SerializationFormat::Bincode,
463 request,
464 )
465 .invoke()
466 .join()?
467 .await
468 }
469 pub fn blocking_send(
470 &self,
471 data: Vec<u8>,
472 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
473 wasmer_bus::task::block_on(self.send(data))
474 }
475}
476impl std::future::Future for WebSocketClient {
477 type Output = Result<(), wasmer_bus::abi::BusError>;
478 fn poll(
479 mut self: std::pin::Pin<&mut Self>,
480 cx: &mut std::task::Context<'_>,
481 ) -> std::task::Poll<Self::Output> {
482 if let Some(task) = self.task.take() {
483 self.join.replace(task.join()?);
484 }
485 if let Some(join) = self.join.as_mut() {
486 let join = std::pin::Pin::new(join);
487 return join.poll(cx);
488 } else {
489 std::task::Poll::Ready(Ok(()))
490 }
491 }
492}
493#[wasmer_bus::async_trait]
494impl WebSocket for WebSocketClient {
495 async fn send(
496 &self,
497 data: Vec<u8>,
498 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
499 WebSocketClient::send(self, data).await
500 }
501 fn blocking_send(
502 &self,
503 data: Vec<u8>,
504 ) -> std::result::Result<SendResult, wasmer_bus::abi::BusError> {
505 WebSocketClient::blocking_send(self, data)
506 }
507 fn as_client(&self) -> Option<WebSocketClient> {
508 Some(self.clone())
509 }
510}
511*/