dlist_top/
client.rs

1use std::sync::{Arc, RwLock};
2
3use async_trait::async_trait;
4use futures_channel::mpsc::{UnboundedSender};
5use futures_util::{future, pin_mut, StreamExt};
6use serde_json::json;
7use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
8use tracing::{trace, warn};
9use url::Url;
10
11use crate::types::entity::Entity;
12use crate::types::events::{RateData, VoteData};
13use crate::types::payload::{GatewayOp, Payload};
14
15const GATEWAY_URL: &str = "wss://gateway.dlist.top";
16
17
18#[async_trait]
19pub trait EventHandler: Send + Sync {
20    async fn on_ready(&self, entity: &Entity) {}
21    async fn on_disconnect(&self, reason: &str) {}
22    async fn on_vote(&self, data: VoteData) {}
23    async fn on_rate(&self, data: RateData) {}
24}
25
26error_chain! {
27    /*types {
28        MyError, MyErrorKind, MyResult;
29    }*/
30    foreign_links {
31        Websocket(tungstenite::Error) #[doc = "Websocket error"];
32        TrySendError(futures_channel::mpsc::TrySendError<Message>) #[doc = "Sending error"];
33        Serde(serde_json::Error) #[doc = "JSON error"];
34    }
35    errors {
36        CannotParsePayload(got: String, error: serde_json::Error) {
37            description("Cannot parse payload JSON")
38            display("Cannot parse payload. {:?} Got: {}", error, got)
39        }
40    }
41}
42
43pub struct Client<'a> {
44    token: &'a str,
45    handler: Arc<dyn EventHandler>,
46    write_tx: Option<Arc<UnboundedSender<Message>>>,
47    pub entity: Option<Entity>,
48}
49
50impl<'a> Client<'a> {
51    pub fn new(token: &'a str, handler: impl EventHandler + 'static) -> Self {
52        Client {
53            token,
54            handler: Arc::new(handler),
55            entity: None,
56            write_tx: None,
57        }
58    }
59
60    pub async fn connect(&mut self) {
61        let url = Url::parse(GATEWAY_URL).unwrap();
62
63        let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
64        let (write, read) = ws_stream.split();
65
66        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
67
68        let sender = StreamExt::map(write_rx, Ok).forward(write);
69
70        self.write_tx = Some(Arc::new(write_tx));
71
72        let self_lock = RwLock::new(self);
73
74        let reader = {
75            read.for_each(|message| async {
76                let mut that = self_lock.write().unwrap();
77                if let Err(why) = that.on_message(message).await {
78                    warn!("{}", why);
79                }
80            })
81        };
82
83        pin_mut!(sender, reader);
84        future::select(sender, reader).await;
85    }
86
87    fn send(&self, msg: Message) -> Result<()> {
88        self.write_tx.as_ref().unwrap().unbounded_send(msg)?;
89        Ok(())
90    }
91
92    async fn on_message(&mut self, message: tungstenite::Result<Message>) -> Result<()> {
93        let text = message?.into_text().unwrap();
94
95        if text == "Connection closed" { return Ok(()); }
96
97        let payload = serde_json::from_str::<Payload>(&text).map_err(|e| ErrorKind::CannotParsePayload(text, e))?;
98
99        match payload.op {
100            GatewayOp::Ready => {
101                let entity: Entity = serde_json::from_value(payload.data)?;
102                trace!("Ready. Connected to {:?}", entity);
103                self.entity = Some(entity);
104                self.handler.on_ready(&self.entity.as_ref().unwrap()).await;
105            }
106            GatewayOp::Hello => {
107                let msg = payload.data.as_str().unwrap_or("");
108                trace!("Connected with message: {}", msg);
109                let data = serde_json::to_string(&Payload {
110                    op: GatewayOp::Identify,
111                    data: json!({
112                                "token": self.token
113                            }),
114                    event: "".to_string(),
115                }).unwrap();
116
117                self.send(Message::Text(data))?;
118                trace!("Identify packet sent");
119            }
120            GatewayOp::Event => {
121                match payload.event.to_lowercase().as_str() {
122                    "vote" => {
123                        let data = serde_json::from_value(payload.data)?;
124                        self.handler.on_vote(data).await;
125                    }
126                    "rate" => {
127                        let data = serde_json::from_value(payload.data)?;
128                        self.handler.on_rate(data).await;
129                    }
130                    _ => {}
131                }
132            }
133            GatewayOp::Disconnect => {
134                let why = payload.data.as_str().unwrap_or("");
135                self.handler.on_disconnect(why).await;
136            }
137            _ => {}
138        };
139
140        Ok(())
141    }
142}