tag2upload_service_manager/
global.rs1use crate::prelude::*;
12
13use crate::o2m_support::OracleSeq;
14
15use std::sync::Mutex;
16
17const USER_AGENT: &str =
18 "tag2upload-service-manager https://salsa.debian.org/dgit-team";
19
20#[derive(Debug)]
21pub struct Globals {
22 pub cli_options: CliOptions,
23 pub config: Config,
24 pub computed_config: ComputedConfig,
25
26 pub state: watch::Sender<State>,
33
34 pub worker_tracker: Arc<WorkerTracker>,
35
36 pub db_trigger: watch::Sender<DbAssocState>,
54
55 pub db_initialised: OnceLock<Result<(), Arc<AE>>>,
62
63 pub http_client: reqwest::Client,
64 pub dns_resolver: dns::Resolver,
65
66 pub running: watch::Receiver<Option<Running>>,
73
74 pub host_retry_state: Mutex<retry::HostStates>,
81
82 pub scratch_dir: String,
83 pub temp_dir_retain: Option<tempfile::TempDir>,
84 pub tera: Tera,
85 pub version_info: crate::version::Info,
86 pub test_suppl: test::GlobalSupplement,
87 pub last_worker_restart: RwLock<SystemTime>,
88
89 pub next_seq: Mutex<OracleSeq>,
96
97 pub oldest_norestart_seq: watch::Sender<OracleSeq>,
102}
103
104#[derive(Debug)]
105pub struct Unchecked<T> { pub unchecked: T }
106
107#[derive(Clone, Debug)]
108pub struct Running {
109 pub port: u16,
110}
111
112pub struct Started {
113 pub rocket: RocketIgnite,
114}
115
116#[derive(Default, Debug)]
120pub struct DbAssocState {
121 pub paused: Option<IsPaused>,
122
123 pub release_jobs: HashMap<JobId, WorkerFidelity>,
124}
125
126#[derive(Debug, Clone, Copy)] pub struct IsPaused;
127#[derive(Debug, Clone, Copy)] pub struct IsThrottled;
128
129#[cfg(not(test))]
130mod imp_globals {
131 use super::*;
132 static GLOBALS: OnceLock<Arc<Globals>> = OnceLock::new();
133
134 pub fn globals() -> Arc<Globals> {
144 GLOBALS.get()
145 .expect("Using globals() before set")
146 .clone()
147 }
148
149 pub(crate) fn set_globals(g: Arc<Globals>) {
151 GLOBALS.set(g)
152 .expect("set_globals called more than once?");
153 }
154}
155#[cfg(test)]
156mod imp_globals {
157 use super::*;
164 thread_local! {
165 static GLOBALS: RefCell<Option<Arc<Globals>>> =
166 const { RefCell::new(None) };
167 }
168 pub fn globals() -> Arc<Globals> {
169 GLOBALS.with_borrow(|gl| gl.clone())
170 .expect("Using globals() before set")
171 }
172 pub(crate) fn set_globals(g: Arc<Globals>) {
173 if let Some(was) = GLOBALS.replace(Some(g)) {
174 panic!("set_globals called more than once, previously {was:?}");
175 }
176 }
177}
178pub use imp_globals::*;
179
180#[derive(Debug)]
186pub struct ShuttingDown;
187
188#[derive(Debug)]
189pub struct State {
190 pub shutdown_reason: Option<Result<ShuttingDown, InternalError>>,
191
192 pub test_suppl: test::StateSupplement,
193}
194
195impl State {
196 pub fn new(test_suppl: test::StateSupplement) -> Self {
197 State {
198 shutdown_reason: None,
199 test_suppl,
200 }
201 }
202
203 pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
205 match &self.shutdown_reason {
206 Some(Ok(ShuttingDown)) => Err(ShuttingDown),
207 Some(Err(_)) => Err(ShuttingDown),
208 None => Ok(()),
209 }
210 }
211}
212
213impl Globals {
214 pub async fn await_shutdown(&self) -> ShuttingDown {
218 self.state.subscribe()
219 .wait_for_then(|state| {
220 state.shutdown_reason
221 .as_ref()
222 .map(|_: &Result<ShuttingDown, IE>| ())
223 })
224 .await
225 .unwrap_or_else(|e| {
226 error!("shutdown handler task recv failed {e}");
227 });
228
229 ShuttingDown
230 }
231}
232
233pub async fn test_hook<S: Display>(
236 #[allow(unused_variables)]
237 point: impl FnOnce() -> S + Send + Sync,
238) {
239 #[cfg(test)]
240 test::hook_point(point().to_string()).await;
241}
242
243#[derive(Debug)]
244pub enum HttpFetchedRaw {
245 Success(reqwest::Response),
246 SuccessFile { file: String, data: String },
247 NotFound(HttpNotFound),
248}
249
250impl Globals {
251 pub fn spawn_task_running(
253 self: &Arc<Self>,
254 what: impl Display,
255 fut: impl Future<Output = TaskResult> + Send + 'static,
256 ) {
257 let gl = self.clone();
258 self.spawn_task_immediate(what, async move {
259 let Running { .. } = gl.await_running().await?;
260 fut.await
261 })
262 }
263
264 pub fn spawn_task_immediate(
266 self: &Arc<Self>,
267 what: impl Display,
268 fut: impl Future<Output = TaskResult> + Send + 'static,
269 ) {
270 trace!("spawning task {what}");
271 tokio::spawn({
272 let what = what.to_string();
273 async move {
274 let _: TaskResult = AssertUnwindSafe(fut)
275 .catch_unwind()
276 .await
277 .unwrap_or_else(|_: Box<dyn Any + Send>| {
278 Err(internal!("task {what} panicked!").into())
279 });
280 }
281 });
282 }
283
284 pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
286 self.state.borrow().check_shutdown()
287 }
288
289 pub async fn await_running(&self) -> Result<Running, ShuttingDown> {
290 match self.running.clone().wait_for_then(|p| p.clone()).await {
291 Ok(y) => Ok(y),
292 Err(e) => {
293 debug!("shutting down, no port: {e}");
294 Err(ShuttingDown)
295 }
296 }
297 }
298
299 pub async fn http_fetch_raw(
300 &self,
301 url: Url,
302 ) -> Result<HttpFetchedRaw, AE> {
303 use HttpFetchedRaw as H;
304
305 let via_file = |file: String| {
306 if {
307 file
308 .strip_prefix('/').expect("allow not absolute?")
309 .split('/')
310 .any(|ent| ent == "" || ent == "..")
311 } {
312 return Err(anyhow!("file url {url:?} path traversal"));
313 }
314 let data = match fs::read_to_string(&file) {
315 Err(e) if e.kind() == io::ErrorKind::NotFound => {
316 return Ok(H::NotFound(HttpNotFound))
317 },
318 Err(e) if e.kind() == io::ErrorKind::IsADirectory => {
319 let data = "DIRECTORY LISTING".into();
320 return Ok(H::SuccessFile { file, data })
321 },
322 other => {
323 other.with_context(|| format!("{file:?}"))
324 .context("failed to read file")?
325 },
326 };
327 Ok(H::SuccessFile { file, data })
328 };
329
330 let file_allows = [
335 self.config.testing.fake_https_dir.clone()
336 .into_iter().collect_vec(),
337 #[cfg(test)]
338 self.test_suppl.url_map_target_files()
339 ];
340 let file_allows = file_allows.iter().flatten();
341
342 if file_allows.clone().next().is_some() {
343 let url = url.to_string();
344
345 if let Some(path) = url.strip_prefix("file://") {
346 if !file_allows.clone().any(|allow| path.starts_with(allow)) {
347 return Err(anyhow!("file url {url:?} not allowed ({:?})",
348 file_allows.clone().collect_vec()));
349 }
350 return via_file(path.to_owned());
351 }
352 }
353
354 if let Some(fake) = &self.config.testing.fake_https_dir {
355 let url = url.to_string();
356 let url = url.strip_prefix("https://")
357 .ok_or_else(|| anyhow!(
358 "failed to strip https:// prefix from url {url:?}"
359 ))?;
360 let file = format!("{fake}/{url}");
361 return via_file(file);
362 }
363
364 let response = self.http_client.get(url)
365 .send().await.context("send")?;
366 if response.status() == reqwest::StatusCode::NOT_FOUND {
367 return Ok(H::NotFound(HttpNotFound))
368 }
369 let response = response
370 .error_for_status().context("status")?;
371
372 Ok(H::Success(response))
373 }
374
375 pub async fn http_fetch_json_maybe<T: serde::de::DeserializeOwned>(
376 &self,
377 url: Url,
378 ) -> Result<Result<T, HttpNotFound>, AE> {
379 use HttpFetchedRaw as H;
380
381 let r = match self.http_fetch_raw(url.clone()).await? {
382 H::Success(response) => {
383 response.json().await.context("response")?
384 },
385 H::SuccessFile { file, data } => {
386 serde_json::from_str(&data)
387 .with_context(|| format!("{file:?}"))
388 .context("failed to deser file")?
389 },
390 H::NotFound(e) => {
391 return Ok(Err(e))
392 },
393 };
394
395 Ok(Ok(r))
396 }
397
398 pub async fn http_fetch_json<T: serde::de::DeserializeOwned>(
399 &self,
400 url: Url,
401 ) -> Result<T, AE> {
402 Ok(self.http_fetch_json_maybe(url).await??)
403 }
404}
405
406macro_rules! test_hook_url { { $url:ident } => {
407 #[cfg(test)]
408 let $url = crate::test::UrlMappable::map(&$url);
409} }
410
411pub fn shutdown_start_tasks(
412 globals: &Arc<Globals>,
413) -> Result<(), StartupError> {
414 use tokio::signal::unix::{signal, SignalKind as SK};
415
416 #[cfg(test)]
417 match globals.t_shutdown_handlers() {
418 Ok(()) => {},
419 Err(()) => return Ok(()),
420 };
421
422 let mut terminate = signal(SK::terminate())
423 .into_internal("failed to set up SIGTERM handler")?;
424
425 globals.spawn_task_immediate("shutdown SIGTEREM watch", {
426 let globals = globals.clone();
427 async move {
428
429 let () = terminate.recv().await
430 .ok_or_else(|| internal!("no more SIGTERM reception?!"))?;
431
432 globals.state.send_modify(|state| {
433 match state.shutdown_reason {
434 None => {
435 info!("received SIGTERM, shutting down...");
436 state.shutdown_reason = Some(Ok(ShuttingDown));
437 }
438 Some(Err(_)) => {
439 info!("SIGTERM, but already crashing!");
440 },
441 Some(Ok(ShuttingDown)) => {
442 info!("SIGTERM, but already shutting down");
443 }
444 }
445 });
446
447 Ok(TaskWorkComplete {})
448 }
449 });
450
451 globals.spawn_task_immediate("shutdown handler", {
452 let globals = globals.clone();
453 async move {
454
455 let _: ShuttingDown = globals.await_shutdown().await;
456
457 let mut subscription = globals.db_trigger.subscribe();
458
459 match loop {
460 let job = find_job_deferring_shutdown()?;
461
462 let Some::<JobRow>(job) = job
463 else { break Ok(()); };
464
465 info!(jid=%job.jid, "shutdown awaits completion of build");
466
467 match subscription.changed().await {
468 Ok(()) => {}, Err(e) => break Err(e),
470 }
471 } {
472 Ok(()) => info!("clean shutdown complete."),
473 Err(e) => error!("shutdown terminating early, watch {}", e),
474 };
475
476 unsafe {
477 libc::kill(0, libc::SIGHUP);
478 error!("SIGHUP didn't kill us!!");
479 std::process::abort();
480 }
481 }
482 });
483
484 Ok(())
485}
486
487pub fn find_job_deferring_shutdown() -> Result<Option<JobRow>, IE> {
488 db_transaction(TN::Readonly, |dbt| {
489 dbt.bsql_query_01(&bsql!("
490 SELECT * FROM jobs
491 WHERE processing != ''
492 AND status = " (JobStatus::Building) "
493 ORDER BY last_update ASC
494 "))
495 })
496}
497
498fn handle_errors_during_rocket_liftoff(
499 context: impl Display,
500 f: impl FnOnce() -> Result<(), AE>,
501) -> Result<(), ShuttingDown> {
502 f()
503 .with_context(|| context.to_string())
504 .map_err(|ae| {
505 IE::new_without_backtrace(ae).note_only();
506 ShuttingDown
507 })
508}
509
510fn write_port_report_file(gl: &Arc<Globals>, port: u16)
511 -> Result<(), ShuttingDown>
512{
513 if let Some(file) = gl.config.files.port_report_file.clone() {
514 trace!(?file, "writing port");
515
516 handle_errors_during_rocket_liftoff(format_args!("{file:?}"), || {
517 let f = fs::File::create(&file).context("open")?;
518 let mut f = io::BufWriter::new(f);
519 writeln!(f, "{port}").context("write")?;
520 f.flush().context("write (flush)")?;
521 Ok(())
522 })?;
523 }
524 Ok(())
525}
526
527pub fn resolve_config(
528 cli_options: CliOptions,
529 base_config: Figment,
530) -> Result<Unchecked<WholeConfig>, StartupError> {
531 use StartupError as SE;
532
533 let rocket_base_config = {
534 use rocket::config::*;
535 rocket::Config {
536 shutdown: Shutdown {
537 ctrlc: false,
538 signals: HashSet::new(),
539 ..Default::default()
540 },
541 ..rocket::Config::release_default()
542 }
543 };
544
545 let config = {
546 let mut config = base_config;
547 for file in &cli_options.config {
548 let fig = figment::providers::Toml::file_exact(&file);
549 config = config.merge(fig);
550 }
551 config
552 }
553 .join(figment::providers::Serialized::default(
555 "log.schedule",
556 logging::default_schedule(),
557 ))
558 .join(figment::providers::Serialized::default(
561 "rocket",
562 rocket_base_config,
563 ))
564 .join(figment::providers::Serialized::default(
565 "rocket",
566 json! {{
570 }},
572 ));
573
574 let config = {
575 let mut c = config;
576
577 for s in &cli_options.config_toml {
578 c = c.merge(figment::providers::Toml::string(s));
579 }
580
581 c
582 };
583
584 let rocket_config = config
585 .focus("rocket");
586
587 let config: Config = config
588 .extract()
589 .map_err(SE::ParseConfig)?;
590
591 let computed_config = ComputedConfig::try_from(&config)?;
592
593 let unchecked =
594 WholeConfig { cli_options, config, computed_config, rocket_config };
595 Ok(Unchecked { unchecked })
596}
597
598impl Unchecked<WholeConfig> {
599 pub fn check(self) -> Result<WholeConfig, StartupError> {
600 self.unchecked.config.check()?;
601 Ok(self.unchecked)
602 }
603}
604
605pub async fn startup(
606 config: WholeConfig,
607 test_global_suppl: test::GlobalSupplement,
608 test_state_suppl: test::StateSupplement,
609 rocket_hook: impl FnOnce(RocketBuild) -> RocketBuild,
610) -> Result<Started, StartupError> {
611 use StartupError as SE;
612
613 let WholeConfig { cli_options, config, computed_config, rocket_config }
614 = config;
615
616 logging::setup(&config)?;
617
618 debug!(?config, ?cli_options, ?computed_config, "starting");
619
620 let scratch_dir;
621 let temp_dir_retain;
622 match &config.files.scratch_dir {
623 Some(s) => {
624 scratch_dir = s.clone();
625 temp_dir_retain = None;
626 }
627 None => {
628 let td = tempfile::TempDir::new()
629 .context("create temp dir")
630 .map_err(SE::TempDir)?;
631 scratch_dir = td.path().to_str()
632 .ok_or_else(|| anyhow!("not utf-8"))
633 .map_err(SE::TempDir)?
634 .to_owned();
635 temp_dir_retain = Some(td);
636 }
637 };
638
639 remove_dir_all::remove_dir_contents(&scratch_dir)
640 .context("clean out old contents of scratch directory")
641 .map_err(SE::TempDir)?;
642
643 let version_info = crate::version::calculate_describe(&config.files);
644
645 let http_client = reqwest::Client::builder()
646 .user_agent(USER_AGENT)
647 .timeout(*config.timeouts.http_request)
648 .build()?;
649
650 let dns_resolver = dns::Resolver::builder_tokio()?.build();
651
652 let tera = ui_render::tera_templates(&config)?;
656
657 let (running_tx, running) = watch::channel(None);
658
659 let last_worker_restart = Globals::now_systemtime_inner(
660 &config,
661 &test_global_suppl,
662 ).into();
663
664 let globals = Arc::new(Globals {
665 cli_options,
666 config,
667 computed_config,
668 db_trigger: watch::Sender::new(DbAssocState::default()),
669 db_initialised: OnceLock::new(),
670 state: watch::Sender::new(State::new(test_state_suppl)),
671 worker_tracker: Default::default(),
672 http_client,
673 dns_resolver,
674 scratch_dir,
675 running,
676 tera,
677 last_worker_restart,
678 host_retry_state: Default::default(),
679 version_info,
680 temp_dir_retain,
681 test_suppl: test_global_suppl,
682 next_seq: Mutex::new(1),
683 oldest_norestart_seq: watch::Sender::new(1),
684 });
685 set_globals(globals.clone());
686
687 let listener = o2m_listener::Listener::new(&globals)?;
688
689 shutdown_start_tasks(&globals)?;
690
691 expire::start_task(&globals);
692
693 globals.spawn_task_running("unpause", {
694 let globals = globals.clone();
695 db_workflow::unpause_task(globals)
696 });
697
698 let rocket = rocket::custom(&rocket_config);
699 let rocket = rocket_hook(rocket);
700 let rocket = api_routes::mount_api(rocket);
701 let rocket = ui_routes::mount_ui_catchers(rocket);
702
703 let rocket = rocket.attach({
704 let globals = globals.clone();
705
706 rocket::fairing::AdHoc::on_liftoff(
707 "spawn workers",
708 |rocket: &rocket::Rocket<_>| Box::pin(
709 async move {
710 handle_errors_during_rocket_liftoff("initialise db", || {
711 db_support::initialise(&globals)?;
712 Ok(())
713 })?;
714
715 if globals.state.borrow().shutdown_reason.is_some() {
716 trace!(
718 "shutdown triggered during startup, not continuing"
719 );
720 return Err(ShuttingDown);
721 }
722 fetcher::start_tasks(&globals);
723 listener.start_task();
724
725 let port = rocket.config().port;
726 write_port_report_file(&globals, port)?;
731
732 let running = Running {
733 port
734 };
735 running_tx.send(Some(running.clone()))
736 .expect("no-one wanted our port");
737
738 info!(?running, "running");
739
740 Ok::<_, ShuttingDown>(())
741 }.map(
742 |r| r.unwrap_or_else(|ShuttingDown| {
745 eprintln!("exiting due to error during startup!");
746 error!("exiting due to error during startup!");
747
748 #[cfg(not(test))]
749 std::process::exit(16);
750 #[allow(unreachable_code)] { unreachable!(); }
752 })
753 )
754 )
755 )
756 });
757
758 let rocket = rocket.ignite().await?;
759
760 Ok(Started {
761 rocket,
762 })
763}