tag2upload_service_manager/
global.rs1
2use crate::prelude::*;
3
4use crate::o2m_support::OracleSeq;
5
6use std::sync::Mutex;
7
8const USER_AGENT: &str =
9 "tag2upload-service-manager https://salsa.debian.org/dgit-team";
10
11#[derive(Debug)]
12pub struct Globals {
13 pub cli_options: CliOptions,
14 pub config: Config,
15 pub computed_config: ComputedConfig,
16
17 pub state: watch::Sender<State>,
23
24 pub worker_tracker: Arc<WorkerTracker>,
25
26 pub db_trigger: watch::Sender<DbAssocState>,
43
44 pub http_client: reqwest::Client,
45 pub dns_resolver: dns::Resolver,
46
47 pub running: watch::Receiver<Option<Running>>,
53
54 pub scratch_dir: String,
55 pub temp_dir_retain: Option<tempfile::TempDir>,
56 pub tera: Tera,
57 pub version_info: crate::version::Info,
58 pub test_suppl: test::GlobalSupplement,
59
60 pub next_seq: Mutex<OracleSeq>,
67
68 pub oldest_norestart_seq: watch::Sender<OracleSeq>,
73}
74
75#[derive(Debug)]
76pub struct Unchecked<T> { pub unchecked: T }
77
78#[derive(Clone, Debug)]
79pub struct Running {
80 pub port: u16,
81}
82
83pub struct Started {
84 pub rocket: RocketIgnite,
85}
86
87#[derive(Default, Debug)]
91pub struct DbAssocState {
92 pub paused: Option<IsPaused>,
93
94 pub release_jobs: HashMap<JobId, WorkerFidelity>,
95}
96
97#[derive(Debug, Clone, Copy)] pub struct IsPaused;
98#[derive(Debug, Clone, Copy)] pub struct IsThrottled;
99
100#[cfg(not(test))]
101mod imp_globals {
102 use super::*;
103 static GLOBALS: OnceLock<Arc<Globals>> = OnceLock::new();
104
105 pub fn globals() -> Arc<Globals> {
115 GLOBALS.get()
116 .expect("Using globals() before set")
117 .clone()
118 }
119
120 pub(crate) fn set_globals(g: Arc<Globals>) {
122 GLOBALS.set(g)
123 .expect("set_globals called more than once?");
124 }
125}
126#[cfg(test)]
127mod imp_globals {
128 use super::*;
135 thread_local! {
136 static GLOBALS: RefCell<Option<Arc<Globals>>> =
137 const { RefCell::new(None) };
138 }
139 pub fn globals() -> Arc<Globals> {
140 GLOBALS.with_borrow(|gl| gl.clone())
141 .expect("Using globals() before set")
142 }
143 pub(crate) fn set_globals(g: Arc<Globals>) {
144 if let Some(was) = GLOBALS.replace(Some(g)) {
145 panic!("set_globals called more than once, previously {was:?}");
146 }
147 }
148}
149pub use imp_globals::*;
150
151#[derive(Debug)]
157pub struct ShuttingDown;
158
159#[derive(Debug)]
160pub struct State {
161 pub shutdown_reason: Option<Result<ShuttingDown, InternalError>>,
162
163 pub test_suppl: test::StateSupplement,
164}
165
166impl State {
167 pub fn new(test_suppl: test::StateSupplement) -> Self {
168 State {
169 shutdown_reason: None,
170 test_suppl,
171 }
172 }
173
174 pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
176 match &self.shutdown_reason {
177 Some(Ok(ShuttingDown)) => Err(ShuttingDown),
178 Some(Err(_)) => Err(ShuttingDown),
179 None => Ok(()),
180 }
181 }
182}
183
184impl Globals {
185 pub async fn await_shutdown(&self) -> ShuttingDown {
189 self.state.subscribe()
190 .wait_for_then(|state| {
191 state.shutdown_reason
192 .as_ref()
193 .map(|_: &Result<ShuttingDown, IE>| ())
194 })
195 .await
196 .unwrap_or_else(|e| {
197 error!("shutdown handler task recv failed {e}");
198 });
199
200 ShuttingDown
201 }
202}
203
204pub async fn test_hook<S: Display>(
207 #[allow(unused_variables)]
208 point: impl FnOnce() -> S + Send + Sync,
209) {
210 #[cfg(test)]
211 test::hook_point(point().to_string()).await;
212}
213
214
215impl Globals {
216 pub fn spawn_task_running(
218 self: &Arc<Self>,
219 what: impl Display,
220 fut: impl Future<Output = TaskResult> + Send + 'static,
221 ) {
222 let gl = self.clone();
223 self.spawn_task_immediate(what, async move {
224 let Running { .. } = gl.await_running().await?;
225 fut.await
226 })
227 }
228
229 pub fn spawn_task_immediate(
231 self: &Arc<Self>,
232 what: impl Display,
233 fut: impl Future<Output = TaskResult> + Send + 'static,
234 ) {
235 tokio::spawn({
236 let what = what.to_string();
237 async move {
238 let _: TaskResult = AssertUnwindSafe(fut)
239 .catch_unwind()
240 .await
241 .unwrap_or_else(|_: Box<dyn Any + Send>| {
242 Err(internal!("task {what} panicked!").into())
243 });
244 }
245 });
246 }
247
248 pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
250 self.state.borrow().check_shutdown()
251 }
252
253 pub async fn await_running(&self) -> Result<Running, ShuttingDown> {
254 match self.running.clone().wait_for_then(|p| p.clone()).await {
255 Ok(y) => Ok(y),
256 Err(e) => {
257 debug!("shutting down, no port: {e}");
258 Err(ShuttingDown)
259 }
260 }
261 }
262
263 pub async fn http_fetch_json<T: serde::de::DeserializeOwned>(
264 &self,
265 url: Url,
266 ) -> Result<T, AE> {
267 if let Some(fake) = &self.config.testing.fake_https_dir {
268 let url = url.to_string();
269 let url = url.strip_prefix("https://")
270 .ok_or_else(|| anyhow!(
271 "failed to strip https:// prefix from url {url:?}"
272 ))?;
273 let fake_file = format!("{fake}/{url}");
274 let data = fs::read_to_string(&fake_file)
275 .with_context(|| format!("{fake_file:?}"))
276 .context("failed to read fake file")?;
277 let data = serde_json::from_str(&data)
278 .with_context(|| format!("{fake_file:?}"))
279 .context("failed to deser fake file")?;
280 return Ok(data);
281 }
282
283 self.http_client.get(url)
284 .send().await.context("send")?
285 .error_for_status().context("status")?
286 .json().await.context("response")
287 }
288}
289
290macro_rules! test_hook_url { { $url:ident } => {
291 #[cfg(test)]
292 let $url = crate::test::UrlMappable::map(&$url);
293} }
294
295pub fn shutdown_start_tasks(
296 globals: &Arc<Globals>,
297) -> Result<(), StartupError> {
298 use tokio::signal::unix::{signal, SignalKind as SK};
299
300 #[cfg(test)]
301 match globals.t_shutdown_handlers() {
302 Ok(()) => {},
303 Err(()) => return Ok(()),
304 };
305
306 let mut terminate = signal(SK::terminate())
307 .into_internal("failed to set up SIGTERM handler")?;
308
309 globals.spawn_task_immediate("shutdown SIGTEREM watch", {
310 let globals = globals.clone();
311 async move {
312
313 let () = terminate.recv().await
314 .ok_or_else(|| internal!("no more SIGTERM reception?!"))?;
315
316 globals.state.send_modify(|state| {
317 match state.shutdown_reason {
318 None => {
319 info!("received SIGTERM, shutting down...");
320 state.shutdown_reason = Some(Ok(ShuttingDown));
321 }
322 Some(Err(_)) => {
323 info!("SIGTERM, but already crashing!");
324 },
325 Some(Ok(ShuttingDown)) => {
326 info!("SIGTERM, but already shutting down");
327 }
328 }
329 });
330
331 Ok(TaskWorkComplete {})
332 }
333 });
334
335 globals.spawn_task_immediate("shutdown handler", {
336 let globals = globals.clone();
337 async move {
338
339 let _: ShuttingDown = globals.await_shutdown().await;
340
341 let mut subscription = globals.db_trigger.subscribe();
342
343 match loop {
344 let job = find_job_deferring_shutdown()?;
345
346 let Some::<JobRow>(job) = job
347 else { break Ok(()); };
348
349 info!(jid=%job.jid, "shutdown awaits completion of build");
350
351 match subscription.changed().await {
352 Ok(()) => {}, Err(e) => break Err(e),
354 }
355 } {
356 Ok(()) => info!("clean shutdown complete."),
357 Err(e) => error!("shutdown terminating early, watch {}", e),
358 };
359
360 unsafe {
361 libc::kill(0, libc::SIGHUP);
362 error!("SIGHUP didn't kill us!!");
363 std::process::abort();
364 }
365 }
366 });
367
368 Ok(())
369}
370
371pub fn find_job_deferring_shutdown() -> Result<Option<JobRow>, IE> {
372 db_transaction(TN::Readonly, |dbt| {
373 dbt.bsql_query_01(bsql!("
374 SELECT * FROM jobs
375 WHERE processing != ''
376 AND status = " (JobStatus::Building) "
377 ORDER BY last_update ASC
378 "))
379 })?
380}
381
382fn write_port_report_file(gl: &Arc<Globals>, port: u16) {
383 if let Some(file) = gl.config.files.port_report_file.clone() {
384 trace!(?file, "writing port");
385 (|| {
386 let f = fs::File::create(&file).context("open")?;
387 let mut f = io::BufWriter::new(f);
388 writeln!(f, "{port}").context("write")?;
389 f.flush().context("write (flush)")?;
390 Ok::<_, AE>(())
391 })()
392 .with_context(|| format!("{file:?}"))
393 .unwrap_or_else(|ae| IE::new_without_backtrace(ae).note_only());
394 }
395}
396
397pub fn resolve_config(
398 cli_options: CliOptions,
399 base_config: Figment,
400) -> Result<Unchecked<WholeConfig>, StartupError> {
401 use StartupError as SE;
402
403 let rocket_base_config = {
404 use rocket::config::*;
405 rocket::Config {
406 shutdown: Shutdown {
407 ctrlc: false,
408 signals: HashSet::new(),
409 ..Default::default()
410 },
411 ..rocket::Config::release_default()
412 }
413 };
414
415 let config = {
416 let mut config = base_config;
417 for file in &cli_options.config {
418 let fig = figment::providers::Toml::file_exact(&file);
419 config = config.merge(fig);
420 }
421 config
422 }
423 .join(figment::providers::Serialized::default(
425 "log.schedule",
426 logging::default_schedule(),
427 ))
428 .join(figment::providers::Serialized::default(
431 "rocket",
432 rocket_base_config,
433 ))
434 .join(figment::providers::Serialized::default(
435 "rocket",
436 json! {{
440 }},
442 ));
443
444 let config = {
445 let mut c = config;
446
447 for s in &cli_options.config_toml {
448 c = c.merge(figment::providers::Toml::string(s));
449 }
450
451 c
452 };
453
454 let rocket_config = config
455 .focus("rocket");
456
457 let config: Config = config
458 .extract()
459 .map_err(SE::ParseConfig)?;
460
461 let computed_config = ComputedConfig::try_from(&config)?;
462
463 let unchecked =
464 WholeConfig { cli_options, config, computed_config, rocket_config };
465 Ok(Unchecked { unchecked })
466}
467
468impl Unchecked<WholeConfig> {
469 pub fn check(self) -> Result<WholeConfig, StartupError> {
470 self.unchecked.config.check()?;
471 Ok(self.unchecked)
472 }
473}
474
475pub async fn startup(
476 config: WholeConfig,
477 test_global_suppl: test::GlobalSupplement,
478 test_state_suppl: test::StateSupplement,
479 rocket_hook: impl FnOnce(RocketBuild) -> RocketBuild,
480) -> Result<Started, StartupError> {
481 use StartupError as SE;
482
483 let WholeConfig { cli_options, config, computed_config, rocket_config }
484 = config;
485
486 logging::setup(&config)?;
487
488 let scratch_dir;
489 let temp_dir_retain;
490 match &config.files.scratch_dir {
491 Some(s) => {
492 scratch_dir = s.clone();
493 temp_dir_retain = None;
494 }
495 None => {
496 let td = tempfile::TempDir::new()
497 .context("create temp dir")
498 .map_err(SE::TempDir)?;
499 scratch_dir = td.path().to_str()
500 .ok_or_else(|| anyhow!("not utf-8"))
501 .map_err(SE::TempDir)?
502 .to_owned();
503 temp_dir_retain = Some(td);
504 }
505 };
506
507 remove_dir_all::remove_dir_contents(&scratch_dir)
508 .context("clean out old contents of scratch directory")
509 .map_err(SE::TempDir)?;
510
511 let version_info = crate::version::calculate_describe(&config.files);
512
513 let http_client = reqwest::Client::builder()
514 .user_agent(USER_AGENT)
515 .timeout(*config.timeouts.http_request)
516 .build()?;
517
518 let dns_resolver = dns::Resolver::builder_tokio()?.build();
519
520 let tera = ui_render::tera_templates(&config)?;
524
525 let (running_tx, running) = watch::channel(None);
526
527 let globals = Arc::new(Globals {
528 cli_options,
529 config,
530 computed_config,
531 db_trigger: watch::Sender::new(DbAssocState::default()),
532 state: watch::Sender::new(State::new(test_state_suppl)),
533 worker_tracker: Default::default(),
534 http_client,
535 dns_resolver,
536 scratch_dir,
537 running,
538 tera,
539 version_info,
540 temp_dir_retain,
541 test_suppl: test_global_suppl,
542 next_seq: Mutex::new(1),
543 oldest_norestart_seq: watch::Sender::new(1),
544 });
545 set_globals(globals.clone());
546
547 db_support::initialise(&globals)?;
548
549 let listener = o2m_listener::Listener::new(&globals)?;
550
551 shutdown_start_tasks(&globals)?;
552
553 expire::start_task(&globals);
554
555 globals.spawn_task_running("unpause", {
556 let globals = globals.clone();
557 db_workflow::unpause_task(globals)
558 });
559
560 let rocket = rocket::custom(&rocket_config);
561 let rocket = rocket_hook(rocket);
562 let rocket = ui_routes::mount_all(rocket);
563
564 let rocket = rocket.attach({
565 let globals = globals.clone();
566
567 rocket::fairing::AdHoc::on_liftoff(
568 "spawn workers",
569 |rocket: &rocket::Rocket<_>| Box::pin(
570 async move {
571 if globals.state.borrow().shutdown_reason.is_some() {
572 trace!(
574 "shutdown triggered during startup, not continuing"
575 );
576 return;
577 }
578 fetcher::start_tasks(&globals);
579 listener.start_task();
580
581 let port = rocket.config().port;
582 write_port_report_file(&globals, port);
587
588 let running = Running {
589 port
590 };
591 running_tx.send(Some(running.clone()))
592 .expect("no-one wanted our port");
593
594 info!(?running, "running");
595 }
596 )
597 )
598 });
599
600 let rocket = rocket.ignite().await?;
601
602 Ok(Started {
603 rocket,
604 })
605}