use crate::{context::ResumeInfo, prelude::{Arc, Ctx, Mutex}};
use anyhow::Result;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use reqwest_websocket::{websocket, Message, WebSocket as WS};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JSON};
use std::{collections::VecDeque, time::Duration};
static DISCORD_WS_URI: &str = "wss://gateway.discord.gg/?encoding=json&v=9";
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DiscordMessage {
pub op: u8,
#[serde(rename = "d")]
pub data: JSON,
#[serde(rename = "s", skip_serializing)]
pub seq: Option<u64>,
#[serde(rename = "t", skip_serializing)]
pub event: Option<String>,
}
impl DiscordMessage {
pub fn new_heartbeat(seq: Option<u64>) -> Self {
Self {
op: 1,
data: match seq {
None => JSON::Null,
Some(s) => JSON::Number(s.into()),
},
seq: None,
event: None,
}
}
pub fn new_identify(token: &str) -> Self {
Self {
op: 2,
data: json!({
"token": token, "capabilities": 30717, "properties": { "os": "Windows", "browser": "Firefox", "device": "", "system_locale": "en-US", "browser_user_agent": "Mozilla/5.0 (Windows NT 10.0; rv:126.0) Gecko/20100101 Firefox/126.0", "browser_version": "126.0", "os_version": "", "referrer": "", "referring_domain": "", "referrer_current": "", "referring_domain_current": "", "release_channel": "stable",
"client_build_number": 327180, "client_event_source": JSON::Null, "design_id": 0 }, "presence": { "status": "unknown", "since": 0, "activities": [], "afk": false }, "compress": false, "client_state": { "guild_versions": {} }
}),
seq: None,
event: None,
}
}
pub fn new_resume(token: &str, seq: u64, info: &ResumeInfo) -> Self {
Self {
op: 6,
data: json!({
"token": token,
"session_id": info.id,
"seq": seq,
}),
seq: None,
event: None,
}
}
}
pub struct StreamCtrl {
s: WS,
rx: VecDeque<Message>,
tx: VecDeque<Message>,
}
impl StreamCtrl {
pub fn new(s: WS) -> Self {
Self {
s,
rx: VecDeque::with_capacity(64),
tx: VecDeque::with_capacity(64),
}
}
pub async fn start(self) -> (Arc<Mutex<VecDeque<Message>>>, Arc<Mutex<VecDeque<Message>>>) {
let _txq = Arc::new(Mutex::new(self.tx));
let _rxq = Arc::new(Mutex::new(self.rx));
let (mut tx, mut rx) = self.s.into_stream().split();
let rxq = _rxq.clone();
tokio::task::spawn(async move {
log::trace!("Starting websocket read loop");
loop {
let msg = match rx.next().await {
Some(r) => match r {
Err(e) => {
let mut rxq = rxq.lock().await;
rxq.close().await.expect("Couldn't close sink");
rxq.push_back(Message::Text(String::from(r#"{"op":255}"#)));
log::error!("rxq: {e}");
return;
}
Ok(m) => m,
},
None => {
let mut rxq = rxq.lock().await;
rxq.close().await.expect("Couldn't close sink");
log::trace!("rxq closed because end-of-stream reached");
return;
}
};
log::trace!("<<\n{msg:?}");
let mut rxq = rxq.lock().await;
rxq.push_back(msg);
}
});
let txq = _txq.clone();
tokio::task::spawn(async move {
log::trace!("Starting websocket write loop");
loop {
tokio::time::sleep(Duration::from_millis(5)).await;
let mut txq = txq.lock().await;
if txq.is_empty() { continue }
while let Some(msg) = txq.pop_front() {
log::trace!(">>\n{msg:?}");
tx.feed(msg).await.expect("Couldn't feed tx");
}
if let Err(e) = tx.flush().await {
txq.close().await.expect("Couldn't close stream");
log::error!("{e:?}");
return;
}
drop(txq);
}
});
(_txq, _rxq)
}
}
type Queue = Arc<Mutex<VecDeque<Message>>>;
pub struct Websocket {
pub q: (Queue, Queue),
pub ready: bool,
pub heartbeat: u64,
pub sequence: Option<u64>,
}
impl Websocket {
async fn connect(url: String) -> Result<WS> {
Ok(loop {
log::debug!("Trying websocket connection");
let attempt = websocket(url.clone()).await;
match attempt {
Ok(v) => { break v },
Err(e) => {
log::error!("Got error while trying to connect: {e}");
log::info!("Retrying in 5s");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
})
}
pub async fn new() -> Result<Self> {
let ws = Self::connect(String::from(DISCORD_WS_URI)).await?;
let (tx, rx) = StreamCtrl::new(ws)
.start()
.await;
Ok(Self {
ready: false,
heartbeat: 0,
sequence: None,
q: (tx, rx),
})
}
pub async fn new_with(resume_gateway: &String, sequence: u64) -> Result<Self> {
let ws = Self::connect(format!("{}?encoding=json&v=9", resume_gateway)).await?;
let (tx, rx) =
StreamCtrl::new(ws)
.start()
.await;
Ok(Self {
ready: false,
heartbeat: 0,
sequence: Some(sequence),
q: (tx, rx),
})
}
pub async fn send(&mut self, msg: DiscordMessage) -> Result<()> {
let msg = Self::serialize_message(msg)?;
let mut lock = self.q.0.lock().await;
lock.push_back(msg);
Ok(())
}
pub async fn read(&mut self) -> Result<DiscordMessage> {
loop {
let mut lock = self.q.1.lock().await;
if lock.is_empty() {
drop(lock);
tokio::time::sleep(Duration::from_millis(5)).await;
continue;
}
break Ok(Self::parse_message(lock.pop_front().unwrap())?);
}
}
pub async fn try_read(&mut self) -> Result<Option<DiscordMessage>> {
let mut lock = self.q.1.lock().await;
Ok(match lock.pop_front() {
Some(v) => Some(Self::parse_message(v)?),
None => None,
})
}
pub fn parse_message(msg: Message) -> Result<DiscordMessage> {
Ok(match msg {
Message::Text(t) => serde_json::from_str(t.as_str())?,
Message::Binary(b) => serde_json::from_slice(b.as_slice())?,
})
}
pub fn serialize_message(msg: DiscordMessage) -> Result<Message> {
Ok(Message::Text(serde_json::to_string(&msg)?))
}
async fn read_hello(&mut self) -> Result<()> {
let hello = self.read().await?;
self.heartbeat = hello.data["heartbeat_interval"]
.as_u64()
.expect("Invalid heartbeat interval");
self.send(DiscordMessage::new_heartbeat(None)).await?;
let _ = self.read().await?;
Ok(())
}
pub async fn login(&mut self, token: &str) -> Result<()> {
self.read_hello().await?;
log::debug!("Sending identify packet");
self.send(DiscordMessage::new_identify(token)).await?;
Ok(())
}
pub async fn resume(&mut self, ctx: Ctx) -> Result<()> {
self.read_hello().await?;
log::debug!("Sending resume packet");
let ctx = ctx.lock().await;
let info = ctx.resume_info.as_ref().unwrap();
self.send(DiscordMessage::new_resume(ctx.auth.as_ref().unwrap(), self.sequence.unwrap(), info)).await?;
Ok(())
}
}