tdn_bevy/
ws.rs

1use async_channel::{unbounded, Receiver, RecvError as ChannelRecvError, Sender, TryRecvError};
2use async_tungstenite::async_std::connect_async;
3use bevy::{
4    prelude::*,
5    tasks::{IoTaskPool, Task, TaskPool},
6};
7use futures_lite::future::race;
8use futures_util::{SinkExt, StreamExt};
9use tungstenite::Message;
10
11use crate::RecvError;
12
13pub struct WsClientPlugin;
14
15impl Plugin for WsClientPlugin {
16    fn build(&self, app: &mut App) {
17        app.insert_resource(WsClient);
18    }
19}
20
21#[derive(Resource)]
22pub struct WsClient;
23
24enum FutureRes {
25    Stream(Option<Message>),
26    Out(Message),
27}
28
29impl WsClient {
30    pub fn connect(
31        &self,
32        commands: &mut Commands,
33        init_url: impl ToString,
34        init_message: Option<Message>,
35    ) {
36        let url = init_url.to_string();
37        let (tx, out_rx) = unbounded::<Message>();
38        let (out_tx, rx) = unbounded::<Message>();
39
40        let task = IoTaskPool::get_or_init(TaskPool::new).spawn(async move {
41            match connect_async(url.to_string()).await {
42                Ok((mut stream, _)) => {
43                    if let Some(msg) = init_message {
44                        let _ = stream.send(msg).await;
45                    }
46
47                    loop {
48                        match race(
49                            async { out_rx.recv().await.map(|v| FutureRes::Out(v)) },
50                            async {
51                                stream
52                                    .next()
53                                    .await
54                                    .map(|v| FutureRes::Stream(v.ok()))
55                                    .ok_or(ChannelRecvError)
56                            },
57                        )
58                        .await
59                        {
60                            Ok(FutureRes::Stream(Some(v))) => {
61                                let _ = out_tx.send(v).await;
62                            }
63                            Ok(FutureRes::Out(v)) => {
64                                let _ = stream.send(v).await;
65                            }
66                            _ => break,
67                        }
68                    }
69                }
70                Err(e) => error!("Websocket:{}", e),
71            }
72        });
73
74        commands.spawn(WsConnection { _io: task, tx, rx });
75    }
76}
77
78#[derive(Component)]
79pub struct WsConnection {
80    _io: Task<()>,
81    tx: Sender<Message>,
82    rx: Receiver<Message>,
83}
84
85impl From<TryRecvError> for RecvError {
86    fn from(e: TryRecvError) -> RecvError {
87        match e {
88            TryRecvError::Empty => RecvError::Empty,
89            TryRecvError::Closed => RecvError::Closed,
90        }
91    }
92}
93
94impl WsConnection {
95    pub fn recv(&self) -> Result<Message, RecvError> {
96        Ok(self.rx.try_recv()?)
97    }
98
99    pub fn send(&self, message: Message) -> bool {
100        self.tx.try_send(message).is_ok()
101    }
102
103    // pub fn jsonrpc_recv(&self) -> Result<(String, Value), TryRecvError> {
104    //     let res = self.recv()?;
105    //     // change res to jsonrpc
106    // }
107
108    // pub fn jsonrpc_send(&self, method: &str, params: Value, is_req: bool) -> bool {
109    //     // TODO change to jsonrpc
110    //     let msg = Message::text();
111    //     self.send(msg)
112    // }
113}