wesichain_core/
time_limited.rs1use std::time::Duration;
2
3use futures::stream::BoxStream;
4use futures::StreamExt as _;
5
6use crate::{Runnable, StreamEvent, WesichainError};
7
8pub struct TimeLimited<R> {
9 inner: R,
10 timeout: Duration,
11}
12
13impl<R> TimeLimited<R> {
14 pub fn new(inner: R, timeout: Duration) -> Self {
15 Self { inner, timeout }
16 }
17}
18
19#[async_trait::async_trait]
20impl<Input, Output, R> Runnable<Input, Output> for TimeLimited<R>
21where
22 Input: Send + Clone + 'static,
23 Output: Send + 'static,
24 R: Runnable<Input, Output> + Send + Sync,
25{
26 async fn invoke(&self, input: Input) -> Result<Output, WesichainError> {
27 tokio::time::timeout(self.timeout, self.inner.invoke(input))
28 .await
29 .map_err(|_| WesichainError::Timeout(self.timeout))?
30 }
31
32 fn stream<'a>(&'a self, input: Input) -> BoxStream<'a, Result<StreamEvent, WesichainError>> {
33 let timeout = self.timeout;
34 let inner = &self.inner;
35 async_stream::stream! {
36 let mut s = inner.stream(input);
37 loop {
38 match tokio::time::timeout(timeout, s.next()).await {
39 Err(_) => {
40 yield Err(WesichainError::Timeout(timeout));
41 break;
42 }
43 Ok(None) => break,
44 Ok(Some(event)) => yield event,
45 }
46 }
47 }
48 .boxed()
49 }
50}