radicle-ci-broker 0.24.0

add integration to CI engins or systems to a Radicle node
Documentation
use radicle::Profile;

use crate::{
    ci_event::CiEvent,
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError, QueueId},
    logger,
    notif::NotificationSender,
    worker::Worker,
};

#[derive(Default)]
pub struct QueueAdderBuilder {
    db: Option<Db>,
    events_tx: Option<NotificationSender>,
}

impl QueueAdderBuilder {
    pub fn build(self) -> Result<QueueAdder, AdderError> {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }

    pub fn events_tx(mut self, tx: NotificationSender) -> Self {
        self.events_tx = Some(tx);
        self
    }

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
    }
}

pub struct QueueAdder {
    db: Db,
    events_tx: NotificationSender,
}

impl QueueAdder {
    fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load()?;

        let mut source = CiEventSource::new(&profile)?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
        'event_loop: loop {
            let events = source.event();
            match events {
                Err(e) => {
                    logger::queueadd_control_socket_close(&e);
                    return Err(e.into());
                }
                Ok(None) => {
                    break 'event_loop;
                }
                Ok(Some(events)) => {
                    for e in events {
                        let id = self.push_event(e.clone())?;
                        logger::queueadd_push_event(&e, &id);
                    }
                }
            }
        }
        Ok(())
    }

    fn push_event(&self, e: CiEvent) -> Result<QueueId, AdderError> {
        let id = self.db.push_queued_ci_event(e)?;
        self.events_tx.notify().map_err(|_| AdderError::Send)?;
        Ok(id)
    }
}

impl Worker for QueueAdder {
    const NAME: &str = "queue-adder";
    type Error = AdderError;

    fn work(&mut self) -> Result<(), Self::Error> {
        self.add_events()
    }
}

#[derive(Debug, thiserror::Error)]
pub enum AdderError {
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
    Missing(&'static str),

    #[error(transparent)]
    Profile(#[from] radicle::profile::Error),

    #[error(transparent)]
    CiEvent(#[from] CiEventSourceError),

    #[error(transparent)]
    Db(#[from] DbError),

    #[error("failed to notify other thread about database change")]
    Send,
}