#![forbid(unsafe_code)]
use std::{
process::{Child, ChildStdin, Command, ExitStatus, Stdio},
thread,
time::Instant,
};
use anyhow::{Context, Result};
use clap::{arg, command, value_parser, ArgAction};
use log::{debug, info, warn};
use simplelog::{
format_description, ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode,
};
mod hls;
mod http;
mod segment_worker;
use hls::{MasterPlaylist, MediaPlaylist};
use segment_worker::Worker;
pub(crate) struct Player {
process: Child,
}
impl Drop for Player {
fn drop(&mut self) {
if let Err(e) = self.process.kill() {
warn!("Failed to kill player: {e}");
}
}
}
impl Player {
pub fn spawn(path: &str, args: &str) -> Result<Self> {
info!("Opening player: {} {}", path, args);
Ok(Self {
process: Command::new(path)
.args(args.split_whitespace())
.stdin(Stdio::piped())
.spawn()
.context("Failed to open player")?,
})
}
pub fn stdin(&mut self) -> Result<ChildStdin> {
self.process
.stdin
.take()
.context("Failed to open player stdin")
}
pub fn wait(&mut self) -> Result<ExitStatus> {
Ok(self.process.wait()?)
}
}
struct Args {
server: String,
player_path: String,
player_args: String,
debug: bool,
max_retries: u32,
passthrough: bool,
channel: String,
quality: String,
}
impl Args {
pub fn parse() -> Self {
let matches = command!()
.next_line_help(true)
.args(&[
arg!(-s --server <URL>
"Playlist proxy server to fetch the master playlist from.\n\
Can be multiple comma separated servers, will try each in order until successful.\n\
If URL path is \"[ttvlol]\" the playlist will be requested using the TTVLOL API.\n\
If URL includes \"[channel]\" it will be replaced with the channel argument at runtime."
).required(true),
arg!(-p --player <PATH> "Path to the player that the stream will be piped to")
.required_unless_present("passthrough"),
arg!(-a --"player-args" <ARGUMENTS> "Arguments to pass to the player")
.default_value("-")
.hide_default_value(true)
.allow_hyphen_values(true),
arg!(-d --debug "Enable debug logging")
.action(ArgAction::SetTrue),
arg!(--"max-retries" <COUNT> "Attempt to fetch the media playlist <COUNT> times before exiting")
.value_parser(value_parser!(u32))
.default_value("30"),
arg!(--passthrough "Print the playlist URL to stdout and exit")
.action(ArgAction::SetTrue),
arg!(<CHANNEL>
"Twitch channel to watch (can also be twitch.tv/channel for Streamlink compatibility)"
),
arg!(<QUALITY>
"Stream quality/variant playlist to fetch (best, 1080p, 720p, 360p, 160p, audio_only)"
),
])
.get_matches();
Self {
server: matches.get_one::<String>("server").unwrap().clone(),
player_path: matches
.get_one::<String>("player")
.map_or_else(String::default, String::clone),
player_args: matches.get_one::<String>("player-args").unwrap().clone(),
debug: matches.get_flag("debug"),
max_retries: *matches.get_one::<u32>("max-retries").unwrap(),
passthrough: matches.get_flag("passthrough"),
channel: matches.get_one::<String>("CHANNEL").unwrap().clone(),
quality: matches.get_one::<String>("QUALITY").unwrap().clone(),
}
}
}
fn main() -> Result<()> {
let args = Args::parse();
if args.debug {
TermLogger::init(
LevelFilter::Debug,
ConfigBuilder::new()
.set_time_format_custom(format_description!(
"[hour]:[minute]:[second].[subsecond digits:5]"
))
.set_time_offset_to_local()
.unwrap() .build(),
TerminalMode::Stderr,
ColorChoice::Auto,
)?;
} else {
TermLogger::init(
LevelFilter::Info,
ConfigBuilder::new()
.set_time_level(LevelFilter::Off)
.build(),
TerminalMode::Stderr,
ColorChoice::Auto,
)?;
}
loop {
let url = MasterPlaylist::new(&args.server, &args.channel, &args.quality)?.fetch()?;
if args.passthrough {
println!("{url}");
return Ok(());
}
let mut playlist = match MediaPlaylist::new(&url) {
Ok(playlist) => playlist,
Err(e) => match e.downcast_ref::<hls::Error>() {
Some(hls::Error::InvalidPrefetchUrl) => {
info!("Stream is not low latency, opening player with playlist URL");
let player_args = args
.player_args
.split_whitespace()
.map(|s| if s == "-" { url.clone() } else { s.to_owned() })
.collect::<Vec<String>>()
.join(" ");
let mut player = Player::spawn(&args.player_path, &player_args)?;
player.wait()?;
return Ok(());
}
Some(hls::Error::Advertisement | hls::Error::Discontinuity) => {
warn!("{e} on startup, resetting...");
continue;
}
_ => return Err(e),
},
};
let worker = Worker::new(args.player_path.clone(), args.player_args.clone())?;
worker.send(&playlist.prefetch_urls[0])?;
worker.sync()?;
let mut retry_count: u32 = 0;
loop {
let time = Instant::now();
match playlist.reload() {
Ok(_) => retry_count = 0,
Err(e) => match e.downcast_ref::<hls::Error>() {
Some(hls::Error::Unchanged | hls::Error::InvalidPrefetchUrl) => {
retry_count += 1;
if retry_count == args.max_retries {
info!("Maximum retries on media playlist reached, exiting...");
return Ok(());
}
debug!("{e}, retrying...");
continue;
}
Some(hls::Error::Advertisement) => {
warn!("{e}, resetting...");
break;
}
Some(hls::Error::Discontinuity) => {
warn!("{e}, stream may be broken");
}
_ => return Err(e),
},
}
worker.send(&playlist.prefetch_urls[1])?;
if let Some(sleep_time) = playlist.duration.checked_sub(time.elapsed()) {
thread::sleep(sleep_time);
}
}
}
}