use parking_lot::Mutex;
use crate::effects::Effects;
use crate::error::{Error, Result};
use crate::types::{PromiseCreateReq, PromiseRecord};
pub(crate) struct CreationSequencer {
tail: Mutex<Option<tokio::sync::watch::Receiver<CreationState>>>,
}
impl CreationSequencer {
pub(crate) fn new() -> Self {
Self {
tail: Mutex::new(None),
}
}
pub(crate) fn claim_slot(&self) -> CreationSlot {
let (tx, rx) = creation_channel();
let prev = self.tail.lock().replace(rx);
CreationSlot { prev, tx }
}
}
#[derive(Clone, Debug)]
pub(crate) enum CreationState {
InFlight,
Created,
Failed(String),
}
pub(crate) fn creation_channel() -> (
tokio::sync::watch::Sender<CreationState>,
tokio::sync::watch::Receiver<CreationState>,
) {
tokio::sync::watch::channel(CreationState::InFlight)
}
pub(crate) async fn await_created_id(
id: &str,
created: &tokio::sync::watch::Receiver<CreationState>,
) -> Result<String> {
let mut rx = created.clone();
let state = rx
.wait_for(|s| !matches!(s, CreationState::InFlight))
.await
.map_err(|_| Error::JoinError(format!("task {} was dropped", id)))?;
match &*state {
CreationState::Created => Ok(id.to_string()),
CreationState::Failed(msg) => Err(Error::PromiseCreation(msg.clone())),
CreationState::InFlight => unreachable!("wait_for excludes InFlight"),
}
}
pub(crate) struct CreationSlot {
prev: Option<tokio::sync::watch::Receiver<CreationState>>,
tx: tokio::sync::watch::Sender<CreationState>,
}
impl CreationSlot {
pub(crate) fn subscribe(&self) -> tokio::sync::watch::Receiver<CreationState> {
self.tx.subscribe()
}
pub(crate) async fn create(
self,
effects: &Effects,
req: PromiseCreateReq,
) -> Result<PromiseRecord> {
if let Some(mut prev) = self.prev {
let predecessor_ok = match prev
.wait_for(|s| !matches!(s, CreationState::InFlight))
.await
{
Ok(state) => matches!(&*state, CreationState::Created),
Err(_) => false, };
if !predecessor_ok {
let e = Error::PromiseCreation(
"aborted: a previous promise creation in this workflow failed".to_string(),
);
let _ = self.tx.send(CreationState::Failed(e.to_string()));
return Err(e);
}
}
match effects.create_promise(req).await {
Ok(record) => {
let _ = self.tx.send(CreationState::Created); Ok(record)
}
Err(e) => {
let _ = self.tx.send(CreationState::Failed(e.to_string()));
Err(e)
}
}
}
}