tag2upload_service_manager/
global.rs

1//! Global state
2//!
3//! # Lock hierarchy
4//!
5//! In order of acquisition:
6//!
7//!  1. sqlite database lock
8//!  2. `state`, `running`, `db_trigger`,
9//!     `host_retry_state` - only one at a time
10
11use crate::prelude::*;
12
13use crate::o2m_support::OracleSeq;
14
15use std::sync::Mutex;
16
17const USER_AGENT: &str =
18    "tag2upload-service-manager https://salsa.debian.org/dgit-team";
19
20#[derive(Debug)]
21pub struct Globals {
22    pub cli_options: CliOptions,
23    pub config: Config,
24    pub computed_config: ComputedConfig,
25
26    /// Shared global state *excluding the db*
27    ///
28    /// ### Lock hierarchy
29    ///
30    /// See the [docs for the `global` module](crate::global)
31    /// for the rules for when you may mutate or borrow this.
32    pub state: watch::Sender<State>,
33
34    pub worker_tracker: Arc<WorkerTracker>,
35
36    /// Trigger watch for database updates
37    ///
38    ///  * When a db transaction modifies the DB such that a
39    ///    job may become processable that wasn't previously,
40    ///    this watch will be signaled (even though the
41    ///    contained `DbAssocState` may not have changed).
42    ///
43    ///    `db_support.rs` functions taking a [`TransactionNature`]
44    ///    ensure this.
45    ///
46    ///  * Additionally `DbAssocState` contains some information
47    ///    about pausing mirrored from the database.
48    ///
49    /// ### Lock hierarchy
50    ///
51    /// See the [docs for the `global` module](crate::global)
52    /// for the rules for when you may mutate or borrow this.
53    pub db_trigger: watch::Sender<DbAssocState>,
54
55    /// Set iff we have run the db initialisation (including schema)
56    ///
57    /// This is done from the Rocket liftoff fairing callback, while the
58    /// rest of the program is (supposedly) waiting.  We want to avoid
59    /// running any normal db transactions before then.  This is a booby
60    /// trap which would cause us to crash (or fail tests) instead.
61    pub db_initialised: OnceLock<Result<(), Arc<AE>>>,
62
63    pub http_client: reqwest::Client,
64    pub dns_resolver: dns::Resolver,
65
66    /// Signal for when we have started up
67    ///
68    /// ### Lock hierarchy
69    ///
70    /// See the [docs for the `global` module](crate::global)
71    /// for the rules for when you may mutate or borrow this.
72    pub running: watch::Receiver<Option<Running>>,
73
74    /// Ephemeral per-host retry/success records
75    ///
76    /// ### Lock hierarchy
77    ///
78    /// See the [docs for the `global` module](crate::global)
79    /// for the rules for when you may lock this.
80    pub host_retry_state: Mutex<retry::HostStates>,
81
82    pub scratch_dir: String,
83    pub temp_dir_retain: Option<tempfile::TempDir>,
84    pub tera: Tera,
85    pub version_info: crate::version::Info,
86    pub test_suppl: test::GlobalSupplement,
87    pub last_worker_restart: RwLock<SystemTime>,
88
89    /// Connection sequence number for the next connection to come in.
90    ///
91    /// This is a global so that we can determine which worker
92    /// connections should be restarted across, and independently of,
93    /// all active listeners (even though for now we have only one
94    /// listener).
95    pub next_seq: Mutex<OracleSeq>,
96
97    /// Lowest connection sequence number of a connection to an Oracle
98    /// worker that doesn't need restarting.  Connections to workers
99    /// with sequence numbers strictly less than this should issue a
100    /// 'restart-worker' instruction and disconnect.
101    pub oldest_norestart_seq: watch::Sender<OracleSeq>,
102}
103
104#[derive(Debug)]
105pub struct Unchecked<T> { pub unchecked: T }
106
107#[derive(Clone, Debug)]
108pub struct Running {
109    pub port: u16,
110}
111
112pub struct Started {
113    pub rocket: RocketIgnite,
114}
115
116/// State that is maintained in tandem with db transactions
117///
118/// But not stored permenantly.
119#[derive(Default, Debug)]
120pub struct DbAssocState {
121    pub paused: Option<IsPaused>,
122
123    pub release_jobs: HashMap<JobId, WorkerFidelity>,
124}
125
126#[derive(Debug, Clone, Copy)] pub struct IsPaused;
127#[derive(Debug, Clone, Copy)] pub struct IsThrottled;
128
129#[cfg(not(test))]
130mod imp_globals {
131    use super::*;
132    static GLOBALS: OnceLock<Arc<Globals>> = OnceLock::new();
133
134    /// Obtain the global variables - **only for use after startup**
135    ///
136    /// # Panics
137    ///
138    /// Panics if called when globals have not yet been set,
139    /// during [`startup`].
140    /// 
141    /// This is only allowed after [`startup`]
142    /// (in `mod globals`, after `set_globals`).
143    pub fn globals() -> Arc<Globals> {
144        GLOBALS.get()
145            .expect("Using globals() before set")
146            .clone()
147    }
148
149    /// Set the global variables - call precisely once
150    pub(crate) fn set_globals(g: Arc<Globals>) {
151        GLOBALS.set(g)
152            .expect("set_globals called more than once?");
153    }
154}
155#[cfg(test)]
156mod imp_globals {
157    // In testing, we use a single-threaded tokio executor
158    // provided by `#[tokio::test]`.
159    //
160    // We don't spawn threads, and tokio relies on the async executor.
161    // So we can use thread locals (which isolates different tests).
162
163    use super::*;
164    thread_local! {
165        static GLOBALS: RefCell<Option<Arc<Globals>>> =
166            const { RefCell::new(None) };
167    }
168    pub fn globals() -> Arc<Globals> {
169        GLOBALS.with_borrow(|gl| gl.clone())
170            .expect("Using globals() before set")
171    }
172    pub(crate) fn set_globals(g: Arc<Globals>) {
173        if let Some(was) = GLOBALS.replace(Some(g)) {
174            panic!("set_globals called more than once, previously {was:?}");
175        }
176    }
177}
178pub use imp_globals::*;
179
180/// Token indicating that we are shutting down
181///
182/// This is not an error.
183/// If an error causes shutdown, `State.shutdown_reason` is `Err`,
184/// but other functions may return a [`ShuttingDown`];
185#[derive(Debug)]
186pub struct ShuttingDown;
187
188#[derive(Debug)]
189pub struct State {
190    pub shutdown_reason: Option<Result<ShuttingDown, InternalError>>,
191
192    pub test_suppl: test::StateSupplement,
193}
194
195impl State {
196    pub fn new(test_suppl: test::StateSupplement) -> Self {
197        State {
198            shutdown_reason: None,
199            test_suppl,
200        }
201    }
202
203    /// Checks for shutdown (but doesn't do anything with errors)
204    pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
205        match &self.shutdown_reason {
206            Some(Ok(ShuttingDown)) => Err(ShuttingDown),
207            Some(Err(_)) => Err(ShuttingDown),
208            None => Ok(()),
209        }
210    }
211}
212
213impl Globals {
214    /// Waits for shutdown (possibly indefinitely)
215    ///
216    /// Like `check_shutdown` this doesn't do anything with errors
217    pub async fn await_shutdown(&self) -> ShuttingDown {
218        self.state.subscribe()
219            .wait_for_then(|state| {
220                state.shutdown_reason
221                    .as_ref()
222                    .map(|_: &Result<ShuttingDown, IE>| ())
223            })
224            .await
225            .unwrap_or_else(|e| {
226                error!("shutdown handler task recv failed {e}");
227            });
228
229       ShuttingDown
230    }
231}
232
233/// If we are running tests, submit a test webhook.
234/// Otherwise do nothing.
235pub async fn test_hook<S: Display>(
236    #[allow(unused_variables)]
237    point: impl FnOnce() -> S + Send + Sync,
238) {
239    #[cfg(test)]
240    test::hook_point(point().to_string()).await;
241}
242
243#[derive(Debug)]
244pub enum HttpFetchedRaw {
245    Success(reqwest::Response),
246    SuccessFile { file: String, data: String },
247    NotFound(HttpNotFound),
248}
249
250impl Globals {
251    /// Start this task when we have entered the Running state
252    pub fn spawn_task_running(
253        self: &Arc<Self>,
254        what: impl Display,
255        fut: impl Future<Output = TaskResult> + Send + 'static,
256    ) {
257        let gl = self.clone();
258        self.spawn_task_immediate(what, async move {
259            let Running { .. } = gl.await_running().await?;
260            fut.await
261        })
262    }
263
264    /// Start this task immediately
265    pub fn spawn_task_immediate(
266        self: &Arc<Self>,
267        what: impl Display,
268        fut: impl Future<Output = TaskResult> + Send + 'static,
269    ) {
270        trace!("spawning task {what}");
271        tokio::spawn({
272            let what = what.to_string();
273            async move {
274                let _: TaskResult = AssertUnwindSafe(fut)
275                    .catch_unwind()
276                    .await
277                    .unwrap_or_else(|_: Box<dyn Any + Send>| {
278                        Err(internal!("task {what} panicked!").into())
279                    });
280            }
281        });
282    }
283
284    /// Checks for shutdown (but doesn't do anything with errors)
285    pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
286        self.state.borrow().check_shutdown()
287    }
288
289    pub async fn await_running(&self) -> Result<Running, ShuttingDown> {
290        match self.running.clone().wait_for_then(|p| p.clone()).await {
291            Ok(y) => Ok(y),
292            Err(e) => {
293                debug!("shutting down, no port: {e}");
294                Err(ShuttingDown)
295            }
296        }
297    }
298
299    pub async fn http_fetch_raw(
300        &self,
301        url: Url,
302    ) -> Result<HttpFetchedRaw, AE> {
303        use HttpFetchedRaw as H;
304
305        let via_file = |file: String| {
306            if {
307                file
308                    .strip_prefix('/').expect("allow not absolute?")
309                    .split('/')
310                    .any(|ent| ent == "" || ent == "..")
311            } {
312                return Err(anyhow!("file url {url:?} path traversal"));
313            }
314            let data = match fs::read_to_string(&file) {
315                Err(e) if e.kind() == io::ErrorKind::NotFound => {
316                    return Ok(H::NotFound(HttpNotFound))
317                },
318                Err(e) if e.kind() == io::ErrorKind::IsADirectory => {
319                    let data = "DIRECTORY LISTING".into();
320                    return Ok(H::SuccessFile { file, data })
321                },
322                other => {
323                    other.with_context(|| format!("{file:?}"))
324                        .context("failed to read file")?
325                },
326            };
327            Ok(H::SuccessFile { file, data })
328        };
329
330        // Handle certain file:/// URLs.
331        // We only want to serve URLs that are expected by our test
332        // harness.  Otherwise other processes on the machine could
333        // suborn our test runs into accessing arbitrary files!
334        let file_allows = [
335            self.config.testing.fake_https_dir.clone()
336                .into_iter().collect_vec(),
337            #[cfg(test)]
338            self.test_suppl.url_map_target_files()
339        ];
340        let file_allows = file_allows.iter().flatten();
341
342        if file_allows.clone().next().is_some() {
343            let url = url.to_string();
344
345            if let Some(path) = url.strip_prefix("file://") {
346                if !file_allows.clone().any(|allow| path.starts_with(allow)) {
347                    return Err(anyhow!("file url {url:?} not allowed ({:?})",
348                                       file_allows.clone().collect_vec()));
349                }
350                return via_file(path.to_owned());
351            }
352        }
353
354        if let Some(fake) = &self.config.testing.fake_https_dir {
355            let url = url.to_string();
356            let url = url.strip_prefix("https://")
357                .ok_or_else(|| anyhow!(
358                    "failed to strip https:// prefix from url {url:?}"
359                ))?;
360            let file = format!("{fake}/{url}");
361            return via_file(file);
362        }
363
364        let response = self.http_client.get(url)
365            .send().await.context("send")?;
366        if response.status() == reqwest::StatusCode::NOT_FOUND {
367            return Ok(H::NotFound(HttpNotFound))
368        }
369        let response = response
370            .error_for_status().context("status")?;
371
372        Ok(H::Success(response))
373    }
374
375    pub async fn http_fetch_json_maybe<T: serde::de::DeserializeOwned>(
376        &self,
377        url: Url,
378    ) -> Result<Result<T, HttpNotFound>, AE> {
379        use HttpFetchedRaw as H;
380
381        let r = match self.http_fetch_raw(url.clone()).await? {
382            H::Success(response) => {
383                response.json().await.context("response")?
384            },
385            H::SuccessFile { file, data } => {
386                serde_json::from_str(&data)
387                    .with_context(|| format!("{file:?}"))
388                    .context("failed to deser file")?
389            },
390            H::NotFound(e) => {
391                return Ok(Err(e))
392            },
393        };
394
395        Ok(Ok(r))
396    }
397
398    pub async fn http_fetch_json<T: serde::de::DeserializeOwned>(
399        &self,
400        url: Url,
401    ) -> Result<T, AE> {
402        Ok(self.http_fetch_json_maybe(url).await??)
403    }
404}
405
406macro_rules! test_hook_url { { $url:ident } => {
407    #[cfg(test)]
408    let $url = crate::test::UrlMappable::map(&$url);
409} }
410
411pub fn shutdown_start_tasks(
412    globals: &Arc<Globals>,
413) -> Result<(), StartupError> {
414    use tokio::signal::unix::{signal, SignalKind as SK};
415
416    #[cfg(test)]
417    match globals.t_shutdown_handlers() {
418        Ok(()) => {},
419        Err(()) => return Ok(()),
420    };
421
422    let mut terminate = signal(SK::terminate())
423        .into_internal("failed to set up SIGTERM handler")?;
424
425    globals.spawn_task_immediate("shutdown SIGTEREM watch", {
426        let globals = globals.clone();
427        async move {
428
429            let () = terminate.recv().await
430                .ok_or_else(|| internal!("no more SIGTERM reception?!"))?;
431
432            globals.state.send_modify(|state| {
433                match state.shutdown_reason {
434                    None => {
435                        info!("received SIGTERM, shutting down...");
436                        state.shutdown_reason = Some(Ok(ShuttingDown));
437                    }
438                    Some(Err(_)) => {
439                        info!("SIGTERM, but already crashing!");
440                    },
441                    Some(Ok(ShuttingDown)) => {
442                        info!("SIGTERM, but already shutting down");
443                    }
444                }
445            });
446
447            Ok(TaskWorkComplete {})
448        }
449    });
450
451    globals.spawn_task_immediate("shutdown handler", {
452        let globals = globals.clone();
453        async move {
454
455            let _: ShuttingDown = globals.await_shutdown().await;
456
457            let mut subscription = globals.db_trigger.subscribe();
458
459            match loop {
460                let job = find_job_deferring_shutdown()?;
461
462                let Some::<JobRow>(job) = job
463                else { break Ok(()); };
464
465                info!(jid=%job.jid, "shutdown awaits completion of build");
466
467                match subscription.changed().await {
468                    Ok(()) => {}, // go round again
469                    Err(e) => break Err(e),
470                }
471            } {
472                Ok(()) => info!("clean shutdown complete."),
473                Err(e) => error!("shutdown terminating early, watch {}", e),
474            };
475
476            unsafe {
477                libc::kill(0, libc::SIGHUP);
478                error!("SIGHUP didn't kill us!!");
479                std::process::abort();
480            }
481        }
482    });
483
484    Ok(())
485}
486
487pub fn find_job_deferring_shutdown() -> Result<Option<JobRow>, IE> {
488    db_transaction(TN::Readonly, |dbt| {
489        dbt.bsql_query_01(&bsql!("
490                        SELECT * FROM jobs
491                         WHERE processing != ''
492                           AND status = " (JobStatus::Building) "
493                      ORDER BY last_update ASC
494                    "))
495    })
496}
497
498fn handle_errors_during_rocket_liftoff(
499    context: impl Display,
500    f: impl FnOnce() -> Result<(), AE>,
501) -> Result<(), ShuttingDown> {
502    f()
503        .with_context(|| context.to_string())
504        .map_err(|ae| {
505            IE::new_without_backtrace(ae).note_only();
506            ShuttingDown
507        })
508}
509
510fn write_port_report_file(gl: &Arc<Globals>, port: u16)
511                          -> Result<(), ShuttingDown>
512{
513    if let Some(file) = gl.config.files.port_report_file.clone() {
514        trace!(?file, "writing port");
515
516        handle_errors_during_rocket_liftoff(format_args!("{file:?}"), || {
517            let f = fs::File::create(&file).context("open")?;
518            let mut f = io::BufWriter::new(f);
519            writeln!(f, "{port}").context("write")?;
520            f.flush().context("write (flush)")?;
521            Ok(())
522        })?;
523    }
524    Ok(())
525}
526
527pub fn resolve_config(
528    cli_options: CliOptions,
529    base_config: Figment,
530) -> Result<Unchecked<WholeConfig>, StartupError> {
531    use StartupError as SE;
532
533    let rocket_base_config = {
534        use rocket::config::*;
535        rocket::Config {
536            shutdown: Shutdown {
537                ctrlc: false,
538                signals: HashSet::new(),
539                ..Default::default()
540            },
541            ..rocket::Config::release_default()
542        }
543    };
544
545    let config = {
546        let mut config = base_config;
547        for file in &cli_options.config {
548            let fig = figment::providers::Toml::file_exact(&file);
549            config = config.merge(fig);
550        }
551        config
552    }
553        // we want to merge the default logging schedule with the config
554        .join(figment::providers::Serialized::default(
555            "log.schedule",
556            logging::default_schedule(),
557        ))
558        // rocket::Config's Deserialize impl doesn't include any of
559        // the usual defaults, so we must inject them here.
560        .join(figment::providers::Serialized::default(
561            "rocket",
562            rocket_base_config,
563        ))
564        .join(figment::providers::Serialized::default(
565            "rocket",
566            // configuration for rocket addons that expect to find their
567            // information in the configuration passed to rocket::custom,
568            // but which *aren't* part of rocket::Config.
569            json! {{
570                // Currently there are none of these.
571            }},
572        ));
573
574    let config = {
575        let mut c = config;
576
577        for s in &cli_options.config_toml {
578            c = c.merge(figment::providers::Toml::string(s));
579        }
580
581        c
582    };
583
584    let rocket_config = config
585        .focus("rocket");
586
587    let config: Config = config
588        .extract()
589        .map_err(SE::ParseConfig)?;
590
591    let computed_config = ComputedConfig::try_from(&config)?;
592
593    let unchecked = 
594        WholeConfig { cli_options, config, computed_config, rocket_config };
595    Ok(Unchecked { unchecked })
596}
597
598impl Unchecked<WholeConfig> {
599    pub fn check(self) -> Result<WholeConfig, StartupError> {
600        self.unchecked.config.check()?;
601        Ok(self.unchecked)
602    }
603}
604
605pub async fn startup(
606    config: WholeConfig,
607    test_global_suppl: test::GlobalSupplement,
608    test_state_suppl: test::StateSupplement,
609    rocket_hook: impl FnOnce(RocketBuild) -> RocketBuild,
610) -> Result<Started, StartupError> {
611    use StartupError as SE;
612
613    let WholeConfig { cli_options, config, computed_config, rocket_config }
614        = config;
615
616    logging::setup(&config)?;
617
618    debug!(?config, ?cli_options, ?computed_config, "starting");
619
620    let scratch_dir;
621    let temp_dir_retain;
622    match &config.files.scratch_dir {
623        Some(s) => {
624            scratch_dir = s.clone();
625            temp_dir_retain = None;
626        }
627        None => {
628            let td = tempfile::TempDir::new()
629                .context("create temp dir")
630                .map_err(SE::TempDir)?;
631            scratch_dir = td.path().to_str()
632                .ok_or_else(|| anyhow!("not utf-8"))
633                .map_err(SE::TempDir)?
634                .to_owned();
635            temp_dir_retain = Some(td);
636        }
637    };
638
639    remove_dir_all::remove_dir_contents(&scratch_dir)
640        .context("clean out old contents of scratch directory")
641        .map_err(SE::TempDir)?;
642
643    let version_info = crate::version::calculate_describe(&config.files);
644
645    let http_client = reqwest::Client::builder()
646            .user_agent(USER_AGENT)
647        .timeout(*config.timeouts.http_request)
648        .build()?;
649
650    let dns_resolver = dns::Resolver::builder_tokio()?.build();
651
652    // We could use rocket_dyn_templates, but it insists on reloading
653    // the /dev/null we have to supply as template_dir, whenever anyone
654    // on the whole system touches it.
655    let tera = ui_render::tera_templates(&config)?;
656
657    let (running_tx, running) = watch::channel(None);
658
659    let last_worker_restart = Globals::now_systemtime_inner(
660        &config,
661        &test_global_suppl,
662    ).into();
663
664    let globals = Arc::new(Globals {
665        cli_options,
666        config,
667        computed_config,
668        db_trigger: watch::Sender::new(DbAssocState::default()),
669        db_initialised: OnceLock::new(),
670        state: watch::Sender::new(State::new(test_state_suppl)),
671        worker_tracker: Default::default(),
672        http_client,
673        dns_resolver,
674        scratch_dir,
675        running,
676        tera,
677        last_worker_restart,
678        host_retry_state: Default::default(),
679        version_info,
680        temp_dir_retain,
681        test_suppl: test_global_suppl,
682        next_seq: Mutex::new(1),
683        oldest_norestart_seq: watch::Sender::new(1),
684    });
685    set_globals(globals.clone());
686
687    let listener = o2m_listener::Listener::new(&globals)?;
688
689    shutdown_start_tasks(&globals)?;
690
691    expire::start_task(&globals);
692
693    globals.spawn_task_running("unpause", {
694        let globals = globals.clone();
695        db_workflow::unpause_task(globals)
696    });
697
698    let rocket = rocket::custom(&rocket_config);
699    let rocket = rocket_hook(rocket);
700    let rocket = api_routes::mount_api(rocket);
701    let rocket = ui_routes::mount_ui_catchers(rocket);
702
703    let rocket = rocket.attach({
704        let globals = globals.clone();
705
706        rocket::fairing::AdHoc::on_liftoff(
707            "spawn workers",
708            |rocket: &rocket::Rocket<_>| Box::pin(
709                async move {
710                    handle_errors_during_rocket_liftoff("initialise db", || {
711                        db_support::initialise(&globals)?;
712                        Ok(())
713                    })?;
714
715                    if globals.state.borrow().shutdown_reason.is_some() {
716                        // Typically, if this happens, an internal error
717                        trace!(
718                         "shutdown triggered during startup, not continuing"
719                        );
720                        return Err(ShuttingDown);
721                    }
722                    fetcher::start_tasks(&globals);
723                    listener.start_task();
724
725                    let port = rocket.config().port;
726                    // We do this here, directly, rather than in a task
727                    // hanging off the channel, because our tests check
728                    // that this file has been written and use the channel
729                    // as a signal to see that it has been done.
730                    write_port_report_file(&globals, port)?;
731
732                    let running = Running {
733                        port
734                    };
735                    running_tx.send(Some(running.clone()))
736                        .expect("no-one wanted our port");
737
738                    info!(?running, "running");
739
740                    Ok::<_, ShuttingDown>(())
741                }.map(
742                    // Err(()) means we cancelled the startup early.
743                    // We've reported it, but we can't return it to Rocket.
744                    |r| r.unwrap_or_else(|ShuttingDown| {
745                        eprintln!("exiting due to error during startup!");
746                        error!("exiting due to error during startup!");
747
748                        #[cfg(not(test))] 
749                        std::process::exit(16);
750                        #[allow(unreachable_code)] // what a joke
751                        { unreachable!(); }
752                    })
753                )
754            )
755        )
756    });
757
758    let rocket = rocket.ignite().await?;
759
760    Ok(Started {
761        rocket,
762    })
763}