use crate::{
pollers, protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
ServerGatewayApis,
};
use futures::{stream::repeat_with, Stream, StreamExt};
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct PollWorkflowTaskBuffer {
buffered_polls: Mutex<
Box<dyn Stream<Item = pollers::Result<PollWorkflowTaskQueueResponse>> + Unpin + Send>,
>,
}
impl PollWorkflowTaskBuffer {
pub fn new(sg: Arc<impl ServerGatewayApis + Send + Sync + 'static>) -> Self {
let pbuff = repeat_with(move || {
let sg = sg.clone();
async move { sg.poll_workflow_task().await }
})
.buffered(1);
Self {
buffered_polls: Mutex::new(Box::new(pbuff)),
}
}
pub async fn poll_workflow_task(&self) -> pollers::Result<PollWorkflowTaskQueueResponse> {
let mut locked = self.buffered_polls.lock().await;
(*locked)
.next()
.await
.expect("There is always another item in the stream")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pollers::manual_mock::MockManualGateway;
use futures::FutureExt;
use std::time::Duration;
use tokio::{select, sync::mpsc::channel};
#[tokio::test]
async fn only_polls_once() {
let mut mock_gateway = MockManualGateway::new();
mock_gateway
.expect_poll_workflow_task()
.times(2)
.returning(move || {
async {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(Default::default())
}
.boxed()
});
let mock_gateway = Arc::new(mock_gateway);
let pb = PollWorkflowTaskBuffer::new(mock_gateway);
let (interrupter_tx, mut interrupter_rx) = channel(50);
for _ in 0..10 {
interrupter_tx.send(()).await.unwrap();
}
let mut last_val = false;
for _ in 0..11 {
select! {
_ = interrupter_rx.recv() => {
}
_ = pb.poll_workflow_task() => {
last_val = true;
}
}
}
assert!(last_val);
pb.poll_workflow_task().await.unwrap();
}
}