radicle_ci_broker/
pages.rs

1//! Status and report pages for CI broker.
2//!
3//! This module generates an HTML status page for the CI broker, as
4//! well as per-repository pages for any repository for which the CI
5//! broker has mediated to run CI. The status page gives the latest
6//! known status of the broker, plus lists the repositories that CI
7//! has run for. The per-repository pages lists all the runs for that
8//! repository.
9
10use std::{
11    collections::{HashMap, HashSet},
12    fs::write,
13    path::{Path, PathBuf},
14    sync::mpsc::RecvTimeoutError,
15    thread::{spawn, JoinHandle},
16    time::Duration,
17};
18
19use html_page::{Element, HtmlPage, Tag};
20use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
21use serde::Serialize;
22use time::{macros::format_description, OffsetDateTime};
23
24use radicle::{
25    git::Oid,
26    prelude::RepoId,
27    storage::{ReadRepository, ReadStorage},
28    Profile,
29};
30
31use crate::{
32    ci_event::{CiEvent, CiEventV1},
33    db::{Db, DbError, QueuedCiEvent},
34    logger,
35    msg::{RunId, RunResult},
36    notif::NotificationReceiver,
37    run::{Run, RunState, Whence},
38    util::{parse_timestamp, rfc822_timestamp},
39};
40
41const MAX_RSS_ENTRIES: usize = 10;
42const BROKER_RSS: &str = "index.rss";
43const FAILURE_RSS: &str = "failed.rss";
44const UNFINISHED_RSS: &str = "unfinished.rss";
45const CSS: &str = include_str!("radicle-ci.css");
46const REFERESH_INTERVAL: &str = "300";
47const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
48const STATUS_JSON: &str = "status.json";
49
50/// All possible errors returned from the status page module.
51#[derive(Debug, thiserror::Error)]
52pub enum PageError {
53    /// Error formatting a time as a string.
54    #[error(transparent)]
55    Timeformat(#[from] time::error::Format),
56
57    #[error("failed to write status page to {0}")]
58    Write(PathBuf, #[source] std::io::Error),
59
60    #[error("no node alias has been set for builder")]
61    NoAlias,
62
63    #[error("no status data has been set for builder")]
64    NoStatusData,
65
66    #[error(transparent)]
67    Db(#[from] DbError),
68
69    #[error("failed to lock page data structure")]
70    Lock(&'static str),
71
72    #[error("failed to represent status page data as JSON")]
73    StatusToJson(#[source] serde_json::Error),
74
75    #[error("failed to create RSS time stamp from CI run timestamp: {0}")]
76    RssTimestamp(String, #[source] crate::util::UtilError),
77}
78
79fn now() -> Result<String, time::error::Format> {
80    let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
81    OffsetDateTime::now_utc().format(fmt)
82}
83
84struct PageData {
85    timestamp: String,
86    ci_broker_version: &'static str,
87    ci_broker_git_commit: &'static str,
88    node_alias: String,
89    runs: HashMap<RunId, Run>,
90    events: Vec<QueuedCiEvent>,
91    broker_event_counter: usize,
92    latest_broker_event: Option<CiEvent>,
93    latest_ci_run: Option<Run>,
94}
95
96impl PageData {
97    fn status_page_as_json(&self) -> Result<String, PageError> {
98        StatusData::from(self).as_json()
99    }
100
101    fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
102        let mut doc = HtmlPage::default();
103
104        let title = format!("CI for Radicle node {}", self.node_alias);
105        Self::head(&mut doc, &title);
106
107        doc.push_to_body(Element::new(Tag::H1).with_text(&title));
108
109        doc.push_to_body(
110            Element::new(Tag::P)
111                .with_text("RSS feeds: ")
112                .with_child(
113                    Element::new(Tag::A)
114                        .with_text("all")
115                        .with_attribute("href", BROKER_RSS),
116                )
117                .with_text(" ")
118                .with_child(
119                    Element::new(Tag::A)
120                        .with_text("failed")
121                        .with_attribute("href", FAILURE_RSS),
122                ),
123        );
124
125        Self::h1(&mut doc, "Broker status");
126        doc.push_to_body(
127            Element::new(Tag::P)
128                .with_text("Last updated: ")
129                .with_text(&self.timestamp)
130                .with_child(Element::new(Tag::Br))
131                .with_text("CI broker version: ")
132                .with_text(self.ci_broker_version)
133                .with_text(" (commit ")
134                .with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
135                .with_text(")"),
136        );
137
138        Self::h1(&mut doc, "Repositories");
139        Self::p_text(&mut doc, "Latest CI run for each repository.");
140
141        let total = self.runs.len();
142        let failed = self
143            .runs
144            .values()
145            .filter(|run| run.result() == Some(&RunResult::Failure))
146            .count();
147        Self::p_text(
148            &mut doc,
149            &format!("Total {total} CI runs recorded, of which {failed} failed."),
150        );
151
152        let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
153            Element::new(Tag::Tr)
154                .with_child(Element::new(Tag::Th).with_text("Repository"))
155                .with_child(Element::new(Tag::Th).with_text("Run ID"))
156                .with_child(Element::new(Tag::Th).with_text("Status"))
157                .with_child(Element::new(Tag::Th).with_text("Info")),
158        );
159
160        for (alias, rid) in self.repos() {
161            let (run_ids, status, info_url) = {
162                let run = self.latest_run(rid);
163                match run {
164                    Some(run) => (
165                        Self::run_ids(Some(run)),
166                        Self::run_state(run),
167                        Self::info_url(Some(run)),
168                    ),
169                    None => (
170                        Self::run_ids(None),
171                        Self::run_state_unknown(),
172                        Self::info_url(None),
173                    ),
174                }
175            };
176
177            let runs = self.runs(rid);
178
179            table.push_child(
180                Element::new(Tag::Tr)
181                    .with_child(
182                        Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
183                    )
184                    .with_child(Element::new(Tag::Td).with_child(run_ids))
185                    .with_child(
186                        Element::new(Tag::Td).with_child(
187                            Element::new(Tag::Span)
188                                .with_class("run-status")
189                                .with_child(status),
190                        ),
191                    )
192                    .with_child(Element::new(Tag::Td).with_child(info_url)),
193            );
194        }
195        doc.push_to_body(table);
196
197        Self::h1(&mut doc, "Event queue");
198        let mut table = Element::new(Tag::Table)
199            .with_class("event-queue")
200            .with_child(
201                Element::new(Tag::Tr)
202                    .with_child(Element::new(Tag::Th).with_text("Queue id"))
203                    .with_child(Element::new(Tag::Th).with_text("Timestamp"))
204                    .with_child(Element::new(Tag::Th).with_text("Event")),
205            );
206        for event in self.events.iter() {
207            fn render_event(
208                repo: &RepoId,
209                alias: Option<String>,
210                refname: &str,
211                commit: &Oid,
212            ) -> Element {
213                let alias = if let Some(alias) = alias {
214                    Element::new(Tag::Span).with_child(
215                        Element::new(Tag::Span)
216                            .with_class("alias")
217                            .with_text(&alias),
218                    )
219                } else {
220                    Element::new(Tag::Span)
221                };
222                Element::new(Tag::Span)
223                    .with_child(alias)
224                    .with_child(Element::new(Tag::Br))
225                    .with_child(
226                        Element::new(Tag::Span)
227                            .with_class("repoid")
228                            .with_text(&repo.to_string()),
229                    )
230                    .with_child(Element::new(Tag::Br))
231                    .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
232                    .with_child(Element::new(Tag::Br))
233                    .with_child(
234                        Element::new(Tag::Span)
235                            .with_class("commit")
236                            .with_text(&commit.to_string()),
237                    )
238            }
239
240            let event_element = match event.event() {
241                CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
242                CiEvent::V1(CiEventV1::BranchCreated {
243                    from_node: _,
244                    repo,
245                    branch,
246                    tip,
247                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
248                CiEvent::V1(CiEventV1::BranchUpdated {
249                    from_node: _,
250                    repo,
251                    branch,
252                    tip,
253                    old_tip: _,
254                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
255                CiEvent::V1(CiEventV1::BranchDeleted {
256                    repo, branch, tip, ..
257                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
258                CiEvent::V1(CiEventV1::PatchCreated {
259                    from_node: _,
260                    repo,
261                    patch,
262                    new_tip,
263                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
264                CiEvent::V1(CiEventV1::PatchUpdated {
265                    from_node: _,
266                    repo,
267                    patch,
268                    new_tip,
269                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
270            };
271
272            table.push_child(
273                Element::new(Tag::Tr)
274                    .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
275                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
276                    .with_child(Element::new(Tag::Td).with_child(event_element)),
277            );
278        }
279        doc.push_to_body(table);
280
281        Self::h1(&mut doc, "Recent status");
282        let status = StatusData::from(self).as_json()?;
283        doc.push_to_body(
284            Element::new(Tag::P)
285                .with_text("See also as a separate file ")
286                .with_child(
287                    Element::new(Tag::A)
288                        .with_attribute("href", STATUS_JSON)
289                        .with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
290                )
291                .with_text(": "),
292        );
293        doc.push_to_body(
294            Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
295        );
296
297        Ok(doc)
298    }
299
300    fn run_state(run: &Run) -> Element {
301        let status = match run.state() {
302            RunState::Finished => {
303                if let Some(result) = run.result() {
304                    format!("{}, {}", run.state(), result)
305                } else {
306                    format!("{} with unknown result", run.state())
307                }
308            }
309            _ => run.state().to_string(),
310        };
311
312        Element::new(Tag::Span)
313            .with_class(&status)
314            .with_text(&status.to_string())
315    }
316
317    fn run_state_unknown() -> Element {
318        Element::new(Tag::Span)
319            .with_class("run-status")
320            .with_text("unknown")
321    }
322
323    fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
324        let mut doc = HtmlPage::default();
325
326        let title = format!("CI runs for repository {}", alias);
327        Self::head(&mut doc, &title);
328
329        doc.push_to_body(
330            Element::new(Tag::P).with_child(
331                Element::new(Tag::A)
332                    .with_attribute("href", "../index.html")
333                    .with_text("Front page"),
334            ),
335        );
336
337        Self::h1(&mut doc, &title);
338
339        doc.push_to_body(
340            Element::new(Tag::P).with_text("Repository ID ").with_child(
341                Element::new(Tag::Code)
342                    .with_class("repoid")
343                    .with_text(&rid.to_string()),
344            ),
345        );
346        Self::p_text(&mut doc, &format!("Last updated: {timestamp}"));
347
348        let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
349            Element::new(Tag::Tr)
350                .with_child(Element::new(Tag::Th).with_text("Run ID"))
351                .with_child(Element::new(Tag::Th).with_text("Whence"))
352                .with_child(Element::new(Tag::Th).with_text("Status"))
353                .with_child(Element::new(Tag::Th).with_text("Info")),
354        );
355
356        let mut runs = self.runs(rid);
357        runs.sort_by_cached_key(|run| run.timestamp());
358        runs.reverse();
359        for run in runs {
360            let current = match run.state() {
361                RunState::Triggered => Element::new(Tag::Span)
362                    .with_attribute("state", "triggered")
363                    .with_text("triggered"),
364                RunState::Running => Element::new(Tag::Span)
365                    .with_class("running)")
366                    .with_text("running"),
367                RunState::Finished => {
368                    let result = if let Some(result) = run.result() {
369                        result.to_string()
370                    } else {
371                        "unknown".into()
372                    };
373                    Element::new(Tag::Span)
374                        .with_class(&result)
375                        .with_text(&result)
376                }
377            };
378
379            table.push_child(
380                Element::new(Tag::Tr)
381                    .with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
382                    .with_child(
383                        Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
384                    )
385                    .with_child(Element::new(Tag::Td).with_child(current))
386                    .with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
387            );
388        }
389
390        doc.push_to_body(table);
391
392        doc
393    }
394
395    fn head(page: &mut HtmlPage, title: &str) {
396        page.push_to_head(Element::new(Tag::Title).with_text(title));
397        page.push_to_head(Element::new(Tag::Style).with_text(CSS));
398        page.push_to_head(
399            Element::new(Tag::Meta)
400                .with_attribute("http-equiv", "refresh")
401                .with_attribute("content", REFERESH_INTERVAL),
402        );
403    }
404
405    fn h1(page: &mut HtmlPage, text: &str) {
406        page.push_to_body(Element::new(Tag::H2).with_text(text));
407    }
408
409    fn p_text(page: &mut HtmlPage, text: &str) {
410        page.push_to_body(Element::new(Tag::P).with_text(text));
411    }
412
413    fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
414        let failed = runs
415            .iter()
416            .filter(|run| run.result() == Some(&RunResult::Failure))
417            .count();
418        let total = runs.len();
419
420        fn failed_recently(runs: &[&Run], n: usize) -> usize {
421            let recent = if runs.len() >= n {
422                &runs[runs.len() - N..]
423            } else {
424                runs
425            };
426            recent
427                .iter()
428                .filter(|run| run.result() == Some(&RunResult::Failure))
429                .count()
430        }
431
432        const N: usize = 5;
433
434        Element::new(Tag::Span)
435            .with_child(
436                Element::new(Tag::A)
437                    .with_child(
438                        Element::new(Tag::Code)
439                            .with_class("alias)")
440                            .with_text(alias),
441                    )
442                    .with_attribute("href", &format!("{}.html", rid_to_basename(repo_id))),
443            )
444            .with_child(Element::new(Tag::Br))
445            .with_child(Element::new(Tag::Span).with_text(&format!(
446                "{failed} failed runs ({} recently)",
447                failed_recently(&runs, N)
448            )))
449            .with_child(Element::new(Tag::Br))
450            .with_child(Element::new(Tag::Span).with_text(&format!("{total} total runs")))
451    }
452
453    fn run_ids(run: Option<&Run>) -> Element {
454        if let Some(run) = run {
455            let adapter_run_id = if let Some(x) = run.adapter_run_id() {
456                Element::new(Tag::Span).with_text(x.as_str())
457            } else {
458                Element::new(Tag::Span)
459            };
460
461            Element::new(Tag::Span).with_child(
462                Element::new(Tag::Span)
463                    .with_class("adapter-run-id")
464                    .with_child(adapter_run_id)
465                    .with_child(Element::new(Tag::Br))
466                    .with_child(
467                        Element::new(Tag::Span)
468                            .with_class("broker-run-id")
469                            .with_text(&run.broker_run_id().to_string()),
470                    )
471                    .with_child(Element::new(Tag::Br))
472                    .with_text(run.timestamp()),
473            )
474        } else {
475            Element::new(Tag::Span)
476        }
477    }
478
479    fn info_url(run: Option<&Run>) -> Element {
480        if let Some(run) = run {
481            if let Some(url) = run.adapter_info_url() {
482                return Element::new(Tag::A)
483                    .with_attribute("href", url)
484                    .with_text("info");
485            }
486        }
487        Element::new(Tag::Span)
488    }
489
490    fn whence_as_html(whence: &Whence) -> Element {
491        match whence {
492            Whence::Branch {
493                name,
494                commit,
495                who: _,
496            } => Element::new(Tag::Span)
497                .with_text("branch ")
498                .with_child(
499                    Element::new(Tag::Code)
500                        .with_class("branch)")
501                        .with_text(name),
502                )
503                .with_text(", commit  ")
504                .with_child(
505                    Element::new(Tag::Code)
506                        .with_class("commit)")
507                        .with_text(&commit.to_string()),
508                )
509                .with_child(Element::new(Tag::Br))
510                .with_text("from ")
511                .with_child(
512                    Element::new(Tag::Span)
513                        .with_class("who")
514                        .with_text(whence.who().unwrap_or("<commit author not known>")),
515                ),
516            Whence::Patch {
517                patch,
518                commit,
519                revision,
520                who: _,
521            } => Element::new(Tag::Span)
522                .with_text("patch ")
523                .with_child(
524                    Element::new(Tag::Code)
525                        .with_class("branch")
526                        .with_text(&patch.to_string()),
527                )
528                .with_child(Element::new(Tag::Br))
529                .with_text("revision ")
530                .with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
531                    if let Some(rev) = &revision {
532                        rev.to_string()
533                    } else {
534                        "<unknown patch revision>".to_string()
535                    }
536                }))
537                .with_child(Element::new(Tag::Br))
538                .with_text("commit ")
539                .with_child(
540                    Element::new(Tag::Code)
541                        .with_class("commit)")
542                        .with_text(&commit.to_string()),
543                )
544                .with_child(Element::new(Tag::Br))
545                .with_text("from ")
546                .with_child(
547                    Element::new(Tag::Span)
548                        .with_class("who")
549                        .with_text(whence.who().unwrap_or("<patch author not known>")),
550                ),
551        }
552    }
553
554    fn repos(&self) -> Vec<(String, RepoId)> {
555        let rids: HashSet<(String, RepoId)> = self
556            .runs
557            .values()
558            .map(|run| (run.repo_alias().to_string(), run.repo_id()))
559            .collect();
560        let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
561        repos.sort();
562        repos
563    }
564
565    fn repo_alias(&self, wanted: RepoId) -> Option<String> {
566        self.repos().iter().find_map(|(alias, rid)| {
567            if *rid == wanted {
568                Some(alias.into())
569            } else {
570                None
571            }
572        })
573    }
574
575    fn runs(&self, repoid: RepoId) -> Vec<&Run> {
576        self.runs
577            .iter()
578            .filter_map(|(_, run)| {
579                if run.repo_id() == repoid {
580                    Some(run)
581                } else {
582                    None
583                }
584            })
585            .collect()
586    }
587
588    fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
589        let mut value: Option<&Run> = None;
590        for run in self.runs(repoid) {
591            if let Some(latest) = value {
592                if run.timestamp() > latest.timestamp() {
593                    value = Some(run);
594                }
595            } else {
596                value = Some(run);
597            }
598        }
599        value
600    }
601
602    fn status_as_rss(&self) -> Result<Channel, PageError> {
603        let mut channel = ChannelBuilder::default();
604        channel
605            .title("Radicle CI broker run information")
606            .description("All CI runs known to this instance of the Radicle CI broker.")
607            .link("FIXME:link");
608        let mut items = vec![];
609        for (_alias, repo_id) in self.repos() {
610            for run in self.runs(repo_id) {
611                items.push(Self::rss_item_from_run(run)?);
612            }
613        }
614        items.sort_by_key(|item| item.pub_date().map(|s| s.to_string()));
615        items.reverse();
616        for item in items.iter().take(MAX_RSS_ENTRIES).cloned() {
617            channel.item(item);
618        }
619        Ok(channel.build())
620    }
621
622    fn failed_as_rss(&self) -> Result<Channel, PageError> {
623        let mut channel = ChannelBuilder::default();
624        channel
625            .title("Radicle CI broker run information")
626            .description("All CI runs known to this instance of the Radicle CI broker.")
627            .link("FIXME:link");
628        for (_alias, repo_id) in self.repos() {
629            for run in self.runs(repo_id) {
630                if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
631                    channel.item(Self::rss_item_from_run(run)?);
632                }
633            }
634        }
635        Ok(channel.build())
636    }
637
638    fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
639        let mut channel = ChannelBuilder::default();
640        channel
641            .title("Radicle CI broker run information")
642            .description("All CI runs known to this instance of the Radicle CI broker.")
643            .link("FIXME:link");
644        for (_alias, repo_id) in self.repos() {
645            for run in self.runs(repo_id) {
646                if run.state() == RunState::Triggered || run.state() == RunState::Running {
647                    channel.item(Self::rss_item_from_run(run)?);
648                }
649            }
650        }
651        Ok(channel.build())
652    }
653
654    fn rss_item_from_run(run: &Run) -> Result<Item, PageError> {
655        let mut guid = Guid::default();
656        guid.set_value(run.broker_run_id().to_string());
657
658        let state = if run.state() == RunState::Finished {
659            match run.result() {
660                Some(result) => result.to_string(),
661                None => "unknown".to_string(),
662            }
663        } else {
664            run.state().to_string()
665        };
666        let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());
667
668        let ts = run.timestamp().to_string();
669        let parsed =
670            parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
671        let ts = rfc822_timestamp(parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;
672
673        let entry = RssEntry {
674            repoid: run.repo_id(),
675            commit: match run.whence() {
676                Whence::Branch { commit, .. } => *commit,
677                Whence::Patch { commit, .. } => *commit,
678            },
679            info_url: run.adapter_info_url().map(String::from),
680            status: run.state(),
681            result: run.result().cloned(),
682        };
683
684        let mut item = ItemBuilder::default()
685            .title(Some(title))
686            .guid(Some(guid))
687            .pub_date(Some(ts))
688            .content(entry.to_html().serialize())
689            .build();
690
691        if let Some(url) = run.adapter_info_url() {
692            item.set_link(Some(url.into()));
693        };
694
695        Ok(item)
696    }
697}
698
699struct RssEntry {
700    repoid: RepoId,
701    commit: Oid,
702    info_url: Option<String>,
703    status: RunState,
704    result: Option<RunResult>,
705}
706
707impl RssEntry {
708    fn to_html(&self) -> Element {
709        Element::new(Tag::Div)
710            .with_class("ci_run")
711            .with_child(
712                Element::new(Tag::Span)
713                    .with_class("repoid")
714                    .with_text(&self.repoid.to_string()),
715            )
716            .with_child(
717                Element::new(Tag::Span)
718                    .with_class("commit")
719                    .with_text(&self.commit.to_string()),
720            )
721            .with_child(
722                Element::new(Tag::Span)
723                    .with_class("info_url")
724                    .with_text(self.info_url.as_deref().unwrap_or("")),
725            )
726            .with_child(
727                Element::new(Tag::Span)
728                    .with_class("status")
729                    .with_text(&self.status.to_string()),
730            )
731            .with_child(Element::new(Tag::Span).with_class("result").with_text(
732                match &self.result {
733                    None => "undetermined",
734                    Some(RunResult::Success) => "success",
735                    Some(RunResult::Failure) => "failure",
736                },
737            ))
738    }
739}
740
741/// Data for status pages for CI broker.
742///
743/// There is a "front page" with status about the broker, and a list
744/// of repositories for which the broker has run CI. Then there is a
745/// page per such repository, with a list of CI runs for that
746/// repository.
747#[derive(Default)]
748pub struct StatusPage {
749    node_alias: String,
750    dirname: Option<PathBuf>,
751}
752
753impl StatusPage {
754    pub fn set_output_dir(&mut self, dirname: &Path) {
755        self.dirname = Some(dirname.into());
756    }
757
758    pub fn update_in_thread(
759        mut self,
760        run_rx: NotificationReceiver,
761        profile: Profile,
762        db: Db,
763        once: bool,
764    ) -> JoinHandle<Result<(), PageError>> {
765        match &self.dirname {
766            None => logger::pages_directory_unset(),
767            Some(report_dir) if !report_dir.exists() => {
768                logger::pages_directory_does_not_exist(report_dir)
769            }
770            Some(_) => (),
771        }
772
773        self.node_alias = profile.config.alias().to_string();
774        logger::pages_interval(UPDATE_INTERVAL);
775
776        spawn(move || {
777            logger::pages_start();
778            let result = self.update_loop(run_rx, profile, db, once);
779            logger::pages_end(&result);
780            result
781        })
782    }
783
784    fn update_loop(
785        mut self,
786        run_rx: NotificationReceiver,
787        profile: Profile,
788        db: Db,
789        once: bool,
790    ) -> Result<(), PageError> {
791        'processing_loop: loop {
792            self.update_and_write(&profile, &db)?;
793            if once {
794                return Ok(());
795            }
796
797            match run_rx.wait_for_notification() {
798                Ok(_) => (),
799                Err(RecvTimeoutError::Timeout) => (),
800                Err(RecvTimeoutError::Disconnected) => {
801                    logger::pages_disconnected();
802                    break 'processing_loop;
803                }
804            }
805        }
806
807        // Make sure we update reports and status JSON at least once.
808        self.update_and_write(&profile, &db)?;
809
810        Ok(())
811    }
812
813    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
814        if let Some(dirname) = &self.dirname {
815            if dirname.exists() {
816                let runs = db.get_all_runs()?;
817
818                // Create list of events, except ones for private
819                // repositories.
820                let events: Result<Vec<QueuedCiEvent>, PageError> = db
821                    .queued_ci_events()?
822                    .iter()
823                    .filter_map(|id| match db.get_queued_ci_event(id) {
824                        Ok(Some(event)) => match event.event() {
825                            CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
826                            CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
827                            | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
828                            | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
829                            | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
830                                if Self::is_public_repo(profile, repo) {
831                                    Some(Ok(event))
832                                } else {
833                                    None
834                                }
835                            }
836                            _ => None,
837                        },
838                        Ok(None) => None, // Event is (no longer?) in database.
839                        Err(_) => None,   // We ignore error here on purpose.
840                    })
841                    .collect();
842                let mut events = events?;
843                events.sort_by_cached_key(|e| e.timestamp().to_string());
844
845                let data = PageData {
846                    timestamp: now()?,
847                    ci_broker_version: env!("CARGO_PKG_VERSION"),
848                    ci_broker_git_commit: env!("GIT_HEAD"),
849                    node_alias: self.node_alias.clone(),
850                    runs: HashMap::from_iter(
851                        runs.iter()
852                            .map(|run| (run.broker_run_id().clone(), run.clone())),
853                    ),
854                    events,
855                    broker_event_counter: 0,
856                    latest_broker_event: None,
857                    latest_ci_run: None,
858                };
859
860                let nameless = String::from("nameless repo");
861
862                // We avoid writing while keeping the lock, to reduce
863                // contention.
864                let (status, repos) = {
865                    let status = data.status_page_as_html()?.to_string();
866
867                    let mut repos = vec![];
868                    for (_, rid) in data.repos() {
869                        let basename = rid_to_basename(rid);
870                        let filename = dirname.join(format!("{basename}.html"));
871                        let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
872                        let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
873                        repos.push((filename, repopage.to_string()));
874                    }
875
876                    (status, repos)
877                };
878
879                let filename = dirname.join("index.html");
880                Self::write_file(&filename, &status)?;
881
882                for (filename, repopage) in repos {
883                    Self::write_file(&filename, &repopage)?;
884                }
885
886                let filename = dirname.join(STATUS_JSON);
887                let json = data.status_page_as_json()?;
888                Self::write_file(&filename, &json)?;
889
890                let filename = dirname.join(BROKER_RSS);
891                let channel = data.status_as_rss()?;
892                let rss = channel.to_string();
893                Self::write_file(&filename, &rss)?;
894
895                let filename = dirname.join(FAILURE_RSS);
896                let channel = data.failed_as_rss()?;
897                let rss = channel.to_string();
898                Self::write_file(&filename, &rss)?;
899
900                let filename = dirname.join(UNFINISHED_RSS);
901                let channel = data.unfinished_as_rss()?;
902                let rss = channel.to_string();
903                Self::write_file(&filename, &rss)?;
904            }
905        }
906        Ok(())
907    }
908
909    fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
910        if let Ok(repo) = profile.storage.repository(*rid) {
911            if let Ok(id_doc) = repo.canonical_identity_doc() {
912                if id_doc.doc.visibility().is_public() {
913                    return true;
914                }
915            }
916        }
917        false
918    }
919
920    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
921        write(filename, text).map_err(|e| PageError::Write(filename.into(), e))?;
922        Ok(())
923    }
924}
925
926#[derive(Debug, Clone, Serialize)]
927struct StatusData {
928    timestamp: String,
929    broker_event_counter: usize,
930    ci_broker_version: &'static str,
931    ci_broker_git_commit: &'static str,
932    latest_broker_event: Option<CiEvent>,
933    latest_ci_run: Option<Run>,
934    event_queue_length: usize,
935}
936
937impl StatusData {
938    fn as_json(&self) -> Result<String, PageError> {
939        serde_json::to_string_pretty(self).map_err(PageError::StatusToJson)
940    }
941}
942
943impl From<&PageData> for StatusData {
944    fn from(page: &PageData) -> Self {
945        Self {
946            timestamp: page.timestamp.clone(),
947            broker_event_counter: page.broker_event_counter,
948            ci_broker_version: page.ci_broker_version,
949            ci_broker_git_commit: page.ci_broker_git_commit,
950            latest_broker_event: page.latest_broker_event.clone(),
951            latest_ci_run: page.latest_ci_run.clone(),
952            event_queue_length: page.events.len(),
953        }
954    }
955}
956
957fn rid_to_basename(repoid: RepoId) -> String {
958    let mut basename = repoid.to_string();
959    assert!(basename.starts_with("rad:"));
960    basename.drain(..4);
961    basename
962}