radicle-ci-broker 0.24.0

add integration to CI engins or systems to a Radicle node
Documentation
//! Create and update Radicle collaborative objects to notify about CI runs.
//!
//! Using the [radicle-job
//! crate](https://crates.io/crates/radicle-job), this module enables
//! the creation of a "job COB" for every CI run, and updating it when
//! the run ends so that the COB captures the result of the run.

use std::{
    collections::{BTreeSet, HashMap},
    str::FromStr,
    time::Duration,
};

use url::Url;
use uuid::Uuid;

use radicle::{
    git::Oid,
    node::{
        Handle, Node,
        sync::{Announcer, AnnouncerConfig, ReplicationFactor},
    },
    prelude::{Profile, ReadStorage, RepoId},
    storage::git::Repository,
};
use radicle_job::*;

use crate::{logger, msg::RunId};

/// Lookup cache for job COBs.
///
/// This assumes job COBs are not removed from the node.
pub struct KnownJobCobs {
    profile: Profile,
    known: HashMap<(RepoId, Oid), JobId>,
}

impl KnownJobCobs {
    /// Create a new [`KnownJobCobs`].
    pub fn new() -> Result<Self, JobError> {
        let profile = Profile::load().map_err(JobError::Profile)?;
        Ok(Self {
            profile,
            known: HashMap::new(),
        })
    }

    /// Look up job COB for a specific commit.
    pub fn create_job(&mut self, repo_id: RepoId, oid: Oid) -> Option<JobId> {
        let key = (repo_id, oid);
        if let Some(job_id) = self.known.get(&key) {
            Some(*job_id)
        } else if let Ok(job_id) = self.fallible_create_job(repo_id, oid) {
            self.known.insert(key, job_id);
            Some(job_id)
        } else {
            None
        }
    }

    fn fallible_create_job(&self, repo_id: RepoId, oid: Oid) -> Result<JobId, JobError> {
        let repo = repository(&self.profile, repo_id)?;
        let signer = self.profile.signer().map_err(JobError::Signer)?;

        let mut jobs = jobs(&repo)?;
        match job_for_commit(&jobs, oid) {
            Err(JobError::NoJob(_)) => {
                let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
                announce(&self.profile, repo_id, false)?;
                logger::job_create(&repo_id, &oid, job.id());
                Ok(*job.id())
            }
            Err(err) => {
                logger::job_failure(
                    "failed to find job COB for Git object",
                    &repo_id,
                    &oid,
                    Some(&err),
                );
                Err(err)
            }
            Ok(job_id) => {
                logger::job_reuse(&repo_id, &oid, &job_id);
                Ok(job_id)
            }
        }
    }

    /// Create a new run for an existing job. The run id should be the one
    /// assigned by the CI broker, not the one by the adapter. The log URL
    /// has to be the from the adapter.
    pub fn create_run(
        &mut self,
        repo_id: RepoId,
        oid: Oid,
        run_id: RunId,
        url: &Url,
        announce: bool,
    ) {
        if let Some(job_id) = self.create_job(repo_id, oid) {
            if let Err(err) = self.fallible_create_run(job_id, repo_id, run_id, url, announce) {
                logger::job_failure(
                    "failed to add a run to a job COB",
                    &repo_id,
                    &oid,
                    Some(&err),
                );
            }
        }
    }

    fn fallible_create_run(
        &self,
        job_id: JobId,
        repo_id: RepoId,
        run_id: RunId,
        url: &Url,
        a: bool,
    ) -> Result<(), JobError> {
        let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;

        let repo = repository(&self.profile, repo_id)?;
        let signer = self.profile.signer().map_err(JobError::Signer)?;

        let mut jobs = jobs(&repo)?;
        let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
        job.run(uuid, url.clone(), &signer)
            .map_err(JobError::AddRun)?;

        announce(&self.profile, repo_id, a)?;

        logger::job_run_create(job_id, uuid);
        Ok(())
    }
}

/// Mark a run as having finished successfully.
pub fn succeeded(repo_id: RepoId, oid: Oid, run_id: RunId) {
    if let Err(err) = finish(repo_id, oid, run_id, Reason::Succeeded) {
        logger::job_failure(
            "failed to mark a run as succeeded",
            &repo_id,
            &oid,
            Some(&err),
        );
    }
}

/// Mark a run as having finished in failure.
pub fn failed(repo_id: RepoId, oid: Oid, run_id: RunId) {
    if let Err(err) = finish(repo_id, oid, run_id, Reason::Failed) {
        logger::job_failure("failed to mark a run as failed", &repo_id, &oid, Some(&err));
    }
}

fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<(), JobError> {
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;

    let profile = profile()?;
    let repo = repository(&profile, repo_id)?;
    let signer = profile.signer().map_err(JobError::Signer)?;

    let mut jobs = jobs(&repo)?;
    let job_id = job_for_commit(&jobs, oid)?;
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
    job.finish(uuid, reason, &signer)
        .map_err(JobError::Finish)?;
    announce(&profile, repo_id, true)?;

    logger::job_run_finished(job_id, uuid, reason);
    Ok(())
}

fn profile() -> Result<Profile, JobError> {
    Profile::load().map_err(JobError::Profile)
}

fn repository(profile: &Profile, repo_id: RepoId) -> Result<Repository, JobError> {
    profile
        .storage
        .repository(repo_id)
        .map_err(JobError::OpenRepository)
}

fn jobs<'a>(repo: &'a Repository) -> Result<Jobs<'a, Repository>, JobError> {
    Jobs::open(repo).map_err(JobError::Jobs)
}

fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId, JobError> {
    for item in jobs.all().map_err(JobError::AllJobs)? {
        let (job_id, job) = item.map_err(JobError::AllJobsJob)?;
        let job_id = JobId::from(job_id);
        if job.oid() == &wanted {
            return Ok(job_id);
        }
    }

    Err(JobError::NoJob(wanted))
}

fn announce(profile: &Profile, repo_id: RepoId, announce: bool) -> Result<(), JobError> {
    if announce {
        const TIMEOUT: Duration = Duration::from_millis(5000);

        let mut node = Node::new(profile.home.socket());

        let (synced, unsynced) = node.seeds(repo_id).map_err(JobError::Seeds)?.iter().fold(
            (BTreeSet::new(), BTreeSet::new()),
            |(mut synced, mut unsynced), seed| {
                if seed.is_synced() {
                    synced.insert(seed.nid);
                } else {
                    unsynced.insert(seed.nid);
                }
                (synced, unsynced)
            },
        );

        let announcer = Announcer::new(AnnouncerConfig::public(
            *profile.id(),
            ReplicationFactor::MustReach(1),
            BTreeSet::new(),
            synced,
            unsynced,
        ))
        .map_err(|_| JobError::Announcer)?;
        node.announce(repo_id, TIMEOUT, announcer, |_, _| ())
            .map_err(JobError::Announce)?;
    }

    Ok(())
}

/// Errors from managing job COBs.
#[derive(Debug, thiserror::Error)]
pub enum JobError {
    #[error("failed to load Radicle profile")]
    Profile(#[source] radicle::profile::Error),

    #[error("failed to open repository in Radicle node storage")]
    OpenRepository(#[source] radicle::storage::RepositoryError),

    #[error("failed to list job COBs in repository")]
    Jobs(#[source] radicle::storage::RepositoryError),

    #[error("failed to get all job COBs")]
    AllJobs(#[source] radicle::cob::store::Error),

    #[error("failed to create a new job COB")]
    CreateJob(#[source] radicle::cob::store::Error),

    #[error("failed to create a signer for Radicle repository")]
    Signer(#[source] radicle::profile::Error),

    #[error("couldn't get job when iterating")]
    AllJobsJob(#[source] radicle::cob::store::Error),

    #[error("failed to get mutable job COB")]
    GetJobMut(#[source] radicle::cob::store::Error),

    #[error("failed to add a run to a job COB")]
    AddRun(#[source] radicle::cob::store::Error),

    #[error("could not mark a run as finished")]
    Finish(#[source] radicle::cob::store::Error),

    #[error("failed to construct a UUID from a run id")]
    Uuid(#[source] uuid::Error),

    #[error("failed to find job COB for oid {0}")]
    NoJob(Oid),

    #[error("a job for Git object {0} already exists")]
    JobExists(Oid),

    #[error("failed to get seeds for node")]
    Seeds(#[source] radicle::node::Error),

    #[error("failed to announce COB change")]
    Announce(#[source] radicle::node::Error),

    #[error("failed to create a COB announcer")]
    Announcer,
}