#![forbid(unsafe_code)]
use std::{
cmp::{Ord, Ordering},
io,
io::Write,
process::{Command, Stdio},
thread,
time::{Duration, Instant},
};
use anyhow::{bail, ensure, Context, Result};
use clap::Parser;
use is_terminal::IsTerminal;
use log::{debug, info, warn};
use simplelog::{ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode};
use ureq::AgentBuilder;
mod iothread;
mod playlist;
use iothread::IOThread;
use playlist::MediaPlaylist;
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36";
#[derive(Parser)]
#[command(version, next_line_help = true)]
struct Args {
#[arg(short, long, value_name = "URL", verbatim_doc_comment)]
server: String,
#[arg(short, long = "player", value_name = "PATH")]
player_path: Option<String>,
#[arg(
short = 'a',
long,
value_name = "ARGUMENTS",
allow_hyphen_values = true
)]
player_args: Option<String>,
#[arg(long)]
disable_reset_on_ad: bool,
#[arg(short, long)]
debug: bool,
channel: String,
quality: String,
}
fn spawn_player_or_stdout(
player_path: &Option<String>,
player_args: &str,
) -> Result<Box<dyn Write + Send>> {
if let Some(player_path) = player_path {
info!("Opening player: {} {}", player_path, player_args);
Ok(Box::new(
Command::new(player_path)
.args(player_args.split_whitespace())
.stdin(Stdio::piped())
.spawn()
.context("Failed to open player")?
.stdin
.take()
.context("Failed to open player stdin")?,
))
} else {
ensure!(
!io::stdout().is_terminal(),
"No player set and stdout is a terminal, exiting..."
);
info!("Writing to stdout");
Ok(Box::new(io::stdout()))
}
}
fn main() -> Result<()> {
let args = Args::parse();
if args.debug {
TermLogger::init(
LevelFilter::Debug,
ConfigBuilder::new()
.set_max_level(LevelFilter::Error)
.set_time_level(LevelFilter::Error)
.build(),
TerminalMode::Stderr,
ColorChoice::Auto,
)?;
} else {
TermLogger::init(
LevelFilter::Info,
ConfigBuilder::new()
.set_max_level(LevelFilter::Error)
.set_time_level(LevelFilter::Off)
.build(),
TerminalMode::Stderr,
ColorChoice::Never,
)?;
}
let player_args = args.player_args.unwrap_or_default();
let agent = AgentBuilder::new().user_agent(USER_AGENT).build();
loop {
let io_thread = IOThread::new(
&agent,
spawn_player_or_stdout(&args.player_path, &player_args)?,
)?;
let playlist = MediaPlaylist::new(&agent, &args.server, &args.channel, &args.quality)?;
playlist.catch_up()?;
let mut prev_sequence: u64 = 0;
loop {
let time = Instant::now();
let segment = match playlist.reload() {
Ok(reload) => {
if !args.disable_reset_on_ad && reload.ad {
warn!("Encountered an embedded ad segment, resetting");
break; } else if reload.discontinuity {
warn!("Encountered a discontinuity, stream may be broken");
}
debug!("Playlist reload took {:?}", time.elapsed());
reload.segment
}
Err(e) => match e.downcast_ref::<ureq::Error>() {
Some(ureq::Error::Status(code, _)) if *code == 404 => {
info!("Playlist not found. Stream likely ended, exiting...");
return Ok(());
}
_ => bail!(e),
},
};
match segment.sequence.cmp(&prev_sequence) {
Ordering::Greater => {
const SEGMENT_DURATION: Duration = Duration::from_secs(2);
io_thread.send_url(&segment.url)?;
debug!("Sequence: {} -> {}", prev_sequence, segment.sequence);
prev_sequence = segment.sequence;
let elapsed = time.elapsed();
if elapsed < SEGMENT_DURATION {
thread::sleep(SEGMENT_DURATION - elapsed);
} else {
warn!("Took longer than segment duration, stream may be broken");
}
}
Ordering::Less => bail!("Out of order media sequence"),
Ordering::Equal => debug!("Sequence {} is the same as previous", segment.sequence), }
}
}
}