1use std::{
2 ops::Deref,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9
10use anyhow::{bail, ensure};
11use futures::lock::Mutex;
12use futures::FutureExt;
13use tokio::task::JoinHandle;
14
15use super::service::Job;
16use crate::health_endpoint;
17use crate::service::Service;
18use crate::{
19 error::display_error,
20 health_endpoint::HealthEndpointConfig,
21 time::{Clock, Sleeper, SystemClock, TokioSleeper},
22 Never, ToError,
23};
24
25#[async_trait::async_trait]
26pub trait Application: Initialize + Serves {
27 async fn run(&self) -> anyhow::Result<Never> {
29 self.run_custom(Default::default()).await
30 }
31
32 async fn run_custom(&self, config: RunConfig) -> anyhow::Result<Never> {
34 config.validate()?;
35 let mgr = Arc::new(self.start().await?);
36 if let Some(config) = config.http_health_endpoint {
37 let mgr2 = mgr.clone();
38 tokio::spawn(async move { health_endpoint::run(mgr2, config).await });
39 }
40 Ok(mgr
41 .monitor_with_recovery(10, config.log_interval, config.attempt_recovery_after)
42 .await)
43 }
44
45 async fn start(&self) -> anyhow::Result<ServiceManager> {
47 tracing::info!("Initializing application.");
48 self.init().run_once().await?;
49 let mut mgr = ServiceManager::new(
50 Arc::new(SystemClock::default()),
51 Arc::new(TokioSleeper::default()),
52 );
53 for service in self.services() {
54 mgr.register_service(service);
55 }
56 tracing::info!("Starting Services.");
57 mgr.spawn_services().await?;
58 Ok(mgr)
59 }
60}
61impl<T: Initialize + Serves> Application for T {}
62
63pub struct RunConfig {
64 pub log_interval: i32,
66 pub attempt_recovery_after: i32,
68 pub http_health_endpoint: Option<HealthEndpointConfig>,
70}
71
72impl RunConfig {
73 fn validate(&self) -> anyhow::Result<()> {
74 ensure!(self.log_interval >= 0);
75 ensure!(self.attempt_recovery_after >= 0);
76 Ok(())
77 }
78}
79
80impl Default for RunConfig {
81 fn default() -> Self {
82 Self {
83 log_interval: 21600,
84 attempt_recovery_after: 120,
85 http_health_endpoint: Some(Default::default()),
86 }
87 }
88}
89
90pub trait Initialize {
91 #[must_use]
97 fn init(&self) -> Vec<Arc<dyn Job>> {
98 vec![]
99 }
100}
101
102pub trait Serves {
103 #[must_use]
108 fn services(&self) -> Vec<Box<dyn Service>>;
109}
110
111pub struct ServiceManager {
113 clock: Arc<dyn Clock>,
114 sleeper: Arc<dyn Sleeper>,
115 managed_services: Vec<ManagedService>,
116}
117
118impl ServiceManager {
119 pub fn new(clock: Arc<dyn Clock>, sleeper: Arc<dyn Sleeper>) -> Self {
120 Self {
121 clock,
122 sleeper,
123 managed_services: vec![],
124 }
125 }
126}
127
128impl ServiceManager {
129 pub fn register_service(&mut self, mut service: Box<dyn Service>) {
130 let heart = Arc::new(AtomicUsize::new(0));
131 service.set_heartbeat(Arc::new((heart.clone(), self.clock.clone())));
132 self.managed_services.push(ManagedService {
133 clock: self.clock.clone(),
134 service: service.into(),
135 heart,
136 failing_health_checks_since: 0.into(),
137 handle: Mutex::new(None),
138 });
139 }
140
141 pub async fn spawn_services(&mut self) -> anyhow::Result<()> {
142 for managed_service in self.managed_services.iter_mut() {
143 managed_service.spawn().await?;
144 }
145 Ok(())
146 }
147
148 pub async fn monitor(&self, check_interval: u64, log_interval: i32) -> Never {
149 let mut publisher = ServiceReportPublisher::new(log_interval);
150 loop {
151 publisher.handle_report(self.check());
152 self.sleeper
153 .sleep(Duration::from_secs(check_interval))
154 .await;
155 }
156 }
157
158 pub async fn monitor_with_recovery(
160 &self,
161 check_interval: u64,
162 log_interval: i32,
163 attempt_recovery_after: i32,
164 ) -> Never {
165 let mut publisher = ServiceReportPublisher::new(log_interval);
166 loop {
167 let report = self.check();
168 publisher.handle_report(report.clone());
169 for ServiceReport {
170 id: index,
171 status: Unhealthy { since, .. },
172 ..
173 } in report.dead
174 {
175 let svc = &self.managed_services[index];
176 if since + attempt_recovery_after < self.clock.current_timestamp() as i32 {
177 if let Err(e) = svc.restart(Duration::from_secs(10)).await {
178 tracing::error!(
179 "Failed to restart {}: {}",
180 svc.service.name(),
181 display_error(&e)
182 );
183 }
184 }
185 }
186 self.sleeper
187 .sleep(Duration::from_secs(check_interval))
188 .await;
189 }
190 }
191
192 pub fn check(&self) -> ServiceReportSummary {
193 let (mut alive, mut dead) = (vec![], vec![]);
194 let timestamp = self.clock.current_timestamp() as i32;
195 for (index, ms) in self.managed_services.iter().enumerate() {
196 let status = ms.check(timestamp);
197 if let ServiceStatus::Unhealthy(unhealthy) = status {
198 dead.push(ServiceReport {
199 id: index,
200 name: ms.service.name(),
201 status: unhealthy,
202 });
203 } else {
204 alive.push(ServiceReport {
205 id: index,
206 name: ms.service.name(),
207 status: (),
208 });
209 }
210 }
211 ServiceReportSummary {
212 alive,
213 dead,
214 timestamp,
215 }
216 }
217}
218
219struct ManagedService {
220 clock: Arc<dyn Clock>,
221 service: Arc<dyn Service>,
222 heart: Arc<AtomicUsize>,
223 failing_health_checks_since: AtomicUsize,
224 handle: Mutex<Option<JoinHandle<Never>>>, }
226
227impl ManagedService {
228 pub fn check(&self, current_timestamp: i32) -> ServiceStatus {
231 let mut unhealthy_since = 0;
232 let last_beat = self.heart.load(Ordering::Relaxed) as i32;
233 let time_since_last_beat = current_timestamp - last_beat;
234 let ttl = self.service.heartbeat_ttl();
235 let mut dead_reasons = vec![];
236 if time_since_last_beat >= ttl {
237 unhealthy_since = last_beat + ttl;
238 dead_reasons.push(format!(
239 "No heartbeat for {time_since_last_beat} seconds (ttl: {ttl})"
240 ))
241 }
242 if self.service.is_healthy() {
243 self.failing_health_checks_since.store(0, Ordering::Relaxed);
244 } else {
245 let failing_since = match self.failing_health_checks_since.compare_exchange(
246 0,
247 current_timestamp as usize,
248 Ordering::Relaxed,
249 Ordering::Relaxed,
250 ) {
251 Ok(_) => current_timestamp,
252 Err(existing) => existing as i32,
253 };
254 unhealthy_since = std::cmp::min(failing_since, unhealthy_since);
255 dead_reasons.push(format!(
256 "Failed health checks for {} seconds",
257 current_timestamp - failing_since
258 ))
259 }
260
261 if dead_reasons.is_empty() {
262 ServiceStatus::Healthy
263 } else {
264 ServiceStatus::Unhealthy(Unhealthy {
265 since: unhealthy_since,
266 reasons: dead_reasons,
267 })
268 }
269 }
270
271 pub async fn restart(&self, timeout: Duration) -> anyhow::Result<()> {
272 self.abort(timeout).await?;
273 self.spawn().await
274 }
275
276 pub async fn spawn(&self) -> anyhow::Result<()> {
277 let mut handle = self.handle.lock().await;
278 if handle.as_ref().is_some() {
279 bail!(
280 "Cannot start service, already running: {}",
281 self.service.name()
282 );
283 }
284 tracing::info!("Starting service: {}", self.service.name());
285 (&*self.heart, self.clock.clone()).beat();
286 let svc = self.service.clone();
287 *handle = Some(tokio::spawn(async move { svc.run_forever().await }));
288 self.failing_health_checks_since.store(0, Ordering::Relaxed);
289
290 Ok(())
291 }
292
293 pub async fn abort(&self, timeout: Duration) -> anyhow::Result<()> {
294 tracing::info!("Aborting service: {}", self.service.name());
295 let mut handle_opt = self.handle.lock().await;
296 if let Some(handle) = handle_opt.as_mut() {
297 if handle.now_or_never().is_none() {
298 handle.abort();
299 tokio::time::timeout(timeout, async move {
300 let _desired_join_error = handle.await.to_error();
301 })
302 .await?;
303 }
304 }
305 *handle_opt = None;
306 Ok(())
307 }
308}
309
310#[derive(Clone, Debug)]
311pub enum ServiceStatus {
312 Healthy,
313 Unhealthy(Unhealthy),
314}
315
316#[derive(Clone, Debug)]
317pub struct Unhealthy {
318 pub since: i32,
319 pub reasons: Vec<String>,
320}
321
322#[derive(Clone, Debug)]
323pub struct ServiceReport<Status> {
324 id: usize,
325 name: String,
326 status: Status,
327}
328
329#[derive(Clone, Debug)]
330pub struct ServiceReportSummary {
331 pub alive: Vec<ServiceReport<()>>,
332 pub dead: Vec<ServiceReport<Unhealthy>>,
333 pub timestamp: i32,
334}
335
336pub trait Heartbeat: Send + Sync {
337 fn beat(&self);
338}
339
340impl Heartbeat for () {
341 fn beat(&self) {}
342}
343
344impl<U, C> Heartbeat for (U, C)
345where
346 U: Deref<Target = AtomicUsize> + Send + Sync,
347 C: Deref<Target = dyn Clock> + Send + Sync,
348{
349 fn beat(&self) {
350 self.0
351 .store(self.1.current_timestamp() as usize, Ordering::Relaxed);
352 }
353}
354
355struct ServiceReportPublisher {
356 log_interval: i32,
357 last_healthy_log: i32,
358 last_healthy_count: usize,
359 last_dead_count: usize,
360}
361
362impl ServiceReportPublisher {
363 fn new(log_interval: i32) -> Self {
364 Self {
365 log_interval,
366 last_healthy_log: 0,
367 last_healthy_count: 0,
368 last_dead_count: 0,
369 }
370 }
371
372 fn handle_report(
374 &mut self,
375 ServiceReportSummary {
376 alive,
377 dead,
378 timestamp,
379 ..
380 }: ServiceReportSummary,
381 ) {
382 let some_are_dead = !dead.is_empty();
383 let n_healthy = alive.len();
384
385 if some_are_dead {
386 tracing::error!(
387 "{n_healthy} services are healthy. {} are unhealthy: {dead:#?}",
388 dead.len()
389 );
390 } else if timestamp - self.last_healthy_log >= self.log_interval
391 || n_healthy != self.last_healthy_count
392 || dead.len() != self.last_dead_count
393 {
394 let prefix = if some_are_dead { "" } else { "all " };
395 let alive_list = pretty(&alive.into_iter().map(|x| x.name).collect());
396 if n_healthy + dead.len() != self.last_healthy_count + self.last_dead_count {
397 tracing::info!("{prefix}{n_healthy} services are healthy: {alive_list}");
398 } else {
399 tracing::info!("{prefix}{n_healthy} services are healthy");
400 tracing::debug!("{prefix}{n_healthy} services are healthy: {alive_list}");
401 }
402 self.last_healthy_log = timestamp;
403 }
404 self.last_healthy_count = n_healthy;
405 self.last_dead_count = dead.len();
406 }
407}
408
409fn pretty(v: &Vec<String>) -> String {
410 format!("{v:#?}")
411 .replace('\"', "")
412 .replace("[\n", "\n")
413 .replace(",\n", "\n")
414 .replace("\n]", "")
415}