Skip to main content

wesichain_core/
time_limited.rs

1use std::time::Duration;
2
3use futures::stream::BoxStream;
4use futures::StreamExt as _;
5
6use crate::{Runnable, StreamEvent, WesichainError};
7
8pub struct TimeLimited<R> {
9    inner: R,
10    timeout: Duration,
11}
12
13impl<R> TimeLimited<R> {
14    pub fn new(inner: R, timeout: Duration) -> Self {
15        Self { inner, timeout }
16    }
17}
18
19#[async_trait::async_trait]
20impl<Input, Output, R> Runnable<Input, Output> for TimeLimited<R>
21where
22    Input: Send + Clone + 'static,
23    Output: Send + 'static,
24    R: Runnable<Input, Output> + Send + Sync,
25{
26    async fn invoke(&self, input: Input) -> Result<Output, WesichainError> {
27        tokio::time::timeout(self.timeout, self.inner.invoke(input))
28            .await
29            .map_err(|_| WesichainError::Timeout(self.timeout))?
30    }
31
32    fn stream<'a>(&'a self, input: Input) -> BoxStream<'a, Result<StreamEvent, WesichainError>> {
33        let timeout = self.timeout;
34        let inner = &self.inner;
35        async_stream::stream! {
36            let mut s = inner.stream(input);
37            loop {
38                match tokio::time::timeout(timeout, s.next()).await {
39                    Err(_) => {
40                        yield Err(WesichainError::Timeout(timeout));
41                        break;
42                    }
43                    Ok(None) => break,
44                    Ok(Some(event)) => yield event,
45                }
46            }
47        }
48        .boxed()
49    }
50}