server_sent_events/
stream.rs

1use alloc::{format, string, string::String};
2use core::time::Duration;
3
4use async_interval::{intervalable_iter_stream, Intervalable};
5use futures_util::{stream::PollNext, Stream, StreamExt as _};
6
7//
8pub fn keep_alive_stream<EVENT, S, INTVL>(
9    inner: S,
10    interval: Duration,
11) -> impl Stream<Item = String>
12where
13    EVENT: string::ToString,
14    S: Stream<Item = EVENT>,
15    INTVL: Intervalable,
16{
17    let option = KeepAliveOption::new().interval(interval);
18
19    keep_alive_stream_with_option::<_, _, INTVL>(inner, option)
20}
21
22pub fn keep_alive_stream_with_option<EVENT, S, INTVL>(
23    inner: S,
24    option: KeepAliveOption,
25) -> impl Stream<Item = String>
26where
27    EVENT: string::ToString,
28    S: Stream<Item = EVENT>,
29    INTVL: Intervalable,
30{
31    let st1 = inner.map(|event| event.to_string());
32
33    let interval = option.get_interval();
34    let comment_prefix = option.get_comment_prefix();
35
36    let st2 = intervalable_iter_stream(0..usize::MAX, INTVL::interval(interval))
37        .map(move |i| format!(": {comment_prefix}{i}\n\n"));
38
39    futures_stream_select_ext::select_until_left_is_done_with_strategy(
40        st1,
41        st2,
42        |_: &mut PollNext| PollNext::Left,
43    )
44}
45
46//
47#[derive(Debug, Clone, Default)]
48pub struct KeepAliveOption {
49    interval: Option<Duration>,
50    comment_prefix: Option<String>,
51}
52
53impl KeepAliveOption {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn interval(mut self, dur: Duration) -> Self {
59        self.interval = Some(dur);
60        self
61    }
62
63    pub fn comment_prefix(mut self, s: String) -> Self {
64        self.comment_prefix = Some(s);
65        self
66    }
67
68    pub fn get_interval(&self) -> Duration {
69        self.interval.unwrap_or_else(|| Duration::from_secs(30))
70    }
71
72    pub fn get_comment_prefix(&self) -> String {
73        self.comment_prefix.clone().unwrap_or_else(|| "".into())
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    #[allow(unused_imports)]
80    use super::*;
81
82    #[allow(unused_imports)]
83    use alloc::{string::ToString as _, vec, vec::Vec};
84
85    #[cfg(feature = "std")]
86    #[tokio::test]
87    async fn test_keep_alive_stream_with_tokio_interval() {
88        use futures_util::stream;
89
90        //
91        let st = keep_alive_stream::<_, _, tokio::time::Interval>(
92            stream::iter(vec!["a", "b"])
93                .then(move |x| async move {
94                    tokio::time::sleep(tokio::time::Duration::from_micros(2)).await;
95                    x
96                })
97                .map(|x| format!(": {x}\n\n")),
98            Duration::from_micros(1),
99        );
100
101        assert_eq!(
102            st.collect::<Vec<_>>().await,
103            vec![
104                ": a\n\n".to_string(),
105                ": 0\n\n".to_string(),
106                ": b\n\n".to_string()
107            ]
108        );
109
110        //
111        let st = keep_alive_stream_with_option::<_, _, tokio::time::Interval>(
112            stream::iter(vec!["a", "b"])
113                .then(move |x| async move {
114                    tokio::time::sleep(tokio::time::Duration::from_micros(2)).await;
115                    x
116                })
117                .map(|x| format!(": {x}\n\n")),
118            KeepAliveOption::new()
119                .interval(Duration::from_micros(1))
120                .comment_prefix("Ping ".into()),
121        );
122
123        assert_eq!(
124            st.collect::<Vec<_>>().await,
125            vec![
126                ": a\n\n".to_string(),
127                ": Ping 0\n\n".to_string(),
128                ": b\n\n".to_string()
129            ]
130        );
131    }
132}