1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
use std::{
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
    time::Duration,
};

use async_trait::async_trait;
use futures::lock::Mutex;
use tokio::time::timeout;

use crate::{
    error::LogError,
    short_name,
    time::{Sleeper, TokioSleeper},
    Never,
};

use super::service_manager::Heartbeat;

#[async_trait]
pub trait Service: Send + Sync {
    /// The service is never expected to stop, it should do its job until
    /// externally shutdown.
    async fn run_forever(&self) -> Never;

    /// The name used to represent the service in the logs
    fn name(&self) -> String {
        short_name::<Self>()
    }

    /// The maximum amount of seconds allowed between heartbeats before the
    /// service is deemed "dead"
    fn heartbeat_ttl(&self) -> i32;

    /// The service should frequently call "beat" to indicate that it is still
    /// alive.
    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat + 'static>);

    /// The service should be able to self-diagnose any internal issues and
    /// report if there is a problem.
    fn is_healthy(&self) -> bool;
}

#[async_trait]
impl<J: Job> Job for Vec<J> {
    async fn run_once(&self) -> anyhow::Result<()> {
        for job in self.iter() {
            job.run_once().await?;
        }
        Ok(())
    }
}

const LOOP_SLEEPER: TokioSleeper = TokioSleeper::default();

pub struct LoopingJobService {
    job: Arc<dyn LoopableJob>,
    heartbeat: Arc<dyn Heartbeat>,
    sleeper: Arc<dyn Sleeper>,
    /// Number of consecutive failures that have happened until now without any
    /// successes.  
    /// If the last attempt was a success, this will be 0.
    consecutive_failures: AtomicUsize,
}

impl From<Arc<dyn LoopableJob>> for LoopingJobService {
    fn from(job: Arc<dyn LoopableJob>) -> Self {
        Self {
            job,
            heartbeat: Arc::new(()),
            sleeper: Arc::new(LOOP_SLEEPER),
            consecutive_failures: 0.into(),
        }
    }
}

impl<J: LoopableJob + 'static> From<J> for LoopingJobService {
    fn from(job: J) -> Self {
        Self {
            job: Arc::new(job),
            heartbeat: Arc::new(()),
            sleeper: Arc::new(LOOP_SLEEPER),
            consecutive_failures: 0.into(),
        }
    }
}

impl LoopingJobService {
    pub fn new<J: Job + 'static>(job: J, config: LoopConfig) -> Self {
        Self {
            job: Arc::new((job, config)),
            heartbeat: Arc::new(()),
            sleeper: Arc::new(LOOP_SLEEPER),
            consecutive_failures: 0.into(),
        }
    }
}

/// Use this as the service type instead of LoopingJobService for mutable jobs
/// that need to be implemented as MutJob instead of Job.
///
/// Ideally, LoopingJobService could be used for both Job and MutJob, but it is
/// not possible to write multiple sufficiently generic implementations of
/// From<> for LoopingJobService due to the lack of specialization and negative
/// trait bounds.
pub struct LoopingMutJobService(LoopingJobService);

impl<Mj: MutJob + SelfConfiguredLoop + Send + Sync + 'static> From<Mj> for LoopingMutJobService {
    fn from(value: Mj) -> Self {
        let config = value.loop_config();
        Self(LoopingJobService::from((Mutex::new(value), config)))
    }
}

#[async_trait]
impl Service for LoopingMutJobService {
    async fn run_forever(&self) -> Never {
        self.0.run_forever().await
    }

    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
        self.0.set_heartbeat(heartbeat)
    }

    fn name(&self) -> String {
        Service::name(&self.0)
    }

    fn heartbeat_ttl(&self) -> i32 {
        self.0.heartbeat_ttl()
    }

    fn is_healthy(&self) -> bool {
        self.0.is_healthy()
    }
}

#[derive(Clone)]
pub struct LoopConfig {
    /// Time to pause between iterations, doing nothing.
    pub delay_secs: i32,
    /// The longest you would ever expect it to take to execute a single
    /// iteration, not including the delay.
    pub max_iteration_secs: i32,
}

pub trait LoopableJob: Job + SelfConfiguredLoop {}
impl<T: Job + SelfConfiguredLoop> LoopableJob for T {}

pub trait SelfConfiguredLoop {
    fn loop_config(&self) -> LoopConfig;
}

#[async_trait]
impl<J: Job, T: Send + Sync> Job for (J, T) {
    async fn run_once(&self) -> anyhow::Result<()> {
        self.0.run_once().await
    }

    fn name(&self) -> String {
        short_name::<J>()
    }
}
impl<T> SelfConfiguredLoop for (T, LoopConfig) {
    fn loop_config(&self) -> LoopConfig {
        self.1.clone()
    }
}

pub trait AsJob {
    fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
    where
        Self: 'a;
}

impl<T: Job + Sized> AsJob for T {
    fn as_job<'a>(self: Arc<Self>) -> Arc<dyn Job + 'a>
    where
        Self: 'a,
    {
        self
    }
}

#[async_trait]
impl Job for LoopingJobService {
    async fn run_once(&self) -> anyhow::Result<()> {
        self.job.run_once().await
    }
}

#[async_trait]
impl Service for LoopingJobService {
    async fn run_forever(&self) -> Never {
        loop {
            self.heartbeat.beat();
            let success = self
                .job
                .run_once()
                .await
                .log_with_context(|| format!("Unhandled error in {}", self.job.name()));
            match success {
                Some(()) => self.consecutive_failures.store(0, Ordering::Relaxed),
                None => {
                    self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
                }
            }
            self.sleeper
                .sleep(Duration::from_secs(
                    self.job.loop_config().delay_secs as u64,
                ))
                .await;
        }
    }

    fn set_heartbeat(&mut self, heartbeat: Arc<dyn Heartbeat>) {
        self.heartbeat = heartbeat;
    }

    fn name(&self) -> String {
        self.job.name()
    }

    fn heartbeat_ttl(&self) -> i32 {
        self.job.loop_config().delay_secs + self.job.loop_config().max_iteration_secs
    }

    fn is_healthy(&self) -> bool {
        0 == self.consecutive_failures.load(Ordering::Relaxed)
    }
}

#[async_trait]
pub trait Job: AsJob + Send + Sync {
    async fn run_once(&self) -> anyhow::Result<()>;

    fn name(&self) -> String {
        short_name::<Self>()
    }
}

#[async_trait]
impl Job for Vec<Arc<dyn Job>> {
    async fn run_once(&self) -> anyhow::Result<()> {
        for item in self.iter() {
            item.run_once().await?;
        }
        Ok(())
    }
}

#[async_trait]
impl<F: Fn() + Send + Sync> Job for F {
    async fn run_once(&self) -> anyhow::Result<()> {
        self();
        Ok(())
    }
}

/// Use this if you never plan on trying to use the same instance of this job
/// multiple times concurrently.
#[async_trait]
pub trait MutJob {
    async fn run_once_mut(&mut self) -> anyhow::Result<()>;
}

#[async_trait]
impl<Mj: MutJob + Send + Sync> Job for Mutex<Mj> {
    async fn run_once(&self) -> anyhow::Result<()> {
        timeout(Duration::from_secs(60), self.lock())
            .await?
            .run_once_mut()
            .await
    }

    fn name(&self) -> String {
        short_name::<Mj>()
    }
}