use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};
use blazen_events::AnyEvent;
use tokio::sync::{broadcast, oneshot};
use tokio::task::JoinHandle;
#[cfg(feature = "telemetry")]
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use crate::error::WorkflowError;
use crate::session_ref::SessionRefRegistry;
use crate::snapshot::WorkflowSnapshot;
pub struct WorkflowHandler {
result_rx: Option<oneshot::Receiver<Result<Box<dyn AnyEvent>, WorkflowError>>>,
stream_tx: broadcast::Sender<Box<dyn AnyEvent>>,
pause_tx: Option<oneshot::Sender<()>>,
snapshot_rx: Option<oneshot::Receiver<WorkflowSnapshot>>,
event_loop_handle: Option<JoinHandle<()>>,
session_refs: Arc<SessionRefRegistry>,
#[cfg(feature = "telemetry")]
history_rx: Option<mpsc::UnboundedReceiver<blazen_telemetry::HistoryEvent>>,
}
impl WorkflowHandler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
result_rx: oneshot::Receiver<Result<Box<dyn AnyEvent>, WorkflowError>>,
stream_tx: broadcast::Sender<Box<dyn AnyEvent>>,
pause_tx: Option<oneshot::Sender<()>>,
snapshot_rx: Option<oneshot::Receiver<WorkflowSnapshot>>,
event_loop_handle: JoinHandle<()>,
session_refs: Arc<SessionRefRegistry>,
#[cfg(feature = "telemetry")] history_rx: Option<
mpsc::UnboundedReceiver<blazen_telemetry::HistoryEvent>,
>,
) -> Self {
Self {
result_rx: Some(result_rx),
stream_tx,
pause_tx,
snapshot_rx,
event_loop_handle: Some(event_loop_handle),
session_refs,
#[cfg(feature = "telemetry")]
history_rx,
}
}
#[must_use]
pub fn session_refs(&self) -> Arc<SessionRefRegistry> {
Arc::clone(&self.session_refs)
}
pub async fn result(mut self) -> Result<Box<dyn AnyEvent>, WorkflowError> {
let rx = self
.result_rx
.take()
.expect("result() called after result was already consumed");
let result = rx.await.unwrap_or(Err(WorkflowError::ChannelClosed));
if let Some(handle) = self.event_loop_handle.take() {
let _ = handle.await;
}
result
}
pub fn stream_events(
&self,
) -> impl tokio_stream::Stream<Item = Box<dyn AnyEvent>> + Send + Unpin + use<> {
let rx = self.stream_tx.subscribe();
BroadcastStream::new(rx).filter_map(std::result::Result::ok)
}
pub async fn pause(mut self) -> Result<WorkflowSnapshot, WorkflowError> {
let pause_tx = self.pause_tx.take().ok_or(WorkflowError::ChannelClosed)?;
let snapshot_rx = self
.snapshot_rx
.take()
.ok_or(WorkflowError::ChannelClosed)?;
pause_tx
.send(())
.map_err(|()| WorkflowError::ChannelClosed)?;
let snapshot = snapshot_rx
.await
.map_err(|_| WorkflowError::ChannelClosed)?;
if let Some(handle) = self.event_loop_handle.take() {
let _ = handle.await;
}
Ok(snapshot)
}
#[cfg(feature = "telemetry")]
pub fn collect_history(
&mut self,
run_id: uuid::Uuid,
workflow_name: String,
) -> Option<blazen_telemetry::WorkflowHistory> {
let mut rx = self.history_rx.take()?;
let mut history = blazen_telemetry::WorkflowHistory::new(run_id, workflow_name);
while let Ok(mut event) = rx.try_recv() {
event.sequence = history.events.len() as u64;
history.events.push(event);
}
Some(history)
}
}
pub struct WorkflowHandlerFuture {
rx: oneshot::Receiver<Result<Box<dyn AnyEvent>, WorkflowError>>,
event_loop_handle: Option<JoinHandle<()>>,
result: Option<Result<Box<dyn AnyEvent>, WorkflowError>>,
}
impl Future for WorkflowHandlerFuture {
type Output = Result<Box<dyn AnyEvent>, WorkflowError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if self.result.is_none() {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(Ok(result)) => {
self.result = Some(result);
}
Poll::Ready(Err(_)) => {
self.result = Some(Err(WorkflowError::ChannelClosed));
}
Poll::Pending => return Poll::Pending,
}
}
if let Some(handle) = &mut self.event_loop_handle {
match Pin::new(handle).poll(cx) {
Poll::Ready(_) => {
self.event_loop_handle = None;
}
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(self.result.take().expect("result was already consumed"))
}
}
impl IntoFuture for WorkflowHandler {
type Output = Result<Box<dyn AnyEvent>, WorkflowError>;
type IntoFuture = WorkflowHandlerFuture;
fn into_future(mut self) -> Self::IntoFuture {
let rx = self
.result_rx
.take()
.expect("IntoFuture: result was already consumed");
WorkflowHandlerFuture {
rx,
event_loop_handle: self.event_loop_handle.take(),
result: None,
}
}
}