server_sent_events/
stream.rs1use alloc::{format, string, string::String};
2use core::time::Duration;
3
4use async_interval::{intervalable_iter_stream, Intervalable};
5use futures_util::{stream::PollNext, Stream, StreamExt as _};
6
7pub fn keep_alive_stream<EVENT, S, INTVL>(
9 inner: S,
10 interval: Duration,
11) -> impl Stream<Item = String>
12where
13 EVENT: string::ToString,
14 S: Stream<Item = EVENT>,
15 INTVL: Intervalable,
16{
17 let option = KeepAliveOption::new().interval(interval);
18
19 keep_alive_stream_with_option::<_, _, INTVL>(inner, option)
20}
21
22pub fn keep_alive_stream_with_option<EVENT, S, INTVL>(
23 inner: S,
24 option: KeepAliveOption,
25) -> impl Stream<Item = String>
26where
27 EVENT: string::ToString,
28 S: Stream<Item = EVENT>,
29 INTVL: Intervalable,
30{
31 let st1 = inner.map(|event| event.to_string());
32
33 let interval = option.get_interval();
34 let comment_prefix = option.get_comment_prefix();
35
36 let st2 = intervalable_iter_stream(0..usize::MAX, INTVL::interval(interval))
37 .map(move |i| format!(": {comment_prefix}{i}\n\n"));
38
39 futures_stream_select_ext::select_until_left_is_done_with_strategy(
40 st1,
41 st2,
42 |_: &mut PollNext| PollNext::Left,
43 )
44}
45
46#[derive(Debug, Clone, Default)]
48pub struct KeepAliveOption {
49 interval: Option<Duration>,
50 comment_prefix: Option<String>,
51}
52
53impl KeepAliveOption {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn interval(mut self, dur: Duration) -> Self {
59 self.interval = Some(dur);
60 self
61 }
62
63 pub fn comment_prefix(mut self, s: String) -> Self {
64 self.comment_prefix = Some(s);
65 self
66 }
67
68 pub fn get_interval(&self) -> Duration {
69 self.interval.unwrap_or_else(|| Duration::from_secs(30))
70 }
71
72 pub fn get_comment_prefix(&self) -> String {
73 self.comment_prefix.clone().unwrap_or_else(|| "".into())
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 #[allow(unused_imports)]
80 use super::*;
81
82 #[allow(unused_imports)]
83 use alloc::{string::ToString as _, vec, vec::Vec};
84
85 #[cfg(feature = "std")]
86 #[tokio::test]
87 async fn test_keep_alive_stream_with_tokio_interval() {
88 use futures_util::stream;
89
90 let st = keep_alive_stream::<_, _, tokio::time::Interval>(
92 stream::iter(vec!["a", "b"])
93 .then(move |x| async move {
94 tokio::time::sleep(tokio::time::Duration::from_micros(2)).await;
95 x
96 })
97 .map(|x| format!(": {x}\n\n")),
98 Duration::from_micros(1),
99 );
100
101 assert_eq!(
102 st.collect::<Vec<_>>().await,
103 vec![
104 ": a\n\n".to_string(),
105 ": 0\n\n".to_string(),
106 ": b\n\n".to_string()
107 ]
108 );
109
110 let st = keep_alive_stream_with_option::<_, _, tokio::time::Interval>(
112 stream::iter(vec!["a", "b"])
113 .then(move |x| async move {
114 tokio::time::sleep(tokio::time::Duration::from_micros(2)).await;
115 x
116 })
117 .map(|x| format!(": {x}\n\n")),
118 KeepAliveOption::new()
119 .interval(Duration::from_micros(1))
120 .comment_prefix("Ping ".into()),
121 );
122
123 assert_eq!(
124 st.collect::<Vec<_>>().await,
125 vec![
126 ": a\n\n".to_string(),
127 ": Ping 0\n\n".to_string(),
128 ": b\n\n".to_string()
129 ]
130 );
131 }
132}