use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex, mpsc::Sender},
time::Duration,
};
use time::{OffsetDateTime, macros::format_description};
use tracing::{Level, span};
use radicle::prelude::RepoId;
use crate::{
adapter::Adapter,
cob::KnownJobCobs,
db::{Db, DbError},
logger,
msg::{PatchEvent, PushEvent, Request, RunId},
notif::NotificationSender,
queueproc::ChildInfo,
run::{Run, RunBuilder, Whence},
};
pub struct Broker {
max_run_time: Duration,
db: Db,
}
impl Broker {
#[allow(clippy::result_large_err)]
pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
logger::broker_db(db_filename);
Ok(Self {
max_run_time,
db: Db::new(db_filename)?,
})
}
pub fn max_run_time(&self) -> Duration {
self.max_run_time
}
pub fn db(&self) -> &Db {
&self.db
}
#[allow(clippy::result_large_err)]
pub fn execute_ci(
&self,
adapter: &Adapter,
trigger: &Request,
run_notification: &NotificationSender,
child_info: Sender<ChildInfo>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<Run, BrokerError> {
let broker_run_id = RunId::default();
let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
let run = span.in_scope(|| {
self.execute_helper(
adapter,
broker_run_id,
trigger,
run_notification,
child_info,
known_job_cobs,
)
})?;
Ok(run)
}
fn execute_helper(
&self,
adapter: &Adapter,
broker_run_id: RunId,
trigger: &Request,
run_notification: &NotificationSender,
child_info: Sender<ChildInfo>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<Run, BrokerError> {
logger::broker_start_run(trigger, &broker_run_id);
let (common, whence, oid) = match &trigger {
Request::Trigger {
common,
push:
Some(PushEvent {
pusher,
after,
branch,
..
}),
patch: None,
} => {
let who = pusher.to_string();
(
common,
Whence::branch(branch, *after, Some(who.as_str())),
*after,
)
}
Request::Trigger {
common,
push: None,
patch: Some(PatchEvent { patch, .. }),
} => {
let revision = patch
.revisions
.last()
.ok_or(BrokerError::NoRevisions)?
.clone();
let who = patch.author.to_string();
(
common,
Whence::patch(patch.id, patch.after, revision, Some(who.as_str())),
patch.after,
)
}
_ => panic!("neither a push nor a patch event"),
};
let mut run = RunBuilder::default()
.broker_run_id(broker_run_id)
.repo_id(common.repository.id)
.repo_name(&common.repository.name)
.whence(whence)
.timestamp(now()?)
.build();
self.db.push_run(&run)?;
if let Ok(mut known) = known_job_cobs.lock() {
if let Some(job_id) = known.create_job(trigger.repo(), oid) {
run.set_job_id(job_id);
}
}
if let Err(e) = adapter.run(
trigger,
&mut run,
&self.db,
run_notification,
self.max_run_time,
child_info,
known_job_cobs,
) {
logger::error("failed to run adapter or it failed to run CI", &e);
}
logger::broker_end_run(&run, run.broker_run_id());
self.db.update_run(&run)?;
Ok(run)
}
}
fn now() -> Result<String, time::error::Format> {
let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
OffsetDateTime::now_utc().format(fmt)
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error(transparent)]
Timeformat(#[from] time::error::Format),
#[error(transparent)]
RadicleProfile(#[from] radicle::profile::Error),
#[error("failed to spawn a CI adapter sub-process: {0}")]
SpawnAdapter(PathBuf, #[source] std::io::Error),
#[error("default adapter is not in list of adapters")]
UnknownDefaultAdapter(String),
#[error("could not determine what adapter to use for repository {0}")]
NoAdapter(RepoId),
#[error("tried to execute CI based on a message that is not a trigger one: {0:#?}")]
NotTrigger(Box<Request>),
#[error("failed to understand repository id {0:?}")]
BadRepoId(String, #[source] radicle::identity::IdError),
#[error("expected at least one revision in a patch event")]
NoRevisions,
#[error(transparent)]
Db(#[from] DbError),
}
#[cfg(test)]
mod test {
use super::*;
use std::{path::Path, sync::mpsc::channel, time::Duration};
use tempfile::tempdir;
use super::Broker;
use crate::{
msg::{RunId, RunResult},
notif::NotificationChannel,
run::RunState,
test::{TestResult, mock_adapter, trigger_request},
};
fn broker(filename: &Path) -> Result<Broker, Box<dyn std::error::Error>> {
Ok(Broker::new(filename, Duration::from_secs(1))?)
}
#[allow(clippy::unwrap_used)]
fn known() -> Arc<Mutex<KnownJobCobs>> {
Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
}
#[test]
fn executes_adapter() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
let adapter = mock_adapter(&bin, ADAPTER)?;
let tmp = tempdir()?;
let db = tmp.path().join("db.db");
let broker = broker(&db)?;
let trigger = trigger_request()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
assert!(x.is_ok());
let run = x?;
assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
assert_eq!(run.state(), RunState::Finished);
assert_eq!(run.result(), Some(&RunResult::Success));
Ok(())
}
#[test]
fn adapter_fails() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo woe be me 1>&2
exit 1
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
let adapter = mock_adapter(&bin, ADAPTER)?;
let tmp = tempdir()?;
let db = tmp.path().join("db.db");
let broker = broker(&db)?;
let trigger = trigger_request()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
assert!(x.is_ok());
let run = x?;
assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
assert_eq!(run.state(), RunState::Finished);
assert_eq!(run.result(), Some(&RunResult::Failure));
Ok(())
}
}