1use std::{
2 sync::{
3 atomic::{AtomicUsize, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use futures::lock::Mutex;
11use tokio::time::timeout;
12
13use crate::{
14 error::LogError,
15 short_name,
16 time::{Sleeper, TokioSleeper},
17 Never,
18};
19
20use super::service_manager::Heartbeat;
21
22#[async_trait]
23pub trait Service: Send + Sync {
24 async fn run_forever(&self) -> Never;
27
28 fn name(&self) -> String {
30 short_name::<Self>()
31 }
32
33 fn heartbeat_ttl(&self) -> i32;
36
37 fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat + 'static>);
40
41 fn is_healthy(&self) -> bool;
44}
45
46#[async_trait]
47impl<J: Job> Job for Vec<J> {
48 async fn run_once(&self) -> anyhow::Result<()> {
49 for job in self.iter() {
50 job.run_once().await?;
51 }
52 Ok(())
53 }
54}
55
56const LOOP_SLEEPER: TokioSleeper = TokioSleeper::default();
57
58pub struct LoopingJobService {
59 job: Arc<dyn LoopableJob>,
60 heartbeat: Arc<dyn Heartbeat>,
61 sleeper: Arc<dyn Sleeper>,
62 consecutive_failures: AtomicUsize,
66}
67
68impl From<Arc<dyn LoopableJob>> for LoopingJobService {
69 fn from(job: Arc<dyn LoopableJob>) -> Self {
70 Self {
71 job,
72 heartbeat: Arc::new(()),
73 sleeper: Arc::new(LOOP_SLEEPER),
74 consecutive_failures: 0.into(),
75 }
76 }
77}
78
79impl<J: LoopableJob + 'static> From<J> for LoopingJobService {
80 fn from(job: J) -> Self {
81 Self {
82 job: Arc::new(job),
83 heartbeat: Arc::new(()),
84 sleeper: Arc::new(LOOP_SLEEPER),
85 consecutive_failures: 0.into(),
86 }
87 }
88}
89
90impl LoopingJobService {
91 pub fn new<J: Job + 'static>(job: J, config: LoopConfig) -> Self {
92 Self {
93 job: Arc::new((job, config)),
94 heartbeat: Arc::new(()),
95 sleeper: Arc::new(LOOP_SLEEPER),
96 consecutive_failures: 0.into(),
97 }
98 }
99}
100
101pub struct LoopingMutJobService(LoopingJobService);
109
110impl<Mj: MutJob + SelfConfiguredLoop + Send + Sync + 'static> From<Mj> for LoopingMutJobService {
111 fn from(value: Mj) -> Self {
112 let config = value.loop_config();
113 Self(LoopingJobService::from((Mutex::new(value), config)))
114 }
115}
116
117#[async_trait]
118impl Service for LoopingMutJobService {
119 async fn run_forever(&self) -> Never {
120 self.0.run_forever().await
121 }
122
123 fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
124 self.0.set_heartbeat(heartbeat)
125 }
126
127 fn name(&self) -> String {
128 Service::name(&self.0)
129 }
130
131 fn heartbeat_ttl(&self) -> i32 {
132 self.0.heartbeat_ttl()
133 }
134
135 fn is_healthy(&self) -> bool {
136 self.0.is_healthy()
137 }
138}
139
140#[derive(Clone)]
141pub struct LoopConfig {
142 pub delay_secs: i32,
144 pub max_iteration_secs: i32,
147}
148
149pub trait LoopableJob: Job + SelfConfiguredLoop {}
150impl<T: Job + SelfConfiguredLoop> LoopableJob for T {}
151
152pub trait SelfConfiguredLoop {
153 fn loop_config(&self) -> LoopConfig;
154}
155
156#[async_trait]
157impl<J: Job, T: Send + Sync> Job for (J, T) {
158 async fn run_once(&self) -> anyhow::Result<()> {
159 self.0.run_once().await
160 }
161
162 fn name(&self) -> String {
163 short_name::<J>()
164 }
165}
166impl<T> SelfConfiguredLoop for (T, LoopConfig) {
167 fn loop_config(&self) -> LoopConfig {
168 self.1.clone()
169 }
170}
171
172pub trait AsJob {
173 fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
174 where
175 Self: 'a;
176}
177
178impl<T: Job + Sized> AsJob for T {
179 fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
180 where
181 Self: 'a,
182 {
183 self
184 }
185}
186
187#[async_trait]
188impl Job for LoopingJobService {
189 async fn run_once(&self) -> anyhow::Result<()> {
190 self.job.run_once().await
191 }
192}
193
194#[async_trait]
195impl Service for LoopingJobService {
196 async fn run_forever(&self) -> Never {
197 loop {
198 self.heartbeat.beat();
199 let success = self
200 .job
201 .run_once()
202 .await
203 .log_with_context(|| format!("Unhandled error in {}", self.job.name()));
204 match success {
205 Some(()) => self.consecutive_failures.store(0, Ordering::Relaxed),
206 None => {
207 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
208 }
209 }
210 self.sleeper
211 .sleep(Duration::from_secs(
212 self.job.loop_config().delay_secs as u64,
213 ))
214 .await;
215 }
216 }
217
218 fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
219 self.heartbeat = heartbeat;
220 }
221
222 fn name(&self) -> String {
223 self.job.name()
224 }
225
226 fn heartbeat_ttl(&self) -> i32 {
227 self.job.loop_config().delay_secs + self.job.loop_config().max_iteration_secs
228 }
229
230 fn is_healthy(&self) -> bool {
231 0 == self.consecutive_failures.load(Ordering::Relaxed)
232 }
233}
234
235#[async_trait]
236pub trait Job: AsJob + Send + Sync {
237 async fn run_once(&self) -> anyhow::Result<()>;
238
239 fn name(&self) -> String {
240 short_name::<Self>()
241 }
242}
243
244#[async_trait]
245impl Job for Vec<Arc<dyn Job>> {
246 async fn run_once(&self) -> anyhow::Result<()> {
247 for item in self.iter() {
248 item.run_once().await?;
249 }
250 Ok(())
251 }
252}
253
254#[async_trait]
255impl<F: Fn() + Send + Sync> Job for F {
256 async fn run_once(&self) -> anyhow::Result<()> {
257 self();
258 Ok(())
259 }
260}
261
262#[async_trait]
265pub trait MutJob {
266 async fn run_once_mut(&mut self) -> anyhow::Result<()>;
267}
268
269#[async_trait]
270impl<Mj: MutJob + Send + Sync> Job for Mutex<Mj> {
271 async fn run_once(&self) -> anyhow::Result<()> {
272 timeout(Duration::from_secs(60), self.lock())
273 .await?
274 .run_once_mut()
275 .await
276 }
277
278 fn name(&self) -> String {
279 short_name::<Mj>()
280 }
281}