1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use tokio::signal;
10use tokio::signal::unix::SignalKind;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::oneshot;
13use tokio::task::JoinHandle;
14use tokio::time::error::Elapsed;
15
16#[async_trait]
17pub trait Stop: Send {
18 async fn stop(self);
19
20 async fn concrete(self) -> ConcreteStop
21 where
22 Self: Sized + 'static,
23 {
24 ConcreteStop::new(self).await
25 }
26
27 async fn into_guard(self) -> StopGuard
28 where
29 Self: Sized + 'static,
30 {
31 StopGuard::new(self).await
32 }
33}
34
35#[async_trait]
36pub trait Lifecycle: Send + 'static {
37 type S: Stop;
38
39 async fn start(self) -> Self::S;
40
41 async fn concrete(self) -> ConcreteLifecycle
42 where
43 Self: Sized,
44 {
45 ConcreteLifecycle::new(self).await
46 }
47}
48
49pub struct StopGuard {
50 _tx: oneshot::Sender<()>,
51}
52
53impl StopGuard {
54 async fn new(stop: impl Stop + 'static) -> Self {
55 let (tx, rx) = oneshot::channel();
56 tokio::spawn(async move {
57 let _ = rx.await;
58 let _ = stop.stop().await;
59 });
60 StopGuard { _tx: tx }
61 }
62}
63
64pub struct IntrospectableStop {
65 sig: ShutdownSender,
66 jh: JoinHandle<()>,
67}
68
69impl IntrospectableStop {
70 fn new(jh: JoinHandle<()>, sig: ShutdownSender) -> Self {
71 IntrospectableStop { jh, sig }
72 }
73
74 pub fn is_finished(&self) -> bool {
75 self.jh.is_finished()
76 }
77}
78
79#[async_trait]
80impl Stop for IntrospectableStop {
81 async fn stop(self) {
82 self.sig.shutdown();
83 let _ = self.jh.await;
84 }
85}
86
87pub struct ConcreteLifecycle {
88 tx: oneshot::Sender<oneshot::Sender<ConcreteStop>>,
89}
90
91impl ConcreteLifecycle {
92 async fn new(lc: impl Lifecycle) -> Self {
93 let (tx, rx) = oneshot::channel::<oneshot::Sender<ConcreteStop>>();
94 tokio::spawn(async move {
95 if let Ok(stop_tx) = rx.await {
96 let stop = lc.start().await;
97 let _ = stop_tx.send(ConcreteStop::new(stop).await);
98 }
99 });
100 ConcreteLifecycle { tx }
101 }
102}
103
104pub async fn parallel_iter<I: IntoIterator<Item = ConcreteLifecycle>>(
105 iter: I,
106) -> ConcreteLifecycle {
107 let mut lc = None;
108 for next in iter.into_iter() {
109 if let Some(old_lc) = lc {
110 lc = Some(parallel(old_lc, next).concrete().await);
111 } else {
112 lc = Some(next);
113 }
114 }
115 if let Some(lc) = lc {
116 lc
117 } else {
118 NoLife.concrete().await
119 }
120}
121
122pub struct ConcreteStop {
123 tx: oneshot::Sender<oneshot::Sender<()>>,
124}
125
126impl ConcreteStop {
127 async fn new(stop: impl Stop + 'static) -> ConcreteStop {
128 let (tx, rx) = oneshot::channel::<oneshot::Sender<()>>();
129 tokio::spawn(async move {
130 if let Ok(done_tx) = rx.await {
131 stop.stop().await;
132 let _ = done_tx.send(());
133 }
134 });
135 ConcreteStop { tx }
136 }
137}
138
139#[async_trait]
140impl Lifecycle for ConcreteLifecycle {
141 type S = ConcreteStop;
142 async fn start(self) -> Self::S {
143 let (tx, rx) = oneshot::channel();
144 let _ = self.tx.send(tx);
145 rx.await.unwrap()
146 }
147}
148
149#[async_trait]
150impl Stop for ConcreteStop {
151 async fn stop(self) {
152 let (tx, rx) = oneshot::channel();
153 let _ = self.tx.send(tx);
154 rx.await.unwrap();
155 }
156}
157
158pub fn seq<A, B>(a: A, b: B) -> impl Lifecycle
159where
160 A: Lifecycle,
161 B: Lifecycle,
162{
163 lifecycle!(state, { (a.start().await, b.start().await) }, {
164 let (a_stop, b_stop) = state;
165 b_stop.stop().await;
166 a_stop.stop().await;
167 })
168}
169
170pub fn parallel<A, B>(a: A, b: B) -> impl Lifecycle
171where
172 A: Lifecycle,
173 B: Lifecycle,
174{
175 lifecycle!(state, { tokio::join!(a.start(), b.start()) }, {
176 let (a_stop, b_stop) = state;
177 let _ = tokio::join!(a_stop.stop(), b_stop.stop());
178 })
179}
180
181#[macro_export]
182macro_rules! parallel {
183 ($x:expr $(,)?) => ($x);
184 ($x:expr, $($y:expr),+ $(,)?) => (
185 simple_life::parallel($x, simple_life::parallel!($($y),+))
186 )
187}
188
189#[macro_export]
190macro_rules! seq {
191 ($x:expr $(,)?) => ($x);
192 ($x:expr, $($y:expr),+ $(,)?) => (
193 simple_life::seq($x, simple_life::seq!($($y),+))
194 )
195}
196
197#[macro_export]
198macro_rules! lifecycle {
199 (mut $state:ident, $start:block, $stop:block) => {
200 move || async move {
201 let mut $state = $start;
202 move || async move { $stop }
203 }
204 };
205 ($state:ident, $start:block, $stop:block) => {
206 move || async move {
207 let $state = $start;
208 move || async move { $stop }
209 }
210 };
211 ($start:block, $stop:block) => {
212 simple_life::lifecycle!(_state, $start, $stop)
213 };
214}
215
216#[macro_export]
217macro_rules! start {
218 ($x:block) => {
219 simple_life::lifecycle!($x, {})
220 };
221}
222
223#[macro_export]
224macro_rules! stop {
225 ($x:block) => {
226 simple_life::lifecycle!({}, $x)
227 };
228}
229
230#[async_trait]
231impl<F, R, O> Lifecycle for F
232where
233 F: FnOnce() -> R + 'static + Send,
234 R: Future<Output = O> + Send,
235 O: Stop,
236{
237 type S = O;
238
239 async fn start(self) -> Self::S {
240 self().await
241 }
242}
243
244#[async_trait]
245impl<F, R> Stop for F
246where
247 F: FnOnce() -> R + Send,
248 R: Future<Output = ()> + Send,
249{
250 async fn stop(self) {
251 self().await
252 }
253}
254
255pub fn spawn_interval<S, F, R>(
256 s: S,
257 period: Duration,
258 fun: F,
259) -> impl Lifecycle<S = IntrospectableStop>
260where
261 S: Clone + 'static + Send + Sync,
262 F: Fn(S) -> R + Send + Sync + 'static,
263 R: Future<Output = ()> + Send,
264{
265 spawn_with_shutdown(move |mut sig| async move {
266 let sleep = tokio::time::sleep(period);
267 tokio::pin!(sleep);
268 loop {
269 tokio::select! {
270 _ = &mut sleep => {
271 fun(s.clone()).await;
272 sleep.as_mut().reset(tokio::time::Instant::now() + period);
273 },
274 _ = &mut sig => {
275 return;
276 }
277 }
278 }
279 })
280}
281
282pub fn spawn_with_delay<F, R>(delay: Duration, fun: F) -> impl Lifecycle<S = IntrospectableStop>
283where
284 F: Fn() -> R + Send + Sync + 'static,
285 R: Future<Output = ()> + Send,
286{
287 spawn_with_shutdown(move |mut sig| async move {
288 let sleep = tokio::time::sleep(delay);
289 tokio::pin!(sleep);
290 tokio::select! {
291 _ = &mut sleep => {
292 fun().await;
293 },
294 _ = &mut sig => {
295 }
296 }
297 })
298}
299
300pub fn spawn_lifecycle_with_delay(
301 delay: Duration,
302 lc: impl Lifecycle,
303) -> impl Lifecycle<S = IntrospectableStop> {
304 spawn_with_shutdown(move |mut sig| async move {
305 let sleep = tokio::time::sleep(delay);
306 tokio::pin!(sleep);
307 tokio::select! {
308 _ = &mut sleep => {
309 let stopper = lc.start().await;
310 sig.await;
311 stopper.stop().await;
312 },
313 _ = &mut sig => {
314 }
315 }
316 })
317}
318
319struct ShutdownInner {
320 signaled: AtomicBool,
321 wakers: Mutex<Vec<Waker>>,
322}
323
324pub struct ShutdownSender(Arc<ShutdownInner>);
325
326impl ShutdownSender {
327 fn shutdown(&self) {
328 self.0.signaled.store(true, Ordering::Release);
329 for waker in self.0.wakers.lock().unwrap().drain(..) {
330 waker.wake();
331 }
332 }
333}
334
335#[derive(Clone)]
336pub struct ShutdownSignal(Arc<ShutdownInner>);
337
338impl Future for ShutdownSignal {
339 type Output = ();
340
341 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342 if self.0.signaled.load(Ordering::Acquire) {
343 return Poll::Ready(());
344 }
345 self.0.wakers.lock().unwrap().push(cx.waker().clone());
346 if self.0.signaled.load(Ordering::Acquire) {
347 Poll::Ready(())
348 } else {
349 Poll::Pending
350 }
351 }
352}
353
354fn shutdown_channel() -> (ShutdownSender, ShutdownSignal) {
355 let inner = Arc::new(ShutdownInner {
356 signaled: AtomicBool::new(false),
357 wakers: Mutex::new(Vec::new()),
358 });
359 (ShutdownSender(inner.clone()), ShutdownSignal(inner))
360}
361
362pub fn spawn_with_shutdown<F, R>(fun: F) -> impl Lifecycle<S = IntrospectableStop>
363where
364 F: FnOnce(ShutdownSignal) -> R + Send + 'static,
365 R: Future<Output = ()> + Send,
366{
367 move || async move {
368 let (shutdown_tx, shutdown_rx) = shutdown_channel();
369 let jh = tokio::spawn(async move {
370 let _ = fun(shutdown_rx).await;
371 });
372 IntrospectableStop::new(jh, shutdown_tx)
373 }
374}
375
376pub async fn run_until_shutdown_sig(
377 life: impl Lifecycle,
378 timeout: Duration,
379) -> Result<(), Elapsed> {
380 let stopper = life.start().await;
381 std_unix_shutdown_sigs().await;
382 tokio::time::timeout(timeout, stopper.stop()).await
383}
384
385async fn std_unix_shutdown_sigs() {
386 let mut kill_sig = signal::unix::signal(SignalKind::terminate()).unwrap();
387 tokio::select! {
388 _ = signal::ctrl_c() => {},
389 _ = kill_sig.recv() => {},
390 }
391}
392
393#[derive(Clone)]
394pub struct LazyStarter {
395 tx: Sender<ConcreteLifecycle>,
396}
397
398impl LazyStarter {
399 fn new() -> (impl Lifecycle<S = IntrospectableStop>, LazyStarter) {
400 let (tx, rx) = tokio::sync::mpsc::channel(5);
401 (LazyStarter::lifecycle(rx), LazyStarter { tx })
402 }
403
404 fn lifecycle(mut rx: Receiver<ConcreteLifecycle>) -> impl Lifecycle<S = IntrospectableStop> {
405 spawn_with_shutdown(|sig| async move {
406 let mut stoppers = vec![];
407 tokio::pin!(sig);
408 loop {
409 tokio::select! {
410 _ = &mut sig => {
411 break;
412 },
413 lc = rx.recv() => {
414 if let Some(lc) = lc {
415 stoppers.push(lc.start().await);
416 } else {
417 let _ = sig.await;
418 break;
419 }
420 },
421 }
422 }
423 if let Some(fut) = stoppers.into_iter().map(Stop::stop).reduce(|a, b| {
424 Box::pin(async {
425 tokio::join!(a, b);
426 })
427 }) {
428 fut.await;
429 }
430 })
431 }
432
433 pub async fn start(&self, life: impl Lifecycle) {
434 let _ = self.tx.send(life.concrete().await).await;
435 }
436}
437
438pub fn lazy_start() -> (impl Lifecycle, LazyStarter) {
439 LazyStarter::new()
440}
441
442#[derive(Eq, PartialEq, Debug)]
443pub struct NoStop;
444
445#[async_trait]
446impl Stop for NoStop {
447 async fn stop(self) {}
448}
449
450pub struct NoLife;
451
452#[async_trait]
453impl Lifecycle for NoLife {
454 type S = NoStop;
455
456 async fn start(self) -> Self::S {
457 NoStop
458 }
459}