use std::{process::exit, time::Instant};
const GATEWAY_VERSION: i8 = 10;
const GATEWAY_BASE_URL: &str = "wss://gateway.discord.gg/";
use crate::{
client::Client,
gateway::response::{DiscordOpCode, DiscordReceiveEvent, ReceiveEvent},
};
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::from_str;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
#[derive(Debug, Deserialize, Serialize)]
pub enum WebSocketReceiveData {}
#[derive(Serialize, Deserialize, Debug)]
pub struct ReceiveMessageOpcode {
op: i8,
}
#[derive(Debug, Serialize)]
pub struct IdentifyProperties {
os: String,
browser: String,
device: String,
}
#[derive(Debug, Serialize, Clone)]
pub enum ActivityType {
Game = 0,
Streaming = 1,
Listening = 2,
Watching = 3,
Custom = 4,
Competing = 5,
}
#[derive(Debug, Serialize, Clone)]
pub struct Activity {
pub name: String,
pub r#type: u8,
pub url: Option<String>,
pub state: Option<String>,
}
#[derive(Debug, Serialize, Clone)]
pub struct PresenceUpdate {
pub since: u128,
pub activities: Vec<Activity>,
pub status: String,
pub afk: bool,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum WebSocketMessageData {
Heartbeat(u64),
Identify {
token: String,
intents: i32,
properties: IdentifyProperties,
compress: bool,
large_threshold: i32,
shard: Option<Vec<i32>>,
presence: Option<PresenceUpdate>,
},
PresenceUpdate(PresenceUpdate),
}
#[derive(Debug, Serialize)]
pub struct WebSocketMessage {
op: u8,
d: WebSocketMessageData,
}
pub struct RustycordWebSocketResponse {}
pub struct DiscordWebSocket(WebSocketStream<MaybeTlsStream<TcpStream>>);
#[allow(dead_code)]
impl DiscordWebSocket {
const DISPATCH: u8 = 0;
const HEARTBEAT: u8 = 1;
const IDENTIFY: u8 = 2;
const PRESENCE_UPDATE: u8 = 3;
const VOICE_STATE_UPDATE: u8 = 4;
const RESUME: u8 = 6;
const RECONNECT: u8 = 7;
const REQUEST_GUILD_MEMBERS: u8 = 8;
const INVALID_SESSION: u8 = 9;
const HELLO: u8 = 10;
const HEARTBEAT_ACK: u8 = 11;
pub(crate) async fn connect(shard_id: usize) -> Self {
let gateway_url = format!("{}?v={}&encoding=json", GATEWAY_BASE_URL, GATEWAY_VERSION);
match connect_async(gateway_url).await {
Ok((ws_stream, _)) => {
log::info!("🔌 {} -> Connected to The Discord", shard_id);
Self(ws_stream)
}
Err(err) => {
log::error!("Shard {} -> Got Error Message {}", shard_id, err);
exit(2)
}
}
}
pub async fn recv(&mut self) -> Result<(Option<ReceiveEvent>, bool), String> {
log::trace!("🔄 Waiting for message from Discord gateway...");
let message = self.0.next().await.unwrap().unwrap();
let message = match message {
Message::Text(text) => {
log::trace!(
"📥 Received text message from gateway (length: {} chars)",
text.len()
);
text.to_string()
}
Message::Binary(data) => {
log::debug!(
"📥 Received binary message from gateway (length: {} bytes)",
data.len()
);
String::from_utf8_lossy(&data).to_string()
}
Message::Close(close_frame) => {
log::warn!("🔒 Received close frame from gateway: {:?}", close_frame);
return Err("Connection closed by Discord".to_string());
}
_ => {
log::debug!("📥 Received non-text message from gateway");
"".to_string()
}
};
if message.is_empty() {
return Err("🔴 -> Error receiving message".to_string());
}
log::trace!("🔍 Parsing gateway message...");
let _opcode = from_str::<ReceiveMessageOpcode>(message.as_str()).unwrap();
let r_event = from_str::<DiscordReceiveEvent>(message.as_str()).unwrap();
log::debug!("📨 Gateway event: {:?} (op: {})", r_event.t, _opcode.op);
let r_event = ReceiveEvent {
t: r_event.t,
s: r_event.s,
op: DiscordOpCode::from(_opcode.op),
d: r_event.d,
};
Ok((Some(r_event), true))
}
pub async fn send_json(&mut self, message: WebSocketMessage) -> bool {
let message = Message::Text(serde_json::to_string(&message).unwrap().into());
match self.0.send(message).await {
Ok(_) => true,
Err(err) => {
log::error!("Error sending message {}", err);
false
}
}
}
pub async fn send_heartbeat(&mut self) -> bool {
log::debug!("💓 Sending heartbeat to Discord gateway");
let heartbeat = WebSocketMessage {
op: Self::HEARTBEAT,
d: WebSocketMessageData::Heartbeat(251),
};
let result = self.send_json(heartbeat).await;
if result {
log::trace!("💓 Heartbeat sent successfully");
} else {
log::error!("💔 Failed to send heartbeat");
}
result
}
pub async fn send_identify(
&mut self,
token: String,
intents: Option<i32>,
compress: bool,
large_threshold: Option<i32>,
shard: Option<Vec<i32>>,
presence: Option<PresenceUpdate>,
) {
let int: i32 = match intents {
Some(int) => {
log::debug!("🎯 Using provided intents: {}", int);
int
}
None => {
log::warn!("No intents provided using default");
1
}
};
log::debug!("🎭 Setting up presence and activity...");
let pre = presence.unwrap_or_else(|| PresenceUpdate {
since: 0,
activities: vec![Activity {
name: "with Rust".to_string(),
r#type: 0,
url: Some("https://iamdhakrey.dev".to_string()),
state: Some("Enjoying Rust".to_string()),
}],
status: "online".to_string(),
afk: false,
});
let shr: Vec<i32> = shard.unwrap_or_else(|| vec![0, 1]);
log::debug!("🗂️ Using shard configuration: {:?}", shr);
let identify = WebSocketMessage {
op: DiscordWebSocket::IDENTIFY,
d: WebSocketMessageData::Identify {
token: token.to_string(),
intents: int,
properties: IdentifyProperties {
os: "linux".to_string(),
browser: "rustycord".to_string(),
device: "rustycord".to_string(),
},
compress: compress,
large_threshold: large_threshold.unwrap_or(50),
shard: Some(shr),
presence: Some(pre),
},
};
log::info!("🔑 -> Sending Identification Message");
self.send_json(identify).await;
}
async fn presence_update(&mut self, presence: PresenceUpdate) {
let activities: Vec<Activity> = presence.activities.clone();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let presence = WebSocketMessage {
op: Self::PRESENCE_UPDATE,
d: WebSocketMessageData::PresenceUpdate(PresenceUpdate {
since: now,
activities: activities,
status: "online".to_string(),
afk: false,
}),
};
self.send_json(presence).await;
}
async fn resume(&mut self) {
let resume = WebSocketMessage {
op: Self::RESUME,
d: WebSocketMessageData::Heartbeat(251),
};
self.send_json(resume).await;
}
}
pub struct Manager {
pub ws: DiscordWebSocket,
last_heartbeat: Option<Instant>,
last_heartbeat_ack: Option<Instant>,
heartbeat_interval: Option<i8>,
pub token: String,
pub intents: i32,
shard_id: usize,
pub total_shards: usize,
pub client: Option<Client>,
}
impl Manager {
pub async fn new(token: String, intents: i32, shard_id: usize, total_shards: usize) -> Self {
let ws = DiscordWebSocket::connect(shard_id).await;
let last_heartbeat = None;
let last_heartbeat_ack = None;
let heartbeat_interval = 0;
Self {
ws,
last_heartbeat,
last_heartbeat_ack,
heartbeat_interval: Some(heartbeat_interval),
token,
intents,
shard_id,
total_shards,
client: None,
}
}
pub fn set_client(&mut self, client: Client) {
self.client = Some(client);
}
pub async fn is_required_heartbeat(&mut self) -> bool {
if self.heartbeat_interval.is_none() {
log::debug!("Need to send heartbeat");
return true;
}
if self.last_heartbeat_ack.is_none() {
log::debug!("Need to send heartbeat");
return true;
}
if self.last_heartbeat.is_none() {
log::debug!("Need to send heartbeat");
return true;
}
if self.last_heartbeat.unwrap().elapsed().as_secs() < 5 {
log::debug!("NO Need to send heartbeat");
return false;
}
if self.last_heartbeat_ack.unwrap().elapsed().as_secs() > 15 {
log::debug!("Need to send heartbeat");
return true;
}
if self.last_heartbeat.unwrap().elapsed().as_secs() > 5 {
log::debug!("Need to send heartbeat");
return true;
}
log::debug!("Need to send heartbeat");
true
}
pub async fn handle_heartbeat(&mut self) {
if self.is_required_heartbeat().await {
self.ws.send_heartbeat().await;
self.last_heartbeat = Some(Instant::now());
}
}
pub async fn handle_event(&mut self) {
match self.ws.recv().await {
Ok((Some(event), _)) => {
log::debug!(
"Shard {} received event: {:?} op: {:?}",
self.shard_id,
event.t,
event.op
);
let event_clone = event.clone();
if DiscordOpCode::from(event_clone.op) == DiscordOpCode::HeartbeatAck {
self.last_heartbeat_ack = Some(Instant::now())
} else {
self.dispatch(event_clone).await
}
}
Ok((None, _)) => {
log::warn!("Shard {} received None event", self.shard_id);
}
Err(err) => {
log::error!("Shard {} error receiving event: {:?}", self.shard_id, err);
}
}
}
async fn dispatch(&self, event: ReceiveEvent) {
if let Some(client) = &self.client {
if let Err(e) = client.event_dispatcher.dispatch_event(&event, client).await {
log::error!("Error dispatching event: {:?}", e);
}
} else {
log::warn!("No client available for event dispatching");
}
}
pub async fn send_identify(
&mut self,
token: String,
intents: i32,
compress: bool,
large_threshold: Option<i32>,
shard: Option<Vec<i32>>,
presence: Option<PresenceUpdate>,
) {
let identify = WebSocketMessage {
op: DiscordWebSocket::IDENTIFY,
d: WebSocketMessageData::Identify {
token: token.to_string(),
intents,
properties: IdentifyProperties {
os: "linux".to_string(),
browser: "rustycord".to_string(),
device: "rustycord".to_string(),
},
compress,
large_threshold: large_threshold.unwrap_or(50),
shard,
presence,
},
};
log::info!(
"🔑 Shard {} - Sending Identification Message",
self.shard_id
);
self.ws.send_json(identify).await;
}
}