use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
use futures_core::Stream;
use tokio::sync::watch;
use crate::engine::WorkflowState;
pub struct StatusStream {
inner: Inner,
}
type ChangedFuture = Pin<
Box<
dyn Future<
Output = (
Result<(), watch::error::RecvError>,
watch::Receiver<WorkflowState>,
),
> + Send,
>,
>;
enum Inner {
Live(LiveState),
Snapshot(Option<WorkflowState>),
}
enum LiveState {
Initial(watch::Receiver<WorkflowState>),
Idle(watch::Receiver<WorkflowState>),
Waiting(ChangedFuture),
Transitioning,
}
impl fmt::Debug for StatusStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner {
Inner::Live(_) => f
.debug_struct("StatusStream")
.field("kind", &"live")
.finish(),
Inner::Snapshot(s) => f.debug_struct("StatusStream").field("snapshot", s).finish(),
}
}
}
impl StatusStream {
pub(crate) fn live(rx: watch::Receiver<WorkflowState>) -> Self {
Self {
inner: Inner::Live(LiveState::Initial(rx)),
}
}
pub(crate) fn snapshot(state: WorkflowState) -> Self {
Self {
inner: Inner::Snapshot(Some(state)),
}
}
pub async fn next(&mut self) -> Option<WorkflowState> {
std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
}
}
impl Stream for StatusStream {
type Item = WorkflowState;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match &mut this.inner {
Inner::Live(state) => loop {
match std::mem::replace(state, LiveState::Transitioning) {
LiveState::Initial(mut rx) => {
let value = rx.borrow_and_update().clone();
*state = LiveState::Idle(rx);
return Poll::Ready(Some(value));
}
LiveState::Idle(mut rx) => {
let fut = Box::pin(async move {
let result = rx.changed().await;
(result, rx)
});
*state = LiveState::Waiting(fut);
}
LiveState::Waiting(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready((Ok(()), mut rx)) => {
let value = rx.borrow_and_update().clone();
*state = LiveState::Idle(rx);
return Poll::Ready(Some(value));
}
Poll::Ready((Err(_), _)) => {
return Poll::Ready(None);
}
Poll::Pending => {
*state = LiveState::Waiting(fut);
return Poll::Pending;
}
},
LiveState::Transitioning => unreachable!(),
}
},
Inner::Snapshot(state) => Poll::Ready(state.take()),
}
}
}