use crate::backends::{ChatRequest, Message};
use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
use crate::ir_nodes::{
IRProbe, IRReasonStep, IRRefineStep, IRStep, IRValidateStep, IRWeaveStep,
};
use crate::stream_effect::BackpressurePolicy;
use futures::StreamExt;
use sha2::{Digest, Sha256};
pub struct PureShapeStep {
pub name: String,
pub user_prompt: String,
pub framing_addendum: Option<String>,
pub kind_slug: &'static str,
pub tools: Vec<crate::backends::ToolSpec>,
}
pub async fn run_step(
step: &IRStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let prompt =
crate::exec_context::interpolate_vars(&step.ask, &ctx.let_bindings);
if !step.apply_ref.is_empty() {
if let Some(registry) = ctx.tool_registry.clone() {
if let Some(entry) = registry.get(&step.apply_ref) {
if entry.is_streaming {
return run_step_streaming_tool(step, entry.clone(), &prompt, ctx).await;
}
}
}
}
let tools = synthesize_tools_from_step(step);
let shape = PureShapeStep {
name: if step.name.is_empty() {
"Step".to_string()
} else {
step.name.clone()
},
user_prompt: prompt,
framing_addendum: None,
kind_slug: "step",
tools,
};
run_pure_shape(shape, ctx).await
}
async fn run_step_streaming_tool(
step: &IRStep,
entry: crate::tool_registry::ToolEntry,
prompt: &str,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let step_index = ctx.step_counter;
ctx.step_counter += 1;
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let policy =
crate::tool_dispatch_bridge::extract_stream_policy(&entry.effect_row);
let step_name = if step.name.is_empty() {
"Step".to_string()
} else {
step.name.clone()
};
ctx.tx
.send(FlowExecutionEvent::StepStart {
step_name: step_name.clone(),
step_index,
step_type: "step".to_string(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
let tool_ctx = crate::tool_dispatch_bridge::build_tool_context(
ctx.cancel.clone(),
0, );
let tool = crate::tool_dispatch_bridge::resolve_streaming_tool(&entry);
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let source = tool.stream(prompt.to_string(), tool_ctx).await;
let summary = crate::flow_dispatcher::unified_stream::unified_stream_handler(
source,
policy,
&ctx.cancel,
&ctx.tx,
&step_name,
)
.await?;
if let Some(p) = policy {
let wire = crate::axon_server::EnforcementSummaryWire {
policy_slug: p.slug().to_string(),
chunks_pushed: summary.chunks_pushed,
chunks_delivered: summary.chunks_delivered,
drop_oldest_hits: summary.chunks_dropped,
degrade_quality_hits: summary.chunks_degraded,
pause_upstream_blocks: summary.pause_upstream_blocks,
fail_overflows: summary.fail_overflows,
failed: !summary.success,
};
ctx.enforcement_summaries
.lock()
.await
.insert(step_name.clone(), wire);
}
if summary.cancelled && ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
ctx.tx
.send(FlowExecutionEvent::StepComplete {
step_name: step_name.clone(),
step_index,
success: summary.success,
full_output: summary.accumulated.clone(),
tokens_input: 0,
tokens_output: summary.tokens_emitted,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
{
let terminator_kind = if summary.cancelled {
"cancelled"
} else if summary.terminator_message.is_some() {
"error"
} else {
"stop"
};
let record = crate::axonendpoint_replay::StepAuditRecord {
step_name: step_name.clone(),
step_index,
success: summary.success,
tokens_emitted: summary.tokens_emitted,
output_hash_hex: summary.output_hash_hex.clone(),
effect_policy_applied: policy.map(|p| p.slug().to_string()),
chunks_dropped: summary.chunks_dropped,
chunks_degraded: summary.chunks_degraded,
timestamp_ms: now_ms(),
tool_name: Some(entry.name.clone()),
tool_chunks_emitted: Some(summary.chunks_pushed),
tool_output_hash_hex: Some(summary.output_hash_hex.clone()),
tool_terminator_kind: Some(terminator_kind.to_string()),
};
let mut guard = ctx.step_audit_records.lock().await;
guard.push(record);
}
if let Some(message) = summary.terminator_message {
return Err(DispatchError::BackendError {
name: format!("tool:{}", entry.name),
message,
});
}
ctx.let_bindings
.insert(step_name.clone(), summary.accumulated.clone());
Ok(NodeOutcome::Completed {
output: summary.accumulated,
tokens_emitted: summary.tokens_emitted,
step_index,
})
}
fn synthesize_tools_from_step(step: &IRStep) -> Vec<crate::backends::ToolSpec> {
if step.apply_ref.is_empty() {
return Vec::new();
}
vec![crate::backends::ToolSpec {
name: step.apply_ref.clone(),
description: format!("Tool reference: {}", step.apply_ref),
parameters_json: "{}".to_string(),
}]
}
pub async fn run_probe(
probe: &IRProbe,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let shape = PureShapeStep {
name: if probe.target.is_empty() {
"Probe".to_string()
} else {
probe.target.clone()
},
user_prompt: format!("Investigate: {}", probe.target),
framing_addendum: Some(
"You are probing the target. Investigate deeply, surface what's hidden, return concisely.".into(),
),
kind_slug: "probe",
tools: Vec::new(),
};
run_pure_shape(shape, ctx).await
}
pub async fn run_reason(
reason: &IRReasonStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let strategy_clause = if reason.strategy.is_empty() {
String::new()
} else {
format!(" using strategy `{}`", reason.strategy)
};
let shape = PureShapeStep {
name: if reason.target.is_empty() {
"Reason".to_string()
} else {
reason.target.clone()
},
user_prompt: format!("Reason about: {}{}", reason.target, strategy_clause),
framing_addendum: Some(
"You are reasoning deliberately. Show the steps of your reasoning where they bear on the answer.".into(),
),
kind_slug: "reason",
tools: Vec::new(),
};
run_pure_shape(shape, ctx).await
}
pub async fn run_validate(
validate: &IRValidateStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let rule_clause = if validate.rule.is_empty() {
String::new()
} else {
format!(" against rule `{}`", validate.rule)
};
let shape = PureShapeStep {
name: if validate.target.is_empty() {
"Validate".to_string()
} else {
validate.target.clone()
},
user_prompt: format!("Validate: {}{}", validate.target, rule_clause),
framing_addendum: Some(
"You are validating. Return a structured verdict (pass/fail) with the reasoning that supports it.".into(),
),
kind_slug: "validate",
tools: Vec::new(),
};
run_pure_shape(shape, ctx).await
}
pub async fn run_refine(
refine: &IRRefineStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let strategy_clause = if refine.strategy.is_empty() {
String::new()
} else {
format!(" using strategy `{}`", refine.strategy)
};
let shape = PureShapeStep {
name: if refine.target.is_empty() {
"Refine".to_string()
} else {
refine.target.clone()
},
user_prompt: format!("Refine: {}{}", refine.target, strategy_clause),
framing_addendum: Some(
"You are refining. Treat the target as draft input; improve it along the declared strategy without losing fidelity to its intent.".into(),
),
kind_slug: "refine",
tools: Vec::new(),
};
run_pure_shape(shape, ctx).await
}
pub async fn run_weave(
weave: &IRWeaveStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let sources_clause = if weave.sources.is_empty() {
String::new()
} else {
format!(" from sources [{}]", weave.sources.join(", "))
};
let format_clause = if weave.format_type.is_empty() {
String::new()
} else {
format!(" as {}", weave.format_type)
};
let style_clause = if weave.style.is_empty() {
String::new()
} else {
format!(" in {} style", weave.style)
};
let priority_clause = if weave.priority.is_empty() {
String::new()
} else {
format!(" with priority [{}]", weave.priority.join(", "))
};
let shape = PureShapeStep {
name: if weave.target.is_empty() {
"Weave".to_string()
} else {
weave.target.clone()
},
user_prompt: format!(
"Weave: {}{}{}{}{}",
weave.target, sources_clause, format_clause, style_clause, priority_clause
),
framing_addendum: Some(
"You are weaving. Stitch the sources into the target output. Honor the declared priority + format + style.".into(),
),
kind_slug: "weave",
tools: Vec::new(),
};
run_pure_shape(shape, ctx).await
}
pub async fn run_pure_shape(
shape: PureShapeStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let effect_policy = ctx.take_pending_effect_policy();
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
ctx.tx
.send(FlowExecutionEvent::StepStart {
step_name: shape.name.clone(),
step_index,
step_type: shape.kind_slug.to_string(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
let backend = crate::backends::resolve_streaming_backend(&ctx.backend_name)
.ok_or_else(|| DispatchError::BackendError {
name: ctx.backend_name.clone(),
message: format!(
"not in streaming registry; supported: {}",
crate::backends::STREAMING_BACKEND_NAMES.join(", ")
),
})?;
let system = match &shape.framing_addendum {
Some(addendum) if ctx.system_prompt.is_empty() => addendum.clone(),
Some(addendum) => format!("{}\n\n{}", ctx.system_prompt, addendum),
None => ctx.system_prompt.clone(),
};
let request = ChatRequest {
model: String::new(),
messages: vec![Message::user(shape.user_prompt.clone())],
system: if system.is_empty() { None } else { Some(system) },
max_tokens: None,
temperature: None,
top_p: None,
tools: shape.tools.clone(),
stream: true,
trace_id: None,
cancel: ctx.cancel.clone(),
};
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let chunk_stream = backend
.stream(request)
.await
.map_err(|e| DispatchError::BackendError {
name: ctx.backend_name.clone(),
message: format!("{e}"),
})?;
let (accumulated, tokens_emitted, drop_count, degrade_count) = match effect_policy {
Some(policy) => drain_through_enforcer(
chunk_stream,
&shape,
ctx,
policy,
step_index,
)
.await?,
None => drain_direct(chunk_stream, &shape, ctx, step_index).await?,
};
let output_hash_hex = sha256_hex(&accumulated);
ctx.tx
.send(FlowExecutionEvent::StepComplete {
step_name: shape.name.clone(),
step_index,
success: true,
full_output: accumulated.clone(),
tokens_input: 0,
tokens_output: tokens_emitted,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
{
let record = crate::axonendpoint_replay::StepAuditRecord {
step_name: shape.name.clone(),
step_index,
success: true,
tokens_emitted,
output_hash_hex,
effect_policy_applied: effect_policy.map(|p| p.slug().to_string()),
chunks_dropped: drop_count,
chunks_degraded: degrade_count,
timestamp_ms: now_ms(),
tool_name: None,
tool_chunks_emitted: None,
tool_output_hash_hex: None,
tool_terminator_kind: None,
};
let mut guard = ctx.step_audit_records.lock().await;
guard.push(record);
}
ctx.let_bindings
.insert(shape.name.clone(), accumulated.clone());
Ok(NodeOutcome::Completed {
output: accumulated,
tokens_emitted,
step_index,
})
}
async fn drain_direct(
chunk_stream: crate::backends::ChatStream,
shape: &PureShapeStep,
ctx: &mut DispatchCtx,
_step_index: usize,
) -> Result<(String, u64, u64, u64), DispatchError> {
use crate::backends::FinishReason;
let mut accumulated = String::new();
let mut tokens_emitted: u64 = 0;
let mut stream = chunk_stream;
while let Some(chunk_result) = stream.next().await {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
match chunk_result {
Ok(chunk) => {
if let Some(FinishReason::ToolUse) = &chunk.finish_reason {
let tool_name = shape
.tools
.first()
.map(|t| t.name.clone())
.unwrap_or_else(|| "<unknown>".to_string());
ctx.tx
.send(FlowExecutionEvent::ToolCall {
step_name: shape.name.clone(),
tool_name,
content: chunk.delta.clone(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
}
if !chunk.delta.is_empty() {
tokens_emitted += 1;
accumulated.push_str(&chunk.delta);
ctx.tx
.send(FlowExecutionEvent::StepToken {
step_name: shape.name.clone(),
content: chunk.delta,
token_index: tokens_emitted,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
}
}
Err(e) => {
return Err(DispatchError::BackendError {
name: ctx.backend_name.clone(),
message: format!("chunk error: {e}"),
});
}
}
}
Ok((accumulated, tokens_emitted, 0, 0))
}
async fn drain_through_enforcer(
chunk_stream: crate::backends::ChatStream,
shape: &PureShapeStep,
ctx: &mut DispatchCtx,
policy: BackpressurePolicy,
_step_index: usize,
) -> Result<(String, u64, u64, u64), DispatchError> {
use crate::stream_effect_dispatcher::{StreamPolicyEnforcer, DEFAULT_STREAM_BUFFER_CAPACITY};
use std::sync::Arc;
let enforcer = Arc::new(match policy {
BackpressurePolicy::DegradeQuality => StreamPolicyEnforcer::with_degrader(
policy,
DEFAULT_STREAM_BUFFER_CAPACITY,
Arc::new(|chunk| chunk),
),
BackpressurePolicy::DropOldest
| BackpressurePolicy::PauseUpstream
| BackpressurePolicy::Fail => StreamPolicyEnforcer::new(policy),
});
let producer_enforcer = enforcer.clone();
let producer = tokio::spawn(async move {
let summary = producer_enforcer
.drain(chunk_stream, |_e| {
})
.await;
producer_enforcer.close().await;
summary
});
let mut accumulated = String::new();
let mut tokens_emitted: u64 = 0;
while let Some(chunk) = enforcer.pop_chunk().await {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
if let Some(crate::backends::FinishReason::ToolUse) = &chunk.finish_reason {
let tool_name = shape
.tools
.first()
.map(|t| t.name.clone())
.unwrap_or_else(|| "<unknown>".to_string());
ctx.tx
.send(FlowExecutionEvent::ToolCall {
step_name: shape.name.clone(),
tool_name,
content: chunk.delta.clone(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
}
if !chunk.delta.is_empty() {
tokens_emitted += 1;
accumulated.push_str(&chunk.delta);
ctx.tx
.send(FlowExecutionEvent::StepToken {
step_name: shape.name.clone(),
content: chunk.delta,
token_index: tokens_emitted,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
}
}
let drain_summary = producer.await.map_err(|e| DispatchError::BackendError {
name: ctx.backend_name.clone(),
message: format!("enforcer producer task join: {e}"),
})?;
let snap = enforcer.metrics_snapshot();
let wire = crate::axon_server::EnforcementSummaryWire {
policy_slug: policy.slug().to_string(),
chunks_pushed: snap.items_pushed,
chunks_delivered: snap.items_delivered,
drop_oldest_hits: snap.drop_oldest_hits,
degrade_quality_hits: snap.degrade_quality_hits,
pause_upstream_blocks: snap.pause_upstream_blocks,
fail_overflows: snap.fail_overflows,
failed: drain_summary.failed,
};
{
let mut guard = ctx.enforcement_summaries.lock().await;
guard.insert(shape.name.clone(), wire);
}
let drop_count = snap.drop_oldest_hits;
let degrade_count = snap.degrade_quality_hits;
Ok((accumulated, tokens_emitted, drop_count, degrade_count))
}
fn sha256_hex(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
let digest = hasher.finalize();
let mut hex = String::with_capacity(digest.len() * 2);
for byte in digest.as_slice() {
use std::fmt::Write as _;
let _ = write!(hex, "{byte:02x}");
}
hex
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
use tokio::sync::mpsc;
fn fresh_ctx() -> (
DispatchCtx,
mpsc::UnboundedReceiver<FlowExecutionEvent>,
) {
let (tx, rx) = mpsc::unbounded_channel();
let ctx = DispatchCtx::new(
"TestFlow",
"stub",
"system prompt",
CancellationFlag::new(),
tx,
);
(ctx, rx)
}
#[test]
fn sha256_hex_empty_string_is_canonical() {
assert_eq!(
sha256_hex(""),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
}
#[test]
fn sha256_hex_stub_marker() {
let h = sha256_hex("(stub)");
assert_eq!(h.len(), 64);
assert!(h.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
}
#[tokio::test]
async fn run_step_with_stub_backend_emits_one_token() {
use crate::ir_nodes::IRStep;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "Generate".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let (mut ctx, mut rx) = fresh_ctx();
let outcome = run_step(&step, &mut ctx).await.expect("run_step ok");
match outcome {
NodeOutcome::Completed { output, tokens_emitted, step_index } => {
assert_eq!(output, "(stub)");
assert_eq!(tokens_emitted, 1);
assert_eq!(step_index, 0);
}
other => panic!("expected Completed, got {other:?}"),
}
let mut events = Vec::new();
while let Ok(ev) = rx.try_recv() {
events.push(ev);
}
assert_eq!(events.len(), 3, "events: {events:?}");
assert!(matches!(events[0], FlowExecutionEvent::StepStart { .. }));
assert!(matches!(events[1], FlowExecutionEvent::StepToken { .. }));
assert!(matches!(events[2], FlowExecutionEvent::StepComplete { .. }));
}
#[tokio::test]
async fn run_step_cancel_pre_dispatch_short_circuits() {
use crate::ir_nodes::IRStep;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "S".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let cancel = CancellationFlag::new();
cancel.cancel();
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
let outcome = run_step(&step, &mut ctx).await;
assert!(matches!(outcome, Err(DispatchError::UpstreamCancelled)));
}
#[tokio::test]
async fn run_step_unknown_backend_returns_backend_error() {
use crate::ir_nodes::IRStep;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "S".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new(
"F",
"does_not_exist",
"",
CancellationFlag::new(),
tx,
);
let outcome = run_step(&step, &mut ctx).await;
match outcome {
Err(DispatchError::BackendError { name, message }) => {
assert_eq!(name, "does_not_exist");
assert!(message.contains("not in streaming registry"));
}
other => panic!("expected BackendError, got {other:?}"),
}
}
#[tokio::test]
async fn run_step_pending_policy_consumed_on_entry() {
use crate::ir_nodes::IRStep;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "S".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let (mut ctx, _rx) = fresh_ctx();
ctx.pending_effect_policy = Some(BackpressurePolicy::DropOldest);
let _ = run_step(&step, &mut ctx).await.expect("ok");
assert!(
ctx.pending_effect_policy.is_none(),
"33.y.c contract: handler MUST consume pending_effect_policy on entry"
);
let summaries = ctx.enforcement_summaries.lock().await;
assert!(summaries.contains_key("S"));
assert_eq!(summaries["S"].policy_slug, "drop_oldest");
}
#[tokio::test]
async fn run_step_records_step_audit_row() {
use crate::ir_nodes::IRStep;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "Generate".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let (mut ctx, _rx) = fresh_ctx();
let _ = run_step(&step, &mut ctx).await.expect("ok");
let audit = ctx.step_audit_records.lock().await;
assert_eq!(audit.len(), 1);
assert_eq!(audit[0].step_name, "Generate");
assert_eq!(audit[0].tokens_emitted, 1);
assert!(audit[0].success);
assert_eq!(audit[0].output_hash_hex.len(), 64);
assert!(audit[0].effect_policy_applied.is_none());
}
#[tokio::test]
async fn run_probe_kind_slug_is_probe() {
use crate::ir_nodes::IRProbe;
let probe = IRProbe {
node_type: "probe",
source_line: 0,
source_column: 0,
target: "market_data".into(),
};
let (mut ctx, mut rx) = fresh_ctx();
let _ = run_probe(&probe, &mut ctx).await.expect("ok");
let ev = rx.try_recv().expect("event");
match ev {
FlowExecutionEvent::StepStart { step_type, step_name, .. } => {
assert_eq!(step_type, "probe");
assert_eq!(step_name, "market_data");
}
other => panic!("expected StepStart, got {other:?}"),
}
}
#[tokio::test]
async fn run_reason_kind_slug_is_reason() {
use crate::ir_nodes::IRReasonStep;
let reason = IRReasonStep {
node_type: "reason",
source_line: 0,
source_column: 0,
strategy: "chain_of_thought".into(),
target: "claim".into(),
};
let (mut ctx, mut rx) = fresh_ctx();
let _ = run_reason(&reason, &mut ctx).await.expect("ok");
let ev = rx.try_recv().expect("event");
match ev {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "reason");
}
other => panic!("expected StepStart, got {other:?}"),
}
}
#[tokio::test]
async fn run_validate_kind_slug_is_validate() {
use crate::ir_nodes::IRValidateStep;
let validate = IRValidateStep {
node_type: "validate",
source_line: 0,
source_column: 0,
target: "draft".into(),
rule: "no_pii".into(),
};
let (mut ctx, mut rx) = fresh_ctx();
let _ = run_validate(&validate, &mut ctx).await.expect("ok");
let ev = rx.try_recv().expect("event");
match ev {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "validate");
}
other => panic!("expected StepStart, got {other:?}"),
}
}
#[tokio::test]
async fn run_refine_kind_slug_is_refine() {
use crate::ir_nodes::IRRefineStep;
let refine = IRRefineStep {
node_type: "refine",
source_line: 0,
source_column: 0,
target: "draft".into(),
strategy: "tighten".into(),
};
let (mut ctx, mut rx) = fresh_ctx();
let _ = run_refine(&refine, &mut ctx).await.expect("ok");
let ev = rx.try_recv().expect("event");
match ev {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "refine");
}
other => panic!("expected StepStart, got {other:?}"),
}
}
#[tokio::test]
async fn run_weave_kind_slug_is_weave() {
use crate::ir_nodes::IRWeaveStep;
let weave = IRWeaveStep {
node_type: "weave",
source_line: 0,
source_column: 0,
sources: vec!["A".into(), "B".into()],
target: "report".into(),
format_type: "markdown".into(),
priority: vec!["A".into()],
style: "formal".into(),
};
let (mut ctx, mut rx) = fresh_ctx();
let _ = run_weave(&weave, &mut ctx).await.expect("ok");
let ev = rx.try_recv().expect("event");
match ev {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "weave");
}
other => panic!("expected StepStart, got {other:?}"),
}
}
}