1use std::sync::{Arc, RwLock};
2
3use async_trait::async_trait;
4use futures_channel::mpsc::{UnboundedSender};
5use futures_util::{future, pin_mut, StreamExt};
6use serde_json::json;
7use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
8use tracing::{trace, warn};
9use url::Url;
10
11use crate::types::entity::Entity;
12use crate::types::events::{RateData, VoteData};
13use crate::types::payload::{GatewayOp, Payload};
14
15const GATEWAY_URL: &str = "wss://gateway.dlist.top";
16
17
18#[async_trait]
19pub trait EventHandler: Send + Sync {
20 async fn on_ready(&self, entity: &Entity) {}
21 async fn on_disconnect(&self, reason: &str) {}
22 async fn on_vote(&self, data: VoteData) {}
23 async fn on_rate(&self, data: RateData) {}
24}
25
26error_chain! {
27 foreign_links {
31 Websocket(tungstenite::Error) #[doc = "Websocket error"];
32 TrySendError(futures_channel::mpsc::TrySendError<Message>) #[doc = "Sending error"];
33 Serde(serde_json::Error) #[doc = "JSON error"];
34 }
35 errors {
36 CannotParsePayload(got: String, error: serde_json::Error) {
37 description("Cannot parse payload JSON")
38 display("Cannot parse payload. {:?} Got: {}", error, got)
39 }
40 }
41}
42
43pub struct Client<'a> {
44 token: &'a str,
45 handler: Arc<dyn EventHandler>,
46 write_tx: Option<Arc<UnboundedSender<Message>>>,
47 pub entity: Option<Entity>,
48}
49
50impl<'a> Client<'a> {
51 pub fn new(token: &'a str, handler: impl EventHandler + 'static) -> Self {
52 Client {
53 token,
54 handler: Arc::new(handler),
55 entity: None,
56 write_tx: None,
57 }
58 }
59
60 pub async fn connect(&mut self) {
61 let url = Url::parse(GATEWAY_URL).unwrap();
62
63 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
64 let (write, read) = ws_stream.split();
65
66 let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
67
68 let sender = StreamExt::map(write_rx, Ok).forward(write);
69
70 self.write_tx = Some(Arc::new(write_tx));
71
72 let self_lock = RwLock::new(self);
73
74 let reader = {
75 read.for_each(|message| async {
76 let mut that = self_lock.write().unwrap();
77 if let Err(why) = that.on_message(message).await {
78 warn!("{}", why);
79 }
80 })
81 };
82
83 pin_mut!(sender, reader);
84 future::select(sender, reader).await;
85 }
86
87 fn send(&self, msg: Message) -> Result<()> {
88 self.write_tx.as_ref().unwrap().unbounded_send(msg)?;
89 Ok(())
90 }
91
92 async fn on_message(&mut self, message: tungstenite::Result<Message>) -> Result<()> {
93 let text = message?.into_text().unwrap();
94
95 if text == "Connection closed" { return Ok(()); }
96
97 let payload = serde_json::from_str::<Payload>(&text).map_err(|e| ErrorKind::CannotParsePayload(text, e))?;
98
99 match payload.op {
100 GatewayOp::Ready => {
101 let entity: Entity = serde_json::from_value(payload.data)?;
102 trace!("Ready. Connected to {:?}", entity);
103 self.entity = Some(entity);
104 self.handler.on_ready(&self.entity.as_ref().unwrap()).await;
105 }
106 GatewayOp::Hello => {
107 let msg = payload.data.as_str().unwrap_or("");
108 trace!("Connected with message: {}", msg);
109 let data = serde_json::to_string(&Payload {
110 op: GatewayOp::Identify,
111 data: json!({
112 "token": self.token
113 }),
114 event: "".to_string(),
115 }).unwrap();
116
117 self.send(Message::Text(data))?;
118 trace!("Identify packet sent");
119 }
120 GatewayOp::Event => {
121 match payload.event.to_lowercase().as_str() {
122 "vote" => {
123 let data = serde_json::from_value(payload.data)?;
124 self.handler.on_vote(data).await;
125 }
126 "rate" => {
127 let data = serde_json::from_value(payload.data)?;
128 self.handler.on_rate(data).await;
129 }
130 _ => {}
131 }
132 }
133 GatewayOp::Disconnect => {
134 let why = payload.data.as_str().unwrap_or("");
135 self.handler.on_disconnect(why).await;
136 }
137 _ => {}
138 };
139
140 Ok(())
141 }
142}