use radicle::Profile;
use crate::{
ci_event::CiEvent,
ci_event_source::{CiEventSource, CiEventSourceError},
db::{Db, DbError, QueueId},
logger,
notif::NotificationSender,
worker::Worker,
};
#[derive(Default)]
pub struct QueueAdderBuilder {
db: Option<Db>,
events_tx: Option<NotificationSender>,
}
impl QueueAdderBuilder {
pub fn build(self) -> Result<QueueAdder, AdderError> {
Ok(QueueAdder {
db: self.db.ok_or(AdderError::Missing("db"))?,
events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
})
}
pub fn events_tx(mut self, tx: NotificationSender) -> Self {
self.events_tx = Some(tx);
self
}
pub fn db(mut self, db: Db) -> Self {
self.db = Some(db);
self
}
}
pub struct QueueAdder {
db: Db,
events_tx: NotificationSender,
}
impl QueueAdder {
fn add_events(&self) -> Result<(), AdderError> {
let profile = Profile::load()?;
let mut source = CiEventSource::new(&profile)?;
'event_loop: loop {
let events = source.event();
match events {
Err(e) => {
logger::queueadd_control_socket_close(&e);
return Err(e.into());
}
Ok(None) => {
break 'event_loop;
}
Ok(Some(events)) => {
for e in events {
let id = self.push_event(e.clone())?;
logger::queueadd_push_event(&e, &id);
}
}
}
}
Ok(())
}
fn push_event(&self, e: CiEvent) -> Result<QueueId, AdderError> {
let id = self.db.push_queued_ci_event(e)?;
self.events_tx.notify().map_err(|_| AdderError::Send)?;
Ok(id)
}
}
impl Worker for QueueAdder {
const NAME: &str = "queue-adder";
type Error = AdderError;
fn work(&mut self) -> Result<(), Self::Error> {
self.add_events()
}
}
#[derive(Debug, thiserror::Error)]
pub enum AdderError {
#[error("programming error: QueueAdderBuilder field {0} was not set")]
Missing(&'static str),
#[error(transparent)]
Profile(#[from] radicle::profile::Error),
#[error(transparent)]
CiEvent(#[from] CiEventSourceError),
#[error(transparent)]
Db(#[from] DbError),
#[error("failed to notify other thread about database change")]
Send,
}