1#![allow(clippy::result_large_err)]
4
5use std::{
6 collections::{HashMap, HashSet},
7 sync::{
8 Arc, Mutex,
9 mpsc::{Receiver, RecvTimeoutError, Sender, channel},
10 },
11 thread::{sleep, spawn},
12 time::{Duration, Instant},
13};
14
15use libc::{SIGKILL, kill};
16use radicle::{Profile, identity::RepoId};
17
18use crate::{
19 adapter::{Adapter, Adapters},
20 broker::{Broker, BrokerError},
21 ci_event::{CiEvent, CiEventV1},
22 cob::KnownJobCobs,
23 db::{Db, DbError, QueuedCiEvent},
24 filter::{EventFilter, Trigger},
25 logger,
26 msg::{MessageError, RequestBuilder, RunId},
27 notif::{NotificationReceiver, NotificationSender},
28 worker::Worker,
29};
30
31const SLEEP_WHEN_BUSY: Duration = Duration::from_secs(1);
32
33#[derive(Default)]
34pub struct QueueProcessorBuilder {
35 db: Option<Db>,
36 broker: Option<Broker>,
37 filters: Option<Vec<EventFilter>>,
38 triggers: Option<Vec<Trigger>>,
39 adapters: Option<Adapters>,
40 events_rx: Option<NotificationReceiver>,
41 run_tx: Option<NotificationSender>,
42 queue_len_interval: Option<Duration>,
43 concurrent_adapters: Option<usize>,
44 update_known_job_cobs: bool,
45}
46
47const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);
48
49impl QueueProcessorBuilder {
50 pub fn build(self) -> Result<QueueProcessor, QueueError> {
51 let profile = Profile::load().map_err(QueueError::profile)?;
52 let broker = self.broker.ok_or(QueueError::Missing("broker"))?;
53 let filters = self.filters.ok_or(QueueError::Missing("filters"))?;
54 let triggers = self.triggers.ok_or(QueueError::Missing("triggers"))?;
55 let adapters = self.adapters.ok_or(QueueError::Missing("adapters"))?;
56 let run_tx = self.run_tx.ok_or(QueueError::Missing("run_tx"))?;
57 let concurrent_adapters = self
58 .concurrent_adapters
59 .ok_or(QueueError::Missing("concurrent_adapters"))?;
60 let (child_pid_tx, child_pid_rx) = channel();
61
62 let known_job_cobs = if self.update_known_job_cobs {
63 eprintln!("XXX update job COBs");
64 KnownJobCobs::updater().map_err(QueueError::KnownJobCobs)?
65 } else {
66 eprintln!("XXX do NOT update job COBs");
67 KnownJobCobs::NoUpdates
68 };
69
70 Ok(QueueProcessor {
71 profile,
72 broker,
73 filters,
74 triggers,
75 adapters,
76 db: self.db.ok_or(QueueError::Missing("db"))?,
77 events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
78 queue_len_interval: self
79 .queue_len_interval
80 .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
81 prev_queue_len: Instant::now(),
82 concurrent_adapters,
83 run_tx,
84 current: CurrentlyPicked::default(),
85 child_pid_tx,
86 child_pid_rx,
87 known_job_cobs: Arc::new(Mutex::new(known_job_cobs)),
88 })
89 }
90
91 pub fn events_rx(mut self, rx: NotificationReceiver) -> Self {
92 self.events_rx = Some(rx);
93 self
94 }
95
96 pub fn run_tx(mut self, tx: NotificationSender) -> Self {
97 self.run_tx = Some(tx);
98 self
99 }
100
101 pub fn db(mut self, db: Db) -> Self {
102 self.db = Some(db);
103 self
104 }
105
106 pub fn queue_len_interval(mut self, interval: Duration) -> Self {
107 self.queue_len_interval = Some(interval);
108 self
109 }
110
111 pub fn concurrent_adapters(mut self, n: usize) -> Self {
112 self.concurrent_adapters = Some(n);
113 self
114 }
115
116 pub fn broker(mut self, broker: Broker) -> Self {
117 self.broker = Some(broker);
118 self
119 }
120
121 pub fn filters(mut self, filters: &[EventFilter]) -> Self {
122 self.filters = Some(filters.to_vec());
123 self
124 }
125
126 pub fn triggers(mut self, triggers: &[Trigger]) -> Self {
127 self.triggers = Some(triggers.to_vec());
128 self
129 }
130
131 pub fn adapters(mut self, adapters: &Adapters) -> Self {
132 self.adapters = Some(adapters.clone());
133 self
134 }
135
136 pub fn update_job_cobs(mut self, value: bool) -> Self {
137 self.update_known_job_cobs = value;
138 self
139 }
140}
141
142pub struct QueueProcessor {
151 profile: Profile,
152 db: Db,
153 broker: Broker,
154 filters: Vec<EventFilter>,
155 triggers: Vec<Trigger>,
156 adapters: Adapters,
157 concurrent_adapters: usize,
158 events_rx: NotificationReceiver,
159 queue_len_interval: Duration,
160 prev_queue_len: Instant,
161 run_tx: NotificationSender,
162 current: CurrentlyPicked,
163 child_pid_tx: Sender<ChildInfo>,
164 child_pid_rx: Receiver<ChildInfo>,
165 known_job_cobs: Arc<Mutex<KnownJobCobs>>,
166}
167
168impl QueueProcessor {
169 fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
170 let mut expecting_new_events = true;
171 let mut handles = vec![];
172 let mut children: HashMap<RunId, u32> = HashMap::new();
173
174 loop {
175 let mut queue = Queue::load(&self.db)?;
176
177 while let Some(qe) = self.pick_special_event(&queue) {
179 queue.remove(&qe);
180 self.drop_event(&qe)?;
181 match qe.event() {
182 CiEvent::V1(CiEventV1::Shutdown) => {
183 logger::queueproc_action_shutdown();
184 expecting_new_events = false;
185 }
186 CiEvent::V1(CiEventV1::Terminate(run_id)) => {
187 if let Some(pid) = children.get(run_id)
188 && let Ok(pid) = i32::try_from(*pid)
189 {
190 logger::queueproc_action_terminate(run_id);
191 unsafe {
192 kill(-pid, SIGKILL);
193 }
194 }
195 }
196
197 _ => (),
198 };
199 }
200
201 if handles.len() < self.concurrent_adapters
204 && let Some(qe) = self.pick_event(&queue)
205 {
206 queue.remove(&qe);
211 self.drop_event(&qe)?;
212
213 match self.matching_adapters(qe.event()) {
214 Ok(Some(adapters)) => {
215 let p = self.processor()?;
216 let repoid = qe.event().repository().copied();
217 self.current.insert(qe.event().repository());
218 let known = self.known_job_cobs.clone();
219 let h = spawn(move || p.pick_and_process_one(qe, adapters, known));
220 handles.push((repoid, h));
221 }
222 Ok(None) => {}
223 Err(_) => {}
224 }
225 }
226
227 while let Ok(child_info) = self.child_pid_rx.try_recv() {
229 children.insert(child_info.run_id().clone(), child_info.pid());
230 }
231
232 let mut h2 = vec![];
235 for (repoid, h) in handles {
236 if h.is_finished() {
237 logger::queueproc_finished_run(&repoid);
238 if let Some(repoid) = repoid {
239 self.current.remove(repoid);
240 }
241 if h.join().is_err() {
242 logger::queueproc_thread_join();
243 }
244 } else {
245 h2.push((repoid, h));
246 }
247 }
248 handles = h2;
249
250 if expecting_new_events && queue.is_empty() {
254 match self.events_rx.wait_for_notification() {
255 Ok(_) => {}
256 Err(RecvTimeoutError::Timeout) => {}
257 Err(RecvTimeoutError::Disconnected) => {
258 logger::queueproc_channel_disconnect();
259 expecting_new_events = false;
260 }
261 }
262 } else if handles.len() >= self.concurrent_adapters {
263 sleep(SLEEP_WHEN_BUSY);
266 }
267
268 if handles.is_empty() && !expecting_new_events && queue.is_empty() {
269 break;
270 }
271 }
272
273 Ok(())
274 }
275
276 fn processor(&self) -> Result<Processor, QueueError> {
277 Ok(Processor {
278 profile: self.profile.clone(),
279 broker: Broker::new(self.db.filename(), self.broker.max_run_time())
280 .map_err(QueueError::NewBroker)?,
281 run_tx: self.run_tx.clone(),
282 child_pid_tx: self.child_pid_tx.clone(),
283 })
284 }
285
286 fn pick_special_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
287 for qe in queue.iter() {
288 match qe.event() {
289 CiEvent::V1(CiEventV1::Shutdown) | CiEvent::V1(CiEventV1::Terminate(_)) => {
290 return Some(qe.clone());
291 }
292 _ => (),
293 }
294 }
295 None
296 }
297
298 fn pick_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
299 let elapsed = self.prev_queue_len.elapsed();
300 if elapsed > self.queue_len_interval {
301 logger::queueproc_queue_length(queue.len());
302 self.prev_queue_len = Instant::now();
303 }
304
305 let mut q: Vec<&QueuedCiEvent> = queue.iter().collect();
306 q.sort_by_cached_key(|qe| qe.timestamp().to_string());
307
308 let current_repos = self.current.list();
310 q = q
311 .iter()
312 .filter(|qe| {
313 if let Some(repoid) = qe.event().repository() {
314 !current_repos.contains(repoid)
315 } else {
316 true
317 }
318 })
319 .cloned()
320 .collect();
321
322 if let Some(qe) = q.first() {
323 logger::queueproc_picked_event(qe.id(), qe);
324 Some((*qe).clone())
325 } else {
326 None
327 }
328 }
329
330 fn drop_event(&mut self, qe: &QueuedCiEvent) -> Result<(), QueueError> {
331 logger::queueproc_remove_event(qe);
332 self.db
333 .remove_queued_ci_event(qe.id())
334 .map_err(QueueError::db)
335 }
336
337 fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
338 let mut adapters = vec![];
339
340 if self.filters.iter().any(|filter| filter.allows(e)) {
341 if let Some(default) = self.adapters.default_adapter() {
342 adapters.push(default.clone());
343 } else {
344 return Err(QueueError::NoDefaultAdapter);
345 }
346 }
347
348 for trigger in self.triggers.iter() {
349 if trigger.allows(e) {
350 let name = trigger.adapter().to_string();
351 let adapter = self
352 .adapters
353 .get(&name)
354 .ok_or(QueueError::UnknownAdapter(name))?;
355 adapters.push(adapter.clone());
356 }
357 }
358
359 if adapters.is_empty() {
360 Ok(None)
361 } else {
362 Ok(Some(adapters))
363 }
364 }
365}
366
367impl Worker for QueueProcessor {
368 const NAME: &str = "queue-processor";
369 type Error = QueueError;
370 fn work(&mut self) -> Result<(), QueueError> {
371 let result = self.process_until_shutdown();
374
375 logger::queueproc_end(&result);
376
377 Ok(())
378 }
379}
380
381struct Queue {
382 queue: Vec<QueuedCiEvent>,
383}
384
385impl Queue {
386 fn load(db: &Db) -> Result<Self, QueueError> {
387 let ids = db.queued_ci_events().map_err(QueueError::db)?;
388 let mut queue = vec![];
389 for id in ids {
390 if let Some(qe) = db.get_queued_ci_event(&id).map_err(QueueError::db)? {
391 queue.push(qe);
392 }
393 }
394 Ok(Self { queue })
395 }
396
397 fn is_empty(&self) -> bool {
398 self.queue.is_empty()
399 }
400
401 fn len(&self) -> usize {
402 self.queue.len()
403 }
404
405 fn remove(&mut self, unwanted: &QueuedCiEvent) {
406 for (i, qe) in self.queue.iter().enumerate() {
407 if qe.id() == unwanted.id() {
408 self.queue.remove(i);
409 return;
410 }
411 }
412 }
413
414 fn iter(&self) -> impl Iterator<Item = &QueuedCiEvent> {
415 self.queue.iter()
416 }
417}
418
419struct Processor {
420 profile: Profile,
421 broker: Broker,
422 run_tx: NotificationSender,
423 child_pid_tx: Sender<ChildInfo>,
424}
425
426impl Processor {
427 fn pick_and_process_one(
428 &self,
429 qe: QueuedCiEvent,
430 adapters: Vec<Adapter>,
431 known_job_cobs: Arc<Mutex<KnownJobCobs>>,
432 ) -> Result<MaybeShutdown, QueueError> {
433 for adapter in adapters.iter() {
434 self.run_tx.notify()?;
435 logger::queueproc_processing_event(qe.event(), adapter);
436 match qe.event() {
437 CiEvent::V1(CiEventV1::Shutdown) => (),
438 CiEvent::V1(CiEventV1::Terminate(_)) => (),
439 _ => {
440 logger::queueproc_action_run(qe.event());
441
442 let trigger = RequestBuilder::default()
443 .profile(&self.profile)
444 .ci_event(qe.event())
445 .build_trigger_from_ci_event()
446 .map_err(|e| QueueError::build_trigger(qe.event(), e));
447 logger::queueproc_trigger(&trigger);
448 let trigger = trigger?;
449
450 self.broker
451 .execute_ci(
452 adapter,
453 &trigger,
454 &self.run_tx,
455 self.child_pid_tx.clone(),
456 known_job_cobs.clone(),
457 )
458 .map_err(QueueError::execute_ci)?;
459 }
460 }
461 }
462
463 Ok(MaybeShutdown::Continue)
464 }
465}
466
467#[derive(Default, Clone)]
468struct CurrentlyPicked {
469 set: Arc<Mutex<HashSet<RepoId>>>,
470}
471
472impl CurrentlyPicked {
473 fn insert(&mut self, repoid: Option<&RepoId>) {
474 if let Some(repoid) = repoid
475 && let Ok(mut set) = self.set.lock()
476 {
477 set.insert(*repoid);
478 }
479 }
480
481 fn remove(&mut self, repoid: RepoId) {
482 if let Ok(mut set) = self.set.lock() {
483 set.remove(&repoid);
484 }
485 }
486
487 fn list(&self) -> Vec<RepoId> {
488 if let Ok(set) = self.set.lock() {
489 set.iter().copied().collect()
490 } else {
491 vec![]
492 }
493 }
494}
495
496#[derive(Debug, Clone, Eq, PartialEq)]
497pub enum MaybeShutdown {
498 Shutdown,
499 Terminate(RunId),
500 Continue,
501}
502
503pub struct AdapterProcess {
504 run_id: RunId,
505 pid: u32,
506}
507
508impl AdapterProcess {
509 pub fn new(run_id: RunId, pid: u32) -> Self {
510 Self { run_id, pid }
511 }
512
513 pub fn run_id(&self) -> &RunId {
514 &self.run_id
515 }
516
517 pub fn pid(&self) -> u32 {
518 self.pid
519 }
520}
521
522#[derive(Debug)]
523pub struct ChildInfo {
524 run_id: RunId,
525 pid: u32,
526}
527
528impl ChildInfo {
529 pub fn new(run_id: RunId, pid: u32) -> Self {
530 Self { run_id, pid }
531 }
532
533 pub fn run_id(&self) -> &RunId {
534 &self.run_id
535 }
536
537 pub fn pid(&self) -> u32 {
538 self.pid
539 }
540}
541
542#[derive(Debug, thiserror::Error)]
543pub enum QueueError {
544 #[error("failed to create cache of job COBs")]
545 KnownJobCobs(#[source] crate::cob::JobError),
546
547 #[error("failed to load node profile")]
548 Profile(#[source] Box<dyn std::error::Error + Send + 'static>),
549
550 #[error("failed to open database")]
551 OpenDb(#[source] crate::db::DbError),
552
553 #[error("programming error: QueueProcessorBuilder field {0} was not set")]
554 Missing(&'static str),
555
556 #[error("failed to use SQLite database")]
557 Db(#[source] DbError),
558
559 #[error("failed to create a trigger message from broker event {0:?}")]
560 BuildTrigger(CiEvent, #[source] MessageError),
561
562 #[error("failed to run CI")]
563 ExecuteCi(#[source] BrokerError),
564
565 #[error(transparent)]
566 NotifyRun(#[from] crate::notif::NotificationError),
567
568 #[error("trigger refers to unknown adapter {0}")]
569 UnknownAdapter(String),
570
571 #[error("no default adapter specified in configuration")]
572 NoDefaultAdapter,
573
574 #[error("failed to send to channel for picked events")]
575 SendPicked,
576
577 #[error("failed to receive from channel for picked events")]
578 RecvPicked,
579
580 #[error("failed to send to channel for results of processed events")]
581 SendProcessResult,
582
583 #[error("failed to receive from channel for results of processed events")]
584 RecvProcessResult,
585
586 #[error("failed to wait for thread to process events to finish")]
587 JoinEventProcessorThread,
588
589 #[error("failed to wait for thread to run adapters to finish")]
590 JoinAdapterThread,
591
592 #[error("failed to wait for thread to process results from adapters to finish")]
593 JoinResultThread,
594
595 #[error("failed to create a new broker instance")]
596 NewBroker(#[source] BrokerError),
597}
598
599impl QueueError {
600 fn db(e: DbError) -> Self {
601 Self::Db(e)
602 }
603
604 fn build_trigger(event: &CiEvent, err: MessageError) -> Self {
605 Self::BuildTrigger(event.clone(), err)
606 }
607
608 fn execute_ci(e: BrokerError) -> Self {
609 Self::ExecuteCi(e)
610 }
611
612 fn profile(err: radicle::profile::Error) -> Self {
613 Self::Profile(Box::new(err))
614 }
615}