use tokio::process::Child;
use tokio_stream::wrappers::ReceiverStream;
use crate::common::*;
#[derive(Debug, Clone)]
pub struct Context {
error_sender: mpsc::Sender<Error>,
}
impl Context {
pub fn create() -> (Self, BoxFuture<()>) {
let (error_sender, receiver) = mpsc::channel(1);
let mut receiver = ReceiverStream::new(receiver);
let context = Context { error_sender };
let worker_future = async move {
match receiver.next().await {
None => Ok(()),
Some(err) => Err(err),
}
};
(context, worker_future.boxed())
}
pub fn spawn_worker<W>(&self, worker: W)
where
W: Future<Output = Result<()>> + Send + 'static,
{
let error_sender = self.error_sender.clone();
tokio::spawn(
async move {
if let Err(err) = worker.await {
debug!("reporting background worker error: {}", err);
if let Err(_err) = error_sender.send(err).await {
debug!("broken pipe reporting background worker error");
}
}
}
.instrument(debug_span!("worker"))
.boxed(),
);
}
pub fn spawn_process(&self, name: String, mut child: Child) {
let name_copy = name.clone();
let worker = async move {
match child.wait().await {
Ok(ref status) if status.success() => Ok(()),
Ok(status) => Err(format_err!("{} failed with {}", name, status)),
Err(err) => Err(format_err!("{} failed with error: {}", name, err)),
}
}
.instrument(debug_span!("process", name = ?name_copy));
self.spawn_worker(worker.boxed());
}
}