use std::future::{Future, IntoFuture};
use std::pin::Pin;
use serde::Serialize;
use tokio::sync::watch;
use super::WorkflowState;
use super::execution::generate_instance_id;
use crate::context::{StepData, serialize_step};
use crate::error::EngineError;
pub struct Invocation {
pub(super) instance_id: String,
pub(super) status: watch::Receiver<WorkflowState>,
}
impl Invocation {
#[must_use]
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn status(&mut self) -> &mut watch::Receiver<WorkflowState> {
&mut self.status
}
pub async fn wait(mut self) -> WorkflowState {
loop {
let ws = self.status.borrow().clone();
if ws.is_terminal() || matches!(ws, WorkflowState::Suspended { .. }) {
return ws;
}
if self.status.changed().await.is_err() {
return self.status.borrow().clone();
}
}
}
#[must_use]
pub fn into_parts(self) -> (String, watch::Receiver<WorkflowState>) {
(self.instance_id, self.status)
}
}
pub struct InvocationBuilder<'a> {
pub(super) engine: &'a super::Engine,
pub(super) workflow_name: String,
pub(super) input_payload: Result<Option<Vec<u8>>, EngineError>,
}
impl InvocationBuilder<'_> {
#[must_use]
pub fn input<T: Serialize>(mut self, payload: T) -> Self {
let data: StepData<T> = StepData::Completed {
result: payload,
status: None,
};
self.input_payload = serialize_step(&data, "_input").map(Some);
self
}
}
impl<'a> IntoFuture for InvocationBuilder<'a> {
type Output = Result<Invocation, EngineError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let input_bytes = self.input_payload?;
self.engine
.spawn_workflow(&self.workflow_name, generate_instance_id(), input_bytes)
.await
})
}
}