use std::{
sync::mpsc::{self, Receiver, Sender, SyncSender},
thread::{self, JoinHandle},
};
use anyhow::{ensure, Context, Result};
use log::debug;
use crate::{
http::{Agent, Url},
output::CombinedWriter,
};
struct ChannelMessage {
url: Url,
should_sync: bool,
}
pub struct Worker {
handle: Option<JoinHandle<Result<()>>>,
url_tx: Sender<ChannelMessage>,
sync_rx: Receiver<()>,
}
impl Worker {
pub fn spawn(writer: CombinedWriter, header_url: Option<Url>, agent: Agent) -> Result<Self> {
let (url_tx, url_rx): (Sender<ChannelMessage>, Receiver<ChannelMessage>) = mpsc::channel();
let (sync_tx, sync_rx): (SyncSender<()>, Receiver<()>) = mpsc::sync_channel(1);
let handle = thread::Builder::new()
.name("worker".to_owned())
.spawn(move || -> Result<()> {
debug!("Starting");
let mut request = {
let Ok(initial_msg) = url_rx.recv() else {
debug!("Exiting before initial url");
return Ok(());
};
let request = if let Some(header_url) = header_url {
let mut request = agent.writer(writer, header_url)?;
request.call(initial_msg.url)?;
request
} else {
agent.writer(writer, initial_msg.url)?
};
if initial_msg.should_sync {
sync_tx.send(())?;
}
request
};
loop {
let Ok(msg) = url_rx.recv() else {
debug!("Exiting");
return Ok(());
};
request.call(msg.url)?;
if msg.should_sync {
sync_tx.send(())?;
}
}
})
.context("Failed to spawn worker")?;
Ok(Self {
handle: Some(handle),
url_tx,
sync_rx,
})
}
pub fn sync_url(&mut self, url: Url) -> Result<()> {
self.send(url, true)
}
pub fn url(&mut self, url: Url) -> Result<()> {
self.send(url, false)
}
fn send(&mut self, url: Url, should_sync: bool) -> Result<()> {
self.join_if_dead()?;
self.url_tx.send(ChannelMessage { url, should_sync })?;
if should_sync {
self.sync_rx.recv().or_else(|_| self.join_if_dead())?;
}
Ok(())
}
fn join_if_dead(&mut self) -> Result<()> {
if self
.handle
.as_ref()
.context("Worker handle None")?
.is_finished()
{
let result = self
.handle
.take()
.context("Handle None while joining worker")?
.join()
.expect("Worker panicked");
ensure!(result.is_err(), "Worker died");
return result;
}
Ok(())
}
}