Skip to main content

radicle_ci_broker/
queueproc.rs

1//! Process events in the persistent event queue.
2
3#![allow(clippy::result_large_err)]
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{
8        Arc, Mutex,
9        mpsc::{Receiver, RecvTimeoutError, Sender, channel},
10    },
11    thread::{sleep, spawn},
12    time::{Duration, Instant},
13};
14
15use libc::{SIGKILL, kill};
16use radicle::{Profile, identity::RepoId};
17
18use crate::{
19    adapter::{Adapter, Adapters},
20    broker::{Broker, BrokerError},
21    ci_event::{CiEvent, CiEventV1},
22    cob::KnownJobCobs,
23    db::{Db, DbError, QueuedCiEvent},
24    filter::{EventFilter, Trigger},
25    logger,
26    msg::{MessageError, RequestBuilder, RunId},
27    notif::{NotificationReceiver, NotificationSender},
28    worker::Worker,
29};
30
31const SLEEP_WHEN_BUSY: Duration = Duration::from_secs(1);
32
33#[derive(Default)]
34pub struct QueueProcessorBuilder {
35    db: Option<Db>,
36    broker: Option<Broker>,
37    filters: Option<Vec<EventFilter>>,
38    triggers: Option<Vec<Trigger>>,
39    adapters: Option<Adapters>,
40    events_rx: Option<NotificationReceiver>,
41    run_tx: Option<NotificationSender>,
42    queue_len_interval: Option<Duration>,
43    concurrent_adapters: Option<usize>,
44    update_known_job_cobs: bool,
45}
46
47const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);
48
49impl QueueProcessorBuilder {
50    pub fn build(self) -> Result<QueueProcessor, QueueError> {
51        let profile = Profile::load().map_err(QueueError::profile)?;
52        let broker = self.broker.ok_or(QueueError::Missing("broker"))?;
53        let filters = self.filters.ok_or(QueueError::Missing("filters"))?;
54        let triggers = self.triggers.ok_or(QueueError::Missing("triggers"))?;
55        let adapters = self.adapters.ok_or(QueueError::Missing("adapters"))?;
56        let run_tx = self.run_tx.ok_or(QueueError::Missing("run_tx"))?;
57        let concurrent_adapters = self
58            .concurrent_adapters
59            .ok_or(QueueError::Missing("concurrent_adapters"))?;
60        let (child_pid_tx, child_pid_rx) = channel();
61
62        let known_job_cobs = if self.update_known_job_cobs {
63            eprintln!("XXX update job COBs");
64            KnownJobCobs::updater().map_err(QueueError::KnownJobCobs)?
65        } else {
66            eprintln!("XXX do NOT update job COBs");
67            KnownJobCobs::NoUpdates
68        };
69
70        Ok(QueueProcessor {
71            profile,
72            broker,
73            filters,
74            triggers,
75            adapters,
76            db: self.db.ok_or(QueueError::Missing("db"))?,
77            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
78            queue_len_interval: self
79                .queue_len_interval
80                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
81            prev_queue_len: Instant::now(),
82            concurrent_adapters,
83            run_tx,
84            current: CurrentlyPicked::default(),
85            child_pid_tx,
86            child_pid_rx,
87            known_job_cobs: Arc::new(Mutex::new(known_job_cobs)),
88        })
89    }
90
91    pub fn events_rx(mut self, rx: NotificationReceiver) -> Self {
92        self.events_rx = Some(rx);
93        self
94    }
95
96    pub fn run_tx(mut self, tx: NotificationSender) -> Self {
97        self.run_tx = Some(tx);
98        self
99    }
100
101    pub fn db(mut self, db: Db) -> Self {
102        self.db = Some(db);
103        self
104    }
105
106    pub fn queue_len_interval(mut self, interval: Duration) -> Self {
107        self.queue_len_interval = Some(interval);
108        self
109    }
110
111    pub fn concurrent_adapters(mut self, n: usize) -> Self {
112        self.concurrent_adapters = Some(n);
113        self
114    }
115
116    pub fn broker(mut self, broker: Broker) -> Self {
117        self.broker = Some(broker);
118        self
119    }
120
121    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
122        self.filters = Some(filters.to_vec());
123        self
124    }
125
126    pub fn triggers(mut self, triggers: &[Trigger]) -> Self {
127        self.triggers = Some(triggers.to_vec());
128        self
129    }
130
131    pub fn adapters(mut self, adapters: &Adapters) -> Self {
132        self.adapters = Some(adapters.clone());
133        self
134    }
135
136    pub fn update_job_cobs(mut self, value: bool) -> Self {
137        self.update_known_job_cobs = value;
138        self
139    }
140}
141
142// The queue processor gets events from the event queue in
143// the database, and processes them concurrently by running
144// the appropriate adapters. To avoid busy looping, the
145// `events_rx` channel is used to receive notification that
146// an event has been added to the database. Processing will
147// end when the channel is closed and the queue is empty,
148// or a "shutdown" event is encountered. In case of shutdown,
149// any currently running adapters will be allowed to finish.
150pub struct QueueProcessor {
151    profile: Profile,
152    db: Db,
153    broker: Broker,
154    filters: Vec<EventFilter>,
155    triggers: Vec<Trigger>,
156    adapters: Adapters,
157    concurrent_adapters: usize,
158    events_rx: NotificationReceiver,
159    queue_len_interval: Duration,
160    prev_queue_len: Instant,
161    run_tx: NotificationSender,
162    current: CurrentlyPicked,
163    child_pid_tx: Sender<ChildInfo>,
164    child_pid_rx: Receiver<ChildInfo>,
165    known_job_cobs: Arc<Mutex<KnownJobCobs>>,
166}
167
168impl QueueProcessor {
169    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
170        let mut expecting_new_events = true;
171        let mut handles = vec![];
172        let mut children: HashMap<RunId, u32> = HashMap::new();
173
174        loop {
175            let mut queue = Queue::load(&self.db)?;
176
177            // Process special events like `Shutdown` and `Terminate`.
178            while let Some(qe) = self.pick_special_event(&queue) {
179                queue.remove(&qe);
180                self.drop_event(&qe)?;
181                match qe.event() {
182                    CiEvent::V1(CiEventV1::Shutdown) => {
183                        logger::queueproc_action_shutdown();
184                        expecting_new_events = false;
185                    }
186                    CiEvent::V1(CiEventV1::Terminate(run_id)) => {
187                        if let Some(pid) = children.get(run_id)
188                            && let Ok(pid) = i32::try_from(*pid)
189                        {
190                            logger::queueproc_action_terminate(run_id);
191                            unsafe {
192                                kill(-pid, SIGKILL);
193                            }
194                        }
195                    }
196
197                    _ => (),
198                };
199            }
200
201            // If we may spawn another adapter, pick an event from the queue
202            // and run the adapters.
203            if handles.len() < self.concurrent_adapters
204                && let Some(qe) = self.pick_event(&queue)
205            {
206                // Remove picked event from queue so we don't re-process it. If we don't
207                // do this, and we crash when processing the event, we'll re-process it
208                // again and again. It seems better to discard an event, and skip running
209                // CI, rather then getting stuck on a specific event.
210                queue.remove(&qe);
211                self.drop_event(&qe)?;
212
213                match self.matching_adapters(qe.event()) {
214                    Ok(Some(adapters)) => {
215                        let p = self.processor()?;
216                        let repoid = qe.event().repository().copied();
217                        self.current.insert(qe.event().repository());
218                        let known = self.known_job_cobs.clone();
219                        let h = spawn(move || p.pick_and_process_one(qe, adapters, known));
220                        handles.push((repoid, h));
221                    }
222                    Ok(None) => {}
223                    Err(_) => {}
224                }
225            }
226
227            // Receive adapter process IDs.
228            while let Ok(child_info) = self.child_pid_rx.try_recv() {
229                children.insert(child_info.run_id().clone(), child_info.pid());
230            }
231
232            // Wait for any threads processing events that have finished. Remove
233            // them from the list of currently running threads.
234            let mut h2 = vec![];
235            for (repoid, h) in handles {
236                if h.is_finished() {
237                    logger::queueproc_finished_run(&repoid);
238                    if let Some(repoid) = repoid {
239                        self.current.remove(repoid);
240                    }
241                    if h.join().is_err() {
242                        logger::queueproc_thread_join();
243                    }
244                } else {
245                    h2.push((repoid, h));
246                }
247            }
248            handles = h2;
249
250            // If we didn't empty the event queue, but we're still
251            // expecting new events, wait for a new event. This prevents
252            // a busy loop.
253            if expecting_new_events && queue.is_empty() {
254                match self.events_rx.wait_for_notification() {
255                    Ok(_) => {}
256                    Err(RecvTimeoutError::Timeout) => {}
257                    Err(RecvTimeoutError::Disconnected) => {
258                        logger::queueproc_channel_disconnect();
259                        expecting_new_events = false;
260                    }
261                }
262            } else if handles.len() >= self.concurrent_adapters {
263                // Avoid a busy loop when as many adapters are running
264                // as we're allowed to run at once.
265                sleep(SLEEP_WHEN_BUSY);
266            }
267
268            if handles.is_empty() && !expecting_new_events && queue.is_empty() {
269                break;
270            }
271        }
272
273        Ok(())
274    }
275
276    fn processor(&self) -> Result<Processor, QueueError> {
277        Ok(Processor {
278            profile: self.profile.clone(),
279            broker: Broker::new(self.db.filename(), self.broker.max_run_time())
280                .map_err(QueueError::NewBroker)?,
281            run_tx: self.run_tx.clone(),
282            child_pid_tx: self.child_pid_tx.clone(),
283        })
284    }
285
286    fn pick_special_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
287        for qe in queue.iter() {
288            match qe.event() {
289                CiEvent::V1(CiEventV1::Shutdown) | CiEvent::V1(CiEventV1::Terminate(_)) => {
290                    return Some(qe.clone());
291                }
292                _ => (),
293            }
294        }
295        None
296    }
297
298    fn pick_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
299        let elapsed = self.prev_queue_len.elapsed();
300        if elapsed > self.queue_len_interval {
301            logger::queueproc_queue_length(queue.len());
302            self.prev_queue_len = Instant::now();
303        }
304
305        let mut q: Vec<&QueuedCiEvent> = queue.iter().collect();
306        q.sort_by_cached_key(|qe| qe.timestamp().to_string());
307
308        // Remove the repositories for which CI is currently running.
309        let current_repos = self.current.list();
310        q = q
311            .iter()
312            .filter(|qe| {
313                if let Some(repoid) = qe.event().repository() {
314                    !current_repos.contains(repoid)
315                } else {
316                    true
317                }
318            })
319            .cloned()
320            .collect();
321
322        if let Some(qe) = q.first() {
323            logger::queueproc_picked_event(qe.id(), qe);
324            Some((*qe).clone())
325        } else {
326            None
327        }
328    }
329
330    fn drop_event(&mut self, qe: &QueuedCiEvent) -> Result<(), QueueError> {
331        logger::queueproc_remove_event(qe);
332        self.db
333            .remove_queued_ci_event(qe.id())
334            .map_err(QueueError::db)
335    }
336
337    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
338        let mut adapters = vec![];
339
340        if self.filters.iter().any(|filter| filter.allows(e)) {
341            if let Some(default) = self.adapters.default_adapter() {
342                adapters.push(default.clone());
343            } else {
344                return Err(QueueError::NoDefaultAdapter);
345            }
346        }
347
348        for trigger in self.triggers.iter() {
349            if trigger.allows(e) {
350                let name = trigger.adapter().to_string();
351                let adapter = self
352                    .adapters
353                    .get(&name)
354                    .ok_or(QueueError::UnknownAdapter(name))?;
355                adapters.push(adapter.clone());
356            }
357        }
358
359        if adapters.is_empty() {
360            Ok(None)
361        } else {
362            Ok(Some(adapters))
363        }
364    }
365}
366
367impl Worker for QueueProcessor {
368    const NAME: &str = "queue-processor";
369    type Error = QueueError;
370    fn work(&mut self) -> Result<(), QueueError> {
371        // Pick events from queue, send to worker threads that run
372        // adapters. Results are processed by thread above.
373        let result = self.process_until_shutdown();
374
375        logger::queueproc_end(&result);
376
377        Ok(())
378    }
379}
380
381struct Queue {
382    queue: Vec<QueuedCiEvent>,
383}
384
385impl Queue {
386    fn load(db: &Db) -> Result<Self, QueueError> {
387        let ids = db.queued_ci_events().map_err(QueueError::db)?;
388        let mut queue = vec![];
389        for id in ids {
390            if let Some(qe) = db.get_queued_ci_event(&id).map_err(QueueError::db)? {
391                queue.push(qe);
392            }
393        }
394        Ok(Self { queue })
395    }
396
397    fn is_empty(&self) -> bool {
398        self.queue.is_empty()
399    }
400
401    fn len(&self) -> usize {
402        self.queue.len()
403    }
404
405    fn remove(&mut self, unwanted: &QueuedCiEvent) {
406        for (i, qe) in self.queue.iter().enumerate() {
407            if qe.id() == unwanted.id() {
408                self.queue.remove(i);
409                return;
410            }
411        }
412    }
413
414    fn iter(&self) -> impl Iterator<Item = &QueuedCiEvent> {
415        self.queue.iter()
416    }
417}
418
419struct Processor {
420    profile: Profile,
421    broker: Broker,
422    run_tx: NotificationSender,
423    child_pid_tx: Sender<ChildInfo>,
424}
425
426impl Processor {
427    fn pick_and_process_one(
428        &self,
429        qe: QueuedCiEvent,
430        adapters: Vec<Adapter>,
431        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
432    ) -> Result<MaybeShutdown, QueueError> {
433        for adapter in adapters.iter() {
434            self.run_tx.notify()?;
435            logger::queueproc_processing_event(qe.event(), adapter);
436            match qe.event() {
437                CiEvent::V1(CiEventV1::Shutdown) => (),
438                CiEvent::V1(CiEventV1::Terminate(_)) => (),
439                _ => {
440                    logger::queueproc_action_run(qe.event());
441
442                    let trigger = RequestBuilder::default()
443                        .profile(&self.profile)
444                        .ci_event(qe.event())
445                        .build_trigger_from_ci_event()
446                        .map_err(|e| QueueError::build_trigger(qe.event(), e));
447                    logger::queueproc_trigger(&trigger);
448                    let trigger = trigger?;
449
450                    self.broker
451                        .execute_ci(
452                            adapter,
453                            &trigger,
454                            &self.run_tx,
455                            self.child_pid_tx.clone(),
456                            known_job_cobs.clone(),
457                        )
458                        .map_err(QueueError::execute_ci)?;
459                }
460            }
461        }
462
463        Ok(MaybeShutdown::Continue)
464    }
465}
466
467#[derive(Default, Clone)]
468struct CurrentlyPicked {
469    set: Arc<Mutex<HashSet<RepoId>>>,
470}
471
472impl CurrentlyPicked {
473    fn insert(&mut self, repoid: Option<&RepoId>) {
474        if let Some(repoid) = repoid
475            && let Ok(mut set) = self.set.lock()
476        {
477            set.insert(*repoid);
478        }
479    }
480
481    fn remove(&mut self, repoid: RepoId) {
482        if let Ok(mut set) = self.set.lock() {
483            set.remove(&repoid);
484        }
485    }
486
487    fn list(&self) -> Vec<RepoId> {
488        if let Ok(set) = self.set.lock() {
489            set.iter().copied().collect()
490        } else {
491            vec![]
492        }
493    }
494}
495
496#[derive(Debug, Clone, Eq, PartialEq)]
497pub enum MaybeShutdown {
498    Shutdown,
499    Terminate(RunId),
500    Continue,
501}
502
503pub struct AdapterProcess {
504    run_id: RunId,
505    pid: u32,
506}
507
508impl AdapterProcess {
509    pub fn new(run_id: RunId, pid: u32) -> Self {
510        Self { run_id, pid }
511    }
512
513    pub fn run_id(&self) -> &RunId {
514        &self.run_id
515    }
516
517    pub fn pid(&self) -> u32 {
518        self.pid
519    }
520}
521
522#[derive(Debug)]
523pub struct ChildInfo {
524    run_id: RunId,
525    pid: u32,
526}
527
528impl ChildInfo {
529    pub fn new(run_id: RunId, pid: u32) -> Self {
530        Self { run_id, pid }
531    }
532
533    pub fn run_id(&self) -> &RunId {
534        &self.run_id
535    }
536
537    pub fn pid(&self) -> u32 {
538        self.pid
539    }
540}
541
542#[derive(Debug, thiserror::Error)]
543pub enum QueueError {
544    #[error("failed to create cache of job COBs")]
545    KnownJobCobs(#[source] crate::cob::JobError),
546
547    #[error("failed to load node profile")]
548    Profile(#[source] Box<dyn std::error::Error + Send + 'static>),
549
550    #[error("failed to open database")]
551    OpenDb(#[source] crate::db::DbError),
552
553    #[error("programming error: QueueProcessorBuilder field {0} was not set")]
554    Missing(&'static str),
555
556    #[error("failed to use SQLite database")]
557    Db(#[source] DbError),
558
559    #[error("failed to create a trigger message from broker event {0:?}")]
560    BuildTrigger(CiEvent, #[source] MessageError),
561
562    #[error("failed to run CI")]
563    ExecuteCi(#[source] BrokerError),
564
565    #[error(transparent)]
566    NotifyRun(#[from] crate::notif::NotificationError),
567
568    #[error("trigger refers to unknown adapter {0}")]
569    UnknownAdapter(String),
570
571    #[error("no default adapter specified in configuration")]
572    NoDefaultAdapter,
573
574    #[error("failed to send to channel for picked events")]
575    SendPicked,
576
577    #[error("failed to receive from channel for picked events")]
578    RecvPicked,
579
580    #[error("failed to send to channel for results of processed events")]
581    SendProcessResult,
582
583    #[error("failed to receive from channel for results of processed events")]
584    RecvProcessResult,
585
586    #[error("failed to wait for thread to process events to finish")]
587    JoinEventProcessorThread,
588
589    #[error("failed to wait for thread to run adapters to finish")]
590    JoinAdapterThread,
591
592    #[error("failed to wait for thread to process results from adapters to finish")]
593    JoinResultThread,
594
595    #[error("failed to create a new broker instance")]
596    NewBroker(#[source] BrokerError),
597}
598
599impl QueueError {
600    fn db(e: DbError) -> Self {
601        Self::Db(e)
602    }
603
604    fn build_trigger(event: &CiEvent, err: MessageError) -> Self {
605        Self::BuildTrigger(event.clone(), err)
606    }
607
608    fn execute_ci(e: BrokerError) -> Self {
609        Self::ExecuteCi(e)
610    }
611
612    fn profile(err: radicle::profile::Error) -> Self {
613        Self::Profile(Box::new(err))
614    }
615}