use crate::gateway::events::GatewayEvent;
use crate::gateway::opcodes::GatewayOpCode;
use crate::utils::parse_stream_key;
use crate::voice::connection::{ConnectionError, VoiceConnection, VoiceEvent};
use serde_json::{json, Value};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
use tracing::debug;
#[derive(Debug, Error)]
pub enum StreamerError {
#[error("Not logged in")]
NotLoggedIn,
#[error("Not in voice channel")]
NotInVoice,
#[error("No session yet")]
NoSession,
#[error("Connection error: {0}")]
Connection(#[from] ConnectionError),
}
#[derive(Debug)]
pub struct GatewayPayload {
pub op: u8,
pub d: Value,
}
pub struct Streamer {
user_id: String,
gateway_tx: mpsc::UnboundedSender<GatewayPayload>,
voice_connection: Option<Arc<Mutex<VoiceConnection>>>,
}
impl Streamer {
pub fn new(user_id: String, gateway_tx: mpsc::UnboundedSender<GatewayPayload>) -> Self {
Self {
user_id,
gateway_tx,
voice_connection: None,
}
}
pub fn join_voice(
&mut self,
guild_id: Option<String>,
channel_id: String,
) -> mpsc::UnboundedReceiver<VoiceEvent> {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let conn = VoiceConnection::new(
guild_id.clone(),
channel_id.clone(),
self.user_id.clone(),
event_tx,
);
self.voice_connection = Some(Arc::new(Mutex::new(conn)));
self.signal_video(false, guild_id, channel_id);
event_rx
}
pub async fn handle_event(&mut self, event: GatewayEvent) {
match event {
GatewayEvent::VoiceStateUpdate(d) => {
if d.user_id != self.user_id {
return;
}
debug!("Gateway: VOICE_STATE_UPDATE session={}", d.session_id);
if let Some(vc) = &self.voice_connection {
vc.lock().await.set_session(d.session_id);
}
}
GatewayEvent::VoiceServerUpdate(d) => {
let matches = self.voice_connection.as_ref().map_or(false, |_| true);
if !matches {
return;
}
debug!("Gateway: VOICE_SERVER_UPDATE endpoint={}", d.endpoint);
if let Some(vc) = &self.voice_connection {
let mut conn = vc.lock().await;
conn.set_tokens(d.endpoint, d.token);
if conn.can_start() {
drop(conn); let conn_arc = vc.clone();
tokio::spawn(async move {
let conn = {
let guard = conn_arc.lock().await;
debug!("VoiceConnection ready to start");
};
});
}
}
}
GatewayEvent::StreamCreate(d) => {
debug!("Gateway: STREAM_CREATE key={}", d.stream_key);
if let Ok(_parsed) = parse_stream_key(&d.stream_key) {
if let Some(vc) = &self.voice_connection {
let mut vc_guard = vc.lock().await;
let session_id = vc_guard.session_id().map(str::to_owned);
if let Some(sc) = vc_guard.stream_connection_mut() {
sc.server_id = Some(d.rtc_server_id);
sc.stream_key = Some(d.stream_key);
if let Some(sid) = session_id {
sc.inner.set_session(sid);
}
}
}
}
}
GatewayEvent::StreamServerUpdate(d) => {
debug!("Gateway: STREAM_SERVER_UPDATE key={}", d.stream_key);
if let Some(vc) = &self.voice_connection {
let mut vc_guard = vc.lock().await;
if let Some(sc) = vc_guard.stream_connection_mut() {
sc.inner.set_tokens(d.endpoint, d.token);
}
}
}
GatewayEvent::Unknown(_) => {}
}
}
pub fn create_stream(&mut self) -> Result<(), StreamerError> {
self.signal_stream()?;
Ok(())
}
pub fn stop_stream(&mut self) -> Result<(), StreamerError> {
self.signal_stop_stream()?;
Ok(())
}
pub fn leave_voice(&mut self) {
self.voice_connection = None;
self.signal_leave_voice();
}
fn send_opcode(&self, op: GatewayOpCode, d: Value) {
let _ = self.gateway_tx.send(GatewayPayload { op: op as u8, d });
}
fn signal_video(&self, video_enabled: bool, guild_id: Option<String>, channel_id: String) {
self.send_opcode(
GatewayOpCode::VoiceStateUpdate,
json!({
"guild_id": guild_id,
"channel_id": channel_id,
"self_mute": false,
"self_deaf": true,
"self_video": video_enabled,
}),
);
}
fn signal_stream(&self) -> Result<(), StreamerError> {
let _vc = self.voice_connection.as_ref().ok_or(StreamerError::NotInVoice)?;
Ok(())
}
fn signal_stop_stream(&self) -> Result<(), StreamerError> {
Ok(())
}
fn signal_leave_voice(&self) {
self.send_opcode(
GatewayOpCode::VoiceStateUpdate,
json!({
"guild_id": null,
"channel_id": null,
"self_mute": true,
"self_deaf": false,
"self_video": false,
}),
);
}
}