use cotton::prelude::*;
use cbradio::{BaseStation, Agent, Tags, Request, Reply, ReplyMessage, timestamp, project_dir, with_create_dir};
use std::process::{Command, Stdio};
use maybe_string::MaybeString;
use std::thread::{self, JoinHandle};
use std::os::unix::process::ExitStatusExt;
use cotton::chrono::format::{DelayedFormat, strftime::StrftimeItems};
use yansi::{Color, Style};
use serde::{Serialize, Deserialize};
use std::collections::BTreeMap;
use fs2::FileExt;
#[derive(Debug, StructOpt)]
enum CliRequest {
Ping,
Run {
command: String
},
}
#[derive(Debug, StructOpt)]
enum Action {
Agent {
#[structopt(long, short = "t")]
tags: Vec<String>,
#[structopt(long, short = "l")]
log_command_output: bool,
#[structopt(long, short = "r", default_value = ".")]
run_directory: PathBuf,
},
Station {
#[structopt(long, short = "d")]
discovery_file: Option<PathBuf>,
#[structopt(long, short = "D")]
no_discovery: bool,
#[structopt(long, short = "m")]
fail_missing: bool,
#[structopt(long, short = "t", number_of_values = 1)]
tags: Vec<String>,
#[structopt(long, short = "H", default_value = "2")]
hello_wait: usize,
#[structopt(long, short = "R", default_value = "120")]
reply_wait: usize,
#[structopt(long, short = "M", default_value = "4")]
minimum_wait: usize,
#[structopt(subcommand)]
request: CliRequest,
}
}
type Timestamp = i64;
fn format_timestamp(timestamp: Timestamp) -> DelayedFormat<StrftimeItems<'static>> {
let ts = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(timestamp / 1000, (timestamp - (timestamp / 1000 * 1000)) as u32 * 1000000), Utc);
ts.format("%Y-%m-%d %H:%M:%S%.3f")
}
#[derive(Debug, StructOpt)]
struct Cli {
#[structopt(flatten)]
logging: LoggingOpt,
#[structopt(long,short = "r", env = "REDIS_CONNECTION_STRING")]
connection_string: String,
#[structopt(long,short = "i", env = "HOSTNAME")]
identity: String,
#[structopt(long,short = "c", env = "CBRADIO_CHANNEL")]
channel: String,
#[structopt(long, short = "b")]
base_station_public_key: Option<PathBuf>,
#[structopt(long, short = "B")]
base_station_secret_key: Option<PathBuf>,
#[structopt(long, short = "n")]
network_public_key: Option<PathBuf>,
#[structopt(long, short = "N")]
network_secret_key: Option<PathBuf>,
#[structopt(subcommand)]
action: Action,
}
fn process_output<R, F>(mut out: R, mut on_line: F) -> JoinHandle<()>
where
R: BufRead + Send + 'static,
F: FnMut(Result<MaybeString, io::Error>) -> PResult<()> + Send + 'static
{
thread::spawn(move || {
let out = &mut out;
in_context_of("process output collection thread", || {
for line in out.split(b'\n') {
on_line(line.map(MaybeString))?;
}
Ok(())
}).ok_or_log_error();
for line in out.split(b'\n') {
drop(line) }
})
}
#[derive(Debug, Default, Clone)]
struct ReplyStyles {
timestamp: Style,
from: Style,
notice: Style,
stdout: Style,
stderr: Style,
error: Style,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct DiscoveredAgent {
last_seen: Timestamp,
tags: Tags,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct DiscoveredChannel {
agents: BTreeMap<String, DiscoveredAgent>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct DiscoverFile {
channels: BTreeMap<String, DiscoveredChannel>,
#[serde(skip)]
locked_file: Option<File>,
}
#[derive(Debug, PartialEq, Eq)]
enum AgentResult {
Ok,
TimeOut,
AgentError(String),
CommandError(String),
Missing(Timestamp),
}
fn main() -> FinalResult {
let args = Cli::from_args();
init_logger(&args.logging, vec![module_path!()]);
let reply_styles = if args.logging.force_colors || stdout_is_tty() {
ReplyStyles {
timestamp: Color::Green.style(),
from: Color::Blue.style(),
notice: Color::Yellow.style(),
stdout: Color::Cyan.style(),
stderr: Color::Magenta.style(),
error: Color::Red.style(),
}
} else {
Default::default()
};
match args.action {
Action::Agent { tags, run_directory, log_command_output } => {
let run_directory = run_directory.canonicalize().problem_while("canonicalizing run directory")?;
info!("Starting Agent with run directory: {:?}", run_directory);
let mut agent = Agent::new(&args.connection_string, args.identity.clone(), tags.into(), args.channel, args.base_station_public_key.as_deref(), args.network_secret_key.as_deref())?;
let identity = args.identity;
let run_directory = &run_directory;
loop {
agent.rx(|request, mut response| {
match request {
Request::Ping => response.reply(Reply::Pong)?,
Request::Run(command) => {
match run_directory.join(&command).canonicalize() {
Ok(command_path) => {
if !command_path.starts_with(run_directory) {
response.reply(Reply::Error(format!("Bad command: {:?}", command)))?;
} else {
if !command_path.is_file() {
response.reply(Reply::Error(format!("No such command: {:?} ", command)))?;
} else {
info!("Running command: {:?}", command_path);
response.reply(Reply::Run { path: command_path.display().to_string() })?;
match Command::new(command_path)
.current_dir(&run_directory)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args(&[&identity])
.spawn() {
Ok(mut child) => {
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stdout = process_output(BufReader::new(stdout), {
let command = command.clone();
let mut response = response.clone();
let styles = reply_styles.clone();
move |result| {
match result.problem_while("reading stdout") {
Ok(line) => {
if log_command_output {
eprintln!("[{}] {} {}",
styles.timestamp.paint(format_timestamp(timestamp())), styles.stdout.paint("STDOUT:"), line);
}
response.reply(Reply::Stdout(line))
},
Err(err) => response.reply(Reply::Error(format!("Failed to process stdout: {:?}: {}", command, err))),
}
}
});
let stderr = process_output(BufReader::new(stderr), {
let command = command.clone();
let mut response = response.clone();
let styles = reply_styles.clone();
move |result| {
match result.problem_while("reading stderr") {
Ok(line) => {
if log_command_output {
eprintln!("[{}] {} {}",
styles.timestamp.paint(format_timestamp(timestamp())), styles.stderr.paint("STDERR:"), line);
}
response.reply(Reply::Stderr(line))
},
Err(err) => response.reply(Reply::Error(format!("Failed to process stderr: {:?}: {}", command, err))),
}
}
});
stdout.join().ok().ok_or_problem("Joining stdout thread")?;
stderr.join().ok().ok_or_problem("Joining stderr thread")?;
let status = child.wait().problem_while("getting command status code")?;
response.reply(Reply::Status {
signal: status.signal(),
code: status.code(),
})?;
if let Some(code) = status.code() {
info!("Command finished with status code: {}", code);
} else if let Some(signal) = status.signal() {
info!("Command finished with signal: {}", signal);
} else {
info!("Command finished with unknown status");
}
}
Err(err) => response.reply(Reply::Error(format!("Failed to start command: {:?}: {}", command, err)))?,
}
}
}
}
Err(_) => response.reply(Reply::Error(format!("No such command: {:?} ", command)))?,
};
}
};
Ok(())
}).ok_or_log_error();
}
}
Action::Station {
discovery_file,
no_discovery,
fail_missing, tags,
hello_wait,
reply_wait,
minimum_wait,
request
} => {
let discovery_file = if no_discovery {
None
} else {
Some(discovery_file.map(Ok).unwrap_or_else(|| with_create_dir(project_dir().cache_dir().join("discovery.toml")))?)
};
let mut discovery = discovery_file.as_deref().map(|discovery_file| {
let mut file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(discovery_file)?;
if file.try_lock_exclusive().is_err() {
warn!("Discovery file is locked (is there another base station instance running?); waiting for lock...");
file.lock_exclusive()?;
}
in_context_of_with(|| format!("reading discovery file: {:?}", discovery_file), || {
debug!("Reading discovery file: {:?}", discovery_file);
let mut data = String::new();
file.read_to_string(&mut data)?;
let mut discovery: DiscoverFile = if data.is_empty() {
Default::default()
} else {
toml::from_str(&data)?
};
discovery.locked_file = Some(file);
Ok(discovery)
})
}).transpose()?;
let mut base_station = BaseStation::new(&args.connection_string, args.identity, args.channel.clone(), args.network_public_key.as_deref(), args.base_station_secret_key.as_deref())?;
let session_timestamp = timestamp();
let tags: Tags = tags.into();
let mut reply_iter = base_station.request(tags.clone(), hello_wait, match request {
CliRequest::Ping => Request::Ping,
CliRequest::Run { command } => Request::Run(command),
}
)?;
if let Some(discovery) = discovery.as_ref() {
let total = discovery.channels.get(&args.channel)
.into_iter().map(|channel| channel.agents.values()).flatten()
.filter(|agent| tags.agent_tags_match(&agent.tags)).count();
info!("Expecting {} agents to reply", total);
}
let mut agent_results = BTreeMap::new();
loop {
let replies = reply_iter.next();
if let Some(replies) = replies.map(|result| result.problem_while("reading reply (late reply or wrong channel?)").ok_or_log_error()).flatten() {
for reply in replies {
print_reply(&reply, &reply_styles);
if let Some(discovery) = discovery.as_mut() {
let first_reply = if let Some(agent) = discovery.channels.get(&args.channel).and_then(|channel| channel.agents.get(&reply.from)) {
agent.last_seen < session_timestamp
} else {
true
};
if first_reply {
let agent = DiscoveredAgent {
last_seen: session_timestamp,
tags: reply.tags,
};
let channel = discovery.channels.entry(args.channel.clone()).or_default();
if channel.agents.insert(reply.from.clone(), agent).is_none() {
warn!("Discovered new agent: {:?}", reply.from)
}
let agents = channel.agents.values().filter(|agent| tags.agent_tags_match(&agent.tags));
let total = agents.clone().count();
let seen = agents.clone().filter(|agent| agent.last_seen >= session_timestamp).count();
info!("Got a reply from {} of {} agents so far", seen, total);
}
}
match reply.reply {
Reply::Run { .. } => { agent_results.insert(reply.from,
AgentResult::TimeOut); },
Reply::Pong => { agent_results.insert(reply.from,
AgentResult::Ok); },
Reply::Status { code, signal } => { match (code, signal) {
(Some(0), _) => agent_results.insert(reply.from,
AgentResult::Ok),
(Some(status), _) => agent_results.insert(reply.from,
AgentResult::CommandError(format!("Command finished with non-zero status code: {}", status))),
(None, Some(signal)) => agent_results.insert(reply.from,
AgentResult::CommandError(format!("Command aborted with signal: {}", signal))),
(None, None) => agent_results.insert(reply.from,
AgentResult::CommandError(format!("Command aborted for unknown reason"))),
}; },
Reply::Error(err) => { agent_results.insert(reply.from, AgentResult::AgentError(err)); },
_ => (),
};
}
} else {
break;
}
if agent_results.values().any(|result| *result == AgentResult::TimeOut) {
reply_iter.set_timeout(reply_wait)?;
} else {
let sec_elapsed = ((timestamp() - session_timestamp) / 1000) as usize;
let timeout = std::cmp::max(minimum_wait.saturating_sub(sec_elapsed), 1);
reply_iter.set_timeout(timeout)?;
}
}
if let Some(discovery) = discovery {
for (identity, agent) in discovery.channels.get(&args.channel).into_iter().map(|channel| channel.agents.iter()).flatten() {
if agent.last_seen != session_timestamp && tags.agent_tags_match(&agent.tags) {
agent_results.insert(identity.to_owned(), AgentResult::Missing(agent.last_seen));
}
}
if let Some(mut file) = discovery.locked_file.as_ref() {
in_context_of_with(|| format!("writing discovery file: {:?}", discovery_file), || {
debug!("Writing discovery file: {:?}", discovery_file);
file.set_len(0)?;
file.write_all(&toml::to_vec(&discovery)?)?;
Ok(())
})?;
}
}
print_result(&agent_results, &reply_styles);
let mut missing: bool = false;
let mut command_error: bool = false;
let mut timeout_error: bool = false;
let mut agent_error: bool = false;
for (_, result) in agent_results {
match result {
AgentResult::Ok => (),
AgentResult::Missing(_) => missing = true,
AgentResult::CommandError(_) => command_error = true,
AgentResult::TimeOut => timeout_error = true,
AgentResult::AgentError(_) => agent_error = true,
}
}
let mut ret = 0;
let mut messages = Vec::new();
if fail_missing && missing {
ret += 2;
messages.push("missing agent");
}
if command_error {
ret += 4;
messages.push("command error");
}
if timeout_error {
ret += 8;
messages.push("timeout error");
}
if agent_error {
ret += 16;
messages.push("agent error");
}
if ret > 0 {
problem!("Finished with problems: {}", messages.join(", ")).fatal_with_status(ret)?;
}
}
}
Ok(())
}
fn print_reply(reply: &ReplyMessage, styles: &ReplyStyles) {
print!("[{}] {}: ", styles.timestamp.paint(format_timestamp(reply.timestamp)), styles.from.paint(&reply.from));
match &reply.reply {
Reply::Run { path } => println!("{} {}", styles.notice.paint(">>"), path),
Reply::Stdout(message) => println!("{} {}", styles.stdout.paint("O>"), message),
Reply::Stderr(message) => println!("{} {}", styles.stderr.paint("E>"), message),
Reply::Error(message) => println!("{} {}", styles.error.paint("!>"), message),
Reply::Status { code: Some(code), signal: None } => println!("{} Finished with status code: {}", styles.notice.paint("<<"), code),
Reply::Status { code: None, signal: Some(signal) } => println!("{} Finished with signal: {}", styles.error.paint("!!"), signal),
reply @ Reply::Pong => println!("{} {:?}", styles.notice.paint("<<"), reply),
reply => println!("{} {:?}", styles.notice.paint("?>"), reply),
}
}
fn print_result(agent_results: &BTreeMap<String, AgentResult>, styles: &ReplyStyles) {
for (agent, result) in agent_results {
match result {
AgentResult::Ok => info!("{}: {}", styles.from.paint(agent), styles.stdout.paint("OK")),
AgentResult::Missing(last_seen) => warn!("{}: {}: Previously seen agent did not reply; last seen: {}", styles.from.paint(agent), styles.notice.paint("Missing"), format_timestamp(*last_seen)),
AgentResult::TimeOut => warn!("{}: {}: Command started but did not finish in time", styles.from.paint(agent), styles.error.paint("Expired")),
AgentResult::CommandError(err) => warn!("{}: {}: Agent's command finished with error: {}", styles.from.paint(agent), styles.error.paint("KO"), err),
AgentResult::AgentError(err) => error!("{}: {}: Agent finished with error: {}", styles.from.paint(agent), styles.error.paint("Error"), err),
}
}
}