entelix_runnable/
timed.rs1use std::marker::PhantomData;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13
14use entelix_core::ExecutionContext;
15use entelix_core::error::{Error, Result};
16
17use crate::runnable::Runnable;
18
19pub struct Timed<R, I, O>
23where
24 R: Runnable<I, O> + 'static,
25 I: Send + 'static,
26 O: Send + 'static,
27{
28 inner: Arc<R>,
29 timeout: Duration,
30 _io: PhantomData<fn(I) -> O>,
31}
32
33impl<R, I, O> Timed<R, I, O>
34where
35 R: Runnable<I, O> + 'static,
36 I: Send + 'static,
37 O: Send + 'static,
38{
39 pub fn new(inner: R, timeout: Duration) -> Self {
41 Self {
42 inner: Arc::new(inner),
43 timeout,
44 _io: PhantomData,
45 }
46 }
47}
48
49impl<R, I, O> std::fmt::Debug for Timed<R, I, O>
50where
51 R: Runnable<I, O> + 'static,
52 I: Send + 'static,
53 O: Send + 'static,
54{
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("Timed")
57 .field("timeout", &self.timeout)
58 .finish_non_exhaustive()
59 }
60}
61
62#[async_trait]
63impl<R, I, O> Runnable<I, O> for Timed<R, I, O>
64where
65 R: Runnable<I, O> + 'static,
66 I: Send + 'static,
67 O: Send + 'static,
68{
69 async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
70 let token = ctx.cancellation();
71 tokio::select! {
72 biased;
73 () = token.cancelled() => Err(Error::Cancelled),
76 () = tokio::time::sleep(self.timeout) => Err(Error::DeadlineExceeded),
78 result = self.inner.invoke(input, ctx) => result,
80 }
81 }
82}