twitch-hls-client 0.1.2

WIP CLI Twitch HLS Client
//    Copyright (C) 2023 2bc4
//
//    This program is free software: you can redistribute it and/or modify
//    it under the terms of the GNU General Public License as published by
//    the Free Software Foundation, either version 3 of the License, or
//    (at your option) any later version.
//
//    This program is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//    GNU General Public License for more details.
//
//    You should have received a copy of the GNU General Public License
//    along with this program.  If not, see <https://www.gnu.org/licenses/>.

use anyhow::{bail, ensure, Context, Result};
use clap::Parser;
use crossbeam_channel::{unbounded, Receiver, Sender};
use is_terminal::IsTerminal;
use std::{
    cmp::{Ord, Ordering},
    io,
    io::ErrorKind::BrokenPipe,
    io::{Read, Write},
    process, thread,
    time::{Duration, Instant},
};
use ureq::{Agent, AgentBuilder, Error};
use url::Url;

const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36";

struct MasterPlaylist {
    server: Url,
    quality: String,
    channel: String,
}

impl MasterPlaylist {
    pub fn new(server: &str, channel: &str, quality: &str) -> Result<MasterPlaylist> {
        Ok(MasterPlaylist {
            server: Url::parse_with_params(
                &str::replace(server, "[channel]", channel),
                &[
                    ("player", "twitchweb"),
                    ("type", "any"),
                    ("allow_source", "true"),
                    ("allow_audio_only", "true"),
                    ("allow_spectre", "false"),
                    ("fast_bread", "true"),
                ],
            )
            .context("Invalid URL")?,
            quality: quality.into(),
            channel: channel.into(),
        })
    }

    pub fn fetch(&self, agent: &Agent) -> Result<String> {
        eprintln!("Fetching playlist for '{}'", self.channel);
        Self::parse_quality_playlist(
            &self.quality,
            &agent
                .request_url("GET", &self.server)
                .set("referer", "https://player.twitch.tv")
                .set("origin", "https://player.twitch.tv")
                .call()
                .context("Error fetching master playlist")?
                .into_string()?,
        )
    }

    fn parse_quality_playlist(quality: &str, playlist: &str) -> Result<String> {
        Ok(playlist
            .lines()
            .skip_while(|s| {
                !(s.contains("#EXT-X-MEDIA") && (s.contains(quality) || quality == "best"))
            })
            .nth(2)
            .context("Invalid quality or malformed master playlist")?
            .to_owned())
    }
}

struct MediaPlaylist {
    url: String,
    agent: Agent,
    master_playlist: MasterPlaylist,
    prefetch_segment_url: String,
    media_sequence: u64,
}

impl MediaPlaylist {
    pub fn new(server: &str, channel: &str, quality: &str) -> Result<MediaPlaylist> {
        let channel = str::replace(channel, "twitch.tv/", "");

        let master_playlist = MasterPlaylist::new(server, &channel, quality)?;
        let agent = AgentBuilder::new().user_agent(USER_AGENT).build();

        Ok(MediaPlaylist {
            url: master_playlist.fetch(&agent)?,
            agent,
            master_playlist,
            prefetch_segment_url: String::new(),
            media_sequence: 0,
        })
    }

    pub fn reload(&mut self) -> Result<()> {
        let playlist = match self
            .agent
            .get(&self.url)
            .set("referer", "https://player.twitch.tv")
            .set("origin", "https://player.twitch.tv")
            .call()
        {
            Ok(response) => response.into_string()?,
            Err(Error::Status(code, _)) if code == 404 => {
                eprintln!("404 on media playlist, stream likely ended. Exiting...");
                process::exit(0);
            }
            Err(e) => bail!(e),
        };

        if playlist.contains("DISCONTINUITY") {
            eprintln!(
                "Encountered a discontinuity/possible ad segment, fetching a new master playlist"
            );

            self.url = self.master_playlist.fetch(&self.agent)?;
            self.reload()?;
            return Ok(());
        }

        self.media_sequence = Self::parse_media_sequence(&playlist)?;

        self.prefetch_segment_url = playlist
            .lines()
            .last()
            .context("Malformed media playlist")?
            .replace("#EXT-X-TWITCH-PREFETCH:", "");

        Ok(())
    }

    fn parse_media_sequence(playlist: &str) -> Result<u64> {
        playlist
            .lines()
            .skip_while(|s| !s.contains("#EXT-X-MEDIA-SEQUENCE:"))
            .nth(1)
            .context("Malformed media playlist")?
            .split(':')
            .nth(1)
            .context("Invalid media sequence")?
            .parse()
            .context("Error parsing media sequence")
    }
}

struct IOThreads {
    reader_tx: Sender<String>,
}

impl IOThreads {
    const SEGMENT_BUF_SIZE: usize = 10_000_000;

    pub fn new(player: Option<String>, player_args: Option<String>) -> Result<IOThreads> {
        let (reader_tx, reader_rx): (Sender<String>, Receiver<String>) = unbounded();
        let (writer_tx, writer_rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = unbounded();

        thread::Builder::new()
            .name("Reader Thread".into())
            .spawn(move || {
                if let Err(e) = Self::reader_thread(reader_rx, writer_tx) {
                    eprintln!("{}", e);
                    process::exit(1);
                }
            })
            .context("Error spawning reader thread")?;

        thread::Builder::new()
            .name("Writer Thread".into())
            .spawn(move || {
                if let Err(e) = Self::writer_thread(writer_rx, player, player_args) {
                    eprintln!("{}", e);
                    process::exit(1);
                }
            })
            .context("Error spawning writer thread")?;

        Ok(IOThreads { reader_tx })
    }

    pub fn send_url(&self, url: String) -> Result<()> {
        self.reader_tx
            .send(url)
            .context("Error sending URL to reader thread")?;
        Ok(())
    }

    fn reader_thread(reader_rx: Receiver<String>, writer_tx: Sender<Vec<u8>>) -> Result<()> {
        let agent = AgentBuilder::new().user_agent(USER_AGENT).build();

        loop {
            //maybe optimize this
            let mut segment: Vec<u8> = Vec::with_capacity(Self::SEGMENT_BUF_SIZE);
            let url = reader_rx
                .recv()
                .context("Error receiving URL in reader thread")?;

            Self::fetch_segment(&mut segment, &agent, &url)?;

            writer_tx
                .send(segment)
                .context("Error sending segment to writer thread")?;
        }
    }

    fn writer_thread(
        writer_rx: Receiver<Vec<u8>>,
        player: Option<String>,
        player_args: Option<String>,
    ) -> Result<()> {
        let mut stdout: Box<dyn Write> = if let Some(player) = player {
            let player_args = player_args.unwrap_or_default();
            eprintln!("Opening player: {} {}", player, player_args);

            Box::new(
                process::Command::new(player)
                    .args(player_args.split_whitespace())
                    .stdin(process::Stdio::piped())
                    .spawn()
                    .context("Error opening player")?
                    .stdin
                    .take()
                    .context("Error opening player stdin")?,
            )
        } else {
            ensure!(
                !io::stdout().is_terminal(),
                "No player set and stdout is a terminal, exiting..."
            );

            eprintln!("Writing to stdout");
            Box::new(io::stdout().lock())
        };

        loop {
            let segment = &writer_rx
                .recv()
                .context("Error receiving segment from reader thread")?;

            match io::copy(&mut segment.as_slice(), &mut *stdout) {
                Err(ref e) if e.kind() == BrokenPipe => {
                    bail!("Pipe closed, exiting...");
                }
                Err(e) => bail!(e),
                _ => (),
            };
        }
    }

    fn fetch_segment(segment: &mut Vec<u8>, agent: &Agent, url: &str) -> Result<()> {
        agent
            .get(url)
            .set("referer", "https://player.twitch.tv")
            .set("origin", "https://player.twitch.tv")
            .call()
            .context("Error fetching segment")?
            .into_reader()
            .take(Self::SEGMENT_BUF_SIZE as u64)
            .read_to_end(segment)
            .context("Error reading segment")?;

        Ok(())
    }
}

#[derive(Parser)]
#[command(version, next_line_help = true)]
struct Args {
    /// Server to fetch the master playlist from,
    /// if URL includes "[channel]" it will be replaced with the real channel at runtime
    #[arg(short, long, value_name = "URL")]
    server: String,

    /// Path to the player that the stream will be piped to,
    /// if not specified will write stream to stdout
    #[arg(short, long, value_name = "PATH")]
    player: Option<String>,

    /// Arguments to pass to the player
    #[arg(
        short = 'a',
        long,
        value_name = "ARGUMENTS",
        allow_hyphen_values = true
    )]
    player_args: Option<String>,

    /// Twitch channel to watch, can be "twitch.tv/CHANNEL_NAME" or just "CHANNEL_NAME"
    channel: String,

    /// Stream quality/variant playlist to open (best, 1080p, 720p, 360p, 160p, etc.)
    quality: String,
}

fn main() -> Result<()> {
    let args = Args::parse();

    let mut media_playlist = MediaPlaylist::new(&args.server, &args.channel, &args.quality)?;
    let io_threads = IOThreads::new(args.player, args.player_args)?;

    loop {
        let time = Instant::now();
        let last_media_sequence = media_playlist.media_sequence;

        media_playlist.reload()?;
        match media_playlist.media_sequence.cmp(&last_media_sequence) {
            Ordering::Greater => {
                io_threads.send_url(media_playlist.prefetch_segment_url.clone())?;
                thread::sleep(Duration::from_secs(2) - time.elapsed()); //sleep until next segment
            }
            Ordering::Equal => continue,
            Ordering::Less => bail!("Media sequence out of order"),
        }
    }
}