1use std::{
11 collections::{HashMap, HashSet},
12 path::{Path, PathBuf},
13 sync::mpsc::RecvTimeoutError,
14};
15
16use html_page::{Element, HtmlPage, Tag};
17use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
18use serde::Serialize;
19use time::{macros::format_description, OffsetDateTime};
20
21use radicle::{
22 git::Oid,
23 prelude::RepoId,
24 storage::{ReadRepository, ReadStorage},
25 Profile,
26};
27
28use crate::{
29 ci_event::{CiEvent, CiEventV1},
30 db::{Db, DbError, QueuedCiEvent},
31 logger,
32 msg::{RunId, RunResult},
33 notif::NotificationReceiver,
34 run::{Run, RunState, Whence},
35 util::{parse_timestamp, rfc822_timestamp, safely_overwrite},
36 worker::Worker,
37};
38
39const MAX_RSS_ENTRIES: usize = 10;
40const BROKER_RSS: &str = "index.rss";
41const FAILURE_RSS: &str = "failed.rss";
42const UNFINISHED_RSS: &str = "unfinished.rss";
43const CSS: &str = include_str!("radicle-ci.css");
44const REFERESH_INTERVAL: &str = "60";
45const STATUS_JSON: &str = "status.json";
46
47#[derive(Debug, thiserror::Error)]
49pub enum PageError {
50 #[error(transparent)]
52 Timeformat(#[from] time::error::Format),
53
54 #[error("failed to write status page to {0}")]
55 Write(PathBuf, #[source] crate::util::UtilError),
56
57 #[error("no node alias has been set for builder")]
58 NoAlias,
59
60 #[error("no status data has been set for builder")]
61 NoStatusData,
62
63 #[error(transparent)]
64 Db(#[from] DbError),
65
66 #[error("failed to lock page data structure")]
67 Lock(&'static str),
68
69 #[error("failed to represent status page data as JSON")]
70 StatusToJson(#[source] serde_json::Error),
71
72 #[error("failed to create RSS time stamp from CI run timestamp: {0}")]
73 RssTimestamp(String, #[source] crate::util::UtilError),
74}
75
76fn now() -> Result<String, time::error::Format> {
77 let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
78 OffsetDateTime::now_utc().format(fmt)
79}
80
81struct PageData {
82 timestamp: String,
83 ci_broker_version: &'static str,
84 ci_broker_git_commit: &'static str,
85 node_alias: String,
86 runs: HashMap<RunId, Run>,
87 events: Vec<QueuedCiEvent>,
88 broker_event_counter: usize,
89 latest_broker_event: Option<CiEvent>,
90 latest_ci_run: Option<Run>,
91}
92
93impl PageData {
94 fn status_page_as_json(&self) -> Result<String, PageError> {
95 StatusData::from(self).as_json()
96 }
97
98 fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
99 let mut doc = HtmlPage::default();
100
101 let title = format!("CI for Radicle node {}", self.node_alias);
102 Self::head(&mut doc, &title);
103
104 doc.push_to_body(Element::new(Tag::H1).with_text(&title));
105
106 doc.push_to_body(
107 Element::new(Tag::P)
108 .with_text("RSS feeds: ")
109 .with_child(
110 Element::new(Tag::A)
111 .with_text("all")
112 .with_attribute("href", BROKER_RSS),
113 )
114 .with_text(" ")
115 .with_child(
116 Element::new(Tag::A)
117 .with_text("failed")
118 .with_attribute("href", FAILURE_RSS),
119 ),
120 );
121
122 Self::h1(&mut doc, "Broker status");
123 doc.push_to_body(
124 Element::new(Tag::P)
125 .with_text("Last updated: ")
126 .with_text(&self.timestamp)
127 .with_child(Element::new(Tag::Br))
128 .with_text("CI broker version: ")
129 .with_text(self.ci_broker_version)
130 .with_text(" (commit ")
131 .with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
132 .with_text(")"),
133 );
134
135 Self::h1(&mut doc, "Repositories");
136 Self::p_text(&mut doc, "Latest CI run for each repository.");
137
138 let total = self.runs.len();
139 let failed = self
140 .runs
141 .values()
142 .filter(|run| run.result() == Some(&RunResult::Failure))
143 .count();
144 Self::p_text(
145 &mut doc,
146 &format!("Total {total} CI runs recorded, of which {failed} failed."),
147 );
148
149 let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
150 Element::new(Tag::Tr)
151 .with_child(Element::new(Tag::Th).with_text("Repository"))
152 .with_child(Element::new(Tag::Th).with_text("Run ID"))
153 .with_child(Element::new(Tag::Th).with_text("Status"))
154 .with_child(Element::new(Tag::Th).with_text("Info")),
155 );
156
157 for (alias, rid) in self.repos() {
158 let (run_ids, status, info_url) = {
159 let run = self.latest_run(rid);
160 match run {
161 Some(run) => (
162 Self::run_ids(Some(run)),
163 Self::run_state(run),
164 Self::info_url(Some(run)),
165 ),
166 None => (
167 Self::run_ids(None),
168 Self::run_state_unknown(),
169 Self::info_url(None),
170 ),
171 }
172 };
173
174 let runs = self.runs(rid);
175
176 table.push_child(
177 Element::new(Tag::Tr)
178 .with_child(
179 Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
180 )
181 .with_child(Element::new(Tag::Td).with_child(run_ids))
182 .with_child(
183 Element::new(Tag::Td).with_child(
184 Element::new(Tag::Span)
185 .with_class("run-status")
186 .with_child(status),
187 ),
188 )
189 .with_child(Element::new(Tag::Td).with_child(info_url)),
190 );
191 }
192 doc.push_to_body(table);
193
194 Self::h1(&mut doc, "Event queue");
195 let mut table = Element::new(Tag::Table)
196 .with_class("event-queue")
197 .with_child(
198 Element::new(Tag::Tr)
199 .with_child(Element::new(Tag::Th).with_text("Queue id"))
200 .with_child(Element::new(Tag::Th).with_text("Timestamp"))
201 .with_child(Element::new(Tag::Th).with_text("Event")),
202 );
203 for event in self.events.iter() {
204 fn render_event(
205 repo: &RepoId,
206 alias: Option<String>,
207 refname: &str,
208 commit: &Oid,
209 ) -> Element {
210 let alias = if let Some(alias) = alias {
211 Element::new(Tag::Span).with_child(
212 Element::new(Tag::Span)
213 .with_class("alias")
214 .with_text(&alias),
215 )
216 } else {
217 Element::new(Tag::Span)
218 };
219 Element::new(Tag::Span)
220 .with_child(alias)
221 .with_child(Element::new(Tag::Br))
222 .with_child(
223 Element::new(Tag::Span)
224 .with_class("repoid")
225 .with_text(&repo.to_string()),
226 )
227 .with_child(Element::new(Tag::Br))
228 .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
229 .with_child(Element::new(Tag::Br))
230 .with_child(
231 Element::new(Tag::Span)
232 .with_class("commit")
233 .with_text(&commit.to_string()),
234 )
235 }
236
237 let event_element = match event.event() {
238 CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
239 CiEvent::V1(CiEventV1::BranchCreated {
240 from_node: _,
241 repo,
242 branch,
243 tip,
244 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
245 CiEvent::V1(CiEventV1::BranchUpdated {
246 from_node: _,
247 repo,
248 branch,
249 tip,
250 old_tip: _,
251 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
252 CiEvent::V1(CiEventV1::BranchDeleted {
253 repo, branch, tip, ..
254 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
255 CiEvent::V1(CiEventV1::TagCreated {
256 from_node: _,
257 repo,
258 tag,
259 tip,
260 }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
261 CiEvent::V1(CiEventV1::TagUpdated {
262 from_node: _,
263 repo,
264 tag,
265 tip,
266 old_tip: _,
267 }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
268 CiEvent::V1(CiEventV1::TagDeleted { repo, tag, tip, .. }) => {
269 render_event(repo, self.repo_alias(*repo), tag.as_str(), tip)
270 }
271 CiEvent::V1(CiEventV1::PatchCreated {
272 from_node: _,
273 repo,
274 patch,
275 new_tip,
276 }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
277 CiEvent::V1(CiEventV1::PatchUpdated {
278 from_node: _,
279 repo,
280 patch,
281 new_tip,
282 }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
283 CiEvent::V1(CiEventV1::CanonicalRefUpdated {
284 from_node: _,
285 repo,
286 refname,
287 target,
288 }) => render_event(repo, self.repo_alias(*repo), refname.as_str(), target),
289 };
290
291 table.push_child(
292 Element::new(Tag::Tr)
293 .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
294 .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
295 .with_child(Element::new(Tag::Td).with_child(event_element)),
296 );
297 }
298 doc.push_to_body(table);
299
300 Self::h1(&mut doc, "Recent status");
301 let status = StatusData::from(self).as_json()?;
302 doc.push_to_body(
303 Element::new(Tag::P)
304 .with_text("See also as a separate file ")
305 .with_child(
306 Element::new(Tag::A)
307 .with_attribute("href", STATUS_JSON)
308 .with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
309 )
310 .with_text(": "),
311 );
312 doc.push_to_body(
313 Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
314 );
315
316 Ok(doc)
317 }
318
319 fn run_state(run: &Run) -> Element {
320 let status = match run.state() {
321 RunState::Finished => {
322 if let Some(result) = run.result() {
323 format!("{}, {}", run.state(), result)
324 } else {
325 format!("{} with unknown result", run.state())
326 }
327 }
328 _ => run.state().to_string(),
329 };
330
331 Element::new(Tag::Span)
332 .with_class(&status)
333 .with_text(&status.to_string())
334 }
335
336 fn run_state_unknown() -> Element {
337 Element::new(Tag::Span)
338 .with_class("run-status")
339 .with_text("unknown")
340 }
341
342 fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
343 let mut doc = HtmlPage::default();
344
345 let title = format!("CI runs for repository {alias}");
346 Self::head(&mut doc, &title);
347
348 doc.push_to_body(
349 Element::new(Tag::P).with_child(
350 Element::new(Tag::A)
351 .with_attribute("href", "./index.html")
352 .with_text("Front page"),
353 ),
354 );
355
356 Self::h1(&mut doc, &title);
357
358 doc.push_to_body(
359 Element::new(Tag::P).with_text("Repository ID ").with_child(
360 Element::new(Tag::Code)
361 .with_class("repoid")
362 .with_text(&rid.to_string()),
363 ),
364 );
365 Self::p_text(&mut doc, &format!("Last updated: {timestamp}"));
366
367 let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
368 Element::new(Tag::Tr)
369 .with_child(Element::new(Tag::Th).with_text("Run ID"))
370 .with_child(Element::new(Tag::Th).with_text("Whence"))
371 .with_child(Element::new(Tag::Th).with_text("Status"))
372 .with_child(Element::new(Tag::Th).with_text("Info")),
373 );
374
375 let mut runs = self.runs(rid);
376 runs.sort_by_cached_key(|run| run.timestamp());
377 runs.reverse();
378
379 for run in runs {
380 let result = if let Some(result) = run.result() {
381 result.to_string()
382 } else {
383 "unknown".into()
384 };
385 let mut status = Element::new(Tag::Span)
386 .with_class(&result)
387 .with_text(&result);
388 if run.timed_out() {
389 status.push_text(" (timed out)");
390 }
391
392 let current = match run.state() {
393 RunState::Triggered => Element::new(Tag::Span)
394 .with_attribute("state", "triggered")
395 .with_text("triggered"),
396 RunState::Running => Element::new(Tag::Span)
397 .with_class("running)")
398 .with_text("running"),
399 RunState::Finished => status,
400 };
401
402 table.push_child(
403 Element::new(Tag::Tr)
404 .with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
405 .with_child(
406 Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
407 )
408 .with_child(Element::new(Tag::Td).with_child(current))
409 .with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
410 );
411 }
412
413 doc.push_to_body(table);
414
415 doc
416 }
417
418 fn head(page: &mut HtmlPage, title: &str) {
419 page.push_to_head(Element::new(Tag::Title).with_text(title));
420 page.push_to_head(Element::new(Tag::Style).with_text(CSS));
421 page.push_to_head(
422 Element::new(Tag::Meta)
423 .with_attribute("http-equiv", "refresh")
424 .with_attribute("content", REFERESH_INTERVAL),
425 );
426 }
427
428 fn h1(page: &mut HtmlPage, text: &str) {
429 page.push_to_body(Element::new(Tag::H2).with_text(text));
430 }
431
432 fn p_text(page: &mut HtmlPage, text: &str) {
433 page.push_to_body(Element::new(Tag::P).with_text(text));
434 }
435
436 fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
437 let failed = runs
438 .iter()
439 .filter(|run| run.result() == Some(&RunResult::Failure))
440 .count();
441 let total = runs.len();
442
443 fn failed_recently(runs: &[&Run], n: usize) -> usize {
444 let recent = if runs.len() >= n {
445 &runs[runs.len() - N..]
446 } else {
447 runs
448 };
449 recent
450 .iter()
451 .filter(|run| run.result() == Some(&RunResult::Failure))
452 .count()
453 }
454
455 const N: usize = 5;
456
457 Element::new(Tag::Span)
458 .with_child(
459 Element::new(Tag::A)
460 .with_child(
461 Element::new(Tag::Code)
462 .with_class("alias)")
463 .with_text(alias),
464 )
465 .with_attribute("href", &format!("{}.html", rid_to_basename(repo_id))),
466 )
467 .with_child(Element::new(Tag::Br))
468 .with_child(Element::new(Tag::Span).with_text(&format!(
469 "{failed} failed runs ({} recently)",
470 failed_recently(&runs, N)
471 )))
472 .with_child(Element::new(Tag::Br))
473 .with_child(Element::new(Tag::Span).with_text(&format!("{total} total runs")))
474 }
475
476 fn run_ids(run: Option<&Run>) -> Element {
477 if let Some(run) = run {
478 let adapter_run_id = if let Some(x) = run.adapter_run_id() {
479 Element::new(Tag::Span).with_text(x.as_str())
480 } else {
481 Element::new(Tag::Span)
482 };
483
484 Element::new(Tag::Span)
485 .with_child(
486 Element::new(Tag::Span)
487 .with_class("adapter-run-id")
488 .with_text("Adapter: ")
489 .with_child(adapter_run_id)
490 .with_child(Element::new(Tag::Br)),
491 )
492 .with_child(
493 Element::new(Tag::Span)
494 .with_class("broker-run-id")
495 .with_text("Broker: ")
496 .with_text(&run.broker_run_id().to_string()),
497 )
498 .with_child(Element::new(Tag::Br))
499 .with_child(
500 Element::new(Tag::Span)
501 .with_class("timestamp")
502 .with_text("Started: ")
503 .with_text(run.timestamp()),
504 )
505 .with_child(Element::new(Tag::Br))
506 .with_child(
507 Element::new(Tag::Span)
508 .with_class("job-cob-id")
509 .with_text("Job: ")
510 .with_text(&run.job_id().map(|id| id.to_string()).unwrap_or("".into())),
511 )
512 } else {
513 Element::new(Tag::Span)
514 }
515 }
516
517 fn info_url(run: Option<&Run>) -> Element {
518 if let Some(run) = run {
519 if let Some(url) = run.adapter_info_url() {
520 return Element::new(Tag::A)
521 .with_attribute("href", url)
522 .with_text("info");
523 }
524 }
525 Element::new(Tag::Span)
526 }
527
528 fn whence_as_html(whence: &Whence) -> Element {
529 match whence {
530 Whence::Branch {
531 name,
532 commit,
533 who: _,
534 } => Element::new(Tag::Span)
535 .with_text("branch ")
536 .with_child(
537 Element::new(Tag::Code)
538 .with_class("branch)")
539 .with_text(name),
540 )
541 .with_text(", commit ")
542 .with_child(
543 Element::new(Tag::Code)
544 .with_class("commit)")
545 .with_text(&commit.to_string()),
546 )
547 .with_child(Element::new(Tag::Br))
548 .with_text("from ")
549 .with_child(
550 Element::new(Tag::Span)
551 .with_class("who")
552 .with_text(whence.who().unwrap_or("<commit author not known>")),
553 ),
554 Whence::Patch {
555 patch,
556 commit,
557 revision,
558 who: _,
559 } => Element::new(Tag::Span)
560 .with_text("patch ")
561 .with_child(
562 Element::new(Tag::Code)
563 .with_class("branch")
564 .with_text(&patch.to_string()),
565 )
566 .with_child(Element::new(Tag::Br))
567 .with_text("revision ")
568 .with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
569 if let Some(rev) = &revision {
570 rev.to_string()
571 } else {
572 "<unknown patch revision>".to_string()
573 }
574 }))
575 .with_child(Element::new(Tag::Br))
576 .with_text("commit ")
577 .with_child(
578 Element::new(Tag::Code)
579 .with_class("commit)")
580 .with_text(&commit.to_string()),
581 )
582 .with_child(Element::new(Tag::Br))
583 .with_text("from ")
584 .with_child(
585 Element::new(Tag::Span)
586 .with_class("who")
587 .with_text(whence.who().unwrap_or("<patch author not known>")),
588 ),
589 }
590 }
591
592 fn repos(&self) -> Vec<(String, RepoId)> {
593 let rids: HashSet<(String, RepoId)> = self
594 .runs
595 .values()
596 .map(|run| (run.repo_alias().to_string(), run.repo_id()))
597 .collect();
598 let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
599 repos.sort();
600 repos
601 }
602
603 fn repo_alias(&self, wanted: RepoId) -> Option<String> {
604 self.repos().iter().find_map(|(alias, rid)| {
605 if *rid == wanted {
606 Some(alias.into())
607 } else {
608 None
609 }
610 })
611 }
612
613 fn runs(&self, repoid: RepoId) -> Vec<&Run> {
614 self.runs
615 .iter()
616 .filter_map(|(_, run)| {
617 if run.repo_id() == repoid {
618 Some(run)
619 } else {
620 None
621 }
622 })
623 .collect()
624 }
625
626 fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
627 let mut value: Option<&Run> = None;
628 for run in self.runs(repoid) {
629 if let Some(latest) = value {
630 if run.timestamp() > latest.timestamp() {
631 value = Some(run);
632 }
633 } else {
634 value = Some(run);
635 }
636 }
637 value
638 }
639
640 fn status_as_rss(&self) -> Result<Channel, PageError> {
641 let mut channel = ChannelBuilder::default();
642 channel
643 .title("Radicle CI broker run information")
644 .description("Latest CI runs known on this instance of the Radicle CI broker.")
645 .link("FIXME:link");
646
647 for item in self.sorted_items()?.iter().take(MAX_RSS_ENTRIES) {
648 channel.item((*item).clone());
649 }
650 Ok(channel.build())
651 }
652
653 fn failed_as_rss(&self) -> Result<Channel, PageError> {
654 let mut channel = ChannelBuilder::default();
655 channel
656 .title("Radicle CI broker run information")
657 .description("Latest FAILED CI runs on this instance of the Radicle CI broker.")
658 .link("FIXME:link");
659
660 for item in self
661 .sorted_items_for_runs(|run| {
662 run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure)
663 })?
664 .iter()
665 .take(MAX_RSS_ENTRIES)
666 {
667 channel.item(item.clone());
668 }
669
670 Ok(channel.build())
671 }
672
673 fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
674 let mut channel = ChannelBuilder::default();
675 channel
676 .title("Radicle CI broker run information")
677 .description("Latest UNFINISHED CI runs on this instance of the Radicle CI broker.")
678 .link("FIXME:link");
679 for item in self
680 .sorted_items_for_runs(|run| {
681 run.state() == RunState::Triggered || run.state() == RunState::Running
682 })?
683 .iter()
684 .take(MAX_RSS_ENTRIES)
685 {
686 channel.item(item.clone());
687 }
688 Ok(channel.build())
689 }
690
691 fn sorted_items(&self) -> Result<Vec<Item>, PageError> {
692 let mut items = vec![];
693 for (_alias, repo_id) in self.repos() {
694 for run in self.runs(repo_id) {
695 if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
696 items.push(Self::rss_item_from_run(run)?);
697 }
698 }
699 }
700 items.sort_by_key(|(ts, _)| *ts);
701 items.reverse();
702 Ok(items.iter().map(|(_, item)| item.clone()).collect())
703 }
704
705 fn sorted_items_for_runs<F>(&self, pred: F) -> Result<Vec<Item>, PageError>
706 where
707 F: Fn(&Run) -> bool,
708 {
709 let mut items = vec![];
710 for (_alias, repo_id) in self.repos() {
711 for run in self.runs(repo_id) {
712 if pred(run) {
713 items.push(Self::rss_item_from_run(run)?);
714 }
715 }
716 }
717 items.sort_by_key(|(ts, _)| *ts);
718 items.reverse();
719 Ok(items.iter().map(|(_, item)| item.clone()).collect())
720 }
721
722 fn rss_item_from_run(run: &Run) -> Result<(OffsetDateTime, Item), PageError> {
723 let mut guid = Guid::default();
724 guid.set_value(run.broker_run_id().to_string());
725 guid.permalink = false; let state = if run.state() == RunState::Finished {
728 match run.result() {
729 Some(result) => result.to_string(),
730 None => "unknown".to_string(),
731 }
732 } else {
733 run.state().to_string()
734 };
735 let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());
736
737 let ts = run.timestamp().to_string();
738 let parsed =
739 parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
740 let ts = rfc822_timestamp(&parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;
741
742 let entry = RssEntry {
743 repoid: run.repo_id(),
744 commit: match run.whence() {
745 Whence::Branch { commit, .. } => *commit,
746 Whence::Patch { commit, .. } => *commit,
747 },
748 info_url: run.adapter_info_url().map(String::from),
749 status: run.state(),
750 result: run.result().cloned(),
751 };
752
753 let mut item = ItemBuilder::default()
754 .title(Some(title))
755 .guid(Some(guid))
756 .pub_date(Some(ts))
757 .content(entry.to_html().serialize())
758 .build();
759
760 if let Some(url) = run.adapter_info_url() {
761 item.set_link(Some(url.into()));
762 };
763
764 Ok((parsed, item))
765 }
766}
767
768struct RssEntry {
769 repoid: RepoId,
770 commit: Oid,
771 info_url: Option<String>,
772 status: RunState,
773 result: Option<RunResult>,
774}
775
776impl RssEntry {
777 fn to_html(&self) -> Element {
778 Element::new(Tag::Div)
779 .with_class("ci_run")
780 .with_child(
781 Element::new(Tag::Span)
782 .with_class("repoid")
783 .with_text(&self.repoid.to_string()),
784 )
785 .with_child(
786 Element::new(Tag::Span)
787 .with_class("commit")
788 .with_text(&self.commit.to_string()),
789 )
790 .with_child(
791 Element::new(Tag::Span)
792 .with_class("info_url")
793 .with_text(self.info_url.as_deref().unwrap_or("")),
794 )
795 .with_child(
796 Element::new(Tag::Span)
797 .with_class("status")
798 .with_text(&self.status.to_string()),
799 )
800 .with_child(Element::new(Tag::Span).with_class("result").with_text(
801 match &self.result {
802 None => "undetermined",
803 Some(RunResult::Success) => "success",
804 Some(RunResult::Failure) => "failure",
805 },
806 ))
807 }
808}
809
810pub struct StatusPage {
817 node_alias: String,
818 dirname: Option<PathBuf>,
819 args: PageArgs,
820}
821
822struct PageArgs {
823 run_rx: NotificationReceiver,
824 profile: Profile,
825 db: Db,
826 once: bool,
827}
828
829impl StatusPage {
830 pub fn set_output_dir(&mut self, dirname: &Path) {
831 self.dirname = Some(dirname.into());
832 }
833
834 pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
835 Self {
836 node_alias: "".into(),
837 dirname: None,
838 args: PageArgs {
839 run_rx,
840 profile,
841 db,
842 once,
843 },
844 }
845 }
846
847 fn update_loop(&mut self) -> Result<(), PageError> {
848 'processing_loop: loop {
849 self.update_and_write()?;
850 if self.args.once {
851 return Ok(());
852 }
853
854 match self.args.run_rx.wait_for_notification() {
855 Ok(_) => (),
856 Err(RecvTimeoutError::Timeout) => (),
857 Err(RecvTimeoutError::Disconnected) => {
858 logger::pages_disconnected();
859 break 'processing_loop;
860 }
861 }
862 }
863
864 self.update_and_write()?;
866
867 Ok(())
868 }
869
870 fn update_and_write(&mut self) -> Result<(), PageError> {
871 if let Some(dirname) = &self.dirname {
872 if dirname.exists() {
873 let runs = self.args.db.get_all_runs()?;
874
875 let events: Result<Vec<QueuedCiEvent>, PageError> = self
878 .args
879 .db
880 .queued_ci_events()?
881 .iter()
882 .filter_map(|id| match self.args.db.get_queued_ci_event(id) {
883 Ok(Some(event)) => match event.event() {
884 CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
885 CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
886 | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
887 | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
888 | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
889 if Self::is_public_repo(&self.args.profile, repo) {
890 Some(Ok(event))
891 } else {
892 None
893 }
894 }
895 _ => None,
896 },
897 Ok(None) => None, Err(_) => None, })
900 .collect();
901 let mut events = events?;
902 events.sort_by_cached_key(|e| e.timestamp().to_string());
903
904 let data = PageData {
905 timestamp: now()?,
906 ci_broker_version: env!("VERSION"),
907 ci_broker_git_commit: env!("GIT_HEAD"),
908 node_alias: self.node_alias.clone(),
909 runs: HashMap::from_iter(
910 runs.iter()
911 .map(|run| (run.broker_run_id().clone(), run.clone())),
912 ),
913 events,
914 broker_event_counter: 0,
915 latest_broker_event: None,
916 latest_ci_run: None,
917 };
918
919 let nameless = String::from("nameless repo");
920
921 let (status, repos) = {
924 let status = data.status_page_as_html()?.to_string();
925
926 let mut repos = vec![];
927 for (_, rid) in data.repos() {
928 let basename = rid_to_basename(rid);
929 let filename = dirname.join(format!("{basename}.html"));
930 let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
931 let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
932 repos.push((filename, repopage.to_string()));
933 }
934
935 (status, repos)
936 };
937
938 let filename = dirname.join("index.html");
939 Self::write_file(&filename, &status)?;
940
941 for (filename, repopage) in repos {
942 Self::write_file(&filename, &repopage)?;
943 }
944
945 let filename = dirname.join(STATUS_JSON);
946 let json = data.status_page_as_json()?;
947 Self::write_file(&filename, &json)?;
948
949 let filename = dirname.join(BROKER_RSS);
950 let channel = data.status_as_rss()?;
951 let rss = channel.to_string();
952 Self::write_file(&filename, &rss)?;
953
954 let filename = dirname.join(FAILURE_RSS);
955 let channel = data.failed_as_rss()?;
956 let rss = channel.to_string();
957 Self::write_file(&filename, &rss)?;
958
959 let filename = dirname.join(UNFINISHED_RSS);
960 let channel = data.unfinished_as_rss()?;
961 let rss = channel.to_string();
962 Self::write_file(&filename, &rss)?;
963 }
964 }
965 Ok(())
966 }
967
968 fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
969 if let Ok(repo) = profile.storage.repository(*rid) {
970 if let Ok(id_doc) = repo.canonical_identity_doc() {
971 if id_doc.doc.visibility().is_public() {
972 return true;
973 }
974 }
975 }
976 false
977 }
978
979 fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
980 safely_overwrite(filename, text.as_bytes())
981 .map_err(|err| PageError::Write(filename.into(), err))?;
982 Ok(())
983 }
984}
985
986impl Worker for StatusPage {
987 const NAME: &str = "status-page";
988 type Error = PageError;
989 fn work(&mut self) -> Result<(), PageError> {
990 match &self.dirname {
991 None => logger::pages_directory_unset(),
992 Some(report_dir) if !report_dir.exists() => {
993 logger::pages_directory_does_not_exist(report_dir)
994 }
995 Some(_) => (),
996 }
997
998 self.update_loop()
999 }
1000}
1001
1002#[derive(Debug, Clone, Serialize)]
1003struct StatusData {
1004 timestamp: String,
1005 broker_event_counter: usize,
1006 ci_broker_version: &'static str,
1007 ci_broker_git_commit: &'static str,
1008 latest_broker_event: Option<CiEvent>,
1009 latest_ci_run: Option<Run>,
1010 event_queue_length: usize,
1011}
1012
1013impl StatusData {
1014 fn as_json(&self) -> Result<String, PageError> {
1015 serde_json::to_string_pretty(self).map_err(PageError::StatusToJson)
1016 }
1017}
1018
1019impl From<&PageData> for StatusData {
1020 fn from(page: &PageData) -> Self {
1021 Self {
1022 timestamp: page.timestamp.clone(),
1023 broker_event_counter: page.broker_event_counter,
1024 ci_broker_version: page.ci_broker_version,
1025 ci_broker_git_commit: page.ci_broker_git_commit,
1026 latest_broker_event: page.latest_broker_event.clone(),
1027 latest_ci_run: page.latest_ci_run.clone(),
1028 event_queue_length: page.events.len(),
1029 }
1030 }
1031}
1032
1033fn rid_to_basename(repoid: RepoId) -> String {
1034 let mut basename = repoid.to_string();
1035 assert!(basename.starts_with("rad:"));
1036 basename.drain(..4);
1037 basename
1038}