use crate::connected::ConnectedOrchestrator;
use crate::logger::default_log_handler;
use crate::should_not_complete;
use crate::{Bridge, Channel, Process};
use anyhow::{anyhow, Context};
use futures::future::{try_join_all, Fuse, Future, FutureExt, TryFuture, TryJoinAll};
use futures::{pin_mut, select};
use ipc_channel::ipc::IpcOneShotServer;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::pin::Pin;
use std::process::Stdio;
use tokio::process::ChildStdout;
use tokio::process::Command;
type BFR<R> = Pin<Box<dyn Future<Output = anyhow::Result<R>>>>;
pub fn orchestrator() -> Orchestrator<impl Future<Output = anyhow::Result<()>>> {
Orchestrator::from_handlers(default_log_handler)
}
pub struct Orchestrator<LF: TryFuture> {
pub processes: HashMap<String, Process>,
loggers: Vec<LF>,
bridges: Vec<BFR<Bridge>>,
ipc: bool,
rust_backtrace: bool,
logger: fn(ChildStdout, String) -> LF,
}
impl<LF: TryFuture> Orchestrator<LF> {
pub fn from_handlers(logger: fn(ChildStdout, String) -> LF) -> Self {
Self {
processes: HashMap::new(),
loggers: Vec::new(),
bridges: Vec::new(),
ipc: false,
rust_backtrace: false,
logger,
}
}
}
impl<LF> Orchestrator<LF>
where
LF: Future<Output = anyhow::Result<()>>,
{
pub fn start(&mut self, name: &str, cmd: &mut Command) -> anyhow::Result<()> {
if self.processes.contains_key(name) {
return Err(anyhow::anyhow!("process named `{}` already started", name));
}
let (server, server_name) =
IpcOneShotServer::new().context("Failed to start IpcOneShotServer")?;
cmd.kill_on_drop(true).stdout(Stdio::piped());
if self.ipc {
cmd.env("IPC_SERVER", server_name);
}
if self.rust_backtrace {
cmd.env("RUST_BACKTRACE", "1");
}
debug!(target: "orchestrator", "Starting {} {:?}", name, cmd);
let mut child = cmd.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("child did not provide a handle to stdout"))?;
self.loggers.push((self.logger)(stdout, name.to_owned()));
self.processes.insert(
name.to_owned(),
Process {
name: name.to_owned(),
child,
},
);
if self.ipc {
self.bridges
.push(Box::pin(ipc_handler(server, name.to_owned())));
}
Ok(())
}
pub async fn connect(self) -> anyhow::Result<ConnectedOrchestrator<Fuse<TryJoinAll<LF>>>> {
let Orchestrator {
mut processes,
bridges,
loggers,
..
} = self;
let processes: Vec<BFR<()>> = processes
.drain()
.map(|(_k, v)| v)
.map(never_exit_process_handler)
.collect();
let bridges = try_join_all(bridges).fuse();
let mut loggers = Box::pin(try_join_all(loggers).fuse());
let mut processes = Box::pin(try_join_all(processes).fuse());
pin_mut!(bridges);
let res = select!(
res = bridges => match res {
Ok(channels) => { Ok(channels) },
Err(err) => { error!("failed to establish connection: {}", err); Err(err.into()) }
},
res = processes => should_not_complete!("processes", res),
res = loggers => should_not_complete!("logs", res),
);
match res {
Ok(channels) => Ok(ConnectedOrchestrator::new(channels, processes, loggers)),
Err(err) => {
error!(target: "orchestrator", "{}", &err);
Err(err)
}
}
}
}
impl<LF: TryFuture> Orchestrator<LF> {
pub fn ipc(mut self, ipc: bool) -> Self {
self.ipc = ipc;
self
}
pub fn rust_backtrace(mut self, backtrace: bool) -> Self {
self.rust_backtrace = backtrace;
self
}
}
async fn ipc_handler(server: IpcOneShotServer<Channel>, name: String) -> anyhow::Result<Bridge> {
let name1 = name.clone();
let server = tokio::task::spawn_blocking(move || {
server
.accept()
.unwrap_or_else(|err| todo!("failed to establish connection from {}: {}", name1, err))
});
let name = name.clone();
server
.map(|res| match res {
Ok((_, channel)) => Ok(Bridge { channel, name }),
Err(err) => Err(err.into()),
})
.await
}
fn never_exit_process_handler(p: Process) -> BFR<()> {
let Process { child, name } = p;
let name1 = name.clone();
Box::pin(
child
.inspect(move |status| warn!(target: &name1, "exiting {:?}", status))
.map(move |status| match status {
Ok(n) => Err(anyhow!(
"process `{}` finish with {}, closing pipeline",
name,
n
)),
Err(err) => Err(err.into()),
}),
)
}