use std::{
io,
io::{ErrorKind::BrokenPipe, Read, Write},
process,
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
thread::Builder,
};
use anyhow::{Context, Result};
use log::info;
use crate::{common, http::Request};
pub struct Worker {
url_tx: Sender<String>,
sync_rx: Receiver<()>,
}
impl Worker {
pub fn new(player_path: String, player_args: String) -> Result<Self> {
let (url_tx, url_rx): (Sender<String>, Receiver<String>) = channel();
let (sync_tx, sync_rx): (SyncSender<()>, Receiver<()>) = sync_channel(1);
Builder::new()
.name(String::from("Segment Worker"))
.spawn(move || {
if let Err(e) = Self::thread_main(&url_rx, &sync_tx, &player_path, &player_args) {
eprintln!("Error: {e}");
process::exit(1);
}
})
.context("Failed to spawn segment worker thread")?;
Ok(Self { url_tx, sync_rx })
}
pub fn send(&self, url: &str) -> Result<()> {
self.url_tx
.send(url.to_owned())
.context("Failed to send URL to segment worker thread")
}
pub fn sync(&self) -> Result<()> {
self.sync_rx
.recv()
.context("Failed to receive sync state from segment worker thread")
}
fn thread_main(
url_rx: &Receiver<String>,
sync_tx: &SyncSender<()>,
player_path: &str,
player_args: &str,
) -> Result<()> {
let mut pipe = common::spawn_player(player_path, player_args)?
.stdin
.take()
.context("Failed to open player stdin")?;
let mut request = match url_rx.recv() {
Ok(url) => {
let mut request = Request::get(&url)?;
copy_segment(&mut request.reader()?, &mut pipe)?;
request
}
_ => return Ok(()),
};
sync_tx
.send(())
.context("Failed to send sync state from segment worker thread")?;
loop {
match url_rx.recv() {
Ok(url) => {
request.set_url(&url)?;
copy_segment(&mut request.reader()?, &mut pipe)?;
}
_ => return Ok(()),
}
}
}
}
#[inline]
fn copy_segment(reader: &mut impl Read, writer: &mut impl Write) -> Result<()> {
match io::copy(reader, writer) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
BrokenPipe => {
info!("Player closed, exiting...");
process::exit(0);
}
_ => Err(e.into()),
},
}
}