#![allow(clippy::enum_variant_names)]
use graphql_client::{GraphQLQuery, Response};
use reqwest::Client;
use serde::{de::DeserializeOwned, Serialize};
use std::{collections::BTreeMap, fmt};
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "buildkite/schema.json",
query_path = "buildkite/jobs.gql",
response_derives = "Debug"
)]
struct GetPipelineById;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "buildkite/schema.json",
query_path = "buildkite/jobs.gql",
response_derives = "Debug"
)]
struct GetBuilds;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "buildkite/schema.json",
query_path = "buildkite/jobs.gql",
response_derives = "Debug"
)]
struct GetRunningBuilds;
#[derive(Debug)]
pub enum BkErr {
InvalidHttpHeader(http::header::InvalidHeaderValue),
Http(reqwest::Error),
Reqwest(reqwest::Error),
Json(serde_json::error::Error),
UnknownOrg(String),
InvalidOrgId(String),
UnknownPipeline(String),
Threadpool(std::io::Error),
Request(GraphQLErrors),
InvalidResponse(String),
}
impl std::error::Error for BkErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::InvalidHttpHeader(ihv) => Some(ihv),
Self::Http(h) => Some(h),
Self::Reqwest(r) => Some(r),
Self::Json(j) => Some(j),
Self::Threadpool(tp) => Some(tp),
_ => None,
}
}
}
impl fmt::Display for BkErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidHttpHeader(ihv) => write!(f, "invalid header value: {}", ihv),
Self::Http(re) => write!(f, "http error: {}", re),
Self::Reqwest(re) => write!(f, "http request error: {}", re),
Self::Json(je) => write!(f, "json error: {}", je),
Self::UnknownOrg(o) => write!(f, "unknown org '{}'", o),
Self::InvalidOrgId(o) => write!(f, "invalid org id '{}'", o),
Self::UnknownPipeline(p) => write!(f, "unknown pipeline '{}'", p),
Self::Threadpool(io) => write!(f, "threadpool error: {}", io),
Self::Request(api) => write!(f, "API failure: {}", api),
Self::InvalidResponse(res) => write!(f, "invalid response {}", res),
}
}
}
impl From<reqwest::Error> for BkErr {
fn from(e: reqwest::Error) -> Self {
Self::Reqwest(e)
}
}
#[derive(Debug)]
pub struct GraphQLErrors {
errors: Vec<graphql_client::Error>,
}
impl fmt::Display for GraphQLErrors {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.errors.iter()).finish()
}
}
async fn send_request<
'a,
Req: Serialize,
Res: DeserializeOwned + std::marker::Unpin + std::fmt::Debug,
>(
client: &'a Client,
req: &'a Req,
hash: Option<u64>,
) -> Result<(Option<Res>, Option<u64>), BkErr> {
use std::hash::Hasher;
let res = client.post(crate::BK_API_URL).json(req).send().await?;
let res = res.error_for_status().map_err(BkErr::Http)?;
let content_type = res.headers().get(http::header::CONTENT_TYPE);
if content_type
!= Some(&http::header::HeaderValue::from_static(
"application/json; charset=utf-8",
))
{
return Err(BkErr::InvalidResponse(format!(
"invalid content-type: {:?}",
content_type
)));
}
let json_body = res.bytes().await?;
let new_hash = {
let mut hasher = twox_hash::XxHash::default();
hasher.write(&json_body);
Some(hasher.finish())
};
if hash.is_some() && new_hash == hash {
return Ok((None, new_hash));
}
let res: Response<Res> = serde_json::from_slice(&json_body).map_err(|e| {
error!(
"json error for response: {:?}",
String::from_utf8(json_body.to_vec())
);
BkErr::Json(e)
})?;
if let Some(errs) = res.errors {
return Err(BkErr::Request(GraphQLErrors { errors: errs }));
}
Ok((res.data, new_hash))
}
macro_rules! parse_uuid {
($uuid_str:expr) => {
match Uuid::parse_str($uuid_str) {
Ok(u) => u,
Err(err) => {
error!("failed to parse Buildkite UUID '{}': {}", $uuid_str, err);
return None;
}
}
};
}
#[derive(Debug, PartialEq, Eq)]
pub enum BuildStates {
Skipped,
Creating,
Scheduled,
Running,
Passed,
Failed,
Failing,
Canceling,
Canceled,
Blocked,
NotRun,
Unknown,
}
impl From<get_builds::BuildStates> for BuildStates {
fn from(bs: get_builds::BuildStates) -> Self {
use get_builds::BuildStates as BS;
match bs {
BS::SKIPPED => Self::Skipped,
BS::CREATING => Self::Creating,
BS::SCHEDULED => Self::Scheduled,
BS::RUNNING => Self::Running,
BS::PASSED => Self::Passed,
BS::FAILED => Self::Failed,
BS::FAILING => Self::Failing,
BS::CANCELING => Self::Canceling,
BS::CANCELED => Self::Canceled,
BS::BLOCKED => Self::Blocked,
BS::NOT_RUN => Self::NotRun,
BS::Other(_) => Self::Unknown,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum JobStates {
Pending,
Waiting,
WaitingFailed,
Blocked,
BlockedFailed,
Unblocked,
UnblockedFailed,
Limiting,
Limited,
Scheduled,
Assigned,
Accepted,
Running,
Finished,
Canceling,
Canceled,
TimingOut,
TimedOut,
Skipped,
Broken,
Expired,
Unknown,
}
impl From<get_builds::JobStates> for JobStates {
fn from(js: get_builds::JobStates) -> Self {
use get_builds::JobStates as JS;
match js {
JS::PENDING => Self::Pending,
JS::WAITING => Self::Waiting,
JS::WAITING_FAILED => Self::WaitingFailed,
JS::BLOCKED => Self::Blocked,
JS::BLOCKED_FAILED => Self::BlockedFailed,
JS::UNBLOCKED => Self::Unblocked,
JS::UNBLOCKED_FAILED => Self::UnblockedFailed,
JS::LIMITING => Self::Limiting,
JS::LIMITED => Self::Limited,
JS::SCHEDULED => Self::Scheduled,
JS::ASSIGNED => Self::Assigned,
JS::ACCEPTED => Self::Accepted,
JS::RUNNING => Self::Running,
JS::FINISHED => Self::Finished,
JS::CANCELING => Self::Canceling,
JS::CANCELED => Self::Canceled,
JS::TIMING_OUT => Self::TimingOut,
JS::TIMED_OUT => Self::TimedOut,
JS::SKIPPED => Self::Skipped,
JS::BROKEN => Self::Broken,
JS::EXPIRED => Self::Expired,
JS::Other(_) => Self::Unknown,
}
}
}
#[derive(Debug)]
pub struct Job {
pub uuid: Uuid,
pub label: String,
pub build_uuid: Uuid,
pub exit_status: Option<i32>,
pub state: JobStates,
query_rules: String,
pub agent: Option<String>,
}
fn iter_query_rules(query_rules: &str) -> impl Iterator<Item = (&str, Option<&str>)> {
query_rules.split(',').filter_map(|rule| {
let mut i = rule.splitn(2, '=');
match (i.next(), i.next()) {
(Some(k), Some(v)) => {
if v.is_empty() || v == "*" {
Some((k, None))
} else {
Some((k, Some(v)))
}
}
_ => None,
}
})
}
impl Job {
pub fn iter_query_rules(&self) -> impl Iterator<Item = (&str, Option<&str>)> {
iter_query_rules(&self.query_rules)
}
}
#[derive(Debug)]
pub struct Build {
pub uuid: Uuid,
pub metadata: BTreeMap<String, String>,
pub message: Option<String>,
pub commit: String,
pub state: BuildStates,
pub jobs: Vec<Job>,
}
impl std::cmp::PartialOrd for Build {
fn partial_cmp(&self, other: &Build) -> Option<std::cmp::Ordering> {
Some(self.uuid.cmp(&other.uuid))
}
}
impl std::cmp::Ord for Build {
fn cmp(&self, other: &Build) -> std::cmp::Ordering {
self.uuid.cmp(&other.uuid)
}
}
impl std::cmp::PartialEq for Build {
fn eq(&self, other: &Build) -> bool {
self.uuid == other.uuid
}
}
impl std::cmp::Eq for Build {}
pub struct Builds {
pub builds: Vec<Build>,
pub pipeline: String,
}
pub struct Monitor {
client: Client,
}
#[derive(Clone)]
struct KnownBuild {
uuid: Uuid,
commit: String,
}
impl std::cmp::PartialOrd for KnownBuild {
fn partial_cmp(&self, other: &KnownBuild) -> Option<std::cmp::Ordering> {
Some(self.uuid.cmp(&other.uuid))
}
}
impl std::cmp::Ord for KnownBuild {
fn cmp(&self, other: &KnownBuild) -> std::cmp::Ordering {
self.uuid.cmp(&other.uuid)
}
}
impl std::cmp::PartialEq for KnownBuild {
fn eq(&self, other: &KnownBuild) -> bool {
self.uuid == other.uuid
}
}
impl std::cmp::Eq for KnownBuild {}
impl Monitor {
pub async fn with_token(token: String) -> Result<Monitor, BkErr> {
use http::{header::AUTHORIZATION, HeaderMap};
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
format!("Bearer {}", token)
.parse::<http::header::HeaderValue>()
.map_err(BkErr::InvalidHttpHeader)?,
);
let client = reqwest::ClientBuilder::new()
.default_headers(headers)
.build()?;
Ok(Self { client })
}
#[inline]
pub fn client(&self) -> Client {
self.client.clone()
}
pub async fn watch<'a>(
&'a self,
pipeline_identifier: &'a str,
) -> Result<futures::channel::mpsc::Receiver<Builds>, BkErr> {
let req_body = GetPipelineById::build_query(get_pipeline_by_id::Variables {
id: pipeline_identifier.to_owned(),
});
let (res_body, _) =
send_request::<_, get_pipeline_by_id::ResponseData>(&self.client, &req_body, None)
.await?;
let pipeline = res_body
.and_then(|root| root.node)
.and_then(|n| {
if let get_pipeline_by_id::GetPipelineByIdNode::Pipeline(pipeline) = n {
Some(pipeline)
} else {
None
}
})
.ok_or_else(|| BkErr::UnknownPipeline(pipeline_identifier.to_owned()))?;
let get_pipeline_by_id::GetPipelineByIdNodeOnPipeline {
name: pipeline_name,
id: pipeline_id,
description: pipeline_description,
..
} = pipeline;
let (mut tx, rx) = futures::channel::mpsc::channel(100);
let client = self.client.clone();
let poll = async move {
info!(
"watching pipeline {}({}) '{}'",
pipeline_name,
pipeline_id,
pipeline_description.as_deref().unwrap_or("<none>")
);
let mut failure_count = 0u32;
let mut query_hash = None;
let mut known_builds = Vec::new();
let mut sleep = 1;
loop {
tokio::time::sleep(std::time::Duration::from_secs(sleep)).await;
let builds =
match get_builds(&client, &pipeline_id, &mut known_builds, &mut query_hash)
.await
{
Ok(builds) => {
if failure_count > 0 {
info!("successfully sent query after {} failures", failure_count);
failure_count = 0;
sleep = 1;
}
match builds {
Some(builds) => Builds {
builds,
pipeline: pipeline_name.clone(),
},
None => continue,
}
}
Err(err) => {
failure_count += 1;
sleep *= 2;
sleep = std::cmp::min(sleep, 60);
warn!(
"failed to send query {} time(s) in a row: {}",
failure_count, err
);
continue;
}
};
if tx.try_send(builds).is_err() {
break;
}
}
info!(
"stopped watching pipeline {}({})",
pipeline_name, pipeline_id,
);
};
tokio::spawn(poll);
Ok(rx)
}
}
async fn get_builds(
client: &Client,
pipeline_id: &str,
known_builds: &mut Vec<KnownBuild>,
query_hash: &mut Option<u64>,
) -> Result<Option<Vec<Build>>, BkErr> {
let req_body = GetRunningBuilds::build_query(get_running_builds::Variables {
id: pipeline_id.to_owned(),
});
let (res_body, _hash) =
send_request::<_, get_running_builds::ResponseData>(client, &req_body, None).await?;
let builds_to_check = res_body
.and_then(|root| root.node)
.and_then(|node| match node {
get_running_builds::GetRunningBuildsNode::Pipeline(pipeline) => pipeline.builds,
_ => None,
})
.and_then(|in_builds| in_builds.edges)
.map(|builds| {
builds
.into_iter()
.filter_map(|n| n.and_then(|n| n.node))
.filter_map(|build| {
Some(KnownBuild {
uuid: parse_uuid!(&build.uuid),
commit: build.commit,
})
})
.collect::<Vec<_>>()
});
let commits: Vec<_> = known_builds
.iter()
.map(|kb| kb.commit.clone())
.chain(
builds_to_check
.unwrap_or_default()
.into_iter()
.map(|kb| kb.commit),
)
.collect();
let req_body = GetBuilds::build_query(get_builds::Variables {
id: pipeline_id.to_owned(),
commits: Some(commits),
});
let (res_body, hash) =
send_request::<_, get_builds::ResponseData>(client, &req_body, *query_hash).await?;
if *query_hash == hash {
return Ok(None);
}
*query_hash = hash;
let builds = res_body
.and_then(|root| {
if let Some(node) = root.node {
match node {
get_builds::GetBuildsNode::Pipeline(pipeline) => pipeline.builds,
_ => None,
}
} else {
None
}
})
.and_then(|in_builds| in_builds.edges)
.and_then(|builds| {
let mut builds: Vec<_> = builds
.into_iter()
.filter_map(|n| n.and_then(|n| n.node))
.filter_map(|build| {
let uuid = parse_uuid!(&build.uuid);
let jobs: Option<Vec<_>> = build.jobs.and_then(|jn| jn.edges)
.map(|jobs| {
jobs.into_iter()
.filter_map(|e| e.and_then(|n| n.node))
.filter_map(|job| {
match job {
get_builds::GetBuildsNodeOnPipelineBuildsEdgesNodeJobsEdgesNode::JobTypeCommand(cmd) => {
Some(Job {
uuid: parse_uuid!(&cmd.uuid),
build_uuid: uuid,
label: cmd.label.unwrap_or_else(|| "<unlabeled>".to_owned()),
state: cmd.state.into(),
query_rules: cmd.agent_query_rules.as_ref().map(|v| v.join(",")).unwrap_or_default(),
exit_status: cmd.exit_status.and_then(|es| es.parse::<i32>().ok()),
agent: cmd.agent.map(|a| a.name),
})
}
_ => None
}
}).collect()
});
let jobs = match jobs {
Some(ref j) if j.is_empty() => return None,
None => return None,
Some(j) => j,
};
let metadata = {
let mut meta = BTreeMap::new();
if let Some(md) = build.triggered_from.and_then(|tf| tf.build.and_then(|b| b.meta_data.and_then(|md| md.edges))) {
meta.extend(md.into_iter().filter_map(|e| {
e.and_then(|e| e.node.map(|kv| (kv.key, kv.value)))
}));
}
if let Some(md) = build.meta_data.and_then(|md| md.edges) {
meta.extend(md.into_iter().filter_map(|e| {
e.and_then(|e| e.node.map(|kv| (kv.key, kv.value)))
}));
}
meta
};
Some(Build {
jobs,
metadata,
message: build.message,
commit: build.commit,
uuid,
state: build.state.into(),
})
})
.collect();
builds.sort();
if !builds.is_empty() {
Some(builds)
} else {
None
}
});
known_builds.clear();
if let Some(ref builds) = builds {
known_builds.extend(builds.iter().filter_map(|b| match b.state {
BuildStates::Running => Some(KnownBuild {
uuid: b.uuid,
commit: b.commit.clone(),
}),
_ => None,
}));
}
Ok(builds)
}
#[cfg(test)]
mod test {
#[test]
fn iters_agent_rules() {
let mut iter = super::iter_query_rules("queue=something,os=linux,blah=thing");
assert_eq!(iter.next(), Some(("queue", Some("something"))));
assert_eq!(iter.next(), Some(("os", Some("linux"))));
assert_eq!(iter.next(), Some(("blah", Some("thing"))));
assert_eq!(iter.next(), None);
}
#[test]
fn iters_empty_agent_rules() {
let mut iter = super::iter_query_rules("");
assert_eq!(iter.next(), None);
}
}