twitch-hls-client 0.3.2

Lightweight CLI client for watching low latency Twitch streams with ad blocking
//    Copyright (C) 2023 2bc4
//
//    This program is free software: you can redistribute it and/or modify
//    it under the terms of the GNU General Public License as published by
//    the Free Software Foundation, either version 3 of the License, or
//    (at your option) any later version.
//
//    This program is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//    GNU General Public License for more details.
//
//    You should have received a copy of the GNU General Public License
//    along with this program.  If not, see <https://www.gnu.org/licenses/>.

#![forbid(unsafe_code)]

use std::{
    process::{Child, ChildStdin, Command, ExitStatus, Stdio},
    thread,
    time::Instant,
};

use anyhow::{Context, Result};
use clap::{arg, command, value_parser, ArgAction};
use log::{debug, info, warn};
use simplelog::{
    format_description, ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode,
};

mod hls;
mod http;
mod segment_worker;
use hls::{MasterPlaylist, MediaPlaylist};
use segment_worker::Worker;

pub(crate) struct Player {
    process: Child,
}

impl Drop for Player {
    fn drop(&mut self) {
        if let Err(e) = self.process.kill() {
            warn!("Failed to kill player: {e}");
        }
    }
}

impl Player {
    pub fn spawn(path: &str, args: &str) -> Result<Self> {
        info!("Opening player: {} {}", path, args);
        Ok(Self {
            process: Command::new(path)
                .args(args.split_whitespace())
                .stdin(Stdio::piped())
                .spawn()
                .context("Failed to open player")?,
        })
    }

    pub fn stdin(&mut self) -> Result<ChildStdin> {
        self.process
            .stdin
            .take()
            .context("Failed to open player stdin")
    }

    pub fn wait(&mut self) -> Result<ExitStatus> {
        Ok(self.process.wait()?)
    }
}

struct Args {
    server: String,
    player_path: String,
    player_args: String,
    debug: bool,
    max_retries: u32,
    passthrough: bool,
    channel: String,
    quality: String,
}

impl Args {
    pub fn parse() -> Self {
        let matches = command!()
            .next_line_help(true)
            .args(&[
                arg!(-s --server <URL>
                    "Playlist proxy server to fetch the master playlist from.\n\
                     Can be multiple comma separated servers, will try each in order until successful.\n\
                     If URL path is \"[ttvlol]\" the playlist will be requested using the TTVLOL API.\n\
                     If URL includes \"[channel]\" it will be replaced with the channel argument at runtime."
                ).required(true),
                arg!(-p --player <PATH> "Path to the player that the stream will be piped to")
                    .required_unless_present("passthrough"),
                arg!(-a --"player-args" <ARGUMENTS> "Arguments to pass to the player")
                    .default_value("-")
                    .hide_default_value(true)
                    .allow_hyphen_values(true),
                arg!(-d --debug "Enable debug logging")
                    .action(ArgAction::SetTrue),
                arg!(--"max-retries" <COUNT> "Attempt to fetch the media playlist <COUNT> times before exiting")
                    .value_parser(value_parser!(u32))
                    .default_value("30"),
                arg!(--passthrough "Print the playlist URL to stdout and exit")
                    .action(ArgAction::SetTrue),
                arg!(<CHANNEL>
                     "Twitch channel to watch (can also be twitch.tv/channel for Streamlink compatibility)"
                ),
                arg!(<QUALITY>
                     "Stream quality/variant playlist to fetch (best, 1080p, 720p, 360p, 160p, audio_only)"
                ),
            ])
            .get_matches();

        Self {
            //these unwraps should never panic
            server: matches.get_one::<String>("server").unwrap().clone(),
            player_path: matches
                .get_one::<String>("player")
                .map_or_else(String::default, String::clone),
            player_args: matches.get_one::<String>("player-args").unwrap().clone(),
            debug: matches.get_flag("debug"),
            max_retries: *matches.get_one::<u32>("max-retries").unwrap(),
            passthrough: matches.get_flag("passthrough"),
            channel: matches.get_one::<String>("CHANNEL").unwrap().clone(),
            quality: matches.get_one::<String>("QUALITY").unwrap().clone(),
        }
    }
}

fn main() -> Result<()> {
    let args = Args::parse();
    if args.debug {
        TermLogger::init(
            LevelFilter::Debug,
            ConfigBuilder::new()
                .set_time_format_custom(format_description!(
                    "[hour]:[minute]:[second].[subsecond digits:5]"
                ))
                .set_time_offset_to_local()
                .unwrap() //isn't an error
                .build(),
            TerminalMode::Stderr,
            ColorChoice::Auto,
        )?;
    } else {
        TermLogger::init(
            LevelFilter::Info,
            ConfigBuilder::new()
                .set_time_level(LevelFilter::Off)
                .build(),
            TerminalMode::Stderr,
            ColorChoice::Auto,
        )?;
    }

    loop {
        let url = MasterPlaylist::new(&args.server, &args.channel, &args.quality)?.fetch()?;
        if args.passthrough {
            println!("{url}");
            return Ok(());
        }

        let mut playlist = match MediaPlaylist::new(&url) {
            Ok(playlist) => playlist,
            Err(e) => match e.downcast_ref::<hls::Error>() {
                Some(hls::Error::InvalidPrefetchUrl) => {
                    info!("Stream is not low latency, opening player with playlist URL");
                    let player_args = args
                        .player_args
                        .split_whitespace()
                        .map(|s| if s == "-" { url.clone() } else { s.to_owned() })
                        .collect::<Vec<String>>()
                        .join(" ");

                    let mut player = Player::spawn(&args.player_path, &player_args)?;
                    player.wait()?;

                    return Ok(());
                }
                Some(hls::Error::Advertisement | hls::Error::Discontinuity) => {
                    warn!("{e} on startup, resetting...");
                    continue;
                }
                _ => return Err(e),
            },
        };

        let worker = Worker::new(args.player_path.clone(), args.player_args.clone())?;
        worker.send(&playlist.prefetch_urls[0])?;
        worker.sync()?;

        let mut retry_count: u32 = 0;
        loop {
            let time = Instant::now();
            match playlist.reload() {
                Ok(_) => retry_count = 0,
                Err(e) => match e.downcast_ref::<hls::Error>() {
                    Some(hls::Error::Unchanged | hls::Error::InvalidPrefetchUrl) => {
                        retry_count += 1;
                        if retry_count == args.max_retries {
                            info!("Maximum retries on media playlist reached, exiting...");
                            return Ok(());
                        }

                        debug!("{e}, retrying...");
                        continue;
                    }
                    Some(hls::Error::Advertisement) => {
                        warn!("{e}, resetting...");
                        break;
                    }
                    Some(hls::Error::Discontinuity) => {
                        warn!("{e}, stream may be broken");
                    }
                    _ => return Err(e),
                },
            }

            worker.send(&playlist.prefetch_urls[1])?;

            if let Some(sleep_time) = playlist.duration.checked_sub(time.elapsed()) {
                thread::sleep(sleep_time);
            }
        }
    }
}