radicle-ci-broker 0.24.0

add integration to CI engins or systems to a Radicle node
Documentation
//! The business logic of the CI broker.
//!
//! This is type and module of its own to facilitate automated
//! testing.

use std::{
    path::{Path, PathBuf},
    sync::{Arc, Mutex, mpsc::Sender},
    time::Duration,
};

use time::{OffsetDateTime, macros::format_description};
use tracing::{Level, span};

use radicle::prelude::RepoId;

use crate::{
    adapter::Adapter,
    cob::KnownJobCobs,
    db::{Db, DbError},
    logger,
    msg::{PatchEvent, PushEvent, Request, RunId},
    notif::NotificationSender,
    queueproc::ChildInfo,
    run::{Run, RunBuilder, Whence},
};

/// A CI broker.
///
/// The broker gets repository change events from the local Radicle
/// node, and executes the appropriate adapter to run CI on the
/// repository.
pub struct Broker {
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
            max_run_time,
            db: Db::new(db_filename)?,
        })
    }

    pub fn max_run_time(&self) -> Duration {
        self.max_run_time
    }

    pub fn db(&self) -> &Db {
        &self.db
    }

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(
        &self,
        adapter: &Adapter,
        trigger: &Request,
        run_notification: &NotificationSender,
        child_info: Sender<ChildInfo>,
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
        let run = span.in_scope(|| {
            self.execute_helper(
                adapter,
                broker_run_id,
                trigger,
                run_notification,
                child_info,
                known_job_cobs,
            )
        })?;
        Ok(run)
    }

    fn execute_helper(
        &self,
        adapter: &Adapter,
        broker_run_id: RunId,
        trigger: &Request,
        run_notification: &NotificationSender,
        child_info: Sender<ChildInfo>,
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger, &broker_run_id);
        let (common, whence, oid) = match &trigger {
            Request::Trigger {
                common,
                push:
                    Some(PushEvent {
                        pusher,
                        after,
                        branch,
                        ..
                    }),
                patch: None,
            } => {
                let who = pusher.to_string();
                (
                    common,
                    Whence::branch(branch, *after, Some(who.as_str())),
                    *after,
                )
            }
            Request::Trigger {
                common,
                push: None,
                patch: Some(PatchEvent { patch, .. }),
            } => {
                let revision = patch
                    .revisions
                    .last()
                    .ok_or(BrokerError::NoRevisions)?
                    .clone();
                let who = patch.author.to_string();
                (
                    common,
                    Whence::patch(patch.id, patch.after, revision, Some(who.as_str())),
                    patch.after,
                )
            }
            _ => panic!("neither a push nor a patch event"),
        };

        let mut run = RunBuilder::default()
            .broker_run_id(broker_run_id)
            .repo_id(common.repository.id)
            .repo_name(&common.repository.name)
            .whence(whence)
            .timestamp(now()?)
            .build();
        self.db.push_run(&run)?;

        // Try to create a job COB. If that fails, the function logs
        // it. We won't do anything about a failure, as there's
        // nothing useful we can do about it, as long as we let CI
        // run, which want to do.
        if let Ok(mut known) = known_job_cobs.lock() {
            if let Some(job_id) = known.create_job(trigger.repo(), oid) {
                run.set_job_id(job_id);
            }
        }

        // We run the adapter, but if that fails, we just
        // log the error. The `Run` value records the
        // result of the run.
        if let Err(e) = adapter.run(
            trigger,
            &mut run,
            &self.db,
            run_notification,
            self.max_run_time,
            child_info,
            known_job_cobs,
        ) {
            logger::error("failed to run adapter or it failed to run CI", &e);
        }

        logger::broker_end_run(&run, run.broker_run_id());

        self.db.update_run(&run)?;

        Ok(run)
    }
}

fn now() -> Result<String, time::error::Format> {
    let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
    OffsetDateTime::now_utc().format(fmt)
}

/// All possible errors from this module.
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
    /// Error formatting a time as a string.
    #[error(transparent)]
    Timeformat(#[from] time::error::Format),

    /// Error from Radicle.
    #[error(transparent)]
    RadicleProfile(#[from] radicle::profile::Error),

    /// Error from spawning a sub-process.
    #[error("failed to spawn a CI adapter sub-process: {0}")]
    SpawnAdapter(PathBuf, #[source] std::io::Error),

    /// Default adapter is not in list of adapters.
    #[error("default adapter is not in list of adapters")]
    UnknownDefaultAdapter(String),

    /// No adapter set for repository and no default adapter set.
    #[error("could not determine what adapter to use for repository {0}")]
    NoAdapter(RepoId),

    /// Request is not a trigger message.
    #[error("tried to execute CI based on a message that is not a trigger one: {0:#?}")]
    NotTrigger(Box<Request>),

    /// Could not convert repository ID from string.
    #[error("failed to understand repository id {0:?}")]
    BadRepoId(String, #[source] radicle::identity::IdError),

    /// Patch event doesn't have any revisions.
    #[error("expected at least one revision in a patch event")]
    NoRevisions,

    /// Database error.
    #[error(transparent)]
    Db(#[from] DbError),
}

#[cfg(test)]
mod test {
    use super::*;
    use std::{path::Path, sync::mpsc::channel, time::Duration};
    use tempfile::tempdir;

    use super::Broker;
    use crate::{
        msg::{RunId, RunResult},
        notif::NotificationChannel,
        run::RunState,
        test::{TestResult, mock_adapter, trigger_request},
    };

    fn broker(filename: &Path) -> Result<Broker, Box<dyn std::error::Error>> {
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

    #[allow(clippy::unwrap_used)]
    fn known() -> Arc<Mutex<KnownJobCobs>> {
        Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
    }

    #[test]
    fn executes_adapter() -> TestResult<()> {
        const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;

        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");
        let adapter = mock_adapter(&bin, ADAPTER)?;

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
        let broker = broker(&db)?;

        let trigger = trigger_request()?;

        let (pid_tx, _) = channel();

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
        assert_eq!(run.state(), RunState::Finished);
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
    }

    #[test]
    fn adapter_fails() -> TestResult<()> {
        const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo woe be me 1>&2
exit 1
"#;

        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");
        let adapter = mock_adapter(&bin, ADAPTER)?;

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
        let broker = broker(&db)?;

        let trigger = trigger_request()?;

        let (pid_tx, _) = channel();

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
        assert_eq!(run.state(), RunState::Finished);
        assert_eq!(run.result(), Some(&RunResult::Failure));

        Ok(())
    }
}