use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
use crate::ir_nodes::{
IRComputeApplyStep, IRDaemonStepNode, IRListenStep, IRMandateApplyStep,
IROtsApplyStep, IRShieldApplyStep,
};
pub fn apply_shield_to_target(shield_name: &str, target: &str, _ctx: &DispatchCtx) -> String {
let _ = shield_name;
target.to_string()
}
pub fn apply_ots_to_target(ots_name: &str, target: &str, _ctx: &DispatchCtx) -> String {
let _ = ots_name;
target.to_string()
}
pub fn apply_mandate_to_target(mandate_name: &str, target: &str, _ctx: &DispatchCtx) -> String {
let _ = mandate_name;
target.to_string()
}
pub fn invoke_compute_capability(
compute_name: &str,
arguments: &[String],
ctx: &DispatchCtx,
) -> String {
let resolved: Vec<String> = arguments
.iter()
.map(|arg| {
ctx.let_bindings
.get(arg)
.cloned()
.unwrap_or_else(|| arg.clone())
})
.collect();
format!("compute:{compute_name}({})", resolved.join(", "))
}
pub fn listen_on_channel(channel: &str, _channel_is_ref: bool, _ctx: &DispatchCtx) -> String {
format!("(awaiting {channel})")
}
pub fn invoke_daemon(daemon_ref: &str, _ctx: &DispatchCtx) -> String {
format!("daemon:{daemon_ref}")
}
pub async fn run_shield_apply(
node: &IRShieldApplyStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.shield_name.is_empty() {
"ShieldApply".to_string()
} else {
node.shield_name.clone()
};
emit_step_start(ctx, &step_name, step_index, "shield_apply")?;
let resolved_target = ctx
.let_bindings
.get(&node.target)
.cloned()
.unwrap_or_else(|| node.target.clone());
let shielded = match crate::shield_registry::lookup_shield_scanner(&node.shield_name) {
Some(scanner) => {
let scan_ctx = crate::shield_registry::ShieldScanContext::new(node.shield_name.clone());
match scanner.scan(&resolved_target, &scan_ctx) {
crate::shield_registry::ShieldVerdict::Pass(content) => content,
crate::shield_registry::ShieldVerdict::Reject { code, reason } => {
return Err(DispatchError::BackendError {
name: format!("shield:{}", node.shield_name),
message: format!("[{code}] {reason}"),
});
}
}
}
None => apply_shield_to_target(&node.shield_name, &resolved_target, ctx),
};
let output_key = if !node.output_type.is_empty() {
node.output_type.clone()
} else if !node.target.is_empty() {
format!("{}_shielded", node.target)
} else {
String::new()
};
if !output_key.is_empty() {
ctx.let_bindings.insert(output_key, shielded.clone());
}
emit_step_complete(ctx, &step_name, step_index, &shielded, 0)?;
Ok(NodeOutcome::Completed {
output: shielded,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_ots_apply(
node: &IROtsApplyStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.ots_name.is_empty() {
"OtsApply".to_string()
} else {
node.ots_name.clone()
};
emit_step_start(ctx, &step_name, step_index, "ots_apply")?;
let resolved_target = ctx
.let_bindings
.get(&node.target)
.cloned()
.unwrap_or_else(|| node.target.clone());
let transformed = apply_ots_to_target(&node.ots_name, &resolved_target, ctx);
let output_key = if !node.output_type.is_empty() {
node.output_type.clone()
} else if !node.target.is_empty() {
format!("{}_ots", node.target)
} else {
String::new()
};
if !output_key.is_empty() {
ctx.let_bindings.insert(output_key, transformed.clone());
}
emit_step_complete(ctx, &step_name, step_index, &transformed, 0)?;
Ok(NodeOutcome::Completed {
output: transformed,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_mandate_apply(
node: &IRMandateApplyStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.mandate_name.is_empty() {
"MandateApply".to_string()
} else {
node.mandate_name.clone()
};
emit_step_start(ctx, &step_name, step_index, "mandate_apply")?;
let resolved_target = ctx
.let_bindings
.get(&node.target)
.cloned()
.unwrap_or_else(|| node.target.clone());
let mandated = apply_mandate_to_target(&node.mandate_name, &resolved_target, ctx);
let output_key = if !node.output_type.is_empty() {
node.output_type.clone()
} else if !node.target.is_empty() {
format!("{}_mandated", node.target)
} else {
String::new()
};
if !output_key.is_empty() {
ctx.let_bindings.insert(output_key, mandated.clone());
}
emit_step_complete(ctx, &step_name, step_index, &mandated, 0)?;
Ok(NodeOutcome::Completed {
output: mandated,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_compute_apply(
node: &IRComputeApplyStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.compute_name.is_empty() {
"ComputeApply".to_string()
} else {
node.compute_name.clone()
};
emit_step_start(ctx, &step_name, step_index, "compute_apply")?;
let result = invoke_compute_capability(&node.compute_name, &node.arguments, ctx);
if !node.output_name.is_empty() {
ctx.let_bindings.insert(node.output_name.clone(), result.clone());
}
emit_step_complete(ctx, &step_name, step_index, &result, 0)?;
Ok(NodeOutcome::Completed {
output: result,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_listen(
node: &IRListenStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.event_alias.is_empty() {
"Listen".to_string()
} else {
node.event_alias.clone()
};
emit_step_start(ctx, &step_name, step_index, "listen")?;
let placeholder = listen_on_channel(&node.channel, node.channel_is_ref, ctx);
if !node.event_alias.is_empty() {
ctx.let_bindings
.insert(node.event_alias.clone(), placeholder.clone());
}
emit_step_complete(ctx, &step_name, step_index, &placeholder, 0)?;
Ok(NodeOutcome::Completed {
output: placeholder,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_daemon_step(
node: &IRDaemonStepNode,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.daemon_ref.is_empty() {
"DaemonStep".to_string()
} else {
node.daemon_ref.clone()
};
emit_step_start(ctx, &step_name, step_index, "daemon_step")?;
let invoked = invoke_daemon(&node.daemon_ref, ctx);
if !node.daemon_ref.is_empty() {
ctx.let_bindings
.insert(format!("{}_invoked", node.daemon_ref), invoked.clone());
}
emit_step_complete(ctx, &step_name, step_index, &invoked, 0)?;
Ok(NodeOutcome::Completed {
output: invoked,
tokens_emitted: 0,
step_index,
})
}
fn emit_step_start(
ctx: &mut DispatchCtx,
step_name: &str,
step_index: usize,
step_type: &str,
) -> Result<(), DispatchError> {
ctx.tx
.send(FlowExecutionEvent::StepStart {
step_name: step_name.to_string(),
step_index,
step_type: step_type.to_string(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)
}
fn emit_step_complete(
ctx: &mut DispatchCtx,
step_name: &str,
step_index: usize,
full_output: &str,
tokens_output: u64,
) -> Result<(), DispatchError> {
ctx.tx
.send(FlowExecutionEvent::StepComplete {
step_name: step_name.to_string(),
step_index,
success: true,
full_output: full_output.to_string(),
tokens_input: 0,
tokens_output,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
use crate::ir_nodes::*;
use tokio::sync::mpsc;
fn fresh_ctx() -> (
DispatchCtx,
mpsc::UnboundedReceiver<FlowExecutionEvent>,
) {
let (tx, rx) = mpsc::unbounded_channel();
let ctx = DispatchCtx::new(
"TestFlow",
"stub",
"",
CancellationFlag::new(),
tx,
);
(ctx, rx)
}
#[test]
fn apply_shield_oss_default_is_identity() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(apply_shield_to_target("hipaa", "patient data", &ctx), "patient data");
}
#[test]
fn apply_ots_oss_default_is_identity() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(apply_ots_to_target("audio_resampler", "raw bytes", &ctx), "raw bytes");
}
#[test]
fn apply_mandate_oss_default_is_identity() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(apply_mandate_to_target("gdpr_erasure", "user record", &ctx), "user record");
}
#[test]
fn invoke_compute_canonical_format_with_literal_args() {
let (ctx, _rx) = fresh_ctx();
let result = invoke_compute_capability(
"sum",
&["1".to_string(), "2".to_string(), "3".to_string()],
&ctx,
);
assert_eq!(result, "compute:sum(1, 2, 3)");
}
#[test]
fn invoke_compute_resolves_symbolic_args_through_let_bindings() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("a".into(), "10".into());
ctx.let_bindings.insert("b".into(), "20".into());
let result = invoke_compute_capability(
"add",
&["a".to_string(), "b".to_string()],
&ctx,
);
assert_eq!(result, "compute:add(10, 20)");
}
#[test]
fn listen_returns_canonical_awaiting_placeholder() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(listen_on_channel("event_bus", true, &ctx), "(awaiting event_bus)");
}
#[test]
fn invoke_daemon_returns_canonical_invocation_placeholder() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(invoke_daemon("supervisor", &ctx), "daemon:supervisor");
}
#[tokio::test]
async fn run_shield_apply_binds_output_under_output_type() {
let (mut ctx, mut rx) = fresh_ctx();
ctx.let_bindings.insert("input_text".into(), "sensitive".into());
let node = IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: "hipaa".into(),
target: "input_text".into(),
output_type: "scrubbed".into(),
};
let outcome = run_shield_apply(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed { output, tokens_emitted, .. } => {
assert_eq!(output, "sensitive");
assert_eq!(tokens_emitted, 0);
}
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(ctx.let_bindings.get("scrubbed").unwrap(), "sensitive");
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "shield_apply");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_shield_apply_canonical_fallback_output_key() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("doc".into(), "content".into());
let node = IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: "hipaa".into(),
target: "doc".into(),
output_type: String::new(),
};
run_shield_apply(&node, &mut ctx).await.unwrap();
assert_eq!(ctx.let_bindings.get("doc_shielded").unwrap(), "content");
}
struct RedactScanner;
impl crate::shield_registry::ShieldScanner for RedactScanner {
fn scan(
&self,
_target: &str,
_ctx: &crate::shield_registry::ShieldScanContext,
) -> crate::shield_registry::ShieldVerdict {
crate::shield_registry::ShieldVerdict::pass("[REDACTED]")
}
}
struct BlockScanner;
impl crate::shield_registry::ShieldScanner for BlockScanner {
fn scan(
&self,
_target: &str,
_ctx: &crate::shield_registry::ShieldScanContext,
) -> crate::shield_registry::ShieldVerdict {
crate::shield_registry::ShieldVerdict::reject("phi.unredacted", "PHI present")
}
}
#[tokio::test]
async fn run_shield_apply_routes_through_registered_scanner() {
const NAME: &str = "t40b_redact";
crate::shield_registry::register_shield_scanner(NAME, std::sync::Arc::new(RedactScanner));
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("note".into(), "SSN 123-45-6789".into());
let node = IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: NAME.into(),
target: "note".into(),
output_type: "scrubbed".into(),
};
let outcome = run_shield_apply(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed { output, .. } => assert_eq!(output, "[REDACTED]"),
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(ctx.let_bindings.get("scrubbed").unwrap(), "[REDACTED]");
crate::shield_registry::unregister_shield_scanner(NAME);
}
#[tokio::test]
async fn run_shield_apply_rejecting_scanner_surfaces_backend_error() {
const NAME: &str = "t40b_block";
crate::shield_registry::register_shield_scanner(NAME, std::sync::Arc::new(BlockScanner));
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("note".into(), "raw phi".into());
let node = IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: NAME.into(),
target: "note".into(),
output_type: "scrubbed".into(),
};
let err = run_shield_apply(&node, &mut ctx).await.unwrap_err();
match err {
DispatchError::BackendError { name, message } => {
assert_eq!(name, format!("shield:{NAME}"));
assert!(message.contains("phi.unredacted"), "blame code in message");
assert!(message.contains("PHI present"), "reason in message");
}
other => panic!("expected BackendError, got {other:?}"),
}
assert!(ctx.let_bindings.get("scrubbed").is_none());
crate::shield_registry::unregister_shield_scanner(NAME);
}
#[tokio::test]
async fn run_shield_apply_unregistered_name_is_identity() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("doc".into(), "untouched".into());
let node = IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: "t40b_never_registered".into(),
target: "doc".into(),
output_type: "out".into(),
};
let outcome = run_shield_apply(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed { output, .. } => assert_eq!(output, "untouched"),
other => panic!("expected Completed, got {other:?}"),
}
}
#[tokio::test]
async fn run_ots_apply_binds_output() {
let (mut ctx, mut rx) = fresh_ctx();
ctx.let_bindings.insert("raw_audio".into(), "samples".into());
let node = IROtsApplyStep {
node_type: "ots_apply",
source_line: 0,
source_column: 0,
ots_name: "g711_mulaw".into(),
target: "raw_audio".into(),
output_type: "pcm".into(),
};
run_ots_apply(&node, &mut ctx).await.unwrap();
assert_eq!(ctx.let_bindings.get("pcm").unwrap(), "samples");
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "ots_apply");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_mandate_apply_binds_output() {
let (mut ctx, mut rx) = fresh_ctx();
let node = IRMandateApplyStep {
node_type: "mandate_apply",
source_line: 0,
source_column: 0,
mandate_name: "gdpr_erasure".into(),
target: "user_record".into(),
output_type: "erased".into(),
};
run_mandate_apply(&node, &mut ctx).await.unwrap();
assert_eq!(
ctx.let_bindings.get("erased").unwrap(),
"user_record"
);
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "mandate_apply");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_compute_apply_binds_result_under_output_name() {
let (mut ctx, mut rx) = fresh_ctx();
ctx.let_bindings.insert("x".into(), "5".into());
ctx.let_bindings.insert("y".into(), "7".into());
let node = IRComputeApplyStep {
node_type: "compute_apply",
source_line: 0,
source_column: 0,
compute_name: "add".into(),
arguments: vec!["x".into(), "y".into()],
output_name: "sum".into(),
};
run_compute_apply(&node, &mut ctx).await.unwrap();
assert_eq!(ctx.let_bindings.get("sum").unwrap(), "compute:add(5, 7)");
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "compute_apply");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_listen_binds_placeholder_under_event_alias() {
let (mut ctx, mut rx) = fresh_ctx();
let node = IRListenStep {
node_type: "listen",
source_line: 0,
source_column: 0,
channel: "user_events".into(),
channel_is_ref: true,
event_alias: "evt".into(),
};
run_listen(&node, &mut ctx).await.unwrap();
assert_eq!(
ctx.let_bindings.get("evt").unwrap(),
"(awaiting user_events)"
);
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "listen");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_daemon_step_binds_invocation_placeholder() {
let (mut ctx, mut rx) = fresh_ctx();
let node = IRDaemonStepNode {
node_type: "daemon_step",
source_line: 0,
source_column: 0,
daemon_ref: "audit_supervisor".into(),
};
run_daemon_step(&node, &mut ctx).await.unwrap();
assert_eq!(
ctx.let_bindings.get("audit_supervisor_invoked").unwrap(),
"daemon:audit_supervisor"
);
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "daemon_step");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn every_handler_short_circuits_on_cancel() {
let cancel = CancellationFlag::new();
cancel.cancel();
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
assert!(matches!(
run_shield_apply(
&IRShieldApplyStep {
node_type: "shield_apply",
source_line: 0,
source_column: 0,
shield_name: "x".into(),
target: "y".into(),
output_type: "z".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
assert!(matches!(
run_ots_apply(
&IROtsApplyStep {
node_type: "ots_apply",
source_line: 0,
source_column: 0,
ots_name: "x".into(),
target: "y".into(),
output_type: "z".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
assert!(matches!(
run_mandate_apply(
&IRMandateApplyStep {
node_type: "mandate_apply",
source_line: 0,
source_column: 0,
mandate_name: "x".into(),
target: "y".into(),
output_type: "z".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
assert!(matches!(
run_compute_apply(
&IRComputeApplyStep {
node_type: "compute_apply",
source_line: 0,
source_column: 0,
compute_name: "x".into(),
arguments: vec![],
output_name: "y".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
assert!(matches!(
run_listen(
&IRListenStep {
node_type: "listen",
source_line: 0,
source_column: 0,
channel: "x".into(),
channel_is_ref: false,
event_alias: "y".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
assert!(matches!(
run_daemon_step(
&IRDaemonStepNode {
node_type: "daemon_step",
source_line: 0,
source_column: 0,
daemon_ref: "x".into(),
},
&mut ctx,
)
.await,
Err(DispatchError::UpstreamCancelled)
));
}
}