use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use crate::event::{BarrierDecision, BarrierDecisionMessage, BarrierId};
use crate::state::State;
use crate::workflow_state::WorkflowState;
pub(crate) enum BarrierOutcome {
Decision(BarrierDecision),
TimedOut,
Cancelled,
}
pub(crate) async fn wait_for_barrier_decision(
decision_rx: &mut tokio::sync::mpsc::Receiver<BarrierDecisionMessage>,
cancel_rx: &mut tokio::sync::mpsc::Receiver<()>,
cancel: &CancellationToken,
barrier_id: &BarrierId,
timeout: Option<std::time::Duration>,
wildcard_cache: &mut HashMap<String, BarrierDecision>,
) -> BarrierOutcome {
if let Some(decision) = wildcard_cache.get(&barrier_id.node_id) {
return BarrierOutcome::Decision(decision.clone());
}
if let Some(dur) = timeout {
tokio::select! {
biased;
_ = cancel_rx.recv() => {
cancel.cancel();
BarrierOutcome::Cancelled
}
_ = tokio::time::sleep(dur) => BarrierOutcome::TimedOut,
msg = decision_rx.recv() => match msg {
Some(BarrierDecisionMessage::Exact { barrier_id: bid, decision }) => {
if bid == *barrier_id { BarrierOutcome::Decision(decision) } else { BarrierOutcome::Cancelled }
}
Some(BarrierDecisionMessage::Wildcard { node_id, decision }) => {
if node_id == barrier_id.node_id {
wildcard_cache.insert(node_id.clone(), decision.clone());
BarrierOutcome::Decision(decision)
} else { BarrierOutcome::Cancelled }
}
None => BarrierOutcome::Cancelled,
},
}
} else {
tokio::select! {
biased;
_ = cancel_rx.recv() => {
cancel.cancel();
BarrierOutcome::Cancelled
}
msg = decision_rx.recv() => match msg {
Some(BarrierDecisionMessage::Exact { barrier_id: bid, decision }) => {
if bid == *barrier_id { BarrierOutcome::Decision(decision) } else { BarrierOutcome::Cancelled }
}
Some(BarrierDecisionMessage::Wildcard { node_id, decision }) => {
if node_id == barrier_id.node_id {
wildcard_cache.insert(node_id.clone(), decision.clone());
BarrierOutcome::Decision(decision)
} else { BarrierOutcome::Cancelled }
}
None => BarrierOutcome::Cancelled,
},
}
}
}
pub(crate) fn apply_barrier_decision_generic<S: WorkflowState>(
_state: &mut S,
_node_name: &str,
decision: &BarrierDecision,
) -> Option<String> {
match decision {
BarrierDecision::Approve | BarrierDecision::Reject { .. } => {
None
}
BarrierDecision::Modify { .. } => {
tracing::warn!(
"BarrierDecision::Modify is only supported for State (HashMap), ignoring"
);
None
}
BarrierDecision::Reroute { target } => Some(target.clone()),
}
}
#[allow(dead_code)]
pub(crate) fn apply_barrier_decision(
state: &mut State,
node_name: &str,
decision: &BarrierDecision,
) -> Option<String> {
let approve_key = format!("{node_name}.approved");
let reject_key = format!("{node_name}.reject_reason");
match decision {
BarrierDecision::Approve => {
state.insert(approve_key, serde_json::json!(true));
state.remove(&reject_key);
None
}
BarrierDecision::Reject { reason } => {
state.insert(reject_key, serde_json::json!(reason));
state.remove(&approve_key);
None
}
BarrierDecision::Modify { key, value } => {
state.insert(key.clone(), value.clone());
None
}
BarrierDecision::Reroute { target } => Some(target.clone()),
}
}