use std::pin::pin;
use futures_core::Stream;
use futures_util::stream::StreamExt;
pub(crate) const STREAM_IDLE_TIMEOUT_MS: u32 = 120_000;
pub(crate) fn idle_timeout_ms() -> u32 {
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(v) = std::env::var("LH_STREAM_IDLE_TIMEOUT_MS") {
if let Ok(n) = v.trim().parse::<u32>() {
if n > 0 {
return n;
}
}
}
}
STREAM_IDLE_TIMEOUT_MS
}
pub(crate) enum NextChunk<T> {
Item(T),
End,
IdleTimeout,
}
pub(crate) async fn next_with_idle_timeout<S, T>(stream: &mut S, idle_ms: u32) -> NextChunk<T>
where
S: Stream<Item = T> + Unpin,
{
let next = stream.next();
let sleep = crate::runtime::sleep_ms(idle_ms);
let next = pin!(next);
let sleep = pin!(sleep);
match futures_util::future::select(next, sleep).await {
futures_util::future::Either::Left((Some(item), _sleep)) => NextChunk::Item(item),
futures_util::future::Either::Left((None, _sleep)) => NextChunk::End,
futures_util::future::Either::Right((_elapsed, _next)) => NextChunk::IdleTimeout,
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::stream;
#[tokio::test]
async fn ready_item_beats_the_timer_then_end() {
let mut s = stream::iter(vec![42i32]);
match next_with_idle_timeout(&mut s, 60_000).await {
NextChunk::Item(v) => assert_eq!(v, 42),
_ => panic!("a ready chunk must win over a 60s timer"),
}
assert!(matches!(next_with_idle_timeout(&mut s, 60_000).await, NextChunk::End));
}
#[tokio::test]
async fn empty_stream_is_end_not_timeout() {
let mut s = stream::iter(Vec::<i32>::new());
assert!(matches!(next_with_idle_timeout(&mut s, 60_000).await, NextChunk::End));
}
#[tokio::test]
async fn a_silent_stream_trips_the_idle_timeout() {
let mut s = stream::pending::<i32>();
assert!(matches!(next_with_idle_timeout(&mut s, 5).await, NextChunk::IdleTimeout));
}
#[test]
fn idle_window_is_two_minutes() {
assert_eq!(STREAM_IDLE_TIMEOUT_MS, 120_000);
}
}