1use std::{
11 collections::{HashMap, HashSet},
12 fs::write,
13 path::{Path, PathBuf},
14 sync::mpsc::RecvTimeoutError,
15 thread::{spawn, JoinHandle},
16 time::Duration,
17};
18
19use html_page::{Element, HtmlPage, Tag};
20use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
21use serde::Serialize;
22use time::{macros::format_description, OffsetDateTime};
23
24use radicle::{
25 git::Oid,
26 prelude::RepoId,
27 storage::{ReadRepository, ReadStorage},
28 Profile,
29};
30
31use crate::{
32 ci_event::{CiEvent, CiEventV1},
33 db::{Db, DbError, QueuedCiEvent},
34 logger,
35 msg::{RunId, RunResult},
36 notif::NotificationReceiver,
37 run::{Run, RunState, Whence},
38 util::{parse_timestamp, rfc822_timestamp},
39};
40
41const MAX_RSS_ENTRIES: usize = 10;
42const BROKER_RSS: &str = "index.rss";
43const FAILURE_RSS: &str = "failed.rss";
44const UNFINISHED_RSS: &str = "unfinished.rss";
45const CSS: &str = include_str!("radicle-ci.css");
46const REFERESH_INTERVAL: &str = "300";
47const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
48const STATUS_JSON: &str = "status.json";
49
50#[derive(Debug, thiserror::Error)]
52pub enum PageError {
53 #[error(transparent)]
55 Timeformat(#[from] time::error::Format),
56
57 #[error("failed to write status page to {0}")]
58 Write(PathBuf, #[source] std::io::Error),
59
60 #[error("no node alias has been set for builder")]
61 NoAlias,
62
63 #[error("no status data has been set for builder")]
64 NoStatusData,
65
66 #[error(transparent)]
67 Db(#[from] DbError),
68
69 #[error("failed to lock page data structure")]
70 Lock(&'static str),
71
72 #[error("failed to represent status page data as JSON")]
73 StatusToJson(#[source] serde_json::Error),
74
75 #[error("failed to create RSS time stamp from CI run timestamp: {0}")]
76 RssTimestamp(String, #[source] crate::util::UtilError),
77}
78
79fn now() -> Result<String, time::error::Format> {
80 let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
81 OffsetDateTime::now_utc().format(fmt)
82}
83
84struct PageData {
85 timestamp: String,
86 ci_broker_version: &'static str,
87 ci_broker_git_commit: &'static str,
88 node_alias: String,
89 runs: HashMap<RunId, Run>,
90 events: Vec<QueuedCiEvent>,
91 broker_event_counter: usize,
92 latest_broker_event: Option<CiEvent>,
93 latest_ci_run: Option<Run>,
94}
95
96impl PageData {
97 fn status_page_as_json(&self) -> Result<String, PageError> {
98 StatusData::from(self).as_json()
99 }
100
101 fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
102 let mut doc = HtmlPage::default();
103
104 let title = format!("CI for Radicle node {}", self.node_alias);
105 Self::head(&mut doc, &title);
106
107 doc.push_to_body(Element::new(Tag::H1).with_text(&title));
108
109 doc.push_to_body(
110 Element::new(Tag::P)
111 .with_text("RSS feeds: ")
112 .with_child(
113 Element::new(Tag::A)
114 .with_text("all")
115 .with_attribute("href", BROKER_RSS),
116 )
117 .with_text(" ")
118 .with_child(
119 Element::new(Tag::A)
120 .with_text("failed")
121 .with_attribute("href", FAILURE_RSS),
122 ),
123 );
124
125 Self::h1(&mut doc, "Broker status");
126 doc.push_to_body(
127 Element::new(Tag::P)
128 .with_text("Last updated: ")
129 .with_text(&self.timestamp)
130 .with_child(Element::new(Tag::Br))
131 .with_text("CI broker version: ")
132 .with_text(self.ci_broker_version)
133 .with_text(" (commit ")
134 .with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
135 .with_text(")"),
136 );
137
138 Self::h1(&mut doc, "Repositories");
139 Self::p_text(&mut doc, "Latest CI run for each repository.");
140
141 let total = self.runs.len();
142 let failed = self
143 .runs
144 .values()
145 .filter(|run| run.result() == Some(&RunResult::Failure))
146 .count();
147 Self::p_text(
148 &mut doc,
149 &format!("Total {total} CI runs recorded, of which {failed} failed."),
150 );
151
152 let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
153 Element::new(Tag::Tr)
154 .with_child(Element::new(Tag::Th).with_text("Repository"))
155 .with_child(Element::new(Tag::Th).with_text("Run ID"))
156 .with_child(Element::new(Tag::Th).with_text("Status"))
157 .with_child(Element::new(Tag::Th).with_text("Info")),
158 );
159
160 for (alias, rid) in self.repos() {
161 let (run_ids, status, info_url) = {
162 let run = self.latest_run(rid);
163 match run {
164 Some(run) => (
165 Self::run_ids(Some(run)),
166 Self::run_state(run),
167 Self::info_url(Some(run)),
168 ),
169 None => (
170 Self::run_ids(None),
171 Self::run_state_unknown(),
172 Self::info_url(None),
173 ),
174 }
175 };
176
177 let runs = self.runs(rid);
178
179 table.push_child(
180 Element::new(Tag::Tr)
181 .with_child(
182 Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
183 )
184 .with_child(Element::new(Tag::Td).with_child(run_ids))
185 .with_child(
186 Element::new(Tag::Td).with_child(
187 Element::new(Tag::Span)
188 .with_class("run-status")
189 .with_child(status),
190 ),
191 )
192 .with_child(Element::new(Tag::Td).with_child(info_url)),
193 );
194 }
195 doc.push_to_body(table);
196
197 Self::h1(&mut doc, "Event queue");
198 let mut table = Element::new(Tag::Table)
199 .with_class("event-queue")
200 .with_child(
201 Element::new(Tag::Tr)
202 .with_child(Element::new(Tag::Th).with_text("Queue id"))
203 .with_child(Element::new(Tag::Th).with_text("Timestamp"))
204 .with_child(Element::new(Tag::Th).with_text("Event")),
205 );
206 for event in self.events.iter() {
207 fn render_event(
208 repo: &RepoId,
209 alias: Option<String>,
210 refname: &str,
211 commit: &Oid,
212 ) -> Element {
213 let alias = if let Some(alias) = alias {
214 Element::new(Tag::Span).with_child(
215 Element::new(Tag::Span)
216 .with_class("alias")
217 .with_text(&alias),
218 )
219 } else {
220 Element::new(Tag::Span)
221 };
222 Element::new(Tag::Span)
223 .with_child(alias)
224 .with_child(Element::new(Tag::Br))
225 .with_child(
226 Element::new(Tag::Span)
227 .with_class("repoid")
228 .with_text(&repo.to_string()),
229 )
230 .with_child(Element::new(Tag::Br))
231 .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
232 .with_child(Element::new(Tag::Br))
233 .with_child(
234 Element::new(Tag::Span)
235 .with_class("commit")
236 .with_text(&commit.to_string()),
237 )
238 }
239
240 let event_element = match event.event() {
241 CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
242 CiEvent::V1(CiEventV1::BranchCreated {
243 from_node: _,
244 repo,
245 branch,
246 tip,
247 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
248 CiEvent::V1(CiEventV1::BranchUpdated {
249 from_node: _,
250 repo,
251 branch,
252 tip,
253 old_tip: _,
254 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
255 CiEvent::V1(CiEventV1::BranchDeleted {
256 repo, branch, tip, ..
257 }) => render_event(repo, self.repo_alias(*repo), branch, tip),
258 CiEvent::V1(CiEventV1::PatchCreated {
259 from_node: _,
260 repo,
261 patch,
262 new_tip,
263 }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
264 CiEvent::V1(CiEventV1::PatchUpdated {
265 from_node: _,
266 repo,
267 patch,
268 new_tip,
269 }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
270 };
271
272 table.push_child(
273 Element::new(Tag::Tr)
274 .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
275 .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
276 .with_child(Element::new(Tag::Td).with_child(event_element)),
277 );
278 }
279 doc.push_to_body(table);
280
281 Self::h1(&mut doc, "Recent status");
282 let status = StatusData::from(self).as_json()?;
283 doc.push_to_body(
284 Element::new(Tag::P)
285 .with_text("See also as a separate file ")
286 .with_child(
287 Element::new(Tag::A)
288 .with_attribute("href", STATUS_JSON)
289 .with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
290 )
291 .with_text(": "),
292 );
293 doc.push_to_body(
294 Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
295 );
296
297 Ok(doc)
298 }
299
300 fn run_state(run: &Run) -> Element {
301 let status = match run.state() {
302 RunState::Finished => {
303 if let Some(result) = run.result() {
304 format!("{}, {}", run.state(), result)
305 } else {
306 format!("{} with unknown result", run.state())
307 }
308 }
309 _ => run.state().to_string(),
310 };
311
312 Element::new(Tag::Span)
313 .with_class(&status)
314 .with_text(&status.to_string())
315 }
316
317 fn run_state_unknown() -> Element {
318 Element::new(Tag::Span)
319 .with_class("run-status")
320 .with_text("unknown")
321 }
322
323 fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
324 let mut doc = HtmlPage::default();
325
326 let title = format!("CI runs for repository {}", alias);
327 Self::head(&mut doc, &title);
328
329 doc.push_to_body(
330 Element::new(Tag::P).with_child(
331 Element::new(Tag::A)
332 .with_attribute("href", "../index.html")
333 .with_text("Front page"),
334 ),
335 );
336
337 Self::h1(&mut doc, &title);
338
339 doc.push_to_body(
340 Element::new(Tag::P).with_text("Repository ID ").with_child(
341 Element::new(Tag::Code)
342 .with_class("repoid")
343 .with_text(&rid.to_string()),
344 ),
345 );
346 Self::p_text(&mut doc, &format!("Last updated: {timestamp}"));
347
348 let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
349 Element::new(Tag::Tr)
350 .with_child(Element::new(Tag::Th).with_text("Run ID"))
351 .with_child(Element::new(Tag::Th).with_text("Whence"))
352 .with_child(Element::new(Tag::Th).with_text("Status"))
353 .with_child(Element::new(Tag::Th).with_text("Info")),
354 );
355
356 let mut runs = self.runs(rid);
357 runs.sort_by_cached_key(|run| run.timestamp());
358 runs.reverse();
359 for run in runs {
360 let current = match run.state() {
361 RunState::Triggered => Element::new(Tag::Span)
362 .with_attribute("state", "triggered")
363 .with_text("triggered"),
364 RunState::Running => Element::new(Tag::Span)
365 .with_class("running)")
366 .with_text("running"),
367 RunState::Finished => {
368 let result = if let Some(result) = run.result() {
369 result.to_string()
370 } else {
371 "unknown".into()
372 };
373 Element::new(Tag::Span)
374 .with_class(&result)
375 .with_text(&result)
376 }
377 };
378
379 table.push_child(
380 Element::new(Tag::Tr)
381 .with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
382 .with_child(
383 Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
384 )
385 .with_child(Element::new(Tag::Td).with_child(current))
386 .with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
387 );
388 }
389
390 doc.push_to_body(table);
391
392 doc
393 }
394
395 fn head(page: &mut HtmlPage, title: &str) {
396 page.push_to_head(Element::new(Tag::Title).with_text(title));
397 page.push_to_head(Element::new(Tag::Style).with_text(CSS));
398 page.push_to_head(
399 Element::new(Tag::Meta)
400 .with_attribute("http-equiv", "refresh")
401 .with_attribute("content", REFERESH_INTERVAL),
402 );
403 }
404
405 fn h1(page: &mut HtmlPage, text: &str) {
406 page.push_to_body(Element::new(Tag::H2).with_text(text));
407 }
408
409 fn p_text(page: &mut HtmlPage, text: &str) {
410 page.push_to_body(Element::new(Tag::P).with_text(text));
411 }
412
413 fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
414 let failed = runs
415 .iter()
416 .filter(|run| run.result() == Some(&RunResult::Failure))
417 .count();
418 let total = runs.len();
419
420 fn failed_recently(runs: &[&Run], n: usize) -> usize {
421 let recent = if runs.len() >= n {
422 &runs[runs.len() - N..]
423 } else {
424 runs
425 };
426 recent
427 .iter()
428 .filter(|run| run.result() == Some(&RunResult::Failure))
429 .count()
430 }
431
432 const N: usize = 5;
433
434 Element::new(Tag::Span)
435 .with_child(
436 Element::new(Tag::A)
437 .with_child(
438 Element::new(Tag::Code)
439 .with_class("alias)")
440 .with_text(alias),
441 )
442 .with_attribute("href", &format!("{}.html", rid_to_basename(repo_id))),
443 )
444 .with_child(Element::new(Tag::Br))
445 .with_child(Element::new(Tag::Span).with_text(&format!(
446 "{failed} failed runs ({} recently)",
447 failed_recently(&runs, N)
448 )))
449 .with_child(Element::new(Tag::Br))
450 .with_child(Element::new(Tag::Span).with_text(&format!("{total} total runs")))
451 }
452
453 fn run_ids(run: Option<&Run>) -> Element {
454 if let Some(run) = run {
455 let adapter_run_id = if let Some(x) = run.adapter_run_id() {
456 Element::new(Tag::Span).with_text(x.as_str())
457 } else {
458 Element::new(Tag::Span)
459 };
460
461 Element::new(Tag::Span).with_child(
462 Element::new(Tag::Span)
463 .with_class("adapter-run-id")
464 .with_child(adapter_run_id)
465 .with_child(Element::new(Tag::Br))
466 .with_child(
467 Element::new(Tag::Span)
468 .with_class("broker-run-id")
469 .with_text(&run.broker_run_id().to_string()),
470 )
471 .with_child(Element::new(Tag::Br))
472 .with_text(run.timestamp()),
473 )
474 } else {
475 Element::new(Tag::Span)
476 }
477 }
478
479 fn info_url(run: Option<&Run>) -> Element {
480 if let Some(run) = run {
481 if let Some(url) = run.adapter_info_url() {
482 return Element::new(Tag::A)
483 .with_attribute("href", url)
484 .with_text("info");
485 }
486 }
487 Element::new(Tag::Span)
488 }
489
490 fn whence_as_html(whence: &Whence) -> Element {
491 match whence {
492 Whence::Branch {
493 name,
494 commit,
495 who: _,
496 } => Element::new(Tag::Span)
497 .with_text("branch ")
498 .with_child(
499 Element::new(Tag::Code)
500 .with_class("branch)")
501 .with_text(name),
502 )
503 .with_text(", commit ")
504 .with_child(
505 Element::new(Tag::Code)
506 .with_class("commit)")
507 .with_text(&commit.to_string()),
508 )
509 .with_child(Element::new(Tag::Br))
510 .with_text("from ")
511 .with_child(
512 Element::new(Tag::Span)
513 .with_class("who")
514 .with_text(whence.who().unwrap_or("<commit author not known>")),
515 ),
516 Whence::Patch {
517 patch,
518 commit,
519 revision,
520 who: _,
521 } => Element::new(Tag::Span)
522 .with_text("patch ")
523 .with_child(
524 Element::new(Tag::Code)
525 .with_class("branch")
526 .with_text(&patch.to_string()),
527 )
528 .with_child(Element::new(Tag::Br))
529 .with_text("revision ")
530 .with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
531 if let Some(rev) = &revision {
532 rev.to_string()
533 } else {
534 "<unknown patch revision>".to_string()
535 }
536 }))
537 .with_child(Element::new(Tag::Br))
538 .with_text("commit ")
539 .with_child(
540 Element::new(Tag::Code)
541 .with_class("commit)")
542 .with_text(&commit.to_string()),
543 )
544 .with_child(Element::new(Tag::Br))
545 .with_text("from ")
546 .with_child(
547 Element::new(Tag::Span)
548 .with_class("who")
549 .with_text(whence.who().unwrap_or("<patch author not known>")),
550 ),
551 }
552 }
553
554 fn repos(&self) -> Vec<(String, RepoId)> {
555 let rids: HashSet<(String, RepoId)> = self
556 .runs
557 .values()
558 .map(|run| (run.repo_alias().to_string(), run.repo_id()))
559 .collect();
560 let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
561 repos.sort();
562 repos
563 }
564
565 fn repo_alias(&self, wanted: RepoId) -> Option<String> {
566 self.repos().iter().find_map(|(alias, rid)| {
567 if *rid == wanted {
568 Some(alias.into())
569 } else {
570 None
571 }
572 })
573 }
574
575 fn runs(&self, repoid: RepoId) -> Vec<&Run> {
576 self.runs
577 .iter()
578 .filter_map(|(_, run)| {
579 if run.repo_id() == repoid {
580 Some(run)
581 } else {
582 None
583 }
584 })
585 .collect()
586 }
587
588 fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
589 let mut value: Option<&Run> = None;
590 for run in self.runs(repoid) {
591 if let Some(latest) = value {
592 if run.timestamp() > latest.timestamp() {
593 value = Some(run);
594 }
595 } else {
596 value = Some(run);
597 }
598 }
599 value
600 }
601
602 fn status_as_rss(&self) -> Result<Channel, PageError> {
603 let mut channel = ChannelBuilder::default();
604 channel
605 .title("Radicle CI broker run information")
606 .description("All CI runs known to this instance of the Radicle CI broker.")
607 .link("FIXME:link");
608 let mut items = vec![];
609 for (_alias, repo_id) in self.repos() {
610 for run in self.runs(repo_id) {
611 items.push(Self::rss_item_from_run(run)?);
612 }
613 }
614 items.sort_by_key(|item| item.pub_date().map(|s| s.to_string()));
615 items.reverse();
616 for item in items.iter().take(MAX_RSS_ENTRIES).cloned() {
617 channel.item(item);
618 }
619 Ok(channel.build())
620 }
621
622 fn failed_as_rss(&self) -> Result<Channel, PageError> {
623 let mut channel = ChannelBuilder::default();
624 channel
625 .title("Radicle CI broker run information")
626 .description("All CI runs known to this instance of the Radicle CI broker.")
627 .link("FIXME:link");
628 for (_alias, repo_id) in self.repos() {
629 for run in self.runs(repo_id) {
630 if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
631 channel.item(Self::rss_item_from_run(run)?);
632 }
633 }
634 }
635 Ok(channel.build())
636 }
637
638 fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
639 let mut channel = ChannelBuilder::default();
640 channel
641 .title("Radicle CI broker run information")
642 .description("All CI runs known to this instance of the Radicle CI broker.")
643 .link("FIXME:link");
644 for (_alias, repo_id) in self.repos() {
645 for run in self.runs(repo_id) {
646 if run.state() == RunState::Triggered || run.state() == RunState::Running {
647 channel.item(Self::rss_item_from_run(run)?);
648 }
649 }
650 }
651 Ok(channel.build())
652 }
653
654 fn rss_item_from_run(run: &Run) -> Result<Item, PageError> {
655 let mut guid = Guid::default();
656 guid.set_value(run.broker_run_id().to_string());
657
658 let state = if run.state() == RunState::Finished {
659 match run.result() {
660 Some(result) => result.to_string(),
661 None => "unknown".to_string(),
662 }
663 } else {
664 run.state().to_string()
665 };
666 let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());
667
668 let ts = run.timestamp().to_string();
669 let parsed =
670 parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
671 let ts = rfc822_timestamp(parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;
672
673 let entry = RssEntry {
674 repoid: run.repo_id(),
675 commit: match run.whence() {
676 Whence::Branch { commit, .. } => *commit,
677 Whence::Patch { commit, .. } => *commit,
678 },
679 info_url: run.adapter_info_url().map(String::from),
680 status: run.state(),
681 result: run.result().cloned(),
682 };
683
684 let mut item = ItemBuilder::default()
685 .title(Some(title))
686 .guid(Some(guid))
687 .pub_date(Some(ts))
688 .content(entry.to_html().serialize())
689 .build();
690
691 if let Some(url) = run.adapter_info_url() {
692 item.set_link(Some(url.into()));
693 };
694
695 Ok(item)
696 }
697}
698
699struct RssEntry {
700 repoid: RepoId,
701 commit: Oid,
702 info_url: Option<String>,
703 status: RunState,
704 result: Option<RunResult>,
705}
706
707impl RssEntry {
708 fn to_html(&self) -> Element {
709 Element::new(Tag::Div)
710 .with_class("ci_run")
711 .with_child(
712 Element::new(Tag::Span)
713 .with_class("repoid")
714 .with_text(&self.repoid.to_string()),
715 )
716 .with_child(
717 Element::new(Tag::Span)
718 .with_class("commit")
719 .with_text(&self.commit.to_string()),
720 )
721 .with_child(
722 Element::new(Tag::Span)
723 .with_class("info_url")
724 .with_text(self.info_url.as_deref().unwrap_or("")),
725 )
726 .with_child(
727 Element::new(Tag::Span)
728 .with_class("status")
729 .with_text(&self.status.to_string()),
730 )
731 .with_child(Element::new(Tag::Span).with_class("result").with_text(
732 match &self.result {
733 None => "undetermined",
734 Some(RunResult::Success) => "success",
735 Some(RunResult::Failure) => "failure",
736 },
737 ))
738 }
739}
740
741#[derive(Default)]
748pub struct StatusPage {
749 node_alias: String,
750 dirname: Option<PathBuf>,
751}
752
753impl StatusPage {
754 pub fn set_output_dir(&mut self, dirname: &Path) {
755 self.dirname = Some(dirname.into());
756 }
757
758 pub fn update_in_thread(
759 mut self,
760 run_rx: NotificationReceiver,
761 profile: Profile,
762 db: Db,
763 once: bool,
764 ) -> JoinHandle<Result<(), PageError>> {
765 match &self.dirname {
766 None => logger::pages_directory_unset(),
767 Some(report_dir) if !report_dir.exists() => {
768 logger::pages_directory_does_not_exist(report_dir)
769 }
770 Some(_) => (),
771 }
772
773 self.node_alias = profile.config.alias().to_string();
774 logger::pages_interval(UPDATE_INTERVAL);
775
776 spawn(move || {
777 logger::pages_start();
778 let result = self.update_loop(run_rx, profile, db, once);
779 logger::pages_end(&result);
780 result
781 })
782 }
783
784 fn update_loop(
785 mut self,
786 run_rx: NotificationReceiver,
787 profile: Profile,
788 db: Db,
789 once: bool,
790 ) -> Result<(), PageError> {
791 'processing_loop: loop {
792 self.update_and_write(&profile, &db)?;
793 if once {
794 return Ok(());
795 }
796
797 match run_rx.wait_for_notification() {
798 Ok(_) => (),
799 Err(RecvTimeoutError::Timeout) => (),
800 Err(RecvTimeoutError::Disconnected) => {
801 logger::pages_disconnected();
802 break 'processing_loop;
803 }
804 }
805 }
806
807 self.update_and_write(&profile, &db)?;
809
810 Ok(())
811 }
812
813 fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
814 if let Some(dirname) = &self.dirname {
815 if dirname.exists() {
816 let runs = db.get_all_runs()?;
817
818 let events: Result<Vec<QueuedCiEvent>, PageError> = db
821 .queued_ci_events()?
822 .iter()
823 .filter_map(|id| match db.get_queued_ci_event(id) {
824 Ok(Some(event)) => match event.event() {
825 CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
826 CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
827 | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
828 | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
829 | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
830 if Self::is_public_repo(profile, repo) {
831 Some(Ok(event))
832 } else {
833 None
834 }
835 }
836 _ => None,
837 },
838 Ok(None) => None, Err(_) => None, })
841 .collect();
842 let mut events = events?;
843 events.sort_by_cached_key(|e| e.timestamp().to_string());
844
845 let data = PageData {
846 timestamp: now()?,
847 ci_broker_version: env!("CARGO_PKG_VERSION"),
848 ci_broker_git_commit: env!("GIT_HEAD"),
849 node_alias: self.node_alias.clone(),
850 runs: HashMap::from_iter(
851 runs.iter()
852 .map(|run| (run.broker_run_id().clone(), run.clone())),
853 ),
854 events,
855 broker_event_counter: 0,
856 latest_broker_event: None,
857 latest_ci_run: None,
858 };
859
860 let nameless = String::from("nameless repo");
861
862 let (status, repos) = {
865 let status = data.status_page_as_html()?.to_string();
866
867 let mut repos = vec![];
868 for (_, rid) in data.repos() {
869 let basename = rid_to_basename(rid);
870 let filename = dirname.join(format!("{basename}.html"));
871 let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
872 let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
873 repos.push((filename, repopage.to_string()));
874 }
875
876 (status, repos)
877 };
878
879 let filename = dirname.join("index.html");
880 Self::write_file(&filename, &status)?;
881
882 for (filename, repopage) in repos {
883 Self::write_file(&filename, &repopage)?;
884 }
885
886 let filename = dirname.join(STATUS_JSON);
887 let json = data.status_page_as_json()?;
888 Self::write_file(&filename, &json)?;
889
890 let filename = dirname.join(BROKER_RSS);
891 let channel = data.status_as_rss()?;
892 let rss = channel.to_string();
893 Self::write_file(&filename, &rss)?;
894
895 let filename = dirname.join(FAILURE_RSS);
896 let channel = data.failed_as_rss()?;
897 let rss = channel.to_string();
898 Self::write_file(&filename, &rss)?;
899
900 let filename = dirname.join(UNFINISHED_RSS);
901 let channel = data.unfinished_as_rss()?;
902 let rss = channel.to_string();
903 Self::write_file(&filename, &rss)?;
904 }
905 }
906 Ok(())
907 }
908
909 fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
910 if let Ok(repo) = profile.storage.repository(*rid) {
911 if let Ok(id_doc) = repo.canonical_identity_doc() {
912 if id_doc.doc.visibility().is_public() {
913 return true;
914 }
915 }
916 }
917 false
918 }
919
920 fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
921 write(filename, text).map_err(|e| PageError::Write(filename.into(), e))?;
922 Ok(())
923 }
924}
925
926#[derive(Debug, Clone, Serialize)]
927struct StatusData {
928 timestamp: String,
929 broker_event_counter: usize,
930 ci_broker_version: &'static str,
931 ci_broker_git_commit: &'static str,
932 latest_broker_event: Option<CiEvent>,
933 latest_ci_run: Option<Run>,
934 event_queue_length: usize,
935}
936
937impl StatusData {
938 fn as_json(&self) -> Result<String, PageError> {
939 serde_json::to_string_pretty(self).map_err(PageError::StatusToJson)
940 }
941}
942
943impl From<&PageData> for StatusData {
944 fn from(page: &PageData) -> Self {
945 Self {
946 timestamp: page.timestamp.clone(),
947 broker_event_counter: page.broker_event_counter,
948 ci_broker_version: page.ci_broker_version,
949 ci_broker_git_commit: page.ci_broker_git_commit,
950 latest_broker_event: page.latest_broker_event.clone(),
951 latest_ci_run: page.latest_ci_run.clone(),
952 event_queue_length: page.events.len(),
953 }
954 }
955}
956
957fn rid_to_basename(repoid: RepoId) -> String {
958 let mut basename = repoid.to_string();
959 assert!(basename.starts_with("rad:"));
960 basename.drain(..4);
961 basename
962}