use anyhow::{bail, ensure, Context, Result};
use clap::Parser;
use crossbeam_channel::{unbounded, Receiver, Sender};
use is_terminal::IsTerminal;
use std::{
cmp::{Ord, Ordering},
io,
io::ErrorKind::BrokenPipe,
io::{Read, Write},
process, thread,
time::{Duration, Instant},
};
use ureq::{Agent, AgentBuilder, Error};
use url::Url;
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36";
struct MasterPlaylist {
server: Url,
quality: String,
channel: String,
}
impl MasterPlaylist {
pub fn new(server: &str, channel: &str, quality: &str) -> Result<MasterPlaylist> {
Ok(MasterPlaylist {
server: Url::parse_with_params(
&str::replace(server, "[channel]", channel),
&[
("player", "twitchweb"),
("type", "any"),
("allow_source", "true"),
("allow_audio_only", "true"),
("allow_spectre", "false"),
("fast_bread", "true"),
],
)
.context("Invalid URL")?,
quality: quality.into(),
channel: channel.into(),
})
}
pub fn fetch(&self, agent: &Agent) -> Result<String> {
eprintln!("Fetching playlist for '{}'", self.channel);
Self::parse_quality_playlist(
&self.quality,
&agent
.request_url("GET", &self.server)
.set("referer", "https://player.twitch.tv")
.set("origin", "https://player.twitch.tv")
.call()
.context("Error fetching master playlist")?
.into_string()?,
)
}
fn parse_quality_playlist(quality: &str, playlist: &str) -> Result<String> {
Ok(playlist
.lines()
.skip_while(|s| {
!(s.contains("#EXT-X-MEDIA") && (s.contains(quality) || quality == "best"))
})
.nth(2)
.context("Invalid quality or malformed master playlist")?
.to_owned())
}
}
struct MediaPlaylist {
url: String,
agent: Agent,
master_playlist: MasterPlaylist,
prefetch_segment_url: String,
media_sequence: u64,
}
impl MediaPlaylist {
pub fn new(server: &str, channel: &str, quality: &str) -> Result<MediaPlaylist> {
let channel = str::replace(channel, "twitch.tv/", "");
let master_playlist = MasterPlaylist::new(server, &channel, quality)?;
let agent = AgentBuilder::new().user_agent(USER_AGENT).build();
Ok(MediaPlaylist {
url: master_playlist.fetch(&agent)?,
agent,
master_playlist,
prefetch_segment_url: String::new(),
media_sequence: 0,
})
}
pub fn reload(&mut self) -> Result<()> {
let playlist = match self
.agent
.get(&self.url)
.set("referer", "https://player.twitch.tv")
.set("origin", "https://player.twitch.tv")
.call()
{
Ok(response) => response.into_string()?,
Err(Error::Status(code, _)) if code == 404 => {
eprintln!("404 on media playlist, stream likely ended. Exiting...");
process::exit(0);
}
Err(e) => bail!(e),
};
if playlist.contains("DISCONTINUITY") {
eprintln!(
"Encountered a discontinuity/possible ad segment, fetching a new master playlist"
);
self.url = self.master_playlist.fetch(&self.agent)?;
self.reload()?;
return Ok(());
}
self.media_sequence = Self::parse_media_sequence(&playlist)?;
self.prefetch_segment_url = playlist
.lines()
.last()
.context("Malformed media playlist")?
.replace("#EXT-X-TWITCH-PREFETCH:", "");
Ok(())
}
fn parse_media_sequence(playlist: &str) -> Result<u64> {
playlist
.lines()
.skip_while(|s| !s.contains("#EXT-X-MEDIA-SEQUENCE:"))
.nth(1)
.context("Malformed media playlist")?
.split(':')
.nth(1)
.context("Invalid media sequence")?
.parse()
.context("Error parsing media sequence")
}
}
struct IOThreads {
reader_tx: Sender<String>,
}
impl IOThreads {
const SEGMENT_BUF_SIZE: usize = 10_000_000;
pub fn new(player: Option<String>, player_args: Option<String>) -> Result<IOThreads> {
let (reader_tx, reader_rx): (Sender<String>, Receiver<String>) = unbounded();
let (writer_tx, writer_rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = unbounded();
thread::Builder::new()
.name("Reader Thread".into())
.spawn(move || {
if let Err(e) = Self::reader_thread(reader_rx, writer_tx) {
eprintln!("{}", e);
process::exit(1);
}
})
.context("Error spawning reader thread")?;
thread::Builder::new()
.name("Writer Thread".into())
.spawn(move || {
if let Err(e) = Self::writer_thread(writer_rx, player, player_args) {
eprintln!("{}", e);
process::exit(1);
}
})
.context("Error spawning writer thread")?;
Ok(IOThreads { reader_tx })
}
pub fn send_url(&self, url: String) -> Result<()> {
self.reader_tx
.send(url)
.context("Error sending URL to reader thread")?;
Ok(())
}
fn reader_thread(reader_rx: Receiver<String>, writer_tx: Sender<Vec<u8>>) -> Result<()> {
let agent = AgentBuilder::new().user_agent(USER_AGENT).build();
loop {
let mut segment: Vec<u8> = Vec::with_capacity(Self::SEGMENT_BUF_SIZE);
let url = reader_rx
.recv()
.context("Error receiving URL in reader thread")?;
Self::fetch_segment(&mut segment, &agent, &url)?;
writer_tx
.send(segment)
.context("Error sending segment to writer thread")?;
}
}
fn writer_thread(
writer_rx: Receiver<Vec<u8>>,
player: Option<String>,
player_args: Option<String>,
) -> Result<()> {
let mut stdout: Box<dyn Write> = if let Some(player) = player {
let player_args = player_args.unwrap_or_default();
eprintln!("Opening player: {} {}", player, player_args);
Box::new(
process::Command::new(player)
.args(player_args.split_whitespace())
.stdin(process::Stdio::piped())
.spawn()
.context("Error opening player")?
.stdin
.take()
.context("Error opening player stdin")?,
)
} else {
ensure!(
!io::stdout().is_terminal(),
"No player set and stdout is a terminal, exiting..."
);
eprintln!("Writing to stdout");
Box::new(io::stdout().lock())
};
loop {
let segment = &writer_rx
.recv()
.context("Error receiving segment from reader thread")?;
match io::copy(&mut segment.as_slice(), &mut *stdout) {
Err(ref e) if e.kind() == BrokenPipe => {
bail!("Pipe closed, exiting...");
}
Err(e) => bail!(e),
_ => (),
};
}
}
fn fetch_segment(segment: &mut Vec<u8>, agent: &Agent, url: &str) -> Result<()> {
agent
.get(url)
.set("referer", "https://player.twitch.tv")
.set("origin", "https://player.twitch.tv")
.call()
.context("Error fetching segment")?
.into_reader()
.take(Self::SEGMENT_BUF_SIZE as u64)
.read_to_end(segment)
.context("Error reading segment")?;
Ok(())
}
}
#[derive(Parser)]
#[command(version, next_line_help = true)]
struct Args {
#[arg(short, long, value_name = "URL")]
server: String,
#[arg(short, long, value_name = "PATH")]
player: Option<String>,
#[arg(
short = 'a',
long,
value_name = "ARGUMENTS",
allow_hyphen_values = true
)]
player_args: Option<String>,
channel: String,
quality: String,
}
fn main() -> Result<()> {
let args = Args::parse();
let mut media_playlist = MediaPlaylist::new(&args.server, &args.channel, &args.quality)?;
let io_threads = IOThreads::new(args.player, args.player_args)?;
loop {
let time = Instant::now();
let last_media_sequence = media_playlist.media_sequence;
media_playlist.reload()?;
match media_playlist.media_sequence.cmp(&last_media_sequence) {
Ordering::Greater => {
io_threads.send_url(media_playlist.prefetch_segment_url.clone())?;
thread::sleep(Duration::from_secs(2) - time.elapsed()); }
Ordering::Equal => continue,
Ordering::Less => bail!("Media sequence out of order"),
}
}
}