use std::{
collections::HashMap,
ffi::OsStr,
os::unix::process::ExitStatusExt,
path::{Path, PathBuf},
process::{Command, ExitStatus},
sync::{Arc, Mutex, mpsc::Sender},
time::Duration,
};
use serde::Serialize;
use tempfile::{TempDir, tempdir};
use url::Url;
use crate::{
cob::{KnownJobCobs, failed, succeeded},
config::AdapterSpec,
db::{Db, DbError},
logger,
msg::{MessageError, Request, Response, RunResult},
notif::NotificationSender,
queueproc::ChildInfo,
run::{Run, RunState},
sensitive::Sensitive,
timeoutcmd::{RealtimeLines, TimeoutCommand, TimeoutError},
};
#[derive(Clone, Serialize)]
pub struct Adapters {
adapters: HashMap<String, Adapter>,
default_adapter: Option<String>,
}
impl Adapters {
pub fn new(
adapters: &HashMap<String, AdapterSpec>,
default_adapter: Option<&str>,
) -> Result<Self, AdapterError> {
if let Some(default) = default_adapter {
if !adapters.contains_key(default) {
return Err(AdapterError::NoDefaultAdapter);
}
}
Ok(Self {
adapters: HashMap::from_iter(
adapters
.iter()
.map(|(k, v)| (k.to_string(), Adapter::from(v))),
),
default_adapter: default_adapter.map(|s| s.into()),
})
}
pub fn default_adapter(&self) -> Option<&Adapter> {
if let Some(default) = &self.default_adapter {
self.adapters.get(default)
} else {
None
}
}
pub fn get(&self, name: &str) -> Option<&Adapter> {
self.adapters.get(name)
}
pub fn to_json(&self) -> Result<String, AdapterError> {
serde_json::to_string_pretty(self).map_err(AdapterError::AdaptersToJson)
}
}
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize)]
pub struct Adapter {
bin: PathBuf,
env: HashMap<String, String>,
config: HashMap<String, serde_norway::Value>,
config_env: Option<String>,
}
impl Adapter {
pub fn new(bin: &Path) -> Self {
Self {
bin: bin.into(),
env: HashMap::new(),
..Default::default()
}
}
pub fn with_environment(mut self, env: &HashMap<String, String>) -> Self {
for (key, value) in env.iter() {
self.env.insert(key.into(), value.into());
}
self
}
pub fn with_sensitive_environment(mut self, env: &HashMap<String, Sensitive>) -> Self {
for (key, value) in env.iter() {
self.env.insert(key.into(), value.as_str().into());
}
self
}
pub fn with_config(mut self, config: HashMap<String, serde_norway::Value>) -> Self {
self.config = config;
self
}
pub fn with_config_env(mut self, config_env: &str) -> Self {
self.config_env = Some(config_env.to_string());
self
}
fn write_adapter_config(&self, tmpdir: &TempDir) -> Result<PathBuf, AdapterError> {
let filename = tmpdir.path().join("adapter.yaml");
let yaml =
serde_norway::to_string(&self.config).map_err(AdapterError::AdapterConfigToYaml)?;
logger::adapter_temp_config(&filename, &yaml);
std::fs::write(&filename, yaml.as_bytes()).map_err(AdapterError::AdapterConfigWrite)?;
Ok(filename)
}
fn envs(&self) -> impl Iterator<Item = (&OsStr, &OsStr)> {
self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
}
#[allow(clippy::too_many_arguments)]
pub fn run(
&self,
trigger: &Request,
run: &mut Run,
db: &Db,
run_notification: &NotificationSender,
max_run_time: Duration,
child_info: Sender<ChildInfo>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<(), AdapterError> {
run.set_state(RunState::Triggered);
db.update_run(run).map_err(AdapterError::UpdateRun)?;
let x = self.run_helper(
trigger,
run,
db,
run_notification,
max_run_time,
child_info,
known_job_cobs,
);
run.set_state(RunState::Finished);
db.update_run(run).map_err(AdapterError::UpdateRun)?;
if matches!(x, Err(AdapterError::FailedTimeout)) {
run.set_timed_out();
}
x
}
#[allow(clippy::too_many_arguments)]
fn run_helper(
&self,
trigger: &Request,
run: &mut Run,
db: &Db,
run_notification: &NotificationSender,
max_run_time: Duration,
child_pid: Sender<ChildInfo>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<(), AdapterError> {
assert!(matches!(trigger, Request::Trigger { .. }));
let tmp = tempdir().map_err(AdapterError::AdapterConfigDir)?;
let mut cmd = Command::new(&self.bin);
cmd.envs(self.envs());
if let Some(config_env) = &self.config_env {
let filename = self.write_adapter_config(&tmp)?;
cmd.env(config_env, filename);
}
let mut child = TimeoutCommand::new(max_run_time);
let stdout = child.stdout();
child.feed_stdin(trigger.to_string().as_bytes());
let child = match child.spawn(cmd) {
Ok(child) => child,
Err(TimeoutError::Spawn(_, err)) => {
Err(AdapterError::SpawnAdapter(self.bin.clone(), err))?
}
Err(err) => Err(AdapterError::TimeoutCommand(err))?,
};
let child_info = ChildInfo::new(run.broker_run_id().clone(), child.id());
child_pid.send(child_info).ok();
run_notification.notify()?;
let mut outcome = MaybeResult::default();
if let Err(err) = self.read_stdout(run, db, run_notification, stdout, known_job_cobs) {
outcome.set_error(err);
}
let result = child.wait();
match result {
Ok(finished) => {
let stderr = finished.stderr();
self.log_stderr(stderr);
let exit = finished.exit_code();
logger::adapter_result(exit);
if !exit.success() {
if let Some(signal) = exit.signal() {
outcome.set_error(AdapterError::Signal(signal));
} else {
outcome.set_error(AdapterError::Failed(exit));
}
}
}
Err(TimeoutError::TimedOut) => {
logger::adapter_timed_out();
outcome.set_error(AdapterError::FailedTimeout);
}
Err(err) => {
logger::adapter_did_not_exit(&err);
outcome.set_error(AdapterError::TimeoutWait(err));
}
}
if let Some(err) = outcome.error() {
run.set_result(RunResult::Failure);
Err(err)
} else {
Ok(())
}
}
fn read_stdout(
&self,
run: &mut Run,
db: &Db,
run_notification: &NotificationSender,
mut stdout: RealtimeLines,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<(), AdapterError> {
#[allow(clippy::unwrap_used)]
let no_url = Url::parse("https://no.url.example.com").unwrap();
if let Some(line) = stdout.line() {
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
run_notification.notify()?;
match resp {
Response::Triggered { run_id, info_url } => {
run.set_state(RunState::Running);
run.set_adapter_run_id(run_id);
let url = if let Some(url) = info_url {
run.set_adapter_info_url(&url);
Url::parse(&url)
} else {
Ok(no_url.clone())
};
db.update_run(run).map_err(AdapterError::UpdateRun)?;
let url = url.unwrap_or(no_url);
if let Ok(mut known) = known_job_cobs.lock() {
known.create_run(
run.repo_id(),
run.whence().oid(),
run.broker_run_id().clone(),
&url,
false,
);
}
}
_ => {
return Err(AdapterError::NotTriggered(resp));
}
}
} else {
logger::adapter_no_first_response();
return Err(AdapterError::NoFirstMessage);
}
if let Some(line) = stdout.line() {
logger::adapter_stdout_line(&line);
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
run_notification.notify()?;
match resp {
Response::Finished { result } => {
run.set_result(result.clone());
db.update_run(run).map_err(AdapterError::UpdateRun)?;
let repo_id = run.repo_id();
let oid = run.whence().oid();
let run_id = run.broker_run_id().clone();
match &result {
RunResult::Success => succeeded(repo_id, oid, run_id),
RunResult::Failure => failed(repo_id, oid, run_id),
};
}
_ => {
return Err(AdapterError::NotFinished(resp));
}
}
} else {
logger::adapter_no_second_response();
return Err(AdapterError::NoSecondMessage);
}
if let Some(line) = stdout.line() {
logger::adapter_stdout_line(&line);
let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
logger::adapter_too_many_responses();
return Err(AdapterError::TooMany(resp));
}
Ok(())
}
fn log_stderr(&self, stderr: &[u8]) {
for line in String::from_utf8_lossy(stderr).lines() {
logger::adapter_stderr_line(line);
}
}
}
#[derive(Default)]
struct MaybeResult {
error: Option<AdapterError>,
}
impl MaybeResult {
fn set_error(&mut self, err: AdapterError) {
self.error = Some(err);
}
fn error(self) -> Option<AdapterError> {
self.error
}
}
#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
#[error("the default adapter is not defined in the configuration")]
NoDefaultAdapter,
#[error("failed to serialize adapters as JSON")]
AdaptersToJson(#[source] serde_json::Error),
#[error(transparent)]
TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
#[error("failed to spawn a CI adapter sub-process: {0}")]
SpawnAdapter(PathBuf, #[source] std::io::Error),
#[error("failed to create a Response message from adapter output")]
ParseResponse(#[source] MessageError),
#[error("failed to write request to adapter stdin")]
RequestWrite(#[source] MessageError),
#[error("failed to get handle for adapter's stdin")]
StdinHandle,
#[error("failed to get handle for adapter's stdout")]
StdoutHandle,
#[error("failed to get handle for adapter's stderr")]
StderrHandle,
#[error("failed to read the adapter's stderr")]
ReadStderr(#[source] std::io::Error),
#[error("failed to read from adapter stdout")]
ReadLine(#[source] std::io::Error),
#[error("failed to wait for child process to exit")]
Wait(#[source] std::io::Error),
#[error("child process failed with wait status {0:?}")]
Failed(ExitStatus),
#[error("child process was terminated due to taking too long")]
FailedTimeout,
#[error("child process terminated by signal {0}")]
Signal(i32),
#[error("child process did not terminate")]
TimeoutWait(#[source] crate::timeoutcmd::TimeoutError),
#[error("adapter's first message is not 'triggered', but {0:?}")]
NotTriggered(Response),
#[error("adapter did not sent its first message")]
NoFirstMessage,
#[error("adapter did not sent its second message")]
NoSecondMessage,
#[error("adapter's second message is not 'finished', but {0:?}")]
NotFinished(Response),
#[error("adapter sent too many messages: first extra is {0:#?}")]
TooMany(Response),
#[error("failed to update CI run information in database")]
UpdateRun(#[source] DbError),
#[error(transparent)]
Notif(#[from] crate::notif::NotificationError),
#[error("failed to create temporary directory for adapter configuration")]
AdapterConfigDir(#[source] std::io::Error),
#[error("can't convert adapter configuration to YAML")]
AdapterConfigToYaml(#[source] serde_norway::Error),
#[error("failed to write adapter configuration")]
AdapterConfigWrite(#[source] std::io::Error),
#[error("failed to lock mutex")]
Mutex,
}
#[cfg(test)]
mod test {
use super::*;
use std::{fs::write, io::ErrorKind, path::PathBuf, sync::mpsc::channel, time::Duration};
use tempfile::{NamedTempFile, TempDir, tempdir};
use radicle::git::Oid;
use radicle::prelude::RepoId;
use super::{Adapter, Db, Run};
use crate::{
adapter::AdapterError,
cob::KnownJobCobs,
msg::{MessageError, Response, RunId, RunResult},
notif::NotificationChannel,
run::{RunBuilder, Whence},
test::{TestResult, mock_adapter, trigger_request},
};
const MAX: Duration = Duration::from_secs(10);
fn db() -> Result<Db, Box<dyn std::error::Error>> {
let tmp = NamedTempFile::new()?;
let db = Db::new(tmp.path())?;
Ok(db)
}
fn run() -> Result<Run, Box<dyn std::error::Error>> {
Ok(RunBuilder::default()
.broker_run_id(RunId::default())
.repo_id(RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?)
.repo_name("test.repo")
.whence(Whence::branch(
"main",
Oid::try_from("ff3099ba5de28d954c41d0b5a84316f943794ea4")?,
Some("J. Random Hacker <random@example.com>"),
))
.timestamp("2024-02-29T12:58:12+02:00".into())
.build())
}
#[allow(clippy::unwrap_used)]
fn known() -> Arc<Mutex<KnownJobCobs>> {
Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
}
#[allow(clippy::unwrap_used)]
fn adapter(tmp: &TempDir, shell: &'static str) -> PathBuf {
let bin = tmp.path().join("adapter.sh");
mock_adapter(&bin, shell).unwrap();
bin
}
#[test]
fn adapter_reports_success() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
)?;
assert_eq!(run.result(), Some(&RunResult::Success));
Ok(())
}
#[test]
fn adapter_reports_failure() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
match x {
Ok(_) => (),
Err(AdapterError::RequestWrite(_)) => (),
_ => panic!("unexpected result: {x:#?}"),
}
Ok(())
}
#[test]
fn adapter_exits_nonzero() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
echo woe be me 1>&2
exit 1
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(x.is_err());
assert_eq!(run.result(), Some(&RunResult::Failure));
Ok(())
}
#[test]
fn adapter_is_killed_before_any_messages() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
kill -9 $$
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::Signal(9))));
Ok(())
}
#[test]
fn adapter_ends_ok_before_first_message() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::NoFirstMessage)));
Ok(())
}
#[test]
fn adapter_is_killed_before_first_message() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
kill -9 $$
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::Signal(9))));
Ok(())
}
#[test]
fn adapter_ends_ok_before_second_message() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
Ok(())
}
#[test]
fn adapter_is_killed_after_second_message() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
kill -9 $$
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("Adapter::run result: {x:#?}");
if let Err(AdapterError::Failed(x)) = x {
use std::os::unix::process::ExitStatusExt;
eprintln!("Adapter::run result: signal={:?}", x.signal());
}
assert!(matches!(x, Err(AdapterError::Signal(9))));
Ok(())
}
#[test]
fn adapter_produces_as_bad_message() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success","bad":"field"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
match x {
Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
_ => panic!("unexpected result: {x:#?}"),
}
Ok(())
}
#[test]
fn adapter_first_message_isnt_triggered() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"finished","result":"success"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(
x,
Err(AdapterError::NotTriggered(Response::Finished {
result: RunResult::Success
}))
));
Ok(())
}
#[test]
fn adapter_outputs_too_many_messages() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo '{"response":"finished","result":"success"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
assert!(matches!(
x,
Err(AdapterError::TooMany(Response::Finished {
result: RunResult::Success
}))
));
Ok(())
}
#[test]
fn adapter_does_not_exist() -> TestResult<()> {
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::NotFound);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
#[test]
fn adapter_is_not_executable() -> TestResult<()> {
const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;
let tmp = tempdir()?;
let bin = tmp.path().join("adapter.sh");
write(&bin, ADAPTER)?;
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("{x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::PermissionDenied);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
#[test]
fn adapter_has_bad_interpreter() -> TestResult<()> {
let tmp = tempdir()?;
let bin = adapter(
&tmp,
r#"#!/bin/does-not-exist
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#,
);
let db = db()?;
let mut run = run()?;
let (pid_tx, _) = channel();
let mut channel = NotificationChannel::new_run();
let sender = channel.tx()?;
let x = Adapter::new(&bin).run(
&trigger_request()?,
&mut run,
&db,
&sender,
MAX,
pid_tx,
known(),
);
eprintln!("result from run: {x:#?}");
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
assert_eq!(e.kind(), ErrorKind::NotFound);
}
_ => panic!("expected a specific error"),
}
Ok(())
}
}