use anyhow::{anyhow, bail, Context as _, Error};
use buildkite_jobify::{jobifier::Jobifier, monitor::Monitor, scheduler::Scheduler};
use camino::Utf8PathBuf as PathBuf;
use clap::Parser;
use serde::Deserialize;
use tracing_subscriber::filter::LevelFilter;
#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
struct Config {
api_token: Option<String>,
namespace: Option<String>,
pipelines: Vec<String>,
cluster: Option<String>,
}
fn parse_level(s: &str) -> Result<LevelFilter, Error> {
s.parse::<LevelFilter>()
.with_context(|| format!("failed to parse level '{}'", s))
}
#[derive(Parser)]
#[clap(name = "jobify")]
struct Opts {
#[clap(short, long, action)]
config: Option<PathBuf>,
#[clap(short = 't', long, action)]
api_token: Option<String>,
#[clap(short, long, action)]
namespace: Option<String>,
#[clap(long, action)]
cluster: Option<String>,
#[structopt(
short = 'L',
long,
default_value = "info",
value_parser = parse_level,
long_help = "The log level for messages, only log messages at or above the level will be emitted.
Possible values:
* off
* error
* warn
* info (default)
* debug
* trace"
)]
log_level: LevelFilter,
#[clap(long, action)]
json: bool,
#[clap(name = "PIPELINE", action)]
pipelines: Vec<String>,
}
async fn real_main() -> Result<(), Error> {
let args = Opts::parse();
let mut env_filter = tracing_subscriber::EnvFilter::from_default_env();
env_filter = env_filter.add_directive(args.log_level.into());
let subscriber = tracing_subscriber::FmtSubscriber::builder().with_env_filter(env_filter);
if args.json {
tracing::subscriber::set_global_default(subscriber.json().finish())
.context("failed to set default subscriber")?;
} else {
tracing::subscriber::set_global_default(subscriber.finish())
.context("failed to set default subscriber")?;
};
let get_cfg = || {
let mut cfg: Config = match args.config {
Some(ref path) => {
let contents = std::fs::read_to_string(&path)
.map_err(|e| anyhow!("failed to read configuration from {path}: {e}"))?;
toml::from_str(&contents)
.map_err(|e| anyhow!("failed to deserialize configuration from {path}: {e}"))?
}
None => Config {
api_token: None,
namespace: None,
cluster: None,
pipelines: Vec::new(),
},
};
if let Some(token) = args.api_token {
cfg.api_token = Some(token);
}
if let Some(ns) = args.namespace {
cfg.namespace = Some(ns);
}
if let Some(cluster) = args.cluster {
cfg.cluster = Some(cluster);
}
if !args.pipelines.is_empty() {
cfg.pipelines = args.pipelines;
}
let api_token = match cfg.api_token {
Some(tok) => tok,
None => match std::env::var("BUILDKITE_API_TOKEN") {
Ok(tok) => tok,
Err(e) => {
bail!(
"failed to read BUILDKITE_API_TOKEN environment variable: {}",
e
);
}
},
};
if cfg.pipelines.is_empty() {
bail!("no pipelines were specified to monitor");
}
Ok((
api_token,
cfg.namespace.unwrap_or_else(|| "buildkite".to_owned()),
cfg.pipelines,
cfg.cluster,
))
};
let (token, namespace, pipelines, cluster) = get_cfg()?;
tracing::info!("namespace: {} cluster: {:?}", namespace, cluster);
let start: Result<_, Error> = async {
let monitor = Monitor::with_token(token.clone())
.await
.context("buildkite org monitor")?;
let jobifier = Jobifier::create(token, namespace, cluster).context("k8s jobifier")?;
let scheduler = Scheduler::new(monitor, jobifier);
for pipeline in pipelines {
scheduler.watch(&pipeline).await.context("pipeline watch")?;
}
Ok(scheduler)
}
.await;
let scheduler = start?;
scheduler.wait().await;
Ok(())
}
#[tokio::main]
async fn main() {
match real_main().await {
Ok(_) => {}
Err(e) => {
tracing::error!("{:#}", e);
#[allow(clippy::exit)]
std::process::exit(1);
}
}
}