actor_discord/
connection.rs1use crate::types::events::{
2 ChannelEvent, Event, GuildChannel, GuildCreate, MessageEvent, MessageObject,
3};
4use crate::types::gateway::{GatewayHello, GatewayIdentify, GatewayMessage, GatewayReply};
5use crate::{types::gateway, DiscordAPI, GatewayIntents};
6use actix_http::ws::Frame;
8use anyhow::Result;
9use awc::ws::Message;
10use awc::Client;
12use futures::StreamExt;
13use futures_util::sink::SinkExt as _;
14use actix_broker::{Broker, SystemBroker};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::time::Interval;
22use url::Url;
23const GATEWAY: &str = "gateway";
24pub struct DiscordBot<'a> {
25 pub api: &'a DiscordAPI,
26 pub client: Client,
27 pub web_socket: Url,
28 pub intents: u64,
29 pub duration: Duration,
30 pub sequence_number: Option<usize>,
31 pub interval: Interval,
32}
33impl<'a> DiscordBot<'a> {
34 pub async fn create(api: &'a DiscordAPI, intents: GatewayIntents) -> Result<DiscordBot<'a>> {
35 let mut config = rustls::ClientConfig::new();
36 config
37 .root_store
38 .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
39
40 let protos = vec![b"http/1.1".to_vec()];
41 config.set_protocols(&protos);
42
43 let rc_config = Arc::new(config);
44 let client = Client::builder()
45 .connector(awc::Connector::new().rustls(rc_config))
46 .finish();
47
48 let web_socket = api.get::<GatewayReply>(GATEWAY).await?;
52
53 let web_socket_url = Url::from_str(&web_socket.url)?;
54 let duration = Duration::from_secs(1);
55 Ok(DiscordBot {
56 client,
57 api,
58 web_socket: web_socket_url,
59 intents: intents.bits,
62 duration,
63 sequence_number: None,
64 interval: tokio::time::interval(duration),
65 })
66 }
67
68 async fn handle_ws_gateway_event(
69 &mut self,
70 event_name: &str,
71 gateway_message: serde_json::Value,
72 ) -> Result<(bool, Option<Message>)> {
73 match event_name {
74 "GUILD_CREATE" => {
75 let gc: GuildCreate = serde_json::from_value(gateway_message)?;
76 let event = Event::GuildCreate(gc);
77 log::debug!("Guild Create");
78 Broker::<SystemBroker>::issue_async(event);
79 }
80 "READY" => {
81 log::debug!("READY\n{}", gateway_message)
83 }
84 "MESSAGE_CREATE" | "MESSAGE_UPDATE" | "MESSAGE_DELETE" => {
85 let gc: MessageObject = serde_json::from_value(gateway_message)?;
87 let event = if event_name == "MESSAGE_CREATE" {
88 MessageEvent::MessageCreate(gc)
89 } else if event_name == "MESSAGE_DELETE" {
90 MessageEvent::MessageDelete(gc)
91 } else {
92 MessageEvent::MessageUpdate(gc)
93 };
94
95 log::debug!("Message Create/update");
96 Broker::<SystemBroker>::issue_async(event);
97 }
98 "CHANNEL_UPDATE" | "CHANNEL_CREATE" | "CHANNEL_DELETE" => {
99 let gc: GuildChannel = serde_json::from_value(gateway_message)?;
101 let event = if event_name == "CHANNEL_CREATE" {
102 ChannelEvent::ChannelCreate(gc)
103 } else if event_name == "CHANNEL_DELETE" {
104 ChannelEvent::ChannelDelete(gc)
105 } else {
106 ChannelEvent::ChannelUpdate(gc)
107 };
108 Broker::<SystemBroker>::issue_async(event);
109 }
110
111 &_ => {
112 log::warn!("Unknown event {}\n{}", event_name, gateway_message)
113 }
114 }
115 Ok((true, None))
116 }
117 async fn handle_ws(
118 &mut self,
119 response: Frame,
121 ) -> Result<(bool, Option<Message>)> {
122 match response {
123 Frame::Text(txt) => {
124 let b: GatewayMessage = serde_json::from_str(&String::from_utf8_lossy(&txt))?;
125 if let Some(new_sequence) = b.s {
126 self.sequence_number = Some(new_sequence);
127 }
128 match b.op {
129 gateway::GATEWAY => {
130 if let Some(gateway_event_name) = b.t {
131 return self.handle_ws_gateway_event(&gateway_event_name, b.d).await;
132 } else {
133 log::warn!("Gateway No Event ?? {}", &String::from_utf8_lossy(&txt));
134 }
135 }
136 gateway::HELLO => {
137 let hello: GatewayHello = serde_json::from_value(b.d)?;
138 log::info!("Heartbeat:{}ms", hello.heartbeat_interval);
139 self.duration = Duration::from_millis(hello.heartbeat_interval);
140 self.interval = tokio::time::interval(self.duration);
141 let identify = serde_json::to_value(GatewayIdentify::create(
142 &self.api.token,
143 self.intents,
144 ))?;
145 let msg_json: String = serde_json::to_string(&GatewayMessage {
146 op: gateway::IDENTIFY,
147 d: identify,
148 s: None,
149 t: None,
150 })?;
151 log::info!("Identify");
152 let message = Message::Text(msg_json.into());
153 return Ok((true, Some(message)));
154 }
155 gateway::ACK => {
156 log::debug!("ACKED {}", String::from_utf8_lossy(&txt));
157 }
158 gateway::INVALID_SESSION => {
159 log::info!("INVALID session {}", b.d.as_bool().unwrap_or(false));
160 }
161 _ => {
162 log::error!("Unknown Op Code: {}", b.op)
163 }
164 }
165 }
166 Frame::Binary(_) => {}
167 Frame::Continuation(_) => {}
168 Frame::Ping(p) => {
169 log::info!("Ping");
170 let pong = Message::Pong(p);
171 return Ok((true, Some(pong)));
172 }
174 Frame::Pong(_) => {}
175 Frame::Close(b) => {
176 match b {
177 Some(close) => {
178 log::warn!(
179 "Socket Closed xx/{}",
180 close.description.unwrap_or_default()
182 )
183 }
184 None => {
185 log::warn!("Socket Closed no-reason")
186 }
187 };
188 return Ok((false, None));
189 }
190 }
191 Ok((true, None))
192 }
193 pub async fn start_websocket(&mut self) -> Result<()> {
194 let mut connect_ws = self.web_socket.clone();
195 connect_ws.set_query(Some("v=9&encoding=json&compress=false"));
196 log::info!("Starting Connect {}", connect_ws.as_str());
197
198 let (_resp, mut connection) = self.client.ws(connect_ws.as_str()).connect().await.unwrap();
199 Broker::<SystemBroker>::issue_async(Event::INIT);
201 loop {
202 log::debug!("Starting Select");
203 tokio::select! {
204 websocket = connection.next() => {
205 log::debug!("WS has a message");
206 let response = websocket.unwrap().unwrap();
207 let (continu,message_send) = self.handle_ws(response).await?;
208 if let Some(to_be_sent) = message_send {
209 let _result = connection.send(to_be_sent).await?;
210 }
211 if !continu {
212 break
213 }
214
215 }
216 _ = self.interval.tick() => {
217 let heartbeat = serde_json::to_value(self.sequence_number)?;
218 let msg_json : String = serde_json::to_string( &GatewayMessage{ op:gateway::HEARTBEAT, d:heartbeat,s:None,t:None})?;
219 log::debug!("Sending Heart-beart {}", msg_json);
220 let message= Message::Text(msg_json.into());
221 let _result = connection.send(message).await?;
222 }
223 }
224 log::debug!("end-of-loop");
225 }
226
227 Ok(())
228 }
229}