1use async_channel::{unbounded, Receiver, RecvError as ChannelRecvError, Sender, TryRecvError};
2use async_tungstenite::async_std::connect_async;
3use bevy::{
4 prelude::*,
5 tasks::{IoTaskPool, Task, TaskPool},
6};
7use futures_lite::future::race;
8use futures_util::{SinkExt, StreamExt};
9use tungstenite::Message;
10
11use crate::RecvError;
12
13pub struct WsClientPlugin;
14
15impl Plugin for WsClientPlugin {
16 fn build(&self, app: &mut App) {
17 app.insert_resource(WsClient);
18 }
19}
20
21#[derive(Resource)]
22pub struct WsClient;
23
24enum FutureRes {
25 Stream(Option<Message>),
26 Out(Message),
27}
28
29impl WsClient {
30 pub fn connect(
31 &self,
32 commands: &mut Commands,
33 init_url: impl ToString,
34 init_message: Option<Message>,
35 ) {
36 let url = init_url.to_string();
37 let (tx, out_rx) = unbounded::<Message>();
38 let (out_tx, rx) = unbounded::<Message>();
39
40 let task = IoTaskPool::get_or_init(TaskPool::new).spawn(async move {
41 match connect_async(url.to_string()).await {
42 Ok((mut stream, _)) => {
43 if let Some(msg) = init_message {
44 let _ = stream.send(msg).await;
45 }
46
47 loop {
48 match race(
49 async { out_rx.recv().await.map(|v| FutureRes::Out(v)) },
50 async {
51 stream
52 .next()
53 .await
54 .map(|v| FutureRes::Stream(v.ok()))
55 .ok_or(ChannelRecvError)
56 },
57 )
58 .await
59 {
60 Ok(FutureRes::Stream(Some(v))) => {
61 let _ = out_tx.send(v).await;
62 }
63 Ok(FutureRes::Out(v)) => {
64 let _ = stream.send(v).await;
65 }
66 _ => break,
67 }
68 }
69 }
70 Err(e) => error!("Websocket:{}", e),
71 }
72 });
73
74 commands.spawn(WsConnection { _io: task, tx, rx });
75 }
76}
77
78#[derive(Component)]
79pub struct WsConnection {
80 _io: Task<()>,
81 tx: Sender<Message>,
82 rx: Receiver<Message>,
83}
84
85impl From<TryRecvError> for RecvError {
86 fn from(e: TryRecvError) -> RecvError {
87 match e {
88 TryRecvError::Empty => RecvError::Empty,
89 TryRecvError::Closed => RecvError::Closed,
90 }
91 }
92}
93
94impl WsConnection {
95 pub fn recv(&self) -> Result<Message, RecvError> {
96 Ok(self.rx.try_recv()?)
97 }
98
99 pub fn send(&self, message: Message) -> bool {
100 self.tx.try_send(message).is_ok()
101 }
102
103 }