tag2upload_service_manager/
global.rs

1
2use crate::prelude::*;
3
4use crate::o2m_support::OracleSeq;
5
6use std::sync::Mutex;
7
8const USER_AGENT: &str =
9    "tag2upload-service-manager https://salsa.debian.org/dgit-team";
10
11#[derive(Debug)]
12pub struct Globals {
13    pub cli_options: CliOptions,
14    pub config: Config,
15    pub computed_config: ComputedConfig,
16
17    /// Shared global state *excluding the db*
18    ///
19    /// ### Lock hierarchy
20    ///
21    /// You may not mutate this at the same time as `db_trigger` or `running`
22    pub state: watch::Sender<State>,
23
24    pub worker_tracker: Arc<WorkerTracker>,
25
26    /// Trigger watch for database updates
27    ///
28    ///  * When a db transaction modifies the DB such that a
29    ///    job may become processable that wasn't previously,
30    ///    this watch will be signaled (even though the
31    ///    contained `DbAssocState` may not have changed).
32    ///
33    ///    `db_support.rs` functions taking a [`TransactionNature`]
34    ///    ensure this.
35    ///
36    ///  * Additionally `DbAssocState` contains some information
37    ///    about pausing mirrored from the database.
38    ///
39    /// ### Lock hierarchy
40    ///
41    /// You may not mutate this at the same time as `state` or `running`
42    pub db_trigger: watch::Sender<DbAssocState>,
43
44    pub http_client: reqwest::Client,
45    pub dns_resolver: dns::Resolver,
46
47    /// Signal for when we have started up
48    ///
49    /// ### Lock hierarchy
50    ///
51    /// You may not mutate this at the same time as `db_trigger` or `state`
52    pub running: watch::Receiver<Option<Running>>,
53
54    pub scratch_dir: String,
55    pub temp_dir_retain: Option<tempfile::TempDir>,
56    pub tera: Tera,
57    pub version_info: crate::version::Info,
58    pub test_suppl: test::GlobalSupplement,
59
60    /// Connection sequence number for the next connection to come in.
61    ///
62    /// This is a global so that we can determine which worker
63    /// connections should be restarted across, and independently of,
64    /// all active listeners (even though for now we have only one
65    /// listener).
66    pub next_seq: Mutex<OracleSeq>,
67
68    /// Lowest connection sequence number of a connection to an Oracle
69    /// worker that doesn't need restarting.  Connections to workers
70    /// with sequence numbers strictly less than this should issue a
71    /// 'restart-worker' instruction and disconnect.
72    pub oldest_norestart_seq: watch::Sender<OracleSeq>,
73}
74
75#[derive(Debug)]
76pub struct Unchecked<T> { pub unchecked: T }
77
78#[derive(Clone, Debug)]
79pub struct Running {
80    pub port: u16,
81}
82
83pub struct Started {
84    pub rocket: RocketIgnite,
85}
86
87/// State that is maintained in tandem with db transactions
88///
89/// But not stored permenantly.
90#[derive(Default, Debug)]
91pub struct DbAssocState {
92    pub paused: Option<IsPaused>,
93
94    pub release_jobs: HashMap<JobId, WorkerFidelity>,
95}
96
97#[derive(Debug, Clone, Copy)] pub struct IsPaused;
98#[derive(Debug, Clone, Copy)] pub struct IsThrottled;
99
100#[cfg(not(test))]
101mod imp_globals {
102    use super::*;
103    static GLOBALS: OnceLock<Arc<Globals>> = OnceLock::new();
104
105    /// Obtain the global variables - **only for use after startup**
106    ///
107    /// # Panics
108    ///
109    /// Panics if called when globals have not yet been set,
110    /// during [`startup`].
111    /// 
112    /// This is only allowed after [`startup`]
113    /// (in `mod globals`, after `set_globals`).
114    pub fn globals() -> Arc<Globals> {
115        GLOBALS.get()
116            .expect("Using globals() before set")
117            .clone()
118    }
119
120    /// Set the global variables - call precisely once
121    pub(crate) fn set_globals(g: Arc<Globals>) {
122        GLOBALS.set(g)
123            .expect("set_globals called more than once?");
124    }
125}
126#[cfg(test)]
127mod imp_globals {
128    // In testing, we use a single-threaded tokio executor
129    // provided by `#[tokio::test]`.
130    //
131    // We don't spawn threads, and tokio relies on the async executor.
132    // So we can use thread locals (which isolates different tests).
133
134    use super::*;
135    thread_local! {
136        static GLOBALS: RefCell<Option<Arc<Globals>>> =
137            const { RefCell::new(None) };
138    }
139    pub fn globals() -> Arc<Globals> {
140        GLOBALS.with_borrow(|gl| gl.clone())
141            .expect("Using globals() before set")
142    }
143    pub(crate) fn set_globals(g: Arc<Globals>) {
144        if let Some(was) = GLOBALS.replace(Some(g)) {
145            panic!("set_globals called more than once, previously {was:?}");
146        }
147    }
148}
149pub use imp_globals::*;
150
151/// Token indicating that we are shutting down
152///
153/// This is not an error.
154/// If an error causes shutdown, `State.shutdown_reason` is `Err`,
155/// but other functions may return a [`ShuttingDown`];
156#[derive(Debug)]
157pub struct ShuttingDown;
158
159#[derive(Debug)]
160pub struct State {
161    pub shutdown_reason: Option<Result<ShuttingDown, InternalError>>,
162
163    pub test_suppl: test::StateSupplement,
164}
165
166impl State {
167    pub fn new(test_suppl: test::StateSupplement) -> Self {
168        State {
169            shutdown_reason: None,
170            test_suppl,
171        }
172    }
173
174    /// Checks for shutdown (but doesn't do anything with errors)
175    pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
176        match &self.shutdown_reason {
177            Some(Ok(ShuttingDown)) => Err(ShuttingDown),
178            Some(Err(_)) => Err(ShuttingDown),
179            None => Ok(()),
180        }
181    }
182}
183
184impl Globals {
185    /// Waits for shutdown (possibly indefinitely)
186    ///
187    /// Like `check_shutdown` this doesn't do anything with errors
188    pub async fn await_shutdown(&self) -> ShuttingDown {
189        self.state.subscribe()
190            .wait_for_then(|state| {
191                state.shutdown_reason
192                    .as_ref()
193                    .map(|_: &Result<ShuttingDown, IE>| ())
194            })
195            .await
196            .unwrap_or_else(|e| {
197                error!("shutdown handler task recv failed {e}");
198            });
199
200       ShuttingDown
201    }
202}
203
204/// If we are running tests, submit a test webhook.
205/// Otherwise do nothing.
206pub async fn test_hook<S: Display>(
207    #[allow(unused_variables)]
208    point: impl FnOnce() -> S + Send + Sync,
209) {
210    #[cfg(test)]
211    test::hook_point(point().to_string()).await;
212}
213
214
215impl Globals {
216    /// Start this task when we have entered the Running state
217    pub fn spawn_task_running(
218        self: &Arc<Self>,
219        what: impl Display,
220        fut: impl Future<Output = TaskResult> + Send + 'static,
221    ) {
222        let gl = self.clone();
223        self.spawn_task_immediate(what, async move {
224            let Running { .. } = gl.await_running().await?;
225            fut.await
226        })
227    }
228
229    /// Start this task immediately
230    pub fn spawn_task_immediate(
231        self: &Arc<Self>,
232        what: impl Display,
233        fut: impl Future<Output = TaskResult> + Send + 'static,
234    ) {
235        tokio::spawn({
236            let what = what.to_string();
237            async move {
238                let _: TaskResult = AssertUnwindSafe(fut)
239                    .catch_unwind()
240                    .await
241                    .unwrap_or_else(|_: Box<dyn Any + Send>| {
242                        Err(internal!("task {what} panicked!").into())
243                    });
244            }
245        });
246    }
247
248    /// Checks for shutdown (but doesn't do anything with errors)
249    pub fn check_shutdown(&self) -> Result<(), ShuttingDown> {
250        self.state.borrow().check_shutdown()
251    }
252
253    pub async fn await_running(&self) -> Result<Running, ShuttingDown> {
254        match self.running.clone().wait_for_then(|p| p.clone()).await {
255            Ok(y) => Ok(y),
256            Err(e) => {
257                debug!("shutting down, no port: {e}");
258                Err(ShuttingDown)
259            }
260        }
261    }
262
263    pub async fn http_fetch_json<T: serde::de::DeserializeOwned>(
264        &self,
265        url: Url,
266    ) -> Result<T, AE> {
267        if let Some(fake) = &self.config.testing.fake_https_dir {
268            let url = url.to_string();
269            let url = url.strip_prefix("https://")
270                .ok_or_else(|| anyhow!(
271    "failed to strip https:// prefix from url {url:?}"
272                ))?;
273            let fake_file = format!("{fake}/{url}");
274            let data = fs::read_to_string(&fake_file)
275                .with_context(|| format!("{fake_file:?}"))
276                .context("failed to read fake file")?;
277            let data = serde_json::from_str(&data)
278                .with_context(|| format!("{fake_file:?}"))
279                .context("failed to deser fake file")?;
280            return Ok(data);
281        }
282
283        self.http_client.get(url)
284            .send().await.context("send")?
285            .error_for_status().context("status")?
286            .json().await.context("response")
287    }
288}
289
290macro_rules! test_hook_url { { $url:ident } => {
291    #[cfg(test)]
292    let $url = crate::test::UrlMappable::map(&$url);
293} }
294
295pub fn shutdown_start_tasks(
296    globals: &Arc<Globals>,
297) -> Result<(), StartupError> {
298    use tokio::signal::unix::{signal, SignalKind as SK};
299
300    #[cfg(test)]
301    match globals.t_shutdown_handlers() {
302        Ok(()) => {},
303        Err(()) => return Ok(()),
304    };
305
306    let mut terminate = signal(SK::terminate())
307        .into_internal("failed to set up SIGTERM handler")?;
308
309    globals.spawn_task_immediate("shutdown SIGTEREM watch", {
310        let globals = globals.clone();
311        async move {
312
313            let () = terminate.recv().await
314                .ok_or_else(|| internal!("no more SIGTERM reception?!"))?;
315
316            globals.state.send_modify(|state| {
317                match state.shutdown_reason {
318                    None => {
319                        info!("received SIGTERM, shutting down...");
320                        state.shutdown_reason = Some(Ok(ShuttingDown));
321                    }
322                    Some(Err(_)) => {
323                        info!("SIGTERM, but already crashing!");
324                    },
325                    Some(Ok(ShuttingDown)) => {
326                        info!("SIGTERM, but already shutting down");
327                    }
328                }
329            });
330
331            Ok(TaskWorkComplete {})
332        }
333    });
334
335    globals.spawn_task_immediate("shutdown handler", {
336        let globals = globals.clone();
337        async move {
338
339            let _: ShuttingDown = globals.await_shutdown().await;
340
341            let mut subscription = globals.db_trigger.subscribe();
342
343            match loop {
344                let job = find_job_deferring_shutdown()?;
345
346                let Some::<JobRow>(job) = job
347                else { break Ok(()); };
348
349                info!(jid=%job.jid, "shutdown awaits completion of build");
350
351                match subscription.changed().await {
352                    Ok(()) => {}, // go round again
353                    Err(e) => break Err(e),
354                }
355            } {
356                Ok(()) => info!("clean shutdown complete."),
357                Err(e) => error!("shutdown terminating early, watch {}", e),
358            };
359
360            unsafe {
361                libc::kill(0, libc::SIGHUP);
362                error!("SIGHUP didn't kill us!!");
363                std::process::abort();
364            }
365        }
366    });
367
368    Ok(())
369}
370
371pub fn find_job_deferring_shutdown() -> Result<Option<JobRow>, IE> {
372    db_transaction(TN::Readonly, |dbt| {
373        dbt.bsql_query_01(bsql!("
374                        SELECT * FROM jobs
375                         WHERE processing != ''
376                           AND status = " (JobStatus::Building) "
377                      ORDER BY last_update ASC
378                    "))
379    })?
380}
381
382fn write_port_report_file(gl: &Arc<Globals>, port: u16) {
383    if let Some(file) = gl.config.files.port_report_file.clone() {
384        trace!(?file, "writing port");
385        (|| {
386            let f = fs::File::create(&file).context("open")?;
387            let mut f = io::BufWriter::new(f);
388            writeln!(f, "{port}").context("write")?;
389            f.flush().context("write (flush)")?;
390            Ok::<_, AE>(())
391        })()
392            .with_context(|| format!("{file:?}"))
393            .unwrap_or_else(|ae| IE::new_without_backtrace(ae).note_only());
394    }
395}
396
397pub fn resolve_config(
398    cli_options: CliOptions,
399    base_config: Figment,
400) -> Result<Unchecked<WholeConfig>, StartupError> {
401    use StartupError as SE;
402
403    let rocket_base_config = {
404        use rocket::config::*;
405        rocket::Config {
406            shutdown: Shutdown {
407                ctrlc: false,
408                signals: HashSet::new(),
409                ..Default::default()
410            },
411            ..rocket::Config::release_default()
412        }
413    };
414
415    let config = {
416        let mut config = base_config;
417        for file in &cli_options.config {
418            let fig = figment::providers::Toml::file_exact(&file);
419            config = config.merge(fig);
420        }
421        config
422    }
423        // we want to merge the default logging schedule with the config
424        .join(figment::providers::Serialized::default(
425            "log.schedule",
426            logging::default_schedule(),
427        ))
428        // rocket::Config's Deserialize impl doesn't include any of
429        // the usual defaults, so we must inject them here.
430        .join(figment::providers::Serialized::default(
431            "rocket",
432            rocket_base_config,
433        ))
434        .join(figment::providers::Serialized::default(
435            "rocket",
436            // configuration for rocket addons that expect to find their
437            // information in the configuration passed to rocket::custom,
438            // but which *aren't* part of rocket::Config.
439            json! {{
440                // Currently there are none of these.
441            }},
442        ));
443
444    let config = {
445        let mut c = config;
446
447        for s in &cli_options.config_toml {
448            c = c.merge(figment::providers::Toml::string(s));
449        }
450
451        c
452    };
453
454    let rocket_config = config
455        .focus("rocket");
456
457    let config: Config = config
458        .extract()
459        .map_err(SE::ParseConfig)?;
460
461    let computed_config = ComputedConfig::try_from(&config)?;
462
463    let unchecked = 
464        WholeConfig { cli_options, config, computed_config, rocket_config };
465    Ok(Unchecked { unchecked })
466}
467
468impl Unchecked<WholeConfig> {
469    pub fn check(self) -> Result<WholeConfig, StartupError> {
470        self.unchecked.config.check()?;
471        Ok(self.unchecked)
472    }
473}
474
475pub async fn startup(
476    config: WholeConfig,
477    test_global_suppl: test::GlobalSupplement,
478    test_state_suppl: test::StateSupplement,
479    rocket_hook: impl FnOnce(RocketBuild) -> RocketBuild,
480) -> Result<Started, StartupError> {
481    use StartupError as SE;
482
483    let WholeConfig { cli_options, config, computed_config, rocket_config }
484        = config;
485
486    logging::setup(&config)?;
487
488    let scratch_dir;
489    let temp_dir_retain;
490    match &config.files.scratch_dir {
491        Some(s) => {
492            scratch_dir = s.clone();
493            temp_dir_retain = None;
494        }
495        None => {
496            let td = tempfile::TempDir::new()
497                .context("create temp dir")
498                .map_err(SE::TempDir)?;
499            scratch_dir = td.path().to_str()
500                .ok_or_else(|| anyhow!("not utf-8"))
501                .map_err(SE::TempDir)?
502                .to_owned();
503            temp_dir_retain = Some(td);
504        }
505    };
506
507    remove_dir_all::remove_dir_contents(&scratch_dir)
508        .context("clean out old contents of scratch directory")
509        .map_err(SE::TempDir)?;
510
511    let version_info = crate::version::calculate_describe(&config.files);
512
513    let http_client = reqwest::Client::builder()
514            .user_agent(USER_AGENT)
515        .timeout(*config.timeouts.http_request)
516        .build()?;
517
518    let dns_resolver = dns::Resolver::builder_tokio()?.build();
519
520    // We could use rocket_dyn_templates, but it insists on reloading
521    // the /dev/null we have to supply as template_dir, whenever anyone
522    // on the whole system touches it.
523    let tera = ui_render::tera_templates(&config)?;
524
525    let (running_tx, running) = watch::channel(None);
526
527    let globals = Arc::new(Globals {
528        cli_options,
529        config,
530        computed_config,
531        db_trigger: watch::Sender::new(DbAssocState::default()),
532        state: watch::Sender::new(State::new(test_state_suppl)),
533        worker_tracker: Default::default(),
534        http_client,
535        dns_resolver,
536        scratch_dir,
537        running,
538        tera,
539        version_info,
540        temp_dir_retain,
541        test_suppl: test_global_suppl,
542        next_seq: Mutex::new(1),
543        oldest_norestart_seq: watch::Sender::new(1),
544    });
545    set_globals(globals.clone());
546
547    db_support::initialise(&globals)?;
548
549    let listener = o2m_listener::Listener::new(&globals)?;
550
551    shutdown_start_tasks(&globals)?;
552
553    expire::start_task(&globals);
554
555    globals.spawn_task_running("unpause", {
556        let globals = globals.clone();
557        db_workflow::unpause_task(globals)
558    });
559
560    let rocket = rocket::custom(&rocket_config);
561    let rocket = rocket_hook(rocket);
562    let rocket = ui_routes::mount_all(rocket);
563
564    let rocket = rocket.attach({
565        let globals = globals.clone();
566
567        rocket::fairing::AdHoc::on_liftoff(
568            "spawn workers",
569            |rocket: &rocket::Rocket<_>| Box::pin(
570                async move {
571                    if globals.state.borrow().shutdown_reason.is_some() {
572                        // Typically, if this happens, an internal error
573                        trace!(
574                         "shutdown triggered during startup, not continuing"
575                        );
576                        return;
577                    }
578                    fetcher::start_tasks(&globals);
579                    listener.start_task();
580
581                    let port = rocket.config().port;
582                    // We do this here, directly, rather than in a task
583                    // hanging off the channel, because our tests check
584                    // that this file has been written and use the channel
585                    // as a signal to see that it has been done.
586                    write_port_report_file(&globals, port);
587
588                    let running = Running {
589                        port
590                    };
591                    running_tx.send(Some(running.clone()))
592                        .expect("no-one wanted our port");
593
594                    info!(?running, "running");
595                }
596            )
597        )
598    });
599
600    let rocket = rocket.ignite().await?;
601
602    Ok(Started {
603        rocket,
604    })
605}