actor_discord/
connection.rs

1use crate::types::events::{
2    ChannelEvent, Event, GuildChannel, GuildCreate, MessageEvent, MessageObject,
3};
4use crate::types::gateway::{GatewayHello, GatewayIdentify, GatewayMessage, GatewayReply};
5use crate::{types::gateway, DiscordAPI, GatewayIntents};
6//use crate::{NAME, VERSION};
7use actix_http::ws::Frame;
8use anyhow::Result;
9use awc::ws::Message;
10//use awc::{ws, Client, ClientBuilder};
11use awc::Client;
12use futures::StreamExt;
13use futures_util::sink::SinkExt as _;
14//use futures_util::{sink::SinkExt as _, stream::StreamExt as _};
15use actix_broker::{Broker, SystemBroker};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19//use tokio::sync::mpsc;
20//use tokio::sync::watch;
21use tokio::time::Interval;
22use url::Url;
23const GATEWAY: &str = "gateway";
24pub struct DiscordBot<'a> {
25    pub api: &'a DiscordAPI,
26    pub client: Client,
27    pub web_socket: Url,
28    pub intents: u64,
29    pub duration: Duration,
30    pub sequence_number: Option<usize>,
31    pub interval: Interval,
32}
33impl<'a> DiscordBot<'a> {
34    pub async fn create(api: &'a DiscordAPI, intents: GatewayIntents) -> Result<DiscordBot<'a>> {
35        let mut config = rustls::ClientConfig::new();
36        config
37            .root_store
38            .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
39
40        let protos = vec![b"http/1.1".to_vec()];
41        config.set_protocols(&protos);
42
43        let rc_config = Arc::new(config);
44        let client = Client::builder()
45            .connector(awc::Connector::new().rustls(rc_config))
46            .finish();
47
48        //   let base_url: Url = Url::from_str(connect_addr)?.join(API_PREFIX)?;
49
50        //        log::error!("URL_{}", base_url.as_str());
51        let web_socket = api.get::<GatewayReply>(GATEWAY).await?;
52
53        let web_socket_url = Url::from_str(&web_socket.url)?;
54        let duration = Duration::from_secs(1);
55        Ok(DiscordBot {
56            client,
57            api,
58            web_socket: web_socket_url,
59            //  tx,
60            //  rx,
61            intents: intents.bits,
62            duration,
63            sequence_number: None,
64            interval: tokio::time::interval(duration),
65        })
66    }
67
68    async fn handle_ws_gateway_event(
69        &mut self,
70        event_name: &str,
71        gateway_message: serde_json::Value,
72    ) -> Result<(bool, Option<Message>)> {
73        match event_name {
74            "GUILD_CREATE" => {
75                let gc: GuildCreate = serde_json::from_value(gateway_message)?;
76                let event = Event::GuildCreate(gc);
77                log::debug!("Guild Create");
78                Broker::<SystemBroker>::issue_async(event);
79            }
80            "READY" => {
81                // TODO log session id for resumes
82                log::debug!("READY\n{}", gateway_message)
83            }
84            "MESSAGE_CREATE" | "MESSAGE_UPDATE" | "MESSAGE_DELETE" => {
85                // log::info!("{}\n{}", event_name, gateway_message);
86                let gc: MessageObject = serde_json::from_value(gateway_message)?;
87                let event = if event_name == "MESSAGE_CREATE" {
88                    MessageEvent::MessageCreate(gc)
89                } else if event_name == "MESSAGE_DELETE" {
90                    MessageEvent::MessageDelete(gc)
91                } else {
92                    MessageEvent::MessageUpdate(gc)
93                };
94
95                log::debug!("Message Create/update");
96                Broker::<SystemBroker>::issue_async(event);
97            }
98            "CHANNEL_UPDATE" | "CHANNEL_CREATE" | "CHANNEL_DELETE" => {
99                //  log::info!("{}\n{}", event_name, gateway_message);
100                let gc: GuildChannel = serde_json::from_value(gateway_message)?;
101                let event = if event_name == "CHANNEL_CREATE" {
102                    ChannelEvent::ChannelCreate(gc)
103                } else if event_name == "CHANNEL_DELETE" {
104                    ChannelEvent::ChannelDelete(gc)
105                } else {
106                    ChannelEvent::ChannelUpdate(gc)
107                };
108                Broker::<SystemBroker>::issue_async(event);
109            }
110
111            &_ => {
112                log::warn!("Unknown event {}\n{}", event_name, gateway_message)
113            }
114        }
115        Ok((true, None))
116    }
117    async fn handle_ws(
118        &mut self,
119        //  connection: &mut actix_codec::Framed<BoxedSocket, Codec>,
120        response: Frame,
121    ) -> Result<(bool, Option<Message>)> {
122        match response {
123            Frame::Text(txt) => {
124                let b: GatewayMessage = serde_json::from_str(&String::from_utf8_lossy(&txt))?;
125                if let Some(new_sequence) = b.s {
126                    self.sequence_number = Some(new_sequence);
127                }
128                match b.op {
129                    gateway::GATEWAY => {
130                        if let Some(gateway_event_name) = b.t {
131                            return self.handle_ws_gateway_event(&gateway_event_name, b.d).await;
132                        } else {
133                            log::warn!("Gateway No Event ?? {}", &String::from_utf8_lossy(&txt));
134                        }
135                    }
136                    gateway::HELLO => {
137                        let hello: GatewayHello = serde_json::from_value(b.d)?;
138                        log::info!("Heartbeat:{}ms", hello.heartbeat_interval);
139                        self.duration = Duration::from_millis(hello.heartbeat_interval);
140                        self.interval = tokio::time::interval(self.duration);
141                        let identify = serde_json::to_value(GatewayIdentify::create(
142                            &self.api.token,
143                            self.intents,
144                        ))?;
145                        let msg_json: String = serde_json::to_string(&GatewayMessage {
146                            op: gateway::IDENTIFY,
147                            d: identify,
148                            s: None,
149                            t: None,
150                        })?;
151                        log::info!("Identify");
152                        let message = Message::Text(msg_json.into());
153                        return Ok((true, Some(message)));
154                    }
155                    gateway::ACK => {
156                        log::debug!("ACKED {}", String::from_utf8_lossy(&txt));
157                    }
158                    gateway::INVALID_SESSION => {
159                        log::info!("INVALID session {}", b.d.as_bool().unwrap_or(false));
160                    }
161                    _ => {
162                        log::error!("Unknown Op Code: {}", b.op)
163                    }
164                }
165            }
166            Frame::Binary(_) => {}
167            Frame::Continuation(_) => {}
168            Frame::Ping(p) => {
169                log::info!("Ping");
170                let pong = Message::Pong(p);
171                return Ok((true, Some(pong)));
172                //connection.send(pong).await?;
173            }
174            Frame::Pong(_) => {}
175            Frame::Close(b) => {
176                match b {
177                    Some(close) => {
178                        log::warn!(
179                            "Socket Closed xx/{}",
180                            // close.code.into(),
181                            close.description.unwrap_or_default()
182                        )
183                    }
184                    None => {
185                        log::warn!("Socket Closed no-reason")
186                    }
187                };
188                return Ok((false, None));
189            }
190        }
191        Ok((true, None))
192    }
193    pub async fn start_websocket(&mut self) -> Result<()> {
194        let mut connect_ws = self.web_socket.clone();
195        connect_ws.set_query(Some("v=9&encoding=json&compress=false"));
196        log::info!("Starting Connect {}", connect_ws.as_str());
197
198        let (_resp, mut connection) = self.client.ws(connect_ws.as_str()).connect().await.unwrap();
199        // let mut interval = tokio::time::interval(duration);
200        Broker::<SystemBroker>::issue_async(Event::INIT);
201        loop {
202            log::debug!("Starting Select");
203            tokio::select! {
204                websocket = connection.next() => {
205                    log::debug!("WS has a message");
206                    let response = websocket.unwrap().unwrap();
207                    let (continu,message_send) = self.handle_ws(response).await?;
208                    if let Some(to_be_sent) = message_send {
209                         let _result =  connection.send(to_be_sent).await?;
210                    }
211                    if !continu {
212                        break
213                    }
214
215                }
216                _ =  self.interval.tick() => {
217                    let heartbeat = serde_json::to_value(self.sequence_number)?;
218                    let msg_json : String = serde_json::to_string( &GatewayMessage{ op:gateway::HEARTBEAT, d:heartbeat,s:None,t:None})?;
219                    log::debug!("Sending Heart-beart {}", msg_json);
220                    let message= Message::Text(msg_json.into());
221                    let _result = connection.send(message).await?;
222                }
223            }
224            log::debug!("end-of-loop");
225        }
226
227        Ok(())
228    }
229}