app_frame/
service.rs

1use std::{
2    sync::{
3        atomic::{AtomicUsize, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use futures::lock::Mutex;
11use tokio::time::timeout;
12
13use crate::{
14    error::LogError,
15    short_name,
16    time::{Sleeper, TokioSleeper},
17    Never,
18};
19
20use super::service_manager::Heartbeat;
21
22#[async_trait]
23pub trait Service: Send + Sync {
24    /// The service is never expected to stop, it should do its job until
25    /// externally shutdown.
26    async fn run_forever(&self) -> Never;
27
28    /// The name used to represent the service in the logs
29    fn name(&self) -> String {
30        short_name::<Self>()
31    }
32
33    /// The maximum amount of seconds allowed between heartbeats before the
34    /// service is deemed "dead"
35    fn heartbeat_ttl(&self) -> i32;
36
37    /// The service should frequently call "beat" to indicate that it is still
38    /// alive.
39    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat + 'static>);
40
41    /// The service should be able to self-diagnose any internal issues and
42    /// report if there is a problem.
43    fn is_healthy(&self) -> bool;
44}
45
46#[async_trait]
47impl<J: Job> Job for Vec<J> {
48    async fn run_once(&self) -> anyhow::Result<()> {
49        for job in self.iter() {
50            job.run_once().await?;
51        }
52        Ok(())
53    }
54}
55
56const LOOP_SLEEPER: TokioSleeper = TokioSleeper::default();
57
58pub struct LoopingJobService {
59    job: Arc<dyn LoopableJob>,
60    heartbeat: Arc<dyn Heartbeat>,
61    sleeper: Arc<dyn Sleeper>,
62    /// Number of consecutive failures that have happened until now without any
63    /// successes.  
64    /// If the last attempt was a success, this will be 0.
65    consecutive_failures: AtomicUsize,
66}
67
68impl From<Arc<dyn LoopableJob>> for LoopingJobService {
69    fn from(job: Arc<dyn LoopableJob>) -> Self {
70        Self {
71            job,
72            heartbeat: Arc::new(()),
73            sleeper: Arc::new(LOOP_SLEEPER),
74            consecutive_failures: 0.into(),
75        }
76    }
77}
78
79impl<J: LoopableJob + 'static> From<J> for LoopingJobService {
80    fn from(job: J) -> Self {
81        Self {
82            job: Arc::new(job),
83            heartbeat: Arc::new(()),
84            sleeper: Arc::new(LOOP_SLEEPER),
85            consecutive_failures: 0.into(),
86        }
87    }
88}
89
90impl LoopingJobService {
91    pub fn new<J: Job + 'static>(job: J, config: LoopConfig) -> Self {
92        Self {
93            job: Arc::new((job, config)),
94            heartbeat: Arc::new(()),
95            sleeper: Arc::new(LOOP_SLEEPER),
96            consecutive_failures: 0.into(),
97        }
98    }
99}
100
101/// Use this as the service type instead of LoopingJobService for mutable jobs
102/// that need to be implemented as MutJob instead of Job.
103///
104/// Ideally, LoopingJobService could be used for both Job and MutJob, but it is
105/// not possible to write multiple sufficiently generic implementations of
106/// From<> for LoopingJobService due to the lack of specialization and negative
107/// trait bounds.
108pub struct LoopingMutJobService(LoopingJobService);
109
110impl<Mj: MutJob + SelfConfiguredLoop + Send + Sync + 'static> From<Mj> for LoopingMutJobService {
111    fn from(value: Mj) -> Self {
112        let config = value.loop_config();
113        Self(LoopingJobService::from((Mutex::new(value), config)))
114    }
115}
116
117#[async_trait]
118impl Service for LoopingMutJobService {
119    async fn run_forever(&self) -> Never {
120        self.0.run_forever().await
121    }
122
123    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
124        self.0.set_heartbeat(heartbeat)
125    }
126
127    fn name(&self) -> String {
128        Service::name(&self.0)
129    }
130
131    fn heartbeat_ttl(&self) -> i32 {
132        self.0.heartbeat_ttl()
133    }
134
135    fn is_healthy(&self) -> bool {
136        self.0.is_healthy()
137    }
138}
139
140#[derive(Clone)]
141pub struct LoopConfig {
142    /// Time to pause between iterations, doing nothing.
143    pub delay_secs: i32,
144    /// The longest you would ever expect it to take to execute a single
145    /// iteration, not including the delay.
146    pub max_iteration_secs: i32,
147}
148
149pub trait LoopableJob: Job + SelfConfiguredLoop {}
150impl<T: Job + SelfConfiguredLoop> LoopableJob for T {}
151
152pub trait SelfConfiguredLoop {
153    fn loop_config(&self) -> LoopConfig;
154}
155
156#[async_trait]
157impl<J: Job, T: Send + Sync> Job for (J, T) {
158    async fn run_once(&self) -> anyhow::Result<()> {
159        self.0.run_once().await
160    }
161
162    fn name(&self) -> String {
163        short_name::<J>()
164    }
165}
166impl<T> SelfConfiguredLoop for (T, LoopConfig) {
167    fn loop_config(&self) -> LoopConfig {
168        self.1.clone()
169    }
170}
171
172pub trait AsJob {
173    fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
174    where
175        Self: 'a;
176}
177
178impl<T: Job + Sized> AsJob for T {
179    fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
180    where
181        Self: 'a,
182    {
183        self
184    }
185}
186
187#[async_trait]
188impl Job for LoopingJobService {
189    async fn run_once(&self) -> anyhow::Result<()> {
190        self.job.run_once().await
191    }
192}
193
194#[async_trait]
195impl Service for LoopingJobService {
196    async fn run_forever(&self) -> Never {
197        loop {
198            self.heartbeat.beat();
199            let success = self
200                .job
201                .run_once()
202                .await
203                .log_with_context(|| format!("Unhandled error in {}", self.job.name()));
204            match success {
205                Some(()) => self.consecutive_failures.store(0, Ordering::Relaxed),
206                None => {
207                    self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
208                }
209            }
210            self.sleeper
211                .sleep(Duration::from_secs(
212                    self.job.loop_config().delay_secs as u64,
213                ))
214                .await;
215        }
216    }
217
218    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
219        self.heartbeat = heartbeat;
220    }
221
222    fn name(&self) -> String {
223        self.job.name()
224    }
225
226    fn heartbeat_ttl(&self) -> i32 {
227        self.job.loop_config().delay_secs + self.job.loop_config().max_iteration_secs
228    }
229
230    fn is_healthy(&self) -> bool {
231        0 == self.consecutive_failures.load(Ordering::Relaxed)
232    }
233}
234
235#[async_trait]
236pub trait Job: AsJob + Send + Sync {
237    async fn run_once(&self) -> anyhow::Result<()>;
238
239    fn name(&self) -> String {
240        short_name::<Self>()
241    }
242}
243
244#[async_trait]
245impl Job for Vec<Arc<dyn Job>> {
246    async fn run_once(&self) -> anyhow::Result<()> {
247        for item in self.iter() {
248            item.run_once().await?;
249        }
250        Ok(())
251    }
252}
253
254#[async_trait]
255impl<F: Fn() + Send + Sync> Job for F {
256    async fn run_once(&self) -> anyhow::Result<()> {
257        self();
258        Ok(())
259    }
260}
261
262/// Use this if you never plan on trying to use the same instance of this job
263/// multiple times concurrently.
264#[async_trait]
265pub trait MutJob {
266    async fn run_once_mut(&mut self) -> anyhow::Result<()>;
267}
268
269#[async_trait]
270impl<Mj: MutJob + Send + Sync> Job for Mutex<Mj> {
271    async fn run_once(&self) -> anyhow::Result<()> {
272        timeout(Duration::from_secs(60), self.lock())
273            .await?
274            .run_once_mut()
275            .await
276    }
277
278    fn name(&self) -> String {
279        short_name::<Mj>()
280    }
281}