Skip to main content

entelix_runnable/
timed.rs

1//! `Timed<R>` — `Runnable<I, O>` adapter that races the inner against
2//! a wall-clock timeout.
3//!
4//! On expiry returns [`Error::DeadlineExceeded`]. Cancellation by the
5//! caller's [`ExecutionContext`] still wins — the timeout sleep is
6//! cancellation-aware.
7
8use std::marker::PhantomData;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13
14use entelix_core::ExecutionContext;
15use entelix_core::error::{Error, Result};
16
17use crate::runnable::Runnable;
18
19/// `Runnable<I, O>` adapter that aborts the inner with
20/// `Error::DeadlineExceeded` if it does not complete within
21/// `timeout`.
22pub struct Timed<R, I, O>
23where
24    R: Runnable<I, O> + 'static,
25    I: Send + 'static,
26    O: Send + 'static,
27{
28    inner: Arc<R>,
29    timeout: Duration,
30    _io: PhantomData<fn(I) -> O>,
31}
32
33impl<R, I, O> Timed<R, I, O>
34where
35    R: Runnable<I, O> + 'static,
36    I: Send + 'static,
37    O: Send + 'static,
38{
39    /// Build with the inner runnable and a wall-clock timeout.
40    pub fn new(inner: R, timeout: Duration) -> Self {
41        Self {
42            inner: Arc::new(inner),
43            timeout,
44            _io: PhantomData,
45        }
46    }
47}
48
49impl<R, I, O> std::fmt::Debug for Timed<R, I, O>
50where
51    R: Runnable<I, O> + 'static,
52    I: Send + 'static,
53    O: Send + 'static,
54{
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("Timed")
57            .field("timeout", &self.timeout)
58            .finish_non_exhaustive()
59    }
60}
61
62#[async_trait]
63impl<R, I, O> Runnable<I, O> for Timed<R, I, O>
64where
65    R: Runnable<I, O> + 'static,
66    I: Send + 'static,
67    O: Send + 'static,
68{
69    async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
70        let token = ctx.cancellation();
71        tokio::select! {
72            biased;
73            // 1. Caller cancellation wins — operator intent overrides
74            //    the timeout.
75            () = token.cancelled() => Err(Error::Cancelled),
76            // 2. Timeout fires.
77            () = tokio::time::sleep(self.timeout) => Err(Error::DeadlineExceeded),
78            // 3. Inner completes.
79            result = self.inner.invoke(input, ctx) => result,
80        }
81    }
82}