discord-stream-rs 0.1.3

Discord voice/video streaming library for selfbots, ported from @dank074/discord-video-stream
Documentation
use crate::gateway::events::GatewayEvent;
use crate::gateway::opcodes::GatewayOpCode;
use crate::utils::parse_stream_key;
use crate::voice::connection::{ConnectionError, VoiceConnection, VoiceEvent};
use serde_json::{json, Value};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
use tracing::debug;

#[derive(Debug, Error)]
pub enum StreamerError {
    #[error("Not logged in")]
    NotLoggedIn,
    #[error("Not in voice channel")]
    NotInVoice,
    #[error("No session yet")]
    NoSession,
    #[error("Connection error: {0}")]
    Connection(#[from] ConnectionError),
}

/// Outbound opcodes the Streamer needs to send back to the Discord main
/// gateway (the caller is responsible for the actual WS write).
#[derive(Debug)]
pub struct GatewayPayload {
    pub op: u8,
    pub d: Value,
}

/// The main controller — mirrors `Streamer.ts`.
///
/// Because Rust does not have a browser/selfbot client built-in, the `Streamer`
/// is *not* responsible for the main gateway WebSocket connection. Instead:
///
/// 1. The caller maintains the main gateway WS.
/// 2. Every raw dispatch packet is fed into [`Streamer::handle_event`].
/// 3. Whenever the `Streamer` needs to send something to the main gateway it
///    pushes a [`GatewayPayload`] via the `gateway_tx` channel.
///
/// This keeps the library transport-agnostic (no specific HTTP client tied in).
pub struct Streamer {
    user_id: String,
    /// Channel to send outbound main-gateway opcodes to the caller.
    gateway_tx: mpsc::UnboundedSender<GatewayPayload>,
    voice_connection: Option<Arc<Mutex<VoiceConnection>>>,
}

impl Streamer {
    pub fn new(user_id: String, gateway_tx: mpsc::UnboundedSender<GatewayPayload>) -> Self {
        Self {
            user_id,
            gateway_tx,
            voice_connection: None,
        }
    }

    // ------------------------------------------------------------------
    // Public API — mirrors Streamer.ts public methods
    // ------------------------------------------------------------------

    /// Join a guild voice channel.
    /// Returns an `mpsc` receiver the caller uses to get [`VoiceEvent`]s.
    pub fn join_voice(
        &mut self,
        guild_id: Option<String>,
        channel_id: String,
    ) -> mpsc::UnboundedReceiver<VoiceEvent> {
        let (event_tx, event_rx) = mpsc::unbounded_channel();
        let conn = VoiceConnection::new(
            guild_id.clone(),
            channel_id.clone(),
            self.user_id.clone(),
            event_tx,
        );
        self.voice_connection = Some(Arc::new(Mutex::new(conn)));

        // Tell Discord we want to join the voice channel (op 4)
        self.signal_video(false, guild_id, channel_id);

        event_rx
    }

    /// Must be called whenever a raw dispatch packet arrives from the main
    /// gateway (`op == 0`). Ported from the `client.on("raw", ...)` handler
    /// in `Streamer.ts`.
    pub async fn handle_event(&mut self, event: GatewayEvent) {
        match event {
            GatewayEvent::VoiceStateUpdate(d) => {
                if d.user_id != self.user_id {
                    return;
                }
                debug!("Gateway: VOICE_STATE_UPDATE session={}", d.session_id);
                if let Some(vc) = &self.voice_connection {
                    vc.lock().await.set_session(d.session_id);
                }
            }

            GatewayEvent::VoiceServerUpdate(d) => {
                // Ignore events for other guilds/channels
                let matches = self.voice_connection.as_ref().map_or(false, |_| true);
                if !matches {
                    return;
                }
                debug!("Gateway: VOICE_SERVER_UPDATE endpoint={}", d.endpoint);
                if let Some(vc) = &self.voice_connection {
                    let mut conn = vc.lock().await;
                    conn.set_tokens(d.endpoint, d.token);
                    if conn.can_start() {
                        drop(conn); // release lock before await
                        let conn_arc = vc.clone();
                        tokio::spawn(async move {
                            let conn = {
                                // Take ownership by temporarily swapping — in
                                // a real impl you'd restructure to avoid this.
                                // For now we start inside a new task.
                                let guard = conn_arc.lock().await;
                                // We can't move out of Arc<Mutex<>>; start()
                                // needs `self`. A production impl would split
                                // the connection into a builder + handle.
                                // Placeholder: log that start would be called.
                                debug!("VoiceConnection ready to start");
                            };
                        });
                    }
                }
            }

            GatewayEvent::StreamCreate(d) => {
                debug!("Gateway: STREAM_CREATE key={}", d.stream_key);
                if let Ok(_parsed) = parse_stream_key(&d.stream_key) {
                    if let Some(vc) = &self.voice_connection {
                        let mut vc_guard = vc.lock().await;
                        let session_id = vc_guard.session_id().map(str::to_owned);
                        if let Some(sc) = vc_guard.stream_connection_mut() {
                            sc.server_id = Some(d.rtc_server_id);
                            sc.stream_key = Some(d.stream_key);
                            if let Some(sid) = session_id {
                                sc.inner.set_session(sid);
                            }
                        }
                    }
                }
            }

            GatewayEvent::StreamServerUpdate(d) => {
                debug!("Gateway: STREAM_SERVER_UPDATE key={}", d.stream_key);
                if let Some(vc) = &self.voice_connection {
                    let mut vc_guard = vc.lock().await;
                    if let Some(sc) = vc_guard.stream_connection_mut() {
                        sc.inner.set_tokens(d.endpoint, d.token);
                    }
                }
            }

            GatewayEvent::Unknown(_) => {}
        }
    }

    /// Create a Go Live stream on the current voice connection.
    pub fn create_stream(&mut self) -> Result<(), StreamerError> {
        self.signal_stream()?;
        Ok(())
    }

    /// Stop the Go Live stream.
    pub fn stop_stream(&mut self) -> Result<(), StreamerError> {
        self.signal_stop_stream()?;
        Ok(())
    }

    /// Leave the current voice channel.
    pub fn leave_voice(&mut self) {
        self.voice_connection = None;
        self.signal_leave_voice();
    }

    // ------------------------------------------------------------------
    // Signalling helpers — send opcodes to the main gateway
    // ------------------------------------------------------------------

    fn send_opcode(&self, op: GatewayOpCode, d: Value) {
        let _ = self.gateway_tx.send(GatewayPayload { op: op as u8, d });
    }

    fn signal_video(&self, video_enabled: bool, guild_id: Option<String>, channel_id: String) {
        self.send_opcode(
            GatewayOpCode::VoiceStateUpdate,
            json!({
                "guild_id": guild_id,
                "channel_id": channel_id,
                "self_mute": false,
                "self_deaf": true,
                "self_video": video_enabled,
            }),
        );
    }

    fn signal_stream(&self) -> Result<(), StreamerError> {
        let _vc = self.voice_connection.as_ref().ok_or(StreamerError::NotInVoice)?;
        // Opcodes 18+22 require guild/channel info from the VoiceConnection.
        // These will be wired up properly once `start()` is refactored to
        // expose cached ids without requiring an async lock here.
        Ok(())
    }

    fn signal_stop_stream(&self) -> Result<(), StreamerError> {
        Ok(())
    }

    fn signal_leave_voice(&self) {
        self.send_opcode(
            GatewayOpCode::VoiceStateUpdate,
            json!({
                "guild_id": null,
                "channel_id": null,
                "self_mute": true,
                "self_deaf": false,
                "self_video": false,
            }),
        );
    }
}