app_frame/
service_manager.rs

1use std::{
2    ops::Deref,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::{bail, ensure};
11use futures::lock::Mutex;
12use futures::FutureExt;
13use tokio::task::JoinHandle;
14
15use super::service::Job;
16use crate::health_endpoint;
17use crate::service::Service;
18use crate::{
19    error::display_error,
20    health_endpoint::HealthEndpointConfig,
21    time::{Clock, Sleeper, SystemClock, TokioSleeper},
22    Never, ToError,
23};
24
25#[async_trait::async_trait]
26pub trait Application: Initialize + Serves {
27    /// Starts the app and runs forever with default monitoring
28    async fn run(&self) -> anyhow::Result<Never> {
29        self.run_custom(Default::default()).await
30    }
31
32    /// Starts the app and runs forever with configured monitoring
33    async fn run_custom(&self, config: RunConfig) -> anyhow::Result<Never> {
34        config.validate()?;
35        let mgr = Arc::new(self.start().await?);
36        if let Some(config) = config.http_health_endpoint {
37            let mgr2 = mgr.clone();
38            tokio::spawn(async move { health_endpoint::run(mgr2, config).await });
39        }
40        Ok(mgr
41            .monitor_with_recovery(10, config.log_interval, config.attempt_recovery_after)
42            .await)
43    }
44
45    /// Starts the app and returns, so you may implement custom monitoring.
46    async fn start(&self) -> anyhow::Result<ServiceManager> {
47        tracing::info!("Initializing application.");
48        self.init().run_once().await?;
49        let mut mgr = ServiceManager::new(
50            Arc::new(SystemClock::default()),
51            Arc::new(TokioSleeper::default()),
52        );
53        for service in self.services() {
54            mgr.register_service(service);
55        }
56        tracing::info!("Starting Services.");
57        mgr.spawn_services().await?;
58        Ok(mgr)
59    }
60}
61impl<T: Initialize + Serves> Application for T {}
62
63pub struct RunConfig {
64    /// Interval in seconds between application status log messages when there are no problems.
65    pub log_interval: i32,
66    /// Services will be restarted after they have been in a failing state for this many seconds.
67    pub attempt_recovery_after: i32,
68    /// Set to None to disable the http health endpoint.
69    pub http_health_endpoint: Option<HealthEndpointConfig>,
70}
71
72impl RunConfig {
73    fn validate(&self) -> anyhow::Result<()> {
74        ensure!(self.log_interval >= 0);
75        ensure!(self.attempt_recovery_after >= 0);
76        Ok(())
77    }
78}
79
80impl Default for RunConfig {
81    fn default() -> Self {
82        Self {
83            log_interval: 21600,
84            attempt_recovery_after: 120,
85            http_health_endpoint: Some(Default::default()),
86        }
87    }
88}
89
90pub trait Initialize {
91    /// Returns the jobs that should run once before initializing the long
92    /// running services.
93    ///
94    /// Calling this function does not run the jobs. You must run them some
95    /// other way.
96    #[must_use]
97    fn init(&self) -> Vec<Arc<dyn Job>> {
98        vec![]
99    }
100}
101
102pub trait Serves {
103    /// Returns the services that should run for the entire life of the app.
104    ///
105    /// Calling this function does not run the services. You must run them some
106    /// other way.
107    #[must_use]
108    fn services(&self) -> Vec<Box<dyn Service>>;
109}
110
111/// Manages services: start/stop/monitor
112pub struct ServiceManager {
113    clock: Arc<dyn Clock>,
114    sleeper: Arc<dyn Sleeper>,
115    managed_services: Vec<ManagedService>,
116}
117
118impl ServiceManager {
119    pub fn new(clock: Arc<dyn Clock>, sleeper: Arc<dyn Sleeper>) -> Self {
120        Self {
121            clock,
122            sleeper,
123            managed_services: vec![],
124        }
125    }
126}
127
128impl ServiceManager {
129    pub fn register_service(&mut self, mut service: Box<dyn Service>) {
130        let heart = Arc::new(AtomicUsize::new(0));
131        service.set_heartbeat(Arc::new((heart.clone(), self.clock.clone())));
132        self.managed_services.push(ManagedService {
133            clock: self.clock.clone(),
134            service: service.into(),
135            heart,
136            failing_health_checks_since: 0.into(),
137            handle: Mutex::new(None),
138        });
139    }
140
141    pub async fn spawn_services(&mut self) -> anyhow::Result<()> {
142        for managed_service in self.managed_services.iter_mut() {
143            managed_service.spawn().await?;
144        }
145        Ok(())
146    }
147
148    pub async fn monitor(&self, check_interval: u64, log_interval: i32) -> Never {
149        let mut publisher = ServiceReportPublisher::new(log_interval);
150        loop {
151            publisher.handle_report(self.check());
152            self.sleeper
153                .sleep(Duration::from_secs(check_interval))
154                .await;
155        }
156    }
157
158    /// If a service is dead for over two minutes, try to restart it.
159    pub async fn monitor_with_recovery(
160        &self,
161        check_interval: u64,
162        log_interval: i32,
163        attempt_recovery_after: i32,
164    ) -> Never {
165        let mut publisher = ServiceReportPublisher::new(log_interval);
166        loop {
167            let report = self.check();
168            publisher.handle_report(report.clone());
169            for ServiceReport {
170                id: index,
171                status: Unhealthy { since, .. },
172                ..
173            } in report.dead
174            {
175                let svc = &self.managed_services[index];
176                if since + attempt_recovery_after < self.clock.current_timestamp() as i32 {
177                    if let Err(e) = svc.restart(Duration::from_secs(10)).await {
178                        tracing::error!(
179                            "Failed to restart {}: {}",
180                            svc.service.name(),
181                            display_error(&e)
182                        );
183                    }
184                }
185            }
186            self.sleeper
187                .sleep(Duration::from_secs(check_interval))
188                .await;
189        }
190    }
191
192    pub fn check(&self) -> ServiceReportSummary {
193        let (mut alive, mut dead) = (vec![], vec![]);
194        let timestamp = self.clock.current_timestamp() as i32;
195        for (index, ms) in self.managed_services.iter().enumerate() {
196            let status = ms.check(timestamp);
197            if let ServiceStatus::Unhealthy(unhealthy) = status {
198                dead.push(ServiceReport {
199                    id: index,
200                    name: ms.service.name(),
201                    status: unhealthy,
202                });
203            } else {
204                alive.push(ServiceReport {
205                    id: index,
206                    name: ms.service.name(),
207                    status: (),
208                });
209            }
210        }
211        ServiceReportSummary {
212            alive,
213            dead,
214            timestamp,
215        }
216    }
217}
218
219struct ManagedService {
220    clock: Arc<dyn Clock>,
221    service: Arc<dyn Service>,
222    heart: Arc<AtomicUsize>,
223    failing_health_checks_since: AtomicUsize,
224    handle: Mutex<Option<JoinHandle<Never>>>, // TODO: shouldn't need a mutex here but it's hard to do another way
225}
226
227impl ManagedService {
228    /// If healthy, returns an empty vec.
229    /// If unhealthy, returns a list of reasons why.
230    pub fn check(&self, current_timestamp: i32) -> ServiceStatus {
231        let mut unhealthy_since = 0;
232        let last_beat = self.heart.load(Ordering::Relaxed) as i32;
233        let time_since_last_beat = current_timestamp - last_beat;
234        let ttl = self.service.heartbeat_ttl();
235        let mut dead_reasons = vec![];
236        if time_since_last_beat >= ttl {
237            unhealthy_since = last_beat + ttl;
238            dead_reasons.push(format!(
239                "No heartbeat for {time_since_last_beat} seconds (ttl: {ttl})"
240            ))
241        }
242        if self.service.is_healthy() {
243            self.failing_health_checks_since.store(0, Ordering::Relaxed);
244        } else {
245            let failing_since = match self.failing_health_checks_since.compare_exchange(
246                0,
247                current_timestamp as usize,
248                Ordering::Relaxed,
249                Ordering::Relaxed,
250            ) {
251                Ok(_) => current_timestamp,
252                Err(existing) => existing as i32,
253            };
254            unhealthy_since = std::cmp::min(failing_since, unhealthy_since);
255            dead_reasons.push(format!(
256                "Failed health checks for {} seconds",
257                current_timestamp - failing_since
258            ))
259        }
260
261        if dead_reasons.is_empty() {
262            ServiceStatus::Healthy
263        } else {
264            ServiceStatus::Unhealthy(Unhealthy {
265                since: unhealthy_since,
266                reasons: dead_reasons,
267            })
268        }
269    }
270
271    pub async fn restart(&self, timeout: Duration) -> anyhow::Result<()> {
272        self.abort(timeout).await?;
273        self.spawn().await
274    }
275
276    pub async fn spawn(&self) -> anyhow::Result<()> {
277        let mut handle = self.handle.lock().await;
278        if handle.as_ref().is_some() {
279            bail!(
280                "Cannot start service, already running: {}",
281                self.service.name()
282            );
283        }
284        tracing::info!("Starting service: {}", self.service.name());
285        (&*self.heart, self.clock.clone()).beat();
286        let svc = self.service.clone();
287        *handle = Some(tokio::spawn(async move { svc.run_forever().await }));
288        self.failing_health_checks_since.store(0, Ordering::Relaxed);
289
290        Ok(())
291    }
292
293    pub async fn abort(&self, timeout: Duration) -> anyhow::Result<()> {
294        tracing::info!("Aborting service: {}", self.service.name());
295        let mut handle_opt = self.handle.lock().await;
296        if let Some(handle) = handle_opt.as_mut() {
297            if handle.now_or_never().is_none() {
298                handle.abort();
299                tokio::time::timeout(timeout, async move {
300                    let _desired_join_error = handle.await.to_error();
301                })
302                .await?;
303            }
304        }
305        *handle_opt = None;
306        Ok(())
307    }
308}
309
310#[derive(Clone, Debug)]
311pub enum ServiceStatus {
312    Healthy,
313    Unhealthy(Unhealthy),
314}
315
316#[derive(Clone, Debug)]
317pub struct Unhealthy {
318    pub since: i32,
319    pub reasons: Vec<String>,
320}
321
322#[derive(Clone, Debug)]
323pub struct ServiceReport<Status> {
324    id: usize,
325    name: String,
326    status: Status,
327}
328
329#[derive(Clone, Debug)]
330pub struct ServiceReportSummary {
331    pub alive: Vec<ServiceReport<()>>,
332    pub dead: Vec<ServiceReport<Unhealthy>>,
333    pub timestamp: i32,
334}
335
336pub trait Heartbeat: Send + Sync {
337    fn beat(&self);
338}
339
340impl Heartbeat for () {
341    fn beat(&self) {}
342}
343
344impl<U, C> Heartbeat for (U, C)
345where
346    U: Deref<Target = AtomicUsize> + Send + Sync,
347    C: Deref<Target = dyn Clock> + Send + Sync,
348{
349    fn beat(&self) {
350        self.0
351            .store(self.1.current_timestamp() as usize, Ordering::Relaxed);
352    }
353}
354
355struct ServiceReportPublisher {
356    log_interval: i32,
357    last_healthy_log: i32,
358    last_healthy_count: usize,
359    last_dead_count: usize,
360}
361
362impl ServiceReportPublisher {
363    fn new(log_interval: i32) -> Self {
364        Self {
365            log_interval,
366            last_healthy_log: 0,
367            last_healthy_count: 0,
368            last_dead_count: 0,
369        }
370    }
371
372    /// Decide if anything needs to be published, and publish it.
373    fn handle_report(
374        &mut self,
375        ServiceReportSummary {
376            alive,
377            dead,
378            timestamp,
379            ..
380        }: ServiceReportSummary,
381    ) {
382        let some_are_dead = !dead.is_empty();
383        let n_healthy = alive.len();
384
385        if some_are_dead {
386            tracing::error!(
387                "{n_healthy} services are healthy. {} are unhealthy: {dead:#?}",
388                dead.len()
389            );
390        } else if timestamp - self.last_healthy_log >= self.log_interval
391            || n_healthy != self.last_healthy_count
392            || dead.len() != self.last_dead_count
393        {
394            let prefix = if some_are_dead { "" } else { "all " };
395            let alive_list = pretty(&alive.into_iter().map(|x| x.name).collect());
396            if n_healthy + dead.len() != self.last_healthy_count + self.last_dead_count {
397                tracing::info!("{prefix}{n_healthy} services are healthy: {alive_list}");
398            } else {
399                tracing::info!("{prefix}{n_healthy} services are healthy");
400                tracing::debug!("{prefix}{n_healthy} services are healthy: {alive_list}");
401            }
402            self.last_healthy_log = timestamp;
403        }
404        self.last_healthy_count = n_healthy;
405        self.last_dead_count = dead.len();
406    }
407}
408
409fn pretty(v: &Vec<String>) -> String {
410    format!("{v:#?}")
411        .replace('\"', "")
412        .replace("[\n", "\n")
413        .replace(",\n", "\n")
414        .replace("\n]", "")
415}