simple_life/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use tokio::signal;
8use tokio::signal::unix::SignalKind;
9use tokio::sync::mpsc::{Receiver, Sender};
10use tokio::sync::oneshot;
11use tokio::task::JoinHandle;
12use tokio::time::error::Elapsed;
13
14#[async_trait]
15pub trait Stop: Send {
16    async fn stop(self);
17
18    async fn concrete(self) -> ConcreteStop
19    where
20        Self: Sized + 'static,
21    {
22        ConcreteStop::new(self).await
23    }
24
25    async fn into_guard(self) -> StopGuard
26    where
27        Self: Sized + 'static,
28    {
29        StopGuard::new(self).await
30    }
31}
32
33#[async_trait]
34pub trait Lifecycle: Send + 'static {
35    type S: Stop;
36
37    async fn start(self) -> Self::S;
38
39    async fn concrete(self) -> ConcreteLifecycle
40    where
41        Self: Sized,
42    {
43        ConcreteLifecycle::new(self).await
44    }
45}
46
47pub struct StopGuard {
48    _tx: oneshot::Sender<()>,
49}
50
51impl StopGuard {
52    async fn new(stop: impl Stop + 'static) -> Self {
53        let (tx, rx) = oneshot::channel();
54        tokio::spawn(async move {
55            let _ = rx.await;
56            let _ = stop.stop().await;
57        });
58        StopGuard { _tx: tx }
59    }
60}
61
62pub struct IntrospectableStop {
63    sig: oneshot::Sender<()>,
64    jh: JoinHandle<()>,
65}
66
67impl IntrospectableStop {
68    fn new(jh: JoinHandle<()>, sig: oneshot::Sender<()>) -> Self {
69        IntrospectableStop { jh, sig }
70    }
71
72    pub fn is_finished(&self) -> bool {
73        self.jh.is_finished()
74    }
75}
76
77#[async_trait]
78impl Stop for IntrospectableStop {
79    async fn stop(self) {
80        let _ = self.sig.send(());
81        let _ = self.jh.await;
82    }
83}
84
85pub struct ConcreteLifecycle {
86    tx: oneshot::Sender<oneshot::Sender<ConcreteStop>>,
87}
88
89impl ConcreteLifecycle {
90    async fn new(lc: impl Lifecycle) -> Self {
91        let (tx, rx) = oneshot::channel::<oneshot::Sender<ConcreteStop>>();
92        tokio::spawn(async move {
93            if let Ok(stop_tx) = rx.await {
94                let stop = lc.start().await;
95                let _ = stop_tx.send(ConcreteStop::new(stop).await);
96            }
97        });
98        ConcreteLifecycle { tx }
99    }
100}
101
102pub async fn parallel_iter<I: IntoIterator<Item = ConcreteLifecycle>>(
103    iter: I,
104) -> ConcreteLifecycle {
105    let mut lc = None;
106    for next in iter.into_iter() {
107        if let Some(old_lc) = lc {
108            lc = Some(parallel(old_lc, next).concrete().await);
109        } else {
110            lc = Some(next);
111        }
112    }
113    if let Some(lc) = lc {
114        lc
115    } else {
116        NoLife.concrete().await
117    }
118}
119
120pub struct ConcreteStop {
121    tx: oneshot::Sender<oneshot::Sender<()>>,
122}
123
124impl ConcreteStop {
125    async fn new(stop: impl Stop + 'static) -> ConcreteStop {
126        let (tx, rx) = oneshot::channel::<oneshot::Sender<()>>();
127        tokio::spawn(async move {
128            if let Ok(done_tx) = rx.await {
129                stop.stop().await;
130                let _ = done_tx.send(());
131            }
132        });
133        ConcreteStop { tx }
134    }
135}
136
137#[async_trait]
138impl Lifecycle for ConcreteLifecycle {
139    type S = ConcreteStop;
140    async fn start(self) -> Self::S {
141        let (tx, rx) = oneshot::channel();
142        let _ = self.tx.send(tx);
143        rx.await.unwrap()
144    }
145}
146
147#[async_trait]
148impl Stop for ConcreteStop {
149    async fn stop(self) {
150        let (tx, rx) = oneshot::channel();
151        let _ = self.tx.send(tx);
152        rx.await.unwrap();
153    }
154}
155
156pub fn seq<A, B>(a: A, b: B) -> impl Lifecycle
157where
158    A: Lifecycle,
159    B: Lifecycle,
160{
161    lifecycle!(state, { (a.start().await, b.start().await) }, {
162        let (a_stop, b_stop) = state;
163        b_stop.stop().await;
164        a_stop.stop().await;
165    })
166}
167
168pub fn parallel<A, B>(a: A, b: B) -> impl Lifecycle
169where
170    A: Lifecycle,
171    B: Lifecycle,
172{
173    lifecycle!(state, { tokio::join!(a.start(), b.start()) }, {
174        let (a_stop, b_stop) = state;
175        let _ = tokio::join!(a_stop.stop(), b_stop.stop());
176    })
177}
178
179#[macro_export]
180macro_rules! parallel {
181    ($x:expr $(,)?) => ($x);
182    ($x:expr, $($y:expr),+ $(,)?) => (
183        simple_life::parallel($x, simple_life::parallel!($($y),+))
184    )
185}
186
187#[macro_export]
188macro_rules! seq {
189    ($x:expr $(,)?) => ($x);
190    ($x:expr, $($y:expr),+ $(,)?) => (
191        simple_life::seq($x, simple_life::seq!($($y),+))
192    )
193}
194
195#[macro_export]
196macro_rules! lifecycle {
197    (mut $state:ident, $start:block, $stop:block) => {
198        move || async move {
199            let mut $state = $start;
200            move || async move { $stop }
201        }
202    };
203    ($state:ident, $start:block, $stop:block) => {
204        move || async move {
205            let $state = $start;
206            move || async move { $stop }
207        }
208    };
209    ($start:block, $stop:block) => {
210        simple_life::lifecycle!(_state, $start, $stop)
211    };
212}
213
214#[macro_export]
215macro_rules! start {
216    ($x:block) => {
217        simple_life::lifecycle!($x, {})
218    };
219}
220
221#[macro_export]
222macro_rules! stop {
223    ($x:block) => {
224        simple_life::lifecycle!({}, $x)
225    };
226}
227
228#[async_trait]
229impl<F, R, O> Lifecycle for F
230where
231    F: FnOnce() -> R + 'static + Send,
232    R: Future<Output = O> + Send,
233    O: Stop,
234{
235    type S = O;
236
237    async fn start(self) -> Self::S {
238        self().await
239    }
240}
241
242#[async_trait]
243impl<F, R> Stop for F
244where
245    F: FnOnce() -> R + Send,
246    R: Future<Output = ()> + Send,
247{
248    async fn stop(self) {
249        self().await
250    }
251}
252
253pub fn spawn_interval<S, F, R>(
254    s: S,
255    period: Duration,
256    fun: F,
257) -> impl Lifecycle<S = IntrospectableStop>
258where
259    S: Clone + 'static + Send + Sync,
260    F: Fn(S) -> R + Send + Sync + 'static,
261    R: Future<Output = ()> + Send,
262{
263    spawn_with_shutdown(move |mut sig| async move {
264        let sleep = tokio::time::sleep(period);
265        tokio::pin!(sleep);
266        loop {
267            tokio::select! {
268                _ = &mut sleep => {
269                    fun(s.clone()).await;
270                    sleep.as_mut().reset(tokio::time::Instant::now() + period);
271                },
272                _ = &mut sig => {
273                    return;
274                }
275            }
276        }
277    })
278}
279
280pub fn spawn_with_delay<F, R>(delay: Duration, fun: F) -> impl Lifecycle<S = IntrospectableStop>
281where
282    F: Fn() -> R + Send + Sync + 'static,
283    R: Future<Output = ()> + Send,
284{
285    spawn_with_shutdown(move |mut sig| async move {
286        let sleep = tokio::time::sleep(delay);
287        tokio::pin!(sleep);
288        tokio::select! {
289            _ = &mut sleep => {
290                fun().await;
291            },
292            _ = &mut sig => {
293                return;
294            }
295        }
296    })
297}
298
299pub fn spawn_lifecycle_with_delay(
300    delay: Duration,
301    lc: impl Lifecycle,
302) -> impl Lifecycle<S = IntrospectableStop> {
303    spawn_with_shutdown(move |mut sig| async move {
304        let sleep = tokio::time::sleep(delay);
305        tokio::pin!(sleep);
306        tokio::select! {
307            _ = &mut sleep => {
308                let stopper = lc.start().await;
309                sig.await;
310                stopper.stop().await;
311            },
312            _ = &mut sig => {
313                return;
314            }
315        }
316    })
317}
318
319pub struct ShutdownSignal(tokio::sync::oneshot::Receiver<()>);
320
321impl Future for ShutdownSignal {
322    type Output = ();
323
324    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
325        Pin::new(&mut self.0).poll(cx).map(|_r| ())
326    }
327}
328
329pub fn spawn_with_shutdown<F, R>(fun: F) -> impl Lifecycle<S = IntrospectableStop>
330where
331    F: FnOnce(ShutdownSignal) -> R + Send + 'static,
332    R: Future<Output = ()> + Send,
333{
334    move || async move {
335        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
336        let jh = tokio::spawn(async move {
337            let _ = fun(ShutdownSignal(shutdown_rx)).await;
338        });
339        IntrospectableStop::new(jh, shutdown_tx)
340    }
341}
342
343pub async fn run_until_shutdown_sig(
344    life: impl Lifecycle,
345    timeout: Duration,
346) -> Result<(), Elapsed> {
347    let stopper = life.start().await;
348    std_unix_shutdown_sigs().await;
349    tokio::time::timeout(timeout, stopper.stop()).await
350}
351
352async fn std_unix_shutdown_sigs() {
353    let mut kill_sig = signal::unix::signal(SignalKind::terminate()).unwrap();
354    tokio::select! {
355        _ = signal::ctrl_c() => {},
356        _ = kill_sig.recv() => {},
357    }
358}
359
360#[derive(Clone)]
361pub struct LazyStarter {
362    tx: Sender<ConcreteLifecycle>,
363}
364
365impl LazyStarter {
366    fn new() -> (impl Lifecycle<S = IntrospectableStop>, LazyStarter) {
367        let (tx, rx) = tokio::sync::mpsc::channel(5);
368        (LazyStarter::lifecycle(rx), LazyStarter { tx })
369    }
370
371    fn lifecycle(mut rx: Receiver<ConcreteLifecycle>) -> impl Lifecycle<S = IntrospectableStop> {
372        spawn_with_shutdown(|sig| async move {
373            let mut stoppers = vec![];
374            tokio::pin!(sig);
375            loop {
376                tokio::select! {
377                    _ = &mut sig => {
378                        break;
379                    },
380                    lc = rx.recv() => {
381                        if let Some(lc) = lc {
382                            stoppers.push(lc.start().await);
383                        } else {
384                            let _ = sig.await;
385                            break;
386                        }
387                    },
388                }
389            }
390            if let Some(fut) = stoppers.into_iter().map(Stop::stop).reduce(|a, b| {
391                Box::pin(async {
392                    tokio::join!(a, b);
393                })
394            }) {
395                fut.await;
396            }
397        })
398    }
399
400    pub async fn start(&self, life: impl Lifecycle) {
401        let _ = self.tx.send(life.concrete().await).await;
402    }
403}
404
405pub fn lazy_start() -> (impl Lifecycle, LazyStarter) {
406    LazyStarter::new()
407}
408
409#[derive(Eq, PartialEq, Debug)]
410pub struct NoStop;
411
412#[async_trait]
413impl Stop for NoStop {
414    async fn stop(self) {}
415}
416
417pub struct NoLife;
418
419#[async_trait]
420impl Lifecycle for NoLife {
421    type S = NoStop;
422
423    async fn start(self) -> Self::S {
424        NoStop
425    }
426}