dynamo_runtime/utils/
stream.rs1use futures::stream::{Stream, StreamExt};
17use std::{
18    future::Future,
19    pin::Pin,
20    task::{Context, Poll},
21};
22
23use tokio::time::{self, sleep_until, Duration, Instant, Sleep};
24
25pub struct DeadlineStream<S> {
26    stream: S,
27    sleep: Pin<Box<Sleep>>,
28}
29
30impl<S: Stream + Unpin> Stream for DeadlineStream<S> {
31    type Item = S::Item;
32
33    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        if Pin::new(&mut self.sleep).poll(cx).is_ready() {
36            return Poll::Ready(None);
38        }
39
40        let val = self.as_mut().stream.poll_next_unpin(cx);
42        match &val {
44            Poll::Ready(Some(_)) => tracing::trace!("DeadlineStream: received item"),
45            Poll::Ready(None) => tracing::trace!("DeadlineStream: underlying stream ended"),
46            Poll::Pending => tracing::trace!("DeadlineStream: waiting for next item"),
47        }
48        val
49    }
50}
51
52pub fn until_deadline<S: Stream + Unpin>(stream: S, deadline: Instant) -> DeadlineStream<S> {
53    DeadlineStream {
54        stream,
55        sleep: Box::pin(sleep_until(deadline)),
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use futures::stream::{self, Stream, StreamExt};
63    use tokio::pin;
64
65    use super::*;
66
67    async fn run_deadline_test(sleep_times_ms: Vec<u64>, deadline_ms: u64) -> Vec<u64> {
69        let stream = stream::iter(sleep_times_ms);
70        let stream = stream.then(|x| {
71            let sleep = time::sleep(Duration::from_millis(x));
72            async move {
73                sleep.await;
74                x
75            }
76        });
77
78        let deadline = Instant::now() + Duration::from_millis(deadline_ms);
79        let mut result = Vec::new();
80
81        pin!(stream);
82        let mut stream = until_deadline(stream, deadline);
83
84        while let Some(x) = stream.next().await {
85            result.push(x);
86        }
87
88        result
89    }
90
91    #[tokio::test]
92    async fn test_deadline_exceeded() {
93        let sleep_times_ms = vec![100, 100, 200, 50];
95        let deadline_ms = 300;
96
97        let result = run_deadline_test(sleep_times_ms, deadline_ms).await;
98        assert_eq!(result, vec![100, 100]);
100    }
101
102    #[tokio::test]
103    async fn test_complete_before_deadline() {
104        let sleep_times_ms = vec![100, 50, 50];
106        let deadline_ms = 300;
107
108        let result = run_deadline_test(sleep_times_ms, deadline_ms).await;
109        assert_eq!(result, vec![100, 50, 50]);
111    }
112}