Skip to main content

simple_life/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use tokio::signal;
10use tokio::signal::unix::SignalKind;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::oneshot;
13use tokio::task::JoinHandle;
14use tokio::time::error::Elapsed;
15
16#[async_trait]
17pub trait Stop: Send {
18    async fn stop(self);
19
20    async fn concrete(self) -> ConcreteStop
21    where
22        Self: Sized + 'static,
23    {
24        ConcreteStop::new(self).await
25    }
26
27    async fn into_guard(self) -> StopGuard
28    where
29        Self: Sized + 'static,
30    {
31        StopGuard::new(self).await
32    }
33}
34
35#[async_trait]
36pub trait Lifecycle: Send + 'static {
37    type S: Stop;
38
39    async fn start(self) -> Self::S;
40
41    async fn concrete(self) -> ConcreteLifecycle
42    where
43        Self: Sized,
44    {
45        ConcreteLifecycle::new(self).await
46    }
47}
48
49pub struct StopGuard {
50    _tx: oneshot::Sender<()>,
51}
52
53impl StopGuard {
54    async fn new(stop: impl Stop + 'static) -> Self {
55        let (tx, rx) = oneshot::channel();
56        tokio::spawn(async move {
57            let _ = rx.await;
58            let _ = stop.stop().await;
59        });
60        StopGuard { _tx: tx }
61    }
62}
63
64pub struct IntrospectableStop {
65    sig: ShutdownSender,
66    jh: JoinHandle<()>,
67}
68
69impl IntrospectableStop {
70    fn new(jh: JoinHandle<()>, sig: ShutdownSender) -> Self {
71        IntrospectableStop { jh, sig }
72    }
73
74    pub fn is_finished(&self) -> bool {
75        self.jh.is_finished()
76    }
77}
78
79#[async_trait]
80impl Stop for IntrospectableStop {
81    async fn stop(self) {
82        self.sig.shutdown();
83        let _ = self.jh.await;
84    }
85}
86
87pub struct ConcreteLifecycle {
88    tx: oneshot::Sender<oneshot::Sender<ConcreteStop>>,
89}
90
91impl ConcreteLifecycle {
92    async fn new(lc: impl Lifecycle) -> Self {
93        let (tx, rx) = oneshot::channel::<oneshot::Sender<ConcreteStop>>();
94        tokio::spawn(async move {
95            if let Ok(stop_tx) = rx.await {
96                let stop = lc.start().await;
97                let _ = stop_tx.send(ConcreteStop::new(stop).await);
98            }
99        });
100        ConcreteLifecycle { tx }
101    }
102}
103
104pub async fn parallel_iter<I: IntoIterator<Item = ConcreteLifecycle>>(
105    iter: I,
106) -> ConcreteLifecycle {
107    let mut lc = None;
108    for next in iter.into_iter() {
109        if let Some(old_lc) = lc {
110            lc = Some(parallel(old_lc, next).concrete().await);
111        } else {
112            lc = Some(next);
113        }
114    }
115    if let Some(lc) = lc {
116        lc
117    } else {
118        NoLife.concrete().await
119    }
120}
121
122pub struct ConcreteStop {
123    tx: oneshot::Sender<oneshot::Sender<()>>,
124}
125
126impl ConcreteStop {
127    async fn new(stop: impl Stop + 'static) -> ConcreteStop {
128        let (tx, rx) = oneshot::channel::<oneshot::Sender<()>>();
129        tokio::spawn(async move {
130            if let Ok(done_tx) = rx.await {
131                stop.stop().await;
132                let _ = done_tx.send(());
133            }
134        });
135        ConcreteStop { tx }
136    }
137}
138
139#[async_trait]
140impl Lifecycle for ConcreteLifecycle {
141    type S = ConcreteStop;
142    async fn start(self) -> Self::S {
143        let (tx, rx) = oneshot::channel();
144        let _ = self.tx.send(tx);
145        rx.await.unwrap()
146    }
147}
148
149#[async_trait]
150impl Stop for ConcreteStop {
151    async fn stop(self) {
152        let (tx, rx) = oneshot::channel();
153        let _ = self.tx.send(tx);
154        rx.await.unwrap();
155    }
156}
157
158pub fn seq<A, B>(a: A, b: B) -> impl Lifecycle
159where
160    A: Lifecycle,
161    B: Lifecycle,
162{
163    lifecycle!(state, { (a.start().await, b.start().await) }, {
164        let (a_stop, b_stop) = state;
165        b_stop.stop().await;
166        a_stop.stop().await;
167    })
168}
169
170pub fn parallel<A, B>(a: A, b: B) -> impl Lifecycle
171where
172    A: Lifecycle,
173    B: Lifecycle,
174{
175    lifecycle!(state, { tokio::join!(a.start(), b.start()) }, {
176        let (a_stop, b_stop) = state;
177        let _ = tokio::join!(a_stop.stop(), b_stop.stop());
178    })
179}
180
181#[macro_export]
182macro_rules! parallel {
183    ($x:expr $(,)?) => ($x);
184    ($x:expr, $($y:expr),+ $(,)?) => (
185        simple_life::parallel($x, simple_life::parallel!($($y),+))
186    )
187}
188
189#[macro_export]
190macro_rules! seq {
191    ($x:expr $(,)?) => ($x);
192    ($x:expr, $($y:expr),+ $(,)?) => (
193        simple_life::seq($x, simple_life::seq!($($y),+))
194    )
195}
196
197#[macro_export]
198macro_rules! lifecycle {
199    (mut $state:ident, $start:block, $stop:block) => {
200        move || async move {
201            let mut $state = $start;
202            move || async move { $stop }
203        }
204    };
205    ($state:ident, $start:block, $stop:block) => {
206        move || async move {
207            let $state = $start;
208            move || async move { $stop }
209        }
210    };
211    ($start:block, $stop:block) => {
212        simple_life::lifecycle!(_state, $start, $stop)
213    };
214}
215
216#[macro_export]
217macro_rules! start {
218    ($x:block) => {
219        simple_life::lifecycle!($x, {})
220    };
221}
222
223#[macro_export]
224macro_rules! stop {
225    ($x:block) => {
226        simple_life::lifecycle!({}, $x)
227    };
228}
229
230#[async_trait]
231impl<F, R, O> Lifecycle for F
232where
233    F: FnOnce() -> R + 'static + Send,
234    R: Future<Output = O> + Send,
235    O: Stop,
236{
237    type S = O;
238
239    async fn start(self) -> Self::S {
240        self().await
241    }
242}
243
244#[async_trait]
245impl<F, R> Stop for F
246where
247    F: FnOnce() -> R + Send,
248    R: Future<Output = ()> + Send,
249{
250    async fn stop(self) {
251        self().await
252    }
253}
254
255pub fn spawn_interval<S, F, R>(
256    s: S,
257    period: Duration,
258    fun: F,
259) -> impl Lifecycle<S = IntrospectableStop>
260where
261    S: Clone + 'static + Send + Sync,
262    F: Fn(S) -> R + Send + Sync + 'static,
263    R: Future<Output = ()> + Send,
264{
265    spawn_with_shutdown(move |mut sig| async move {
266        let sleep = tokio::time::sleep(period);
267        tokio::pin!(sleep);
268        loop {
269            tokio::select! {
270                _ = &mut sleep => {
271                    fun(s.clone()).await;
272                    sleep.as_mut().reset(tokio::time::Instant::now() + period);
273                },
274                _ = &mut sig => {
275                    return;
276                }
277            }
278        }
279    })
280}
281
282pub fn spawn_with_delay<F, R>(delay: Duration, fun: F) -> impl Lifecycle<S = IntrospectableStop>
283where
284    F: Fn() -> R + Send + Sync + 'static,
285    R: Future<Output = ()> + Send,
286{
287    spawn_with_shutdown(move |mut sig| async move {
288        let sleep = tokio::time::sleep(delay);
289        tokio::pin!(sleep);
290        tokio::select! {
291            _ = &mut sleep => {
292                fun().await;
293            },
294            _ = &mut sig => {
295            }
296        }
297    })
298}
299
300pub fn spawn_lifecycle_with_delay(
301    delay: Duration,
302    lc: impl Lifecycle,
303) -> impl Lifecycle<S = IntrospectableStop> {
304    spawn_with_shutdown(move |mut sig| async move {
305        let sleep = tokio::time::sleep(delay);
306        tokio::pin!(sleep);
307        tokio::select! {
308            _ = &mut sleep => {
309                let stopper = lc.start().await;
310                sig.await;
311                stopper.stop().await;
312            },
313            _ = &mut sig => {
314            }
315        }
316    })
317}
318
319struct ShutdownInner {
320    signaled: AtomicBool,
321    wakers: Mutex<Vec<Waker>>,
322}
323
324pub struct ShutdownSender(Arc<ShutdownInner>);
325
326impl ShutdownSender {
327    fn shutdown(&self) {
328        self.0.signaled.store(true, Ordering::Release);
329        for waker in self.0.wakers.lock().unwrap().drain(..) {
330            waker.wake();
331        }
332    }
333}
334
335#[derive(Clone)]
336pub struct ShutdownSignal(Arc<ShutdownInner>);
337
338impl Future for ShutdownSignal {
339    type Output = ();
340
341    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342        if self.0.signaled.load(Ordering::Acquire) {
343            return Poll::Ready(());
344        }
345        self.0.wakers.lock().unwrap().push(cx.waker().clone());
346        if self.0.signaled.load(Ordering::Acquire) {
347            Poll::Ready(())
348        } else {
349            Poll::Pending
350        }
351    }
352}
353
354fn shutdown_channel() -> (ShutdownSender, ShutdownSignal) {
355    let inner = Arc::new(ShutdownInner {
356        signaled: AtomicBool::new(false),
357        wakers: Mutex::new(Vec::new()),
358    });
359    (ShutdownSender(inner.clone()), ShutdownSignal(inner))
360}
361
362pub fn spawn_with_shutdown<F, R>(fun: F) -> impl Lifecycle<S = IntrospectableStop>
363where
364    F: FnOnce(ShutdownSignal) -> R + Send + 'static,
365    R: Future<Output = ()> + Send,
366{
367    move || async move {
368        let (shutdown_tx, shutdown_rx) = shutdown_channel();
369        let jh = tokio::spawn(async move {
370            let _ = fun(shutdown_rx).await;
371        });
372        IntrospectableStop::new(jh, shutdown_tx)
373    }
374}
375
376pub async fn run_until_shutdown_sig(
377    life: impl Lifecycle,
378    timeout: Duration,
379) -> Result<(), Elapsed> {
380    let stopper = life.start().await;
381    std_unix_shutdown_sigs().await;
382    tokio::time::timeout(timeout, stopper.stop()).await
383}
384
385async fn std_unix_shutdown_sigs() {
386    let mut kill_sig = signal::unix::signal(SignalKind::terminate()).unwrap();
387    tokio::select! {
388        _ = signal::ctrl_c() => {},
389        _ = kill_sig.recv() => {},
390    }
391}
392
393#[derive(Clone)]
394pub struct LazyStarter {
395    tx: Sender<ConcreteLifecycle>,
396}
397
398impl LazyStarter {
399    fn new() -> (impl Lifecycle<S = IntrospectableStop>, LazyStarter) {
400        let (tx, rx) = tokio::sync::mpsc::channel(5);
401        (LazyStarter::lifecycle(rx), LazyStarter { tx })
402    }
403
404    fn lifecycle(mut rx: Receiver<ConcreteLifecycle>) -> impl Lifecycle<S = IntrospectableStop> {
405        spawn_with_shutdown(|sig| async move {
406            let mut stoppers = vec![];
407            tokio::pin!(sig);
408            loop {
409                tokio::select! {
410                    _ = &mut sig => {
411                        break;
412                    },
413                    lc = rx.recv() => {
414                        if let Some(lc) = lc {
415                            stoppers.push(lc.start().await);
416                        } else {
417                            let _ = sig.await;
418                            break;
419                        }
420                    },
421                }
422            }
423            if let Some(fut) = stoppers.into_iter().map(Stop::stop).reduce(|a, b| {
424                Box::pin(async {
425                    tokio::join!(a, b);
426                })
427            }) {
428                fut.await;
429            }
430        })
431    }
432
433    pub async fn start(&self, life: impl Lifecycle) {
434        let _ = self.tx.send(life.concrete().await).await;
435    }
436}
437
438pub fn lazy_start() -> (impl Lifecycle, LazyStarter) {
439    LazyStarter::new()
440}
441
442#[derive(Eq, PartialEq, Debug)]
443pub struct NoStop;
444
445#[async_trait]
446impl Stop for NoStop {
447    async fn stop(self) {}
448}
449
450pub struct NoLife;
451
452#[async_trait]
453impl Lifecycle for NoLife {
454    type S = NoStop;
455
456    async fn start(self) -> Self::S {
457        NoStop
458    }
459}