radicle_ci_broker/
pages.rs

1//! Status and report pages for CI broker.
2//!
3//! This module generates an HTML status page for the CI broker, as
4//! well as per-repository pages for any repository for which the CI
5//! broker has mediated to run CI. The status page gives the latest
6//! known status of the broker, plus lists the repositories that CI
7//! has run for. The per-repository pages lists all the runs for that
8//! repository.
9
10use std::{
11    collections::{HashMap, HashSet},
12    path::{Path, PathBuf},
13    sync::mpsc::RecvTimeoutError,
14};
15
16use html_page::{Element, HtmlPage, Tag};
17use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
18use serde::Serialize;
19use time::{macros::format_description, OffsetDateTime};
20
21use radicle::{
22    git::Oid,
23    prelude::RepoId,
24    storage::{ReadRepository, ReadStorage},
25    Profile,
26};
27
28use crate::{
29    ci_event::{CiEvent, CiEventV1},
30    db::{Db, DbError, QueuedCiEvent},
31    logger,
32    msg::{RunId, RunResult},
33    notif::NotificationReceiver,
34    run::{Run, RunState, Whence},
35    util::{parse_timestamp, rfc822_timestamp, safely_overwrite},
36    worker::Worker,
37};
38
39const MAX_RSS_ENTRIES: usize = 10;
40const BROKER_RSS: &str = "index.rss";
41const FAILURE_RSS: &str = "failed.rss";
42const UNFINISHED_RSS: &str = "unfinished.rss";
43const CSS: &str = include_str!("radicle-ci.css");
44const REFERESH_INTERVAL: &str = "60";
45const STATUS_JSON: &str = "status.json";
46
47/// All possible errors returned from the status page module.
48#[derive(Debug, thiserror::Error)]
49pub enum PageError {
50    /// Error formatting a time as a string.
51    #[error(transparent)]
52    Timeformat(#[from] time::error::Format),
53
54    #[error("failed to write status page to {0}")]
55    Write(PathBuf, #[source] crate::util::UtilError),
56
57    #[error("no node alias has been set for builder")]
58    NoAlias,
59
60    #[error("no status data has been set for builder")]
61    NoStatusData,
62
63    #[error(transparent)]
64    Db(#[from] DbError),
65
66    #[error("failed to lock page data structure")]
67    Lock(&'static str),
68
69    #[error("failed to represent status page data as JSON")]
70    StatusToJson(#[source] serde_json::Error),
71
72    #[error("failed to create RSS time stamp from CI run timestamp: {0}")]
73    RssTimestamp(String, #[source] crate::util::UtilError),
74}
75
76fn now() -> Result<String, time::error::Format> {
77    let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
78    OffsetDateTime::now_utc().format(fmt)
79}
80
81struct PageData {
82    timestamp: String,
83    ci_broker_version: &'static str,
84    ci_broker_git_commit: &'static str,
85    node_alias: String,
86    runs: HashMap<RunId, Run>,
87    events: Vec<QueuedCiEvent>,
88    broker_event_counter: usize,
89    latest_broker_event: Option<CiEvent>,
90    latest_ci_run: Option<Run>,
91}
92
93impl PageData {
94    fn status_page_as_json(&self) -> Result<String, PageError> {
95        StatusData::from(self).as_json()
96    }
97
98    fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
99        let mut doc = HtmlPage::default();
100
101        let title = format!("CI for Radicle node {}", self.node_alias);
102        Self::head(&mut doc, &title);
103
104        doc.push_to_body(Element::new(Tag::H1).with_text(&title));
105
106        doc.push_to_body(
107            Element::new(Tag::P)
108                .with_text("RSS feeds: ")
109                .with_child(
110                    Element::new(Tag::A)
111                        .with_text("all")
112                        .with_attribute("href", BROKER_RSS),
113                )
114                .with_text(" ")
115                .with_child(
116                    Element::new(Tag::A)
117                        .with_text("failed")
118                        .with_attribute("href", FAILURE_RSS),
119                ),
120        );
121
122        Self::h1(&mut doc, "Broker status");
123        doc.push_to_body(
124            Element::new(Tag::P)
125                .with_text("Last updated: ")
126                .with_text(&self.timestamp)
127                .with_child(Element::new(Tag::Br))
128                .with_text("CI broker version: ")
129                .with_text(self.ci_broker_version)
130                .with_text(" (commit ")
131                .with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
132                .with_text(")"),
133        );
134
135        Self::h1(&mut doc, "Repositories");
136        Self::p_text(&mut doc, "Latest CI run for each repository.");
137
138        let total = self.runs.len();
139        let failed = self
140            .runs
141            .values()
142            .filter(|run| run.result() == Some(&RunResult::Failure))
143            .count();
144        Self::p_text(
145            &mut doc,
146            &format!("Total {total} CI runs recorded, of which {failed} failed."),
147        );
148
149        let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
150            Element::new(Tag::Tr)
151                .with_child(Element::new(Tag::Th).with_text("Repository"))
152                .with_child(Element::new(Tag::Th).with_text("Run ID"))
153                .with_child(Element::new(Tag::Th).with_text("Status"))
154                .with_child(Element::new(Tag::Th).with_text("Info")),
155        );
156
157        for (alias, rid) in self.repos() {
158            let (run_ids, status, info_url) = {
159                let run = self.latest_run(rid);
160                match run {
161                    Some(run) => (
162                        Self::run_ids(Some(run)),
163                        Self::run_state(run),
164                        Self::info_url(Some(run)),
165                    ),
166                    None => (
167                        Self::run_ids(None),
168                        Self::run_state_unknown(),
169                        Self::info_url(None),
170                    ),
171                }
172            };
173
174            let runs = self.runs(rid);
175
176            table.push_child(
177                Element::new(Tag::Tr)
178                    .with_child(
179                        Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
180                    )
181                    .with_child(Element::new(Tag::Td).with_child(run_ids))
182                    .with_child(
183                        Element::new(Tag::Td).with_child(
184                            Element::new(Tag::Span)
185                                .with_class("run-status")
186                                .with_child(status),
187                        ),
188                    )
189                    .with_child(Element::new(Tag::Td).with_child(info_url)),
190            );
191        }
192        doc.push_to_body(table);
193
194        Self::h1(&mut doc, "Event queue");
195        let mut table = Element::new(Tag::Table)
196            .with_class("event-queue")
197            .with_child(
198                Element::new(Tag::Tr)
199                    .with_child(Element::new(Tag::Th).with_text("Queue id"))
200                    .with_child(Element::new(Tag::Th).with_text("Timestamp"))
201                    .with_child(Element::new(Tag::Th).with_text("Event")),
202            );
203        for event in self.events.iter() {
204            fn render_event(
205                repo: &RepoId,
206                alias: Option<String>,
207                refname: &str,
208                commit: &Oid,
209            ) -> Element {
210                let alias = if let Some(alias) = alias {
211                    Element::new(Tag::Span).with_child(
212                        Element::new(Tag::Span)
213                            .with_class("alias")
214                            .with_text(&alias),
215                    )
216                } else {
217                    Element::new(Tag::Span)
218                };
219                Element::new(Tag::Span)
220                    .with_child(alias)
221                    .with_child(Element::new(Tag::Br))
222                    .with_child(
223                        Element::new(Tag::Span)
224                            .with_class("repoid")
225                            .with_text(&repo.to_string()),
226                    )
227                    .with_child(Element::new(Tag::Br))
228                    .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
229                    .with_child(Element::new(Tag::Br))
230                    .with_child(
231                        Element::new(Tag::Span)
232                            .with_class("commit")
233                            .with_text(&commit.to_string()),
234                    )
235            }
236
237            let event_element = match event.event() {
238                CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
239                CiEvent::V1(CiEventV1::BranchCreated {
240                    from_node: _,
241                    repo,
242                    branch,
243                    tip,
244                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
245                CiEvent::V1(CiEventV1::BranchUpdated {
246                    from_node: _,
247                    repo,
248                    branch,
249                    tip,
250                    old_tip: _,
251                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
252                CiEvent::V1(CiEventV1::BranchDeleted {
253                    repo, branch, tip, ..
254                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
255                CiEvent::V1(CiEventV1::TagCreated {
256                    from_node: _,
257                    repo,
258                    tag,
259                    tip,
260                }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
261                CiEvent::V1(CiEventV1::TagUpdated {
262                    from_node: _,
263                    repo,
264                    tag,
265                    tip,
266                    old_tip: _,
267                }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
268                CiEvent::V1(CiEventV1::TagDeleted { repo, tag, tip, .. }) => {
269                    render_event(repo, self.repo_alias(*repo), tag.as_str(), tip)
270                }
271                CiEvent::V1(CiEventV1::PatchCreated {
272                    from_node: _,
273                    repo,
274                    patch,
275                    new_tip,
276                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
277                CiEvent::V1(CiEventV1::PatchUpdated {
278                    from_node: _,
279                    repo,
280                    patch,
281                    new_tip,
282                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
283                CiEvent::V1(CiEventV1::CanonicalRefUpdated {
284                    from_node: _,
285                    repo,
286                    refname,
287                    target,
288                }) => render_event(repo, self.repo_alias(*repo), refname.as_str(), target),
289            };
290
291            table.push_child(
292                Element::new(Tag::Tr)
293                    .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
294                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
295                    .with_child(Element::new(Tag::Td).with_child(event_element)),
296            );
297        }
298        doc.push_to_body(table);
299
300        Self::h1(&mut doc, "Recent status");
301        let status = StatusData::from(self).as_json()?;
302        doc.push_to_body(
303            Element::new(Tag::P)
304                .with_text("See also as a separate file ")
305                .with_child(
306                    Element::new(Tag::A)
307                        .with_attribute("href", STATUS_JSON)
308                        .with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
309                )
310                .with_text(": "),
311        );
312        doc.push_to_body(
313            Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
314        );
315
316        Ok(doc)
317    }
318
319    fn run_state(run: &Run) -> Element {
320        let status = match run.state() {
321            RunState::Finished => {
322                if let Some(result) = run.result() {
323                    format!("{}, {}", run.state(), result)
324                } else {
325                    format!("{} with unknown result", run.state())
326                }
327            }
328            _ => run.state().to_string(),
329        };
330
331        Element::new(Tag::Span)
332            .with_class(&status)
333            .with_text(&status.to_string())
334    }
335
336    fn run_state_unknown() -> Element {
337        Element::new(Tag::Span)
338            .with_class("run-status")
339            .with_text("unknown")
340    }
341
342    fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
343        let mut doc = HtmlPage::default();
344
345        let title = format!("CI runs for repository {alias}");
346        Self::head(&mut doc, &title);
347
348        doc.push_to_body(
349            Element::new(Tag::P).with_child(
350                Element::new(Tag::A)
351                    .with_attribute("href", "./index.html")
352                    .with_text("Front page"),
353            ),
354        );
355
356        Self::h1(&mut doc, &title);
357
358        doc.push_to_body(
359            Element::new(Tag::P).with_text("Repository ID ").with_child(
360                Element::new(Tag::Code)
361                    .with_class("repoid")
362                    .with_text(&rid.to_string()),
363            ),
364        );
365        Self::p_text(&mut doc, &format!("Last updated: {timestamp}"));
366
367        let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
368            Element::new(Tag::Tr)
369                .with_child(Element::new(Tag::Th).with_text("Run ID"))
370                .with_child(Element::new(Tag::Th).with_text("Whence"))
371                .with_child(Element::new(Tag::Th).with_text("Status"))
372                .with_child(Element::new(Tag::Th).with_text("Info")),
373        );
374
375        let mut runs = self.runs(rid);
376        runs.sort_by_cached_key(|run| run.timestamp());
377        runs.reverse();
378
379        for run in runs {
380            let result = if let Some(result) = run.result() {
381                result.to_string()
382            } else {
383                "unknown".into()
384            };
385            let mut status = Element::new(Tag::Span)
386                .with_class(&result)
387                .with_text(&result);
388            if run.timed_out() {
389                status.push_text(" (timed out)");
390            }
391
392            let current = match run.state() {
393                RunState::Triggered => Element::new(Tag::Span)
394                    .with_attribute("state", "triggered")
395                    .with_text("triggered"),
396                RunState::Running => Element::new(Tag::Span)
397                    .with_class("running)")
398                    .with_text("running"),
399                RunState::Finished => status,
400            };
401
402            table.push_child(
403                Element::new(Tag::Tr)
404                    .with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
405                    .with_child(
406                        Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
407                    )
408                    .with_child(Element::new(Tag::Td).with_child(current))
409                    .with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
410            );
411        }
412
413        doc.push_to_body(table);
414
415        doc
416    }
417
418    fn head(page: &mut HtmlPage, title: &str) {
419        page.push_to_head(Element::new(Tag::Title).with_text(title));
420        page.push_to_head(Element::new(Tag::Style).with_text(CSS));
421        page.push_to_head(
422            Element::new(Tag::Meta)
423                .with_attribute("http-equiv", "refresh")
424                .with_attribute("content", REFERESH_INTERVAL),
425        );
426    }
427
428    fn h1(page: &mut HtmlPage, text: &str) {
429        page.push_to_body(Element::new(Tag::H2).with_text(text));
430    }
431
432    fn p_text(page: &mut HtmlPage, text: &str) {
433        page.push_to_body(Element::new(Tag::P).with_text(text));
434    }
435
436    fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
437        let failed = runs
438            .iter()
439            .filter(|run| run.result() == Some(&RunResult::Failure))
440            .count();
441        let total = runs.len();
442
443        fn failed_recently(runs: &[&Run], n: usize) -> usize {
444            let recent = if runs.len() >= n {
445                &runs[runs.len() - N..]
446            } else {
447                runs
448            };
449            recent
450                .iter()
451                .filter(|run| run.result() == Some(&RunResult::Failure))
452                .count()
453        }
454
455        const N: usize = 5;
456
457        Element::new(Tag::Span)
458            .with_child(
459                Element::new(Tag::A)
460                    .with_child(
461                        Element::new(Tag::Code)
462                            .with_class("alias)")
463                            .with_text(alias),
464                    )
465                    .with_attribute("href", &format!("{}.html", rid_to_basename(repo_id))),
466            )
467            .with_child(Element::new(Tag::Br))
468            .with_child(Element::new(Tag::Span).with_text(&format!(
469                "{failed} failed runs ({} recently)",
470                failed_recently(&runs, N)
471            )))
472            .with_child(Element::new(Tag::Br))
473            .with_child(Element::new(Tag::Span).with_text(&format!("{total} total runs")))
474    }
475
476    fn run_ids(run: Option<&Run>) -> Element {
477        if let Some(run) = run {
478            let adapter_run_id = if let Some(x) = run.adapter_run_id() {
479                Element::new(Tag::Span).with_text(x.as_str())
480            } else {
481                Element::new(Tag::Span)
482            };
483
484            Element::new(Tag::Span)
485                .with_child(
486                    Element::new(Tag::Span)
487                        .with_class("adapter-run-id")
488                        .with_text("Adapter: ")
489                        .with_child(adapter_run_id)
490                        .with_child(Element::new(Tag::Br)),
491                )
492                .with_child(
493                    Element::new(Tag::Span)
494                        .with_class("broker-run-id")
495                        .with_text("Broker: ")
496                        .with_text(&run.broker_run_id().to_string()),
497                )
498                .with_child(Element::new(Tag::Br))
499                .with_child(
500                    Element::new(Tag::Span)
501                        .with_class("timestamp")
502                        .with_text("Started: ")
503                        .with_text(run.timestamp()),
504                )
505                .with_child(Element::new(Tag::Br))
506                .with_child(
507                    Element::new(Tag::Span)
508                        .with_class("job-cob-id")
509                        .with_text("Job: ")
510                        .with_text(&run.job_id().map(|id| id.to_string()).unwrap_or("".into())),
511                )
512        } else {
513            Element::new(Tag::Span)
514        }
515    }
516
517    fn info_url(run: Option<&Run>) -> Element {
518        if let Some(run) = run {
519            if let Some(url) = run.adapter_info_url() {
520                return Element::new(Tag::A)
521                    .with_attribute("href", url)
522                    .with_text("info");
523            }
524        }
525        Element::new(Tag::Span)
526    }
527
528    fn whence_as_html(whence: &Whence) -> Element {
529        match whence {
530            Whence::Branch {
531                name,
532                commit,
533                who: _,
534            } => Element::new(Tag::Span)
535                .with_text("branch ")
536                .with_child(
537                    Element::new(Tag::Code)
538                        .with_class("branch)")
539                        .with_text(name),
540                )
541                .with_text(", commit  ")
542                .with_child(
543                    Element::new(Tag::Code)
544                        .with_class("commit)")
545                        .with_text(&commit.to_string()),
546                )
547                .with_child(Element::new(Tag::Br))
548                .with_text("from ")
549                .with_child(
550                    Element::new(Tag::Span)
551                        .with_class("who")
552                        .with_text(whence.who().unwrap_or("<commit author not known>")),
553                ),
554            Whence::Patch {
555                patch,
556                commit,
557                revision,
558                who: _,
559            } => Element::new(Tag::Span)
560                .with_text("patch ")
561                .with_child(
562                    Element::new(Tag::Code)
563                        .with_class("branch")
564                        .with_text(&patch.to_string()),
565                )
566                .with_child(Element::new(Tag::Br))
567                .with_text("revision ")
568                .with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
569                    if let Some(rev) = &revision {
570                        rev.to_string()
571                    } else {
572                        "<unknown patch revision>".to_string()
573                    }
574                }))
575                .with_child(Element::new(Tag::Br))
576                .with_text("commit ")
577                .with_child(
578                    Element::new(Tag::Code)
579                        .with_class("commit)")
580                        .with_text(&commit.to_string()),
581                )
582                .with_child(Element::new(Tag::Br))
583                .with_text("from ")
584                .with_child(
585                    Element::new(Tag::Span)
586                        .with_class("who")
587                        .with_text(whence.who().unwrap_or("<patch author not known>")),
588                ),
589        }
590    }
591
592    fn repos(&self) -> Vec<(String, RepoId)> {
593        let rids: HashSet<(String, RepoId)> = self
594            .runs
595            .values()
596            .map(|run| (run.repo_alias().to_string(), run.repo_id()))
597            .collect();
598        let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
599        repos.sort();
600        repos
601    }
602
603    fn repo_alias(&self, wanted: RepoId) -> Option<String> {
604        self.repos().iter().find_map(|(alias, rid)| {
605            if *rid == wanted {
606                Some(alias.into())
607            } else {
608                None
609            }
610        })
611    }
612
613    fn runs(&self, repoid: RepoId) -> Vec<&Run> {
614        self.runs
615            .iter()
616            .filter_map(|(_, run)| {
617                if run.repo_id() == repoid {
618                    Some(run)
619                } else {
620                    None
621                }
622            })
623            .collect()
624    }
625
626    fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
627        let mut value: Option<&Run> = None;
628        for run in self.runs(repoid) {
629            if let Some(latest) = value {
630                if run.timestamp() > latest.timestamp() {
631                    value = Some(run);
632                }
633            } else {
634                value = Some(run);
635            }
636        }
637        value
638    }
639
640    fn status_as_rss(&self) -> Result<Channel, PageError> {
641        let mut channel = ChannelBuilder::default();
642        channel
643            .title("Radicle CI broker run information")
644            .description("Latest CI runs known on this instance of the Radicle CI broker.")
645            .link("FIXME:link");
646
647        for item in self.sorted_items()?.iter().take(MAX_RSS_ENTRIES) {
648            channel.item((*item).clone());
649        }
650        Ok(channel.build())
651    }
652
653    fn failed_as_rss(&self) -> Result<Channel, PageError> {
654        let mut channel = ChannelBuilder::default();
655        channel
656            .title("Radicle CI broker run information")
657            .description("Latest FAILED CI runs on this instance of the Radicle CI broker.")
658            .link("FIXME:link");
659
660        for item in self
661            .sorted_items_for_runs(|run| {
662                run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure)
663            })?
664            .iter()
665            .take(MAX_RSS_ENTRIES)
666        {
667            channel.item(item.clone());
668        }
669
670        Ok(channel.build())
671    }
672
673    fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
674        let mut channel = ChannelBuilder::default();
675        channel
676            .title("Radicle CI broker run information")
677            .description("Latest UNFINISHED CI runs on this instance of the Radicle CI broker.")
678            .link("FIXME:link");
679        for item in self
680            .sorted_items_for_runs(|run| {
681                run.state() == RunState::Triggered || run.state() == RunState::Running
682            })?
683            .iter()
684            .take(MAX_RSS_ENTRIES)
685        {
686            channel.item(item.clone());
687        }
688        Ok(channel.build())
689    }
690
691    fn sorted_items(&self) -> Result<Vec<Item>, PageError> {
692        let mut items = vec![];
693        for (_alias, repo_id) in self.repos() {
694            for run in self.runs(repo_id) {
695                if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
696                    items.push(Self::rss_item_from_run(run)?);
697                }
698            }
699        }
700        items.sort_by_key(|(ts, _)| *ts);
701        items.reverse();
702        Ok(items.iter().map(|(_, item)| item.clone()).collect())
703    }
704
705    fn sorted_items_for_runs<F>(&self, pred: F) -> Result<Vec<Item>, PageError>
706    where
707        F: Fn(&Run) -> bool,
708    {
709        let mut items = vec![];
710        for (_alias, repo_id) in self.repos() {
711            for run in self.runs(repo_id) {
712                if pred(run) {
713                    items.push(Self::rss_item_from_run(run)?);
714                }
715            }
716        }
717        items.sort_by_key(|(ts, _)| *ts);
718        items.reverse();
719        Ok(items.iter().map(|(_, item)| item.clone()).collect())
720    }
721
722    fn rss_item_from_run(run: &Run) -> Result<(OffsetDateTime, Item), PageError> {
723        let mut guid = Guid::default();
724        guid.set_value(run.broker_run_id().to_string());
725        guid.permalink = false; // guid permalink defaults to true as our guid is not a url set this to false
726
727        let state = if run.state() == RunState::Finished {
728            match run.result() {
729                Some(result) => result.to_string(),
730                None => "unknown".to_string(),
731            }
732        } else {
733            run.state().to_string()
734        };
735        let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());
736
737        let ts = run.timestamp().to_string();
738        let parsed =
739            parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
740        let ts = rfc822_timestamp(&parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;
741
742        let entry = RssEntry {
743            repoid: run.repo_id(),
744            commit: match run.whence() {
745                Whence::Branch { commit, .. } => *commit,
746                Whence::Patch { commit, .. } => *commit,
747            },
748            info_url: run.adapter_info_url().map(String::from),
749            status: run.state(),
750            result: run.result().cloned(),
751        };
752
753        let mut item = ItemBuilder::default()
754            .title(Some(title))
755            .guid(Some(guid))
756            .pub_date(Some(ts))
757            .content(entry.to_html().serialize())
758            .build();
759
760        if let Some(url) = run.adapter_info_url() {
761            item.set_link(Some(url.into()));
762        };
763
764        Ok((parsed, item))
765    }
766}
767
768struct RssEntry {
769    repoid: RepoId,
770    commit: Oid,
771    info_url: Option<String>,
772    status: RunState,
773    result: Option<RunResult>,
774}
775
776impl RssEntry {
777    fn to_html(&self) -> Element {
778        Element::new(Tag::Div)
779            .with_class("ci_run")
780            .with_child(
781                Element::new(Tag::Span)
782                    .with_class("repoid")
783                    .with_text(&self.repoid.to_string()),
784            )
785            .with_child(
786                Element::new(Tag::Span)
787                    .with_class("commit")
788                    .with_text(&self.commit.to_string()),
789            )
790            .with_child(
791                Element::new(Tag::Span)
792                    .with_class("info_url")
793                    .with_text(self.info_url.as_deref().unwrap_or("")),
794            )
795            .with_child(
796                Element::new(Tag::Span)
797                    .with_class("status")
798                    .with_text(&self.status.to_string()),
799            )
800            .with_child(Element::new(Tag::Span).with_class("result").with_text(
801                match &self.result {
802                    None => "undetermined",
803                    Some(RunResult::Success) => "success",
804                    Some(RunResult::Failure) => "failure",
805                },
806            ))
807    }
808}
809
810/// Data for status pages for CI broker.
811///
812/// There is a "front page" with status about the broker, and a list
813/// of repositories for which the broker has run CI. Then there is a
814/// page per such repository, with a list of CI runs for that
815/// repository.
816pub struct StatusPage {
817    node_alias: String,
818    dirname: Option<PathBuf>,
819    args: PageArgs,
820}
821
822struct PageArgs {
823    run_rx: NotificationReceiver,
824    profile: Profile,
825    db: Db,
826    once: bool,
827}
828
829impl StatusPage {
830    pub fn set_output_dir(&mut self, dirname: &Path) {
831        self.dirname = Some(dirname.into());
832    }
833
834    pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
835        Self {
836            node_alias: "".into(),
837            dirname: None,
838            args: PageArgs {
839                run_rx,
840                profile,
841                db,
842                once,
843            },
844        }
845    }
846
847    fn update_loop(&mut self) -> Result<(), PageError> {
848        'processing_loop: loop {
849            self.update_and_write()?;
850            if self.args.once {
851                return Ok(());
852            }
853
854            match self.args.run_rx.wait_for_notification() {
855                Ok(_) => (),
856                Err(RecvTimeoutError::Timeout) => (),
857                Err(RecvTimeoutError::Disconnected) => {
858                    logger::pages_disconnected();
859                    break 'processing_loop;
860                }
861            }
862        }
863
864        // Make sure we update reports and status JSON at least once.
865        self.update_and_write()?;
866
867        Ok(())
868    }
869
870    fn update_and_write(&mut self) -> Result<(), PageError> {
871        if let Some(dirname) = &self.dirname {
872            if dirname.exists() {
873                let runs = self.args.db.get_all_runs()?;
874
875                // Create list of events, except ones for private
876                // repositories.
877                let events: Result<Vec<QueuedCiEvent>, PageError> = self
878                    .args
879                    .db
880                    .queued_ci_events()?
881                    .iter()
882                    .filter_map(|id| match self.args.db.get_queued_ci_event(id) {
883                        Ok(Some(event)) => match event.event() {
884                            CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
885                            CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
886                            | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
887                            | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
888                            | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
889                                if Self::is_public_repo(&self.args.profile, repo) {
890                                    Some(Ok(event))
891                                } else {
892                                    None
893                                }
894                            }
895                            _ => None,
896                        },
897                        Ok(None) => None, // Event is (no longer?) in database.
898                        Err(_) => None,   // We ignore error here on purpose.
899                    })
900                    .collect();
901                let mut events = events?;
902                events.sort_by_cached_key(|e| e.timestamp().to_string());
903
904                let data = PageData {
905                    timestamp: now()?,
906                    ci_broker_version: env!("VERSION"),
907                    ci_broker_git_commit: env!("GIT_HEAD"),
908                    node_alias: self.node_alias.clone(),
909                    runs: HashMap::from_iter(
910                        runs.iter()
911                            .map(|run| (run.broker_run_id().clone(), run.clone())),
912                    ),
913                    events,
914                    broker_event_counter: 0,
915                    latest_broker_event: None,
916                    latest_ci_run: None,
917                };
918
919                let nameless = String::from("nameless repo");
920
921                // We avoid writing while keeping the lock, to reduce
922                // contention.
923                let (status, repos) = {
924                    let status = data.status_page_as_html()?.to_string();
925
926                    let mut repos = vec![];
927                    for (_, rid) in data.repos() {
928                        let basename = rid_to_basename(rid);
929                        let filename = dirname.join(format!("{basename}.html"));
930                        let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
931                        let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
932                        repos.push((filename, repopage.to_string()));
933                    }
934
935                    (status, repos)
936                };
937
938                let filename = dirname.join("index.html");
939                Self::write_file(&filename, &status)?;
940
941                for (filename, repopage) in repos {
942                    Self::write_file(&filename, &repopage)?;
943                }
944
945                let filename = dirname.join(STATUS_JSON);
946                let json = data.status_page_as_json()?;
947                Self::write_file(&filename, &json)?;
948
949                let filename = dirname.join(BROKER_RSS);
950                let channel = data.status_as_rss()?;
951                let rss = channel.to_string();
952                Self::write_file(&filename, &rss)?;
953
954                let filename = dirname.join(FAILURE_RSS);
955                let channel = data.failed_as_rss()?;
956                let rss = channel.to_string();
957                Self::write_file(&filename, &rss)?;
958
959                let filename = dirname.join(UNFINISHED_RSS);
960                let channel = data.unfinished_as_rss()?;
961                let rss = channel.to_string();
962                Self::write_file(&filename, &rss)?;
963            }
964        }
965        Ok(())
966    }
967
968    fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
969        if let Ok(repo) = profile.storage.repository(*rid) {
970            if let Ok(id_doc) = repo.canonical_identity_doc() {
971                if id_doc.doc.visibility().is_public() {
972                    return true;
973                }
974            }
975        }
976        false
977    }
978
979    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
980        safely_overwrite(filename, text.as_bytes())
981            .map_err(|err| PageError::Write(filename.into(), err))?;
982        Ok(())
983    }
984}
985
986impl Worker for StatusPage {
987    const NAME: &str = "status-page";
988    type Error = PageError;
989    fn work(&mut self) -> Result<(), PageError> {
990        match &self.dirname {
991            None => logger::pages_directory_unset(),
992            Some(report_dir) if !report_dir.exists() => {
993                logger::pages_directory_does_not_exist(report_dir)
994            }
995            Some(_) => (),
996        }
997
998        self.update_loop()
999    }
1000}
1001
1002#[derive(Debug, Clone, Serialize)]
1003struct StatusData {
1004    timestamp: String,
1005    broker_event_counter: usize,
1006    ci_broker_version: &'static str,
1007    ci_broker_git_commit: &'static str,
1008    latest_broker_event: Option<CiEvent>,
1009    latest_ci_run: Option<Run>,
1010    event_queue_length: usize,
1011}
1012
1013impl StatusData {
1014    fn as_json(&self) -> Result<String, PageError> {
1015        serde_json::to_string_pretty(self).map_err(PageError::StatusToJson)
1016    }
1017}
1018
1019impl From<&PageData> for StatusData {
1020    fn from(page: &PageData) -> Self {
1021        Self {
1022            timestamp: page.timestamp.clone(),
1023            broker_event_counter: page.broker_event_counter,
1024            ci_broker_version: page.ci_broker_version,
1025            ci_broker_git_commit: page.ci_broker_git_commit,
1026            latest_broker_event: page.latest_broker_event.clone(),
1027            latest_ci_run: page.latest_ci_run.clone(),
1028            event_queue_length: page.events.len(),
1029        }
1030    }
1031}
1032
1033fn rid_to_basename(repoid: RepoId) -> String {
1034    let mut basename = repoid.to_string();
1035    assert!(basename.starts_with("rad:"));
1036    basename.drain(..4);
1037    basename
1038}