1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//! Opinionated orchestrator for services which communicate via IPC and are not expected to exit
//!
//! It allows to start and control processes, handling all the necessary boilerplate:
//! - Running within async runtime
//! - Uses tokio::process::Command with predefined params
//!   to execute commands
//! - Uses log with info+ levels to
//! - Uses ipc-channel to establish communication from and to processes
//!
//! # Panicing
//!
//! Orchestrator will panic if .connect() called outside of async runtime as it spawns handlers.
//!
//! # Example
//!
//! This example shows how Orchestrator allows to add custom handlers for process stdout:
//!
//! ```
//! use tokio::process::{Command, ChildStdout};
//! use ipc_orchestrator::Orchestrator;
//!
//! use std::sync::atomic::{AtomicBool, Ordering};
//! static CALLED: AtomicBool = AtomicBool::new(false);
//!
//! use tokio::io::{AsyncBufReadExt, BufReader};
//! async fn mock_log_handler(reader: ChildStdout, name: String) -> anyhow::Result<()> {
//!    let mut reader = BufReader::new(reader).lines();
//!    assert_eq!(reader.next_line().await?.unwrap(), "testbed");
//!    CALLED.store(true, Ordering::Relaxed);
//!    Ok(())
//! }
//!
//! // from within async runtime:
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//!     let mut orchestrator = Orchestrator::from_handlers(mock_log_handler).ipc(false);
//!     let mut cmd = Command::new("echo");
//!     cmd.arg("testbed");
//!     orchestrator.start("start", &mut cmd);
//!     let orchestra = orchestrator.connect().await.unwrap();
//!     // it supposes never existing processes
//!     // hence it will give error on when any process exit or stdout was closed
//!     orchestra.run().await.unwrap_err();
//!     assert!(CALLED.load(Ordering::Relaxed));
//! # });
//! ```

use crate::connected::ConnectedOrchestrator;
use crate::logger::default_log_handler;
use crate::should_not_complete;
use crate::{Bridge, Channel, Process};
use anyhow::{anyhow, Context};
use futures::future::{try_join_all, Fuse, Future, FutureExt, TryFuture, TryJoinAll};
use futures::{pin_mut, select};
use ipc_channel::ipc::IpcOneShotServer;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::pin::Pin;
use std::process::Stdio;
use tokio::process::ChildStdout;
use tokio::process::Command;

type BFR<R> = Pin<Box<dyn Future<Output = anyhow::Result<R>>>>;

/// Create default orchestrator
///
/// Default orchestrator comes with `default_log_handler`
///
/// Default log handler will read lines from process stdout
/// and log them with info level adding process name
pub fn orchestrator() -> Orchestrator<impl Future<Output = anyhow::Result<()>>> {
    Orchestrator::from_handlers(default_log_handler)
}

/// Orchestrator which is in progress of starting up
pub struct Orchestrator<LF: TryFuture> {
    pub processes: HashMap<String, Process>,
    loggers: Vec<LF>,
    bridges: Vec<BFR<Bridge>>,
    ipc: bool,
    rust_backtrace: bool,
    logger: fn(ChildStdout, String) -> LF,
}

impl<LF: TryFuture> Orchestrator<LF> {
    /// Create orchestrator with provided log handler
    ///
    /// Log handler is a function: `fn(ChildStdout, String) -> impl TryFuture`
    /// Provided future should process ChildStdout until eof,
    /// returning with anyhow::Result<()>
    pub fn from_handlers(logger: fn(ChildStdout, String) -> LF) -> Self {
        Self {
            processes: HashMap::new(),
            loggers: Vec::new(),
            bridges: Vec::new(),
            ipc: false,
            rust_backtrace: false,
            logger,
        }
    }
}

impl<LF> Orchestrator<LF>
where
    LF: Future<Output = anyhow::Result<()>>,
{
    /// Start provided command with communication channel
    /// As opinionated executor for all the processes Orchestrator provides following setup:
    /// 1. Start IpcOneShotServer and provide server name to process via
    /// commandline argument `--orchestrator-ch`
    /// 2. cmd.kill_on_drop(true) - process will exit if orchestrator's handle is dropped
    /// 3. cmd.stdout(Stdio::piped()) - stdout will be logged as info!(target: &name, ...)
    pub fn start(&mut self, name: &str, cmd: &mut Command) -> anyhow::Result<()> {
        if self.processes.contains_key(name) {
            return Err(anyhow::anyhow!("process named `{}` already started", name));
        }

        let (server, server_name) =
            IpcOneShotServer::new().context("Failed to start IpcOneShotServer")?;

        cmd.kill_on_drop(true).stdout(Stdio::piped());
        if self.ipc {
            cmd.env("IPC_SERVER", server_name);
        }
        if self.rust_backtrace {
            cmd.env("RUST_BACKTRACE", "1");
        }

        debug!(target: "orchestrator", "Starting {} {:?}", name, cmd);

        let mut child = cmd.spawn()?;

        // Redirect command output to stdout - quick and dirty logging
        let stdout = child
            .stdout
            .take()
            .ok_or_else(|| anyhow!("child did not provide a handle to stdout"))?;
        self.loggers.push((self.logger)(stdout, name.to_owned()));

        self.processes.insert(
            name.to_owned(),
            Process {
                name: name.to_owned(),
                child,
            },
        );

        // Spawning Ipc Server to accept incoming channel from child process
        if self.ipc {
            self.bridges
                .push(Box::pin(ipc_handler(server, name.to_owned())));
        }

        Ok(())
    }

    /// Connect to processes IPC channels
    /// Resulting ConnectedOrchestrator can be used to further setup handlers
    /// over processes bridges
    pub async fn connect(self) -> anyhow::Result<ConnectedOrchestrator<Fuse<TryJoinAll<LF>>>> {
        let Orchestrator {
            mut processes,
            bridges,
            loggers,
            ..
        } = self;
        let processes: Vec<BFR<()>> = processes
            .drain()
            .map(|(_k, v)| v)
            .map(never_exit_process_handler)
            .collect();

        // Main future executor, had to implement due to customized pipeline
        // Wait for all bridges to connect to server and pass ipc handles
        let bridges = try_join_all(bridges).fuse();
        // Wait for all logs to complete or any of them to fail
        let mut loggers = Box::pin(try_join_all(loggers).fuse());
        //let i: u32 = loggers;
        // Wait for all processes to complete or any of them to fail
        let mut processes = Box::pin(try_join_all(processes).fuse());
        pin_mut!(bridges);

        let res = select!(
            res = bridges => match res {
                Ok(channels) => { Ok(channels) },
                Err(err) => { error!("failed to establish connection: {}", err); Err(err.into()) }
            },
            res = processes => should_not_complete!("processes", res),
            res = loggers => should_not_complete!("logs", res),
        );

        match res {
            Ok(channels) => Ok(ConnectedOrchestrator::new(channels, processes, loggers)),
            Err(err) => {
                error!(target: "orchestrator", "{}", &err);
                Err(err)
            }
        }
    }
}

impl<LF: TryFuture> Orchestrator<LF> {
    /// Setup IPC channel
    /// Will pass IpcOneShotServer name via `--orchestrator-ch`
    pub fn ipc(mut self, ipc: bool) -> Self {
        self.ipc = ipc;
        self
    }

    /// Start child process with RUST_BACKTRACE=1 env option
    pub fn rust_backtrace(mut self, backtrace: bool) -> Self {
        self.rust_backtrace = backtrace;
        self
    }
}

async fn ipc_handler(server: IpcOneShotServer<Channel>, name: String) -> anyhow::Result<Bridge> {
    let name1 = name.clone();
    let server = tokio::task::spawn_blocking(move || {
        server
            .accept()
            .unwrap_or_else(|err| todo!("failed to establish connection from {}: {}", name1, err))
    });
    let name = name.clone();
    server
        .map(|res| match res {
            Ok((_, channel)) => Ok(Bridge { channel, name }),
            Err(err) => Err(err.into()),
        })
        .await
}

fn never_exit_process_handler(p: Process) -> BFR<()> {
    let Process { child, name } = p;
    let name1 = name.clone();
    Box::pin(
        child
            .inspect(move |status| warn!(target: &name1, "exiting {:?}", status))
            .map(move |status| match status {
                Ok(n) => Err(anyhow!(
                    "process `{}` finish with {}, closing pipeline",
                    name,
                    n
                )),
                Err(err) => Err(err.into()),
            }),
    )
}