use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::mpsc::RecvTimeoutError,
};
use html_page::{Element, HtmlPage, Tag};
use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
use serde::Serialize;
use time::{OffsetDateTime, macros::format_description};
use radicle::{
Profile,
git::Oid,
prelude::RepoId,
storage::{ReadRepository, ReadStorage},
};
use crate::{
ci_event::{CiEvent, CiEventV1},
db::{Db, DbError, QueuedCiEvent},
logger,
msg::{RunId, RunResult},
notif::NotificationReceiver,
run::{Run, RunState, Whence},
util::{parse_timestamp, rfc822_timestamp, safely_overwrite},
worker::Worker,
};
const MAX_RSS_ENTRIES: usize = 10;
const BROKER_RSS: &str = "index.rss";
const FAILURE_RSS: &str = "failed.rss";
const UNFINISHED_RSS: &str = "unfinished.rss";
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "60";
const STATUS_JSON: &str = "status.json";
#[derive(Debug, thiserror::Error)]
pub enum PageError {
#[error(transparent)]
Timeformat(#[from] time::error::Format),
#[error("failed to write status page to {0}")]
Write(PathBuf, #[source] crate::util::UtilError),
#[error("no node alias has been set for builder")]
NoAlias,
#[error("no status data has been set for builder")]
NoStatusData,
#[error(transparent)]
Db(#[from] DbError),
#[error("failed to lock page data structure")]
Lock(&'static str),
#[error("failed to represent status page data as JSON")]
StatusToJson(#[source] serde_json::Error),
#[error("failed to create RSS time stamp from CI run timestamp: {0}")]
RssTimestamp(String, #[source] crate::util::UtilError),
}
fn now() -> Result<String, time::error::Format> {
let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
OffsetDateTime::now_utc().format(fmt)
}
struct PageData {
timestamp: String,
ci_broker_version: &'static str,
ci_broker_git_commit: &'static str,
node_alias: String,
runs: HashMap<RunId, Run>,
events: Vec<QueuedCiEvent>,
broker_event_counter: usize,
latest_broker_event: Option<CiEvent>,
latest_ci_run: Option<Run>,
desc_snippet: Option<String>,
}
impl PageData {
fn status_page_as_json(&self) -> Result<String, PageError> {
StatusData::from(self).as_json()
}
fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
let mut doc = ReportPage::new(&self.node_alias);
if let Some(snippet) = &self.desc_snippet {
doc.desc(snippet);
}
doc.rss_feeds();
self.broker_status(&mut doc);
self.repo_table(&mut doc);
self.event_queue(&mut doc);
self.recent_status(&mut doc)?;
Ok(doc.page())
}
fn broker_status(&self, doc: &mut ReportPage) {
doc.h2("Broker status");
doc.push(
Element::new(Tag::P)
.with_text("Last updated: ")
.with_text(&self.timestamp)
.with_child(Element::new(Tag::Br))
.with_text("CI broker version: ")
.with_text(self.ci_broker_version)
.with_text(" (commit ")
.with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
.with_text(")"),
);
}
fn repo_table(&self, doc: &mut ReportPage) {
doc.h2("Repositories");
doc.push(Element::new(Tag::P).with_text("Latest CI run for each repository."));
let total = self.runs.len();
let failed = self
.runs
.values()
.filter(|run| run.result() == Some(&RunResult::Failure))
.count();
doc.push(Element::new(Tag::P).with_text(&format!(
"Total {total} CI runs recorded, of which {failed} failed."
)));
let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
Element::new(Tag::Tr)
.with_child(Element::new(Tag::Th).with_text("Repository"))
.with_child(Element::new(Tag::Th).with_text("Run ID"))
.with_child(Element::new(Tag::Th).with_text("Status"))
.with_child(Element::new(Tag::Th).with_text("Info")),
);
for (alias, rid) in self.repos() {
let (run_ids, status, info_url) = {
let run = self.latest_run(rid);
match run {
Some(run) => (
Self::run_ids(Some(run)),
Self::run_state(run),
Self::info_url(Some(run)),
),
None => (
Self::run_ids(None),
Self::run_state_unknown(),
Self::info_url(None),
),
}
};
let runs = self.runs(rid);
table.push_child(
Element::new(Tag::Tr)
.with_child(
Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
)
.with_child(Element::new(Tag::Td).with_child(run_ids))
.with_child(
Element::new(Tag::Td).with_child(
Element::new(Tag::Span)
.with_class("run-status")
.with_child(status),
),
)
.with_child(Element::new(Tag::Td).with_child(info_url)),
);
}
doc.push(table);
}
fn event_queue(&self, doc: &mut ReportPage) {
doc.h2("Event queue");
let mut table = Element::new(Tag::Table)
.with_class("event-queue")
.with_child(
Element::new(Tag::Tr)
.with_child(Element::new(Tag::Th).with_text("Queue id"))
.with_child(Element::new(Tag::Th).with_text("Timestamp"))
.with_child(Element::new(Tag::Th).with_text("Event")),
);
for event in self.events.iter() {
fn render_event(
repo: &RepoId,
alias: Option<String>,
refname: &str,
commit: &Oid,
) -> Element {
let alias = if let Some(alias) = alias {
Element::new(Tag::Span).with_child(
Element::new(Tag::Span)
.with_class("alias")
.with_text(&alias),
)
} else {
Element::new(Tag::Span)
};
Element::new(Tag::Span)
.with_child(alias)
.with_child(Element::new(Tag::Br))
.with_child(
Element::new(Tag::Span)
.with_class("repoid")
.with_text(&repo.to_string()),
)
.with_child(Element::new(Tag::Br))
.with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
.with_child(Element::new(Tag::Br))
.with_child(
Element::new(Tag::Span)
.with_class("commit")
.with_text(&commit.to_string()),
)
}
let event_element = match event.event() {
CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
CiEvent::V1(CiEventV1::Terminate(_)) => {
Element::new(Tag::Span).with_text("terminate")
}
CiEvent::V1(CiEventV1::BranchCreated {
from_node: _,
repo,
branch,
tip,
}) => render_event(repo, self.repo_alias(*repo), branch, tip),
CiEvent::V1(CiEventV1::BranchUpdated {
from_node: _,
repo,
branch,
tip,
old_tip: _,
}) => render_event(repo, self.repo_alias(*repo), branch, tip),
CiEvent::V1(CiEventV1::BranchDeleted {
repo, branch, tip, ..
}) => render_event(repo, self.repo_alias(*repo), branch, tip),
CiEvent::V1(CiEventV1::TagCreated {
from_node: _,
repo,
tag,
tip,
}) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
CiEvent::V1(CiEventV1::TagUpdated {
from_node: _,
repo,
tag,
tip,
old_tip: _,
}) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
CiEvent::V1(CiEventV1::TagDeleted { repo, tag, tip, .. }) => {
render_event(repo, self.repo_alias(*repo), tag.as_str(), tip)
}
CiEvent::V1(CiEventV1::PatchCreated {
from_node: _,
repo,
patch,
new_tip,
}) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
CiEvent::V1(CiEventV1::PatchUpdated {
from_node: _,
repo,
patch,
new_tip,
}) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
CiEvent::V1(CiEventV1::CanonicalRefUpdated {
from_node: _,
repo,
refname,
target,
}) => render_event(repo, self.repo_alias(*repo), refname.as_str(), target),
};
table.push_child(
Element::new(Tag::Tr)
.with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
.with_child(Element::new(Tag::Td).with_text(event.timestamp()))
.with_child(Element::new(Tag::Td).with_child(event_element)),
);
}
doc.push(table);
}
fn recent_status(&self, doc: &mut ReportPage) -> Result<(), PageError> {
doc.h2("Recent status");
let status = StatusData::from(self).as_json()?;
doc.push(
Element::new(Tag::P)
.with_text("See also as a separate file ")
.with_child(
Element::new(Tag::A)
.with_attribute("href", STATUS_JSON)
.with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
)
.with_text(": "),
);
doc.push(
Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
);
Ok(())
}
fn run_state(run: &Run) -> Element {
let status = match run.state() {
RunState::Finished => {
if let Some(result) = run.result() {
format!("{}, {}", run.state(), result)
} else {
format!("{} with unknown result", run.state())
}
}
_ => run.state().to_string(),
};
Element::new(Tag::Span)
.with_class(&status)
.with_text(&status.to_string())
}
fn run_state_unknown() -> Element {
Element::new(Tag::Span)
.with_class("run-status")
.with_text("unknown")
}
fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
let mut doc = ReportPage::new(alias);
doc.push(
Element::new(Tag::P).with_child(
Element::new(Tag::A)
.with_attribute("href", "./index.html")
.with_text("Front page"),
),
);
doc.push(
Element::new(Tag::P).with_text("Repository ID ").with_child(
Element::new(Tag::Code)
.with_class("repoid")
.with_text(&rid.to_string()),
),
);
doc.push(Element::new(Tag::P).with_text(&format!("Last updated: {timestamp}")));
let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
Element::new(Tag::Tr)
.with_child(Element::new(Tag::Th).with_text("Run ID"))
.with_child(Element::new(Tag::Th).with_text("Whence"))
.with_child(Element::new(Tag::Th).with_text("Status"))
.with_child(Element::new(Tag::Th).with_text("Info")),
);
let mut runs = self.runs(rid);
runs.sort_by_cached_key(|run| run.timestamp());
runs.reverse();
for run in runs {
let result = if let Some(result) = run.result() {
result.to_string()
} else {
"unknown".into()
};
let mut status = Element::new(Tag::Span)
.with_class(&result)
.with_text(&result);
if run.timed_out() {
status.push_text(" (timed out)");
}
let current = match run.state() {
RunState::Triggered => Element::new(Tag::Span)
.with_attribute("state", "triggered")
.with_text("triggered"),
RunState::Running => Element::new(Tag::Span)
.with_class("running)")
.with_text("running"),
RunState::Finished => status,
};
table.push_child(
Element::new(Tag::Tr)
.with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
.with_child(
Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
)
.with_child(Element::new(Tag::Td).with_child(current))
.with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
);
}
doc.push(table);
doc.page()
}
fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
let failed = runs
.iter()
.filter(|run| run.result() == Some(&RunResult::Failure))
.count();
let total = runs.len();
fn failed_recently(runs: &[&Run], n: usize) -> usize {
let recent = if runs.len() >= n {
&runs[runs.len() - N..]
} else {
runs
};
recent
.iter()
.filter(|run| run.result() == Some(&RunResult::Failure))
.count()
}
const N: usize = 5;
Element::new(Tag::Span)
.with_child(
Element::new(Tag::A)
.with_child(
Element::new(Tag::Code)
.with_class("alias)")
.with_text(alias),
)
.with_attribute("href", &format!("{}.html", rid_to_basename(repo_id))),
)
.with_child(Element::new(Tag::Br))
.with_child(Element::new(Tag::Span).with_text(&format!(
"{failed} failed runs ({} recently)",
failed_recently(&runs, N)
)))
.with_child(Element::new(Tag::Br))
.with_child(Element::new(Tag::Span).with_text(&format!("{total} total runs")))
}
fn run_ids(run: Option<&Run>) -> Element {
if let Some(run) = run {
let adapter_run_id = if let Some(x) = run.adapter_run_id() {
Element::new(Tag::Span).with_text(x.as_str())
} else {
Element::new(Tag::Span)
};
Element::new(Tag::Span)
.with_child(
Element::new(Tag::Span)
.with_class("adapter-run-id")
.with_text("Adapter: ")
.with_child(adapter_run_id)
.with_child(Element::new(Tag::Br)),
)
.with_child(
Element::new(Tag::Span)
.with_class("broker-run-id")
.with_text("Broker: ")
.with_text(&run.broker_run_id().to_string()),
)
.with_child(Element::new(Tag::Br))
.with_child(
Element::new(Tag::Span)
.with_class("timestamp")
.with_text("Started: ")
.with_text(run.timestamp()),
)
.with_child(Element::new(Tag::Br))
.with_child(
Element::new(Tag::Span)
.with_class("job-cob-id")
.with_text("Job: ")
.with_text(&run.job_id().map(|id| id.to_string()).unwrap_or("".into())),
)
} else {
Element::new(Tag::Span)
}
}
fn info_url(run: Option<&Run>) -> Element {
if let Some(run) = run {
if let Some(url) = run.adapter_info_url() {
return Element::new(Tag::A)
.with_attribute("href", url)
.with_text("info");
}
}
Element::new(Tag::Span)
}
fn whence_as_html(whence: &Whence) -> Element {
match whence {
Whence::Branch {
name,
commit,
who: _,
} => Element::new(Tag::Span)
.with_text("branch ")
.with_child(
Element::new(Tag::Code)
.with_class("branch)")
.with_text(name),
)
.with_text(", commit ")
.with_child(
Element::new(Tag::Code)
.with_class("commit)")
.with_text(&commit.to_string()),
)
.with_child(Element::new(Tag::Br))
.with_text("from ")
.with_child(
Element::new(Tag::Span)
.with_class("who")
.with_text(whence.who().unwrap_or("<commit author not known>")),
),
Whence::Patch {
patch,
commit,
revision,
who: _,
} => Element::new(Tag::Span)
.with_text("patch ")
.with_child(
Element::new(Tag::Code)
.with_class("branch")
.with_text(&patch.to_string()),
)
.with_child(Element::new(Tag::Br))
.with_text("revision ")
.with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
if let Some(rev) = &revision {
rev.to_string()
} else {
"<unknown patch revision>".to_string()
}
}))
.with_child(Element::new(Tag::Br))
.with_text("commit ")
.with_child(
Element::new(Tag::Code)
.with_class("commit)")
.with_text(&commit.to_string()),
)
.with_child(Element::new(Tag::Br))
.with_text("from ")
.with_child(
Element::new(Tag::Span)
.with_class("who")
.with_text(whence.who().unwrap_or("<patch author not known>")),
),
}
}
fn repos(&self) -> Vec<(String, RepoId)> {
let rids: HashSet<(String, RepoId)> = self
.runs
.values()
.map(|run| (run.repo_alias().to_string(), run.repo_id()))
.collect();
let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
repos.sort();
repos
}
fn repo_alias(&self, wanted: RepoId) -> Option<String> {
self.repos().iter().find_map(|(alias, rid)| {
if *rid == wanted {
Some(alias.into())
} else {
None
}
})
}
fn runs(&self, repoid: RepoId) -> Vec<&Run> {
self.runs
.iter()
.filter_map(|(_, run)| {
if run.repo_id() == repoid {
Some(run)
} else {
None
}
})
.collect()
}
fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
let mut value: Option<&Run> = None;
for run in self.runs(repoid) {
if let Some(latest) = value {
if run.timestamp() > latest.timestamp() {
value = Some(run);
}
} else {
value = Some(run);
}
}
value
}
fn status_as_rss(&self) -> Result<Channel, PageError> {
let mut channel = ChannelBuilder::default();
channel
.title("Radicle CI broker run information")
.description("Latest CI runs known on this instance of the Radicle CI broker.")
.link("FIXME:link");
for item in self.sorted_items()?.iter().take(MAX_RSS_ENTRIES) {
channel.item((*item).clone());
}
Ok(channel.build())
}
fn failed_as_rss(&self) -> Result<Channel, PageError> {
let mut channel = ChannelBuilder::default();
channel
.title("Radicle CI broker run information")
.description("Latest FAILED CI runs on this instance of the Radicle CI broker.")
.link("FIXME:link");
for item in self
.sorted_items_for_runs(|run| {
run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure)
})?
.iter()
.take(MAX_RSS_ENTRIES)
{
channel.item(item.clone());
}
Ok(channel.build())
}
fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
let mut channel = ChannelBuilder::default();
channel
.title("Radicle CI broker run information")
.description("Latest UNFINISHED CI runs on this instance of the Radicle CI broker.")
.link("FIXME:link");
for item in self
.sorted_items_for_runs(|run| {
run.state() == RunState::Triggered || run.state() == RunState::Running
})?
.iter()
.take(MAX_RSS_ENTRIES)
{
channel.item(item.clone());
}
Ok(channel.build())
}
fn sorted_items(&self) -> Result<Vec<Item>, PageError> {
let mut items = vec![];
for (_alias, repo_id) in self.repos() {
for run in self.runs(repo_id) {
if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
items.push(Self::rss_item_from_run(run)?);
}
}
}
items.sort_by_key(|(ts, _)| *ts);
items.reverse();
Ok(items.iter().map(|(_, item)| item.clone()).collect())
}
fn sorted_items_for_runs<F>(&self, pred: F) -> Result<Vec<Item>, PageError>
where
F: Fn(&Run) -> bool,
{
let mut items = vec![];
for (_alias, repo_id) in self.repos() {
for run in self.runs(repo_id) {
if pred(run) {
items.push(Self::rss_item_from_run(run)?);
}
}
}
items.sort_by_key(|(ts, _)| *ts);
items.reverse();
Ok(items.iter().map(|(_, item)| item.clone()).collect())
}
fn rss_item_from_run(run: &Run) -> Result<(OffsetDateTime, Item), PageError> {
let mut guid = Guid::default();
guid.set_value(run.broker_run_id().to_string());
guid.permalink = false;
let state = if run.state() == RunState::Finished {
match run.result() {
Some(result) => result.to_string(),
None => "unknown".to_string(),
}
} else {
run.state().to_string()
};
let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());
let ts = run.timestamp().to_string();
let parsed =
parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
let ts = rfc822_timestamp(&parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;
let entry = RssEntry {
repoid: run.repo_id(),
commit: match run.whence() {
Whence::Branch { commit, .. } => *commit,
Whence::Patch { commit, .. } => *commit,
},
info_url: run.adapter_info_url().map(String::from),
status: run.state(),
result: run.result().cloned(),
};
let mut item = ItemBuilder::default()
.title(Some(title))
.guid(Some(guid))
.pub_date(Some(ts))
.content(entry.to_html().serialize())
.build();
if let Some(url) = run.adapter_info_url() {
item.set_link(Some(url.into()));
};
Ok((parsed, item))
}
}
struct ReportPage {
page: HtmlPage,
}
impl ReportPage {
fn new(node_alias: &str) -> Self {
let mut page = HtmlPage::default();
let title = format!("CI for Radicle node {node_alias}");
page.push_to_head(Element::new(Tag::Title).with_text(&title));
page.push_to_head(Element::new(Tag::Style).with_text(CSS));
page.push_to_head(
Element::new(Tag::Meta)
.with_attribute("http-equiv", "refresh")
.with_attribute("content", REFERESH_INTERVAL),
);
page.push_to_body(Element::new(Tag::H1).with_text(&title));
Self { page }
}
fn desc(&mut self, snippet: &str) {
let mut desc = Element::new(Tag::Div);
desc.push_html(snippet);
self.page.push_to_body(desc);
}
fn h2(&mut self, heading: &str) {
self.push(Element::new(Tag::H2).with_text(heading));
}
fn push(&mut self, e: Element) {
self.page.push_to_body(e);
}
fn rss_feeds(&mut self) {
self.h2("RSS feeds");
self.push(
Element::new(Tag::P)
.with_child(
Element::new(Tag::A)
.with_text("all")
.with_attribute("href", BROKER_RSS),
)
.with_text(" ")
.with_child(
Element::new(Tag::A)
.with_text("failed")
.with_attribute("href", FAILURE_RSS),
),
);
}
fn page(self) -> HtmlPage {
self.page
}
}
struct RssEntry {
repoid: RepoId,
commit: Oid,
info_url: Option<String>,
status: RunState,
result: Option<RunResult>,
}
impl RssEntry {
fn to_html(&self) -> Element {
Element::new(Tag::Div)
.with_class("ci_run")
.with_child(
Element::new(Tag::Span)
.with_class("repoid")
.with_text(&self.repoid.to_string()),
)
.with_child(
Element::new(Tag::Span)
.with_class("commit")
.with_text(&self.commit.to_string()),
)
.with_child(
Element::new(Tag::Span)
.with_class("info_url")
.with_text(self.info_url.as_deref().unwrap_or("")),
)
.with_child(
Element::new(Tag::Span)
.with_class("status")
.with_text(&self.status.to_string()),
)
.with_child(Element::new(Tag::Span).with_class("result").with_text(
match &self.result {
None => "undetermined",
Some(RunResult::Success) => "success",
Some(RunResult::Failure) => "failure",
},
))
}
}
pub struct StatusPage {
node_alias: String,
dirname: Option<PathBuf>,
args: PageArgs,
}
struct PageArgs {
run_rx: NotificationReceiver,
profile: Profile,
db: Db,
once: bool,
desc_snippet: Option<String>,
}
impl StatusPage {
pub fn set_output_dir(&mut self, dirname: &Path) {
self.dirname = Some(dirname.into());
}
pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
Self {
node_alias: "".into(),
dirname: None,
args: PageArgs {
run_rx,
profile,
db,
once,
desc_snippet: None,
},
}
}
pub fn set_description(&mut self, desc: &str) {
self.args.desc_snippet = Some(desc.to_string());
}
fn update_loop(&mut self) -> Result<(), PageError> {
'processing_loop: loop {
self.update_and_write()?;
if self.args.once {
return Ok(());
}
match self.args.run_rx.wait_for_notification() {
Ok(_) => (),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => {
logger::pages_disconnected();
break 'processing_loop;
}
}
}
self.update_and_write()?;
Ok(())
}
fn update_and_write(&mut self) -> Result<(), PageError> {
if let Some(dirname) = &self.dirname {
if dirname.exists() {
let runs = self.args.db.get_all_runs()?;
let events: Result<Vec<QueuedCiEvent>, PageError> = self
.args
.db
.queued_ci_events()?
.iter()
.filter_map(|id| match self.args.db.get_queued_ci_event(id) {
Ok(Some(event)) => match event.event() {
CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
| CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
| CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
| CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
if Self::is_public_repo(&self.args.profile, repo) {
Some(Ok(event))
} else {
None
}
}
_ => None,
},
Ok(None) => None, Err(_) => None, })
.collect();
let mut events = events?;
events.sort_by_cached_key(|e| e.timestamp().to_string());
let data = PageData {
timestamp: now()?,
ci_broker_version: env!("VERSION"),
ci_broker_git_commit: env!("GIT_HEAD"),
node_alias: self.node_alias.clone(),
runs: HashMap::from_iter(
runs.iter()
.map(|run| (run.broker_run_id().clone(), run.clone())),
),
events,
broker_event_counter: 0,
latest_broker_event: None,
latest_ci_run: None,
desc_snippet: self.args.desc_snippet.clone(),
};
let nameless = String::from("nameless repo");
let (status, repos) = {
let status = data.status_page_as_html()?.to_string();
let mut repos = vec![];
for (_, rid) in data.repos() {
let basename = rid_to_basename(rid);
let filename = dirname.join(format!("{basename}.html"));
let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
repos.push((filename, repopage.to_string()));
}
(status, repos)
};
let filename = dirname.join("index.html");
Self::write_file(&filename, &status)?;
for (filename, repopage) in repos {
Self::write_file(&filename, &repopage)?;
}
let filename = dirname.join(STATUS_JSON);
let json = data.status_page_as_json()?;
Self::write_file(&filename, &json)?;
let filename = dirname.join(BROKER_RSS);
let channel = data.status_as_rss()?;
let rss = channel.to_string();
Self::write_file(&filename, &rss)?;
let filename = dirname.join(FAILURE_RSS);
let channel = data.failed_as_rss()?;
let rss = channel.to_string();
Self::write_file(&filename, &rss)?;
let filename = dirname.join(UNFINISHED_RSS);
let channel = data.unfinished_as_rss()?;
let rss = channel.to_string();
Self::write_file(&filename, &rss)?;
}
}
Ok(())
}
fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
if let Ok(repo) = profile.storage.repository(*rid) {
if let Ok(id_doc) = repo.canonical_identity_doc() {
if id_doc.doc.visibility().is_public() {
return true;
}
}
}
false
}
fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
safely_overwrite(filename, text.as_bytes())
.map_err(|err| PageError::Write(filename.into(), err))?;
Ok(())
}
}
impl Worker for StatusPage {
const NAME: &str = "status-page";
type Error = PageError;
fn work(&mut self) -> Result<(), PageError> {
match &self.dirname {
None => logger::pages_directory_unset(),
Some(report_dir) if !report_dir.exists() => {
logger::pages_directory_does_not_exist(report_dir)
}
Some(_) => (),
}
self.update_loop()
}
}
#[derive(Debug, Clone, Serialize)]
struct StatusData {
timestamp: String,
broker_event_counter: usize,
ci_broker_version: &'static str,
ci_broker_git_commit: &'static str,
latest_broker_event: Option<CiEvent>,
latest_ci_run: Option<Run>,
event_queue_length: usize,
}
impl StatusData {
fn as_json(&self) -> Result<String, PageError> {
serde_json::to_string_pretty(self).map_err(PageError::StatusToJson)
}
}
impl From<&PageData> for StatusData {
fn from(page: &PageData) -> Self {
Self {
timestamp: page.timestamp.clone(),
broker_event_counter: page.broker_event_counter,
ci_broker_version: page.ci_broker_version,
ci_broker_git_commit: page.ci_broker_git_commit,
latest_broker_event: page.latest_broker_event.clone(),
latest_ci_run: page.latest_ci_run.clone(),
event_queue_length: page.events.len(),
}
}
}
fn rid_to_basename(repoid: RepoId) -> String {
let mut basename = repoid.to_string();
assert!(basename.starts_with("rad:"));
basename.drain(..4);
basename
}