1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use tokio::signal;
8use tokio::signal::unix::SignalKind;
9use tokio::sync::mpsc::{Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::JoinHandle;
12use tokio::time::error::Elapsed;
13
14#[async_trait]
15pub trait Stop: Send {
16 async fn stop(self);
17
18 async fn concrete(self) -> ConcreteStop
19 where
20 Self: Sized + 'static,
21 {
22 ConcreteStop::new(self).await
23 }
24
25 async fn into_guard(self) -> StopGuard
26 where
27 Self: Sized + 'static,
28 {
29 StopGuard::new(self).await
30 }
31}
32
33#[async_trait]
34pub trait Lifecycle: Send + 'static {
35 type S: Stop;
36
37 async fn start(self) -> Self::S;
38
39 async fn concrete(self) -> ConcreteLifecycle
40 where
41 Self: Sized,
42 {
43 ConcreteLifecycle::new(self).await
44 }
45}
46
47pub struct StopGuard {
48 _tx: oneshot::Sender<()>,
49}
50
51impl StopGuard {
52 async fn new(stop: impl Stop + 'static) -> Self {
53 let (tx, rx) = oneshot::channel();
54 tokio::spawn(async move {
55 let _ = rx.await;
56 let _ = stop.stop().await;
57 });
58 StopGuard { _tx: tx }
59 }
60}
61
62pub struct IntrospectableStop {
63 sig: oneshot::Sender<()>,
64 jh: JoinHandle<()>,
65}
66
67impl IntrospectableStop {
68 fn new(jh: JoinHandle<()>, sig: oneshot::Sender<()>) -> Self {
69 IntrospectableStop { jh, sig }
70 }
71
72 pub fn is_finished(&self) -> bool {
73 self.jh.is_finished()
74 }
75}
76
77#[async_trait]
78impl Stop for IntrospectableStop {
79 async fn stop(self) {
80 let _ = self.sig.send(());
81 let _ = self.jh.await;
82 }
83}
84
85pub struct ConcreteLifecycle {
86 tx: oneshot::Sender<oneshot::Sender<ConcreteStop>>,
87}
88
89impl ConcreteLifecycle {
90 async fn new(lc: impl Lifecycle) -> Self {
91 let (tx, rx) = oneshot::channel::<oneshot::Sender<ConcreteStop>>();
92 tokio::spawn(async move {
93 if let Ok(stop_tx) = rx.await {
94 let stop = lc.start().await;
95 let _ = stop_tx.send(ConcreteStop::new(stop).await);
96 }
97 });
98 ConcreteLifecycle { tx }
99 }
100}
101
102pub async fn parallel_iter<I: IntoIterator<Item = ConcreteLifecycle>>(
103 iter: I,
104) -> ConcreteLifecycle {
105 let mut lc = None;
106 for next in iter.into_iter() {
107 if let Some(old_lc) = lc {
108 lc = Some(parallel(old_lc, next).concrete().await);
109 } else {
110 lc = Some(next);
111 }
112 }
113 if let Some(lc) = lc {
114 lc
115 } else {
116 NoLife.concrete().await
117 }
118}
119
120pub struct ConcreteStop {
121 tx: oneshot::Sender<oneshot::Sender<()>>,
122}
123
124impl ConcreteStop {
125 async fn new(stop: impl Stop + 'static) -> ConcreteStop {
126 let (tx, rx) = oneshot::channel::<oneshot::Sender<()>>();
127 tokio::spawn(async move {
128 if let Ok(done_tx) = rx.await {
129 stop.stop().await;
130 let _ = done_tx.send(());
131 }
132 });
133 ConcreteStop { tx }
134 }
135}
136
137#[async_trait]
138impl Lifecycle for ConcreteLifecycle {
139 type S = ConcreteStop;
140 async fn start(self) -> Self::S {
141 let (tx, rx) = oneshot::channel();
142 let _ = self.tx.send(tx);
143 rx.await.unwrap()
144 }
145}
146
147#[async_trait]
148impl Stop for ConcreteStop {
149 async fn stop(self) {
150 let (tx, rx) = oneshot::channel();
151 let _ = self.tx.send(tx);
152 rx.await.unwrap();
153 }
154}
155
156pub fn seq<A, B>(a: A, b: B) -> impl Lifecycle
157where
158 A: Lifecycle,
159 B: Lifecycle,
160{
161 lifecycle!(state, { (a.start().await, b.start().await) }, {
162 let (a_stop, b_stop) = state;
163 b_stop.stop().await;
164 a_stop.stop().await;
165 })
166}
167
168pub fn parallel<A, B>(a: A, b: B) -> impl Lifecycle
169where
170 A: Lifecycle,
171 B: Lifecycle,
172{
173 lifecycle!(state, { tokio::join!(a.start(), b.start()) }, {
174 let (a_stop, b_stop) = state;
175 let _ = tokio::join!(a_stop.stop(), b_stop.stop());
176 })
177}
178
179#[macro_export]
180macro_rules! parallel {
181 ($x:expr $(,)?) => ($x);
182 ($x:expr, $($y:expr),+ $(,)?) => (
183 simple_life::parallel($x, simple_life::parallel!($($y),+))
184 )
185}
186
187#[macro_export]
188macro_rules! seq {
189 ($x:expr $(,)?) => ($x);
190 ($x:expr, $($y:expr),+ $(,)?) => (
191 simple_life::seq($x, simple_life::seq!($($y),+))
192 )
193}
194
195#[macro_export]
196macro_rules! lifecycle {
197 (mut $state:ident, $start:block, $stop:block) => {
198 move || async move {
199 let mut $state = $start;
200 move || async move { $stop }
201 }
202 };
203 ($state:ident, $start:block, $stop:block) => {
204 move || async move {
205 let $state = $start;
206 move || async move { $stop }
207 }
208 };
209 ($start:block, $stop:block) => {
210 simple_life::lifecycle!(_state, $start, $stop)
211 };
212}
213
214#[macro_export]
215macro_rules! start {
216 ($x:block) => {
217 simple_life::lifecycle!($x, {})
218 };
219}
220
221#[macro_export]
222macro_rules! stop {
223 ($x:block) => {
224 simple_life::lifecycle!({}, $x)
225 };
226}
227
228#[async_trait]
229impl<F, R, O> Lifecycle for F
230where
231 F: FnOnce() -> R + 'static + Send,
232 R: Future<Output = O> + Send,
233 O: Stop,
234{
235 type S = O;
236
237 async fn start(self) -> Self::S {
238 self().await
239 }
240}
241
242#[async_trait]
243impl<F, R> Stop for F
244where
245 F: FnOnce() -> R + Send,
246 R: Future<Output = ()> + Send,
247{
248 async fn stop(self) {
249 self().await
250 }
251}
252
253pub fn spawn_interval<S, F, R>(
254 s: S,
255 period: Duration,
256 fun: F,
257) -> impl Lifecycle<S = IntrospectableStop>
258where
259 S: Clone + 'static + Send + Sync,
260 F: Fn(S) -> R + Send + Sync + 'static,
261 R: Future<Output = ()> + Send,
262{
263 spawn_with_shutdown(move |mut sig| async move {
264 let sleep = tokio::time::sleep(period);
265 tokio::pin!(sleep);
266 loop {
267 tokio::select! {
268 _ = &mut sleep => {
269 fun(s.clone()).await;
270 sleep.as_mut().reset(tokio::time::Instant::now() + period);
271 },
272 _ = &mut sig => {
273 return;
274 }
275 }
276 }
277 })
278}
279
280pub fn spawn_with_delay<F, R>(delay: Duration, fun: F) -> impl Lifecycle<S = IntrospectableStop>
281where
282 F: Fn() -> R + Send + Sync + 'static,
283 R: Future<Output = ()> + Send,
284{
285 spawn_with_shutdown(move |mut sig| async move {
286 let sleep = tokio::time::sleep(delay);
287 tokio::pin!(sleep);
288 tokio::select! {
289 _ = &mut sleep => {
290 fun().await;
291 },
292 _ = &mut sig => {
293 return;
294 }
295 }
296 })
297}
298
299pub fn spawn_lifecycle_with_delay(
300 delay: Duration,
301 lc: impl Lifecycle,
302) -> impl Lifecycle<S = IntrospectableStop> {
303 spawn_with_shutdown(move |mut sig| async move {
304 let sleep = tokio::time::sleep(delay);
305 tokio::pin!(sleep);
306 tokio::select! {
307 _ = &mut sleep => {
308 let stopper = lc.start().await;
309 sig.await;
310 stopper.stop().await;
311 },
312 _ = &mut sig => {
313 return;
314 }
315 }
316 })
317}
318
319pub struct ShutdownSignal(tokio::sync::oneshot::Receiver<()>);
320
321impl Future for ShutdownSignal {
322 type Output = ();
323
324 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
325 Pin::new(&mut self.0).poll(cx).map(|_r| ())
326 }
327}
328
329pub fn spawn_with_shutdown<F, R>(fun: F) -> impl Lifecycle<S = IntrospectableStop>
330where
331 F: FnOnce(ShutdownSignal) -> R + Send + 'static,
332 R: Future<Output = ()> + Send,
333{
334 move || async move {
335 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
336 let jh = tokio::spawn(async move {
337 let _ = fun(ShutdownSignal(shutdown_rx)).await;
338 });
339 IntrospectableStop::new(jh, shutdown_tx)
340 }
341}
342
343pub async fn run_until_shutdown_sig(
344 life: impl Lifecycle,
345 timeout: Duration,
346) -> Result<(), Elapsed> {
347 let stopper = life.start().await;
348 std_unix_shutdown_sigs().await;
349 tokio::time::timeout(timeout, stopper.stop()).await
350}
351
352async fn std_unix_shutdown_sigs() {
353 let mut kill_sig = signal::unix::signal(SignalKind::terminate()).unwrap();
354 tokio::select! {
355 _ = signal::ctrl_c() => {},
356 _ = kill_sig.recv() => {},
357 }
358}
359
360#[derive(Clone)]
361pub struct LazyStarter {
362 tx: Sender<ConcreteLifecycle>,
363}
364
365impl LazyStarter {
366 fn new() -> (impl Lifecycle<S = IntrospectableStop>, LazyStarter) {
367 let (tx, rx) = tokio::sync::mpsc::channel(5);
368 (LazyStarter::lifecycle(rx), LazyStarter { tx })
369 }
370
371 fn lifecycle(mut rx: Receiver<ConcreteLifecycle>) -> impl Lifecycle<S = IntrospectableStop> {
372 spawn_with_shutdown(|sig| async move {
373 let mut stoppers = vec![];
374 tokio::pin!(sig);
375 loop {
376 tokio::select! {
377 _ = &mut sig => {
378 break;
379 },
380 lc = rx.recv() => {
381 if let Some(lc) = lc {
382 stoppers.push(lc.start().await);
383 } else {
384 let _ = sig.await;
385 break;
386 }
387 },
388 }
389 }
390 if let Some(fut) = stoppers.into_iter().map(Stop::stop).reduce(|a, b| {
391 Box::pin(async {
392 tokio::join!(a, b);
393 })
394 }) {
395 fut.await;
396 }
397 })
398 }
399
400 pub async fn start(&self, life: impl Lifecycle) {
401 let _ = self.tx.send(life.concrete().await).await;
402 }
403}
404
405pub fn lazy_start() -> (impl Lifecycle, LazyStarter) {
406 LazyStarter::new()
407}
408
409#[derive(Eq, PartialEq, Debug)]
410pub struct NoStop;
411
412#[async_trait]
413impl Stop for NoStop {
414 async fn stop(self) {}
415}
416
417pub struct NoLife;
418
419#[async_trait]
420impl Lifecycle for NoLife {
421 type S = NoStop;
422
423 async fn start(self) -> Self::S {
424 NoStop
425 }
426}