#![allow(clippy::result_large_err)]
use std::{
collections::{HashMap, HashSet},
sync::{
Arc, Mutex,
mpsc::{Receiver, RecvTimeoutError, Sender, channel},
},
thread::{sleep, spawn},
time::{Duration, Instant},
};
use libc::{SIGKILL, kill};
use radicle::{Profile, identity::RepoId};
use crate::{
adapter::{Adapter, Adapters},
broker::{Broker, BrokerError},
ci_event::{CiEvent, CiEventV1},
cob::KnownJobCobs,
db::{Db, DbError, QueuedCiEvent},
filter::{EventFilter, Trigger},
logger,
msg::{MessageError, RequestBuilder, RunId},
notif::{NotificationReceiver, NotificationSender},
worker::Worker,
};
const SLEEP_WHEN_BUSY: Duration = Duration::from_secs(1);
#[derive(Default)]
pub struct QueueProcessorBuilder {
db: Option<Db>,
broker: Option<Broker>,
filters: Option<Vec<EventFilter>>,
triggers: Option<Vec<Trigger>>,
adapters: Option<Adapters>,
events_rx: Option<NotificationReceiver>,
run_tx: Option<NotificationSender>,
queue_len_interval: Option<Duration>,
concurrent_adapters: Option<usize>,
}
const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);
impl QueueProcessorBuilder {
pub fn build(self) -> Result<QueueProcessor, QueueError> {
let profile = Profile::load().map_err(QueueError::Profile)?;
let broker = self.broker.ok_or(QueueError::Missing("broker"))?;
let filters = self.filters.ok_or(QueueError::Missing("filters"))?;
let triggers = self.triggers.ok_or(QueueError::Missing("triggers"))?;
let adapters = self.adapters.ok_or(QueueError::Missing("adapters"))?;
let run_tx = self.run_tx.ok_or(QueueError::Missing("run_tx"))?;
let concurrent_adapters = self
.concurrent_adapters
.ok_or(QueueError::Missing("concurrent_adapters"))?;
let (child_pid_tx, child_pid_rx) = channel();
Ok(QueueProcessor {
profile,
broker,
filters,
triggers,
adapters,
db: self.db.ok_or(QueueError::Missing("db"))?,
events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
queue_len_interval: self
.queue_len_interval
.unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
prev_queue_len: Instant::now(),
concurrent_adapters,
run_tx,
current: CurrentlyPicked::default(),
child_pid_tx,
child_pid_rx,
known_job_cobs: Arc::new(Mutex::new(
KnownJobCobs::new().map_err(QueueError::KnownJobCobs)?,
)),
})
}
pub fn events_rx(mut self, rx: NotificationReceiver) -> Self {
self.events_rx = Some(rx);
self
}
pub fn run_tx(mut self, tx: NotificationSender) -> Self {
self.run_tx = Some(tx);
self
}
pub fn db(mut self, db: Db) -> Self {
self.db = Some(db);
self
}
pub fn queue_len_interval(mut self, interval: Duration) -> Self {
self.queue_len_interval = Some(interval);
self
}
pub fn concurrent_adapters(mut self, n: usize) -> Self {
self.concurrent_adapters = Some(n);
self
}
pub fn broker(mut self, broker: Broker) -> Self {
self.broker = Some(broker);
self
}
pub fn filters(mut self, filters: &[EventFilter]) -> Self {
self.filters = Some(filters.to_vec());
self
}
pub fn triggers(mut self, triggers: &[Trigger]) -> Self {
self.triggers = Some(triggers.to_vec());
self
}
pub fn adapters(mut self, adapters: &Adapters) -> Self {
self.adapters = Some(adapters.clone());
self
}
}
pub struct QueueProcessor {
profile: Profile,
db: Db,
broker: Broker,
filters: Vec<EventFilter>,
triggers: Vec<Trigger>,
adapters: Adapters,
concurrent_adapters: usize,
events_rx: NotificationReceiver,
queue_len_interval: Duration,
prev_queue_len: Instant,
run_tx: NotificationSender,
current: CurrentlyPicked,
child_pid_tx: Sender<ChildInfo>,
child_pid_rx: Receiver<ChildInfo>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
}
impl QueueProcessor {
fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
let mut expecting_new_events = true;
let mut handles = vec![];
let mut children: HashMap<RunId, u32> = HashMap::new();
loop {
let mut queue = Queue::load(&self.db)?;
while let Some(qe) = self.pick_special_event(&queue) {
queue.remove(&qe);
self.drop_event(&qe)?;
match qe.event() {
CiEvent::V1(CiEventV1::Shutdown) => {
logger::queueproc_action_shutdown();
expecting_new_events = false;
}
CiEvent::V1(CiEventV1::Terminate(run_id)) => {
if let Some(pid) = children.get(run_id) {
if let Ok(pid) = i32::try_from(*pid) {
logger::queueproc_action_terminate(run_id);
unsafe {
kill(-pid, SIGKILL);
}
}
}
}
_ => (),
};
}
if handles.len() < self.concurrent_adapters {
if let Some(qe) = self.pick_event(&queue) {
queue.remove(&qe);
self.drop_event(&qe)?;
match self.matching_adapters(qe.event()) {
Ok(Some(adapters)) => {
let p = self.processor()?;
let repoid = qe.event().repository().copied();
self.current.insert(qe.event().repository());
let known = self.known_job_cobs.clone();
let h = spawn(move || p.pick_and_process_one(qe, adapters, known));
handles.push((repoid, h));
}
Ok(None) => {}
Err(_) => {}
}
}
}
while let Ok(child_info) = self.child_pid_rx.try_recv() {
children.insert(child_info.run_id().clone(), child_info.pid());
}
let mut h2 = vec![];
for (repoid, h) in handles {
if h.is_finished() {
logger::queueproc_finished_run(&repoid);
if let Some(repoid) = repoid {
self.current.remove(repoid);
}
if h.join().is_err() {
logger::queueproc_thread_join();
}
} else {
h2.push((repoid, h));
}
}
handles = h2;
if expecting_new_events && queue.is_empty() {
match self.events_rx.wait_for_notification() {
Ok(_) => {}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
logger::queueproc_channel_disconnect();
expecting_new_events = false;
}
}
} else if handles.len() >= self.concurrent_adapters {
sleep(SLEEP_WHEN_BUSY);
}
if handles.is_empty() && !expecting_new_events && queue.is_empty() {
break;
}
}
Ok(())
}
fn processor(&self) -> Result<Processor, QueueError> {
Ok(Processor {
profile: self.profile.clone(),
broker: Broker::new(self.db.filename(), self.broker.max_run_time())
.map_err(QueueError::NewBroker)?,
run_tx: self.run_tx.clone(),
child_pid_tx: self.child_pid_tx.clone(),
})
}
fn pick_special_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
for qe in queue.iter() {
match qe.event() {
CiEvent::V1(CiEventV1::Shutdown) | CiEvent::V1(CiEventV1::Terminate(_)) => {
return Some(qe.clone());
}
_ => (),
}
}
None
}
fn pick_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
let elapsed = self.prev_queue_len.elapsed();
if elapsed > self.queue_len_interval {
logger::queueproc_queue_length(queue.len());
self.prev_queue_len = Instant::now();
}
let mut q: Vec<&QueuedCiEvent> = queue.iter().collect();
q.sort_by_cached_key(|qe| qe.timestamp().to_string());
let current_repos = self.current.list();
q = q
.iter()
.filter(|qe| {
if let Some(repoid) = qe.event().repository() {
!current_repos.contains(repoid)
} else {
true
}
})
.cloned()
.collect();
if let Some(qe) = q.first() {
logger::queueproc_picked_event(qe.id(), qe);
Some((*qe).clone())
} else {
None
}
}
fn drop_event(&mut self, qe: &QueuedCiEvent) -> Result<(), QueueError> {
logger::queueproc_remove_event(qe);
self.db
.remove_queued_ci_event(qe.id())
.map_err(QueueError::db)
}
fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
let mut adapters = vec![];
if self.filters.iter().any(|filter| filter.allows(e)) {
if let Some(default) = self.adapters.default_adapter() {
adapters.push(default.clone());
} else {
return Err(QueueError::NoDefaultAdapter);
}
}
for trigger in self.triggers.iter() {
if trigger.allows(e) {
let name = trigger.adapter().to_string();
let adapter = self
.adapters
.get(&name)
.ok_or(QueueError::UnknownAdapter(name))?;
adapters.push(adapter.clone());
}
}
if adapters.is_empty() {
Ok(None)
} else {
Ok(Some(adapters))
}
}
}
impl Worker for QueueProcessor {
const NAME: &str = "queue-processor";
type Error = QueueError;
fn work(&mut self) -> Result<(), QueueError> {
let result = self.process_until_shutdown();
logger::queueproc_end(&result);
Ok(())
}
}
struct Queue {
queue: Vec<QueuedCiEvent>,
}
impl Queue {
fn load(db: &Db) -> Result<Self, QueueError> {
let ids = db.queued_ci_events().map_err(QueueError::db)?;
let mut queue = vec![];
for id in ids {
if let Some(qe) = db.get_queued_ci_event(&id).map_err(QueueError::db)? {
queue.push(qe);
}
}
Ok(Self { queue })
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn len(&self) -> usize {
self.queue.len()
}
fn remove(&mut self, unwanted: &QueuedCiEvent) {
for (i, qe) in self.queue.iter().enumerate() {
if qe.id() == unwanted.id() {
self.queue.remove(i);
return;
}
}
}
fn iter(&self) -> impl Iterator<Item = &QueuedCiEvent> {
self.queue.iter()
}
}
struct Processor {
profile: Profile,
broker: Broker,
run_tx: NotificationSender,
child_pid_tx: Sender<ChildInfo>,
}
impl Processor {
fn pick_and_process_one(
&self,
qe: QueuedCiEvent,
adapters: Vec<Adapter>,
known_job_cobs: Arc<Mutex<KnownJobCobs>>,
) -> Result<MaybeShutdown, QueueError> {
for adapter in adapters.iter() {
self.run_tx.notify()?;
logger::queueproc_processing_event(qe.event(), adapter);
match qe.event() {
CiEvent::V1(CiEventV1::Shutdown) => (),
CiEvent::V1(CiEventV1::Terminate(_)) => (),
_ => {
logger::queueproc_action_run(qe.event());
let trigger = RequestBuilder::default()
.profile(&self.profile)
.ci_event(qe.event())
.build_trigger_from_ci_event()
.map_err(|e| QueueError::build_trigger(qe.event(), e));
logger::queueproc_trigger(&trigger);
let trigger = trigger?;
self.broker
.execute_ci(
adapter,
&trigger,
&self.run_tx,
self.child_pid_tx.clone(),
known_job_cobs.clone(),
)
.map_err(QueueError::execute_ci)?;
}
}
}
Ok(MaybeShutdown::Continue)
}
}
#[derive(Default, Clone)]
struct CurrentlyPicked {
set: Arc<Mutex<HashSet<RepoId>>>,
}
impl CurrentlyPicked {
fn insert(&mut self, repoid: Option<&RepoId>) {
if let Some(repoid) = repoid {
if let Ok(mut set) = self.set.lock() {
set.insert(*repoid);
}
}
}
fn remove(&mut self, repoid: RepoId) {
if let Ok(mut set) = self.set.lock() {
set.remove(&repoid);
}
}
fn list(&self) -> Vec<RepoId> {
if let Ok(set) = self.set.lock() {
set.iter().copied().collect()
} else {
vec![]
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MaybeShutdown {
Shutdown,
Terminate(RunId),
Continue,
}
pub struct AdapterProcess {
run_id: RunId,
pid: u32,
}
impl AdapterProcess {
pub fn new(run_id: RunId, pid: u32) -> Self {
Self { run_id, pid }
}
pub fn run_id(&self) -> &RunId {
&self.run_id
}
pub fn pid(&self) -> u32 {
self.pid
}
}
#[derive(Debug)]
pub struct ChildInfo {
run_id: RunId,
pid: u32,
}
impl ChildInfo {
pub fn new(run_id: RunId, pid: u32) -> Self {
Self { run_id, pid }
}
pub fn run_id(&self) -> &RunId {
&self.run_id
}
pub fn pid(&self) -> u32 {
self.pid
}
}
#[derive(Debug, thiserror::Error)]
pub enum QueueError {
#[error("failed to create cache of job COBs")]
KnownJobCobs(#[source] crate::cob::JobError),
#[error("failed to load node profile")]
Profile(#[source] radicle::profile::Error),
#[error("failed to open database")]
OpenDb(#[source] crate::db::DbError),
#[error("programming error: QueueProcessorBuilder field {0} was not set")]
Missing(&'static str),
#[error("failed to use SQLite database")]
Db(#[source] DbError),
#[error("failed to create a trigger message from broker event {0:?}")]
BuildTrigger(CiEvent, #[source] MessageError),
#[error("failed to run CI")]
ExecuteCi(#[source] BrokerError),
#[error(transparent)]
NotifyRun(#[from] crate::notif::NotificationError),
#[error("trigger refers to unknown adapter {0}")]
UnknownAdapter(String),
#[error("no default adapter specified in configuration")]
NoDefaultAdapter,
#[error("failed to send to channel for picked events")]
SendPicked,
#[error("failed to receive from channel for picked events")]
RecvPicked,
#[error("failed to send to channel for results of processed events")]
SendProcessResult,
#[error("failed to receive from channel for results of processed events")]
RecvProcessResult,
#[error("failed to wait for thread to process events to finish")]
JoinEventProcessorThread,
#[error("failed to wait for thread to run adapters to finish")]
JoinAdapterThread,
#[error("failed to wait for thread to process results from adapters to finish")]
JoinResultThread,
#[error("failed to create a new broker instance")]
NewBroker(#[source] BrokerError),
}
impl QueueError {
fn db(e: DbError) -> Self {
Self::Db(e)
}
fn build_trigger(event: &CiEvent, err: MessageError) -> Self {
Self::BuildTrigger(event.clone(), err)
}
fn execute_ci(e: BrokerError) -> Self {
Self::ExecuteCi(e)
}
}