use crate::google::pubsub::v1::StreamingPullRequest;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant, interval_at};
use tokio_util::sync::CancellationToken;
pub(super) const KEEPALIVE_PERIOD: Duration = Duration::from_secs(30);
pub(super) fn spawn(
request_tx: Sender<StreamingPullRequest>,
shutdown: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut keepalive = interval_at(Instant::now() + KEEPALIVE_PERIOD, KEEPALIVE_PERIOD);
loop {
tokio::select! {
_ = shutdown.cancelled() => break,
_ = keepalive.tick() => {
let _ = request_tx.send(StreamingPullRequest::default()).await;
}
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use google_cloud_test_macros::tokio_test_no_panics;
use tokio::sync::mpsc::channel;
#[tokio_test_no_panics(start_paused = true)]
async fn keepalive_interval() {
let start = Instant::now();
let (request_tx, mut request_rx) = channel(1);
let shutdown = CancellationToken::new();
let _handle = spawn(request_tx, shutdown);
let r = request_rx.recv().await.unwrap();
assert_eq!(r, StreamingPullRequest::default());
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD);
let r = request_rx.recv().await.unwrap();
assert_eq!(r, StreamingPullRequest::default());
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD * 2);
let r = request_rx.recv().await.unwrap();
assert_eq!(r, StreamingPullRequest::default());
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD * 3);
}
#[tokio_test_no_panics(start_paused = true)]
async fn shutdown_immediately() -> anyhow::Result<()> {
let start = Instant::now();
let (request_tx, mut request_rx) = channel(1);
let shutdown = CancellationToken::new();
let handle = spawn(request_tx, shutdown.clone());
let _ = request_rx.recv().await.unwrap();
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD);
const DELTA: Duration = Duration::from_secs(10);
tokio::time::advance(DELTA).await;
shutdown.cancel();
handle.await?;
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD + DELTA);
Ok(())
}
}