use crate::effects::ir::{IRPerform, Instruction};
use crate::effects::{EffectRuntime, ExecutionResult, Value};
use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
use crate::ir_nodes::IRStreamBlock;
pub async fn run_stream(
_node: &IRStreamBlock,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = "Stream".to_string();
ctx.tx
.send(FlowExecutionEvent::StepStart {
step_name: step_name.clone(),
step_index,
step_type: "stream".to_string(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
ctx.tx
.send(FlowExecutionEvent::StepComplete {
step_name,
step_index,
success: true,
full_output: String::new(),
tokens_input: 0,
tokens_output: 0,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
Ok(NodeOutcome::Completed {
output: String::new(),
tokens_emitted: 0,
step_index,
})
}
pub async fn bridge_effect_stream_yield(
instructions: &[Instruction],
runtime: &mut EffectRuntime,
step_name: &str,
ctx: &mut DispatchCtx,
) -> Result<ExecutionResult, DispatchError> {
use sha2::{Digest, Sha256};
use std::fmt::Write as _;
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let yields = scan_stream_yields(instructions);
let mut accumulated = String::new();
for (token_index, perf) in yields.iter().enumerate() {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let resolved_value = resolve_first_argument(perf, runtime);
let wire_content = value_to_wire_string(&resolved_value);
if !wire_content.is_empty() {
accumulated.push_str(&wire_content);
}
ctx.tx
.send(FlowExecutionEvent::StepToken {
step_name: step_name.to_string(),
content: wire_content,
token_index: (token_index as u64) + 1,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)?;
}
let result = runtime
.run(instructions)
.map_err(|e| DispatchError::BackendError {
name: "algebraic_effects".to_string(),
message: format!("{e:?}"),
})?;
{
let mut hasher = Sha256::new();
hasher.update(accumulated.as_bytes());
let digest = hasher.finalize();
let mut output_hash_hex = String::with_capacity(digest.len() * 2);
for byte in digest.as_slice() {
let _ = write!(output_hash_hex, "{byte:02x}");
}
let tool_output_hash_hex = output_hash_hex.clone();
let record = crate::axonendpoint_replay::StepAuditRecord {
step_name: step_name.to_string(),
step_index: ctx.step_counter,
success: true,
tokens_emitted: yields.len() as u64,
output_hash_hex,
effect_policy_applied: None,
chunks_dropped: 0,
chunks_degraded: 0,
timestamp_ms: now_ms(),
tool_name: None,
tool_chunks_emitted: Some(yields.len() as u64),
tool_output_hash_hex: Some(tool_output_hash_hex),
tool_terminator_kind: Some("stop".to_string()),
};
let mut guard = ctx.step_audit_records.lock().await;
guard.push(record);
}
Ok(result)
}
pub async fn bridge_effect_stream_yield_unified(
instructions: &[Instruction],
runtime: &mut EffectRuntime,
step_name: &str,
policy: Option<crate::stream_effect::BackpressurePolicy>,
ctx: &mut DispatchCtx,
) -> Result<ExecutionResult, DispatchError> {
use crate::tool_trait::{ToolChunk, ToolFinishReason};
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let yields = scan_stream_yields(instructions);
let mut tool_chunks: Vec<ToolChunk> =
Vec::with_capacity(yields.len().saturating_add(1));
for perf in &yields {
let resolved_value = resolve_first_argument(perf, runtime);
let wire_content = value_to_wire_string(&resolved_value);
tool_chunks.push(ToolChunk::intermediate(wire_content));
}
tool_chunks.push(ToolChunk::terminator("", ToolFinishReason::Stop));
let source = crate::flow_dispatcher::unified_stream::unified_stream_from_chunks(tool_chunks);
let summary = crate::flow_dispatcher::unified_stream::unified_stream_handler(
source,
policy,
&ctx.cancel,
&ctx.tx,
step_name,
)
.await?;
if summary.cancelled && ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let result = runtime
.run(instructions)
.map_err(|e| DispatchError::BackendError {
name: "algebraic_effects".to_string(),
message: format!("{e:?}"),
})?;
{
let terminator_kind = if summary.cancelled {
"cancelled"
} else if summary.terminator_message.is_some() {
"error"
} else {
"stop"
};
let tool_hash = summary.output_hash_hex.clone();
let record = crate::axonendpoint_replay::StepAuditRecord {
step_name: step_name.to_string(),
step_index: ctx.step_counter,
success: summary.success,
tokens_emitted: summary.tokens_emitted,
output_hash_hex: summary.output_hash_hex,
effect_policy_applied: policy.map(|p| p.slug().to_string()),
chunks_dropped: summary.chunks_dropped,
chunks_degraded: summary.chunks_degraded,
timestamp_ms: now_ms(),
tool_name: None,
tool_chunks_emitted: Some(summary.chunks_pushed),
tool_output_hash_hex: Some(tool_hash),
tool_terminator_kind: Some(terminator_kind.to_string()),
};
let mut guard = ctx.step_audit_records.lock().await;
guard.push(record);
}
if let Some(message) = summary.terminator_message {
return Err(DispatchError::BackendError {
name: "algebraic_effects".to_string(),
message,
});
}
Ok(result)
}
fn scan_stream_yields(instructions: &[Instruction]) -> Vec<IRPerform> {
let mut out = Vec::new();
walk(instructions, &mut out);
out
}
fn walk(instructions: &[Instruction], out: &mut Vec<IRPerform>) {
for instr in instructions {
match instr {
Instruction::Perform(p) => {
if p.effect_name == "Stream" && p.operation_name == "Yield" {
out.push(p.clone());
}
}
Instruction::HandlerFrame(frame) => {
walk(&frame.body, out);
for clause in &frame.clauses {
walk(&clause.body, out);
}
}
Instruction::Resume(_)
| Instruction::Abort(_)
| Instruction::Forward(_)
| Instruction::Passthrough => {}
}
}
}
fn resolve_first_argument(perf: &IRPerform, runtime: &EffectRuntime) -> Value {
let Some(first) = perf.arguments.first() else {
return Value::Unit;
};
let v = Value::from_argument_text(first);
if let Value::Symbol(name) = &v {
if let Some(bound) = runtime.effects().get(name).map(|_| ()) {
let _ = bound;
}
let _ = name;
}
v
}
fn value_to_wire_string(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
Value::Int(n) => n.to_string(),
Value::Float(f) => f.to_string(),
Value::Bool(b) => b.to_string(),
Value::Unit => String::new(),
Value::Symbol(name) => {
let trimmed = name.trim();
if trimmed.len() >= 2 && trimmed.starts_with('"') && trimmed.ends_with('"') {
trimmed[1..trimmed.len() - 1].to_string()
} else {
name.clone()
}
}
Value::List(items) => {
let parts: Vec<String> = items.iter().map(value_to_wire_string).collect();
format!("[{}]", parts.join(", "))
}
Value::Map(fields) => {
let parts: Vec<String> = fields
.iter()
.map(|(k, v)| format!("{k}: {}", value_to_wire_string(v)))
.collect();
format!("{{{}}}", parts.join(", "))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
use crate::effects::ir::{IRHandlerClause, IRHandlerFrame, IRPerform};
use tokio::sync::mpsc;
fn fresh_ctx() -> (
DispatchCtx,
mpsc::UnboundedReceiver<FlowExecutionEvent>,
) {
let (tx, rx) = mpsc::unbounded_channel();
let ctx = DispatchCtx::new(
"TestFlow",
"stub",
"",
CancellationFlag::new(),
tx,
);
(ctx, rx)
}
fn perform_yield(arg: &str) -> Instruction {
Instruction::Perform(IRPerform {
effect_name: "Stream".into(),
operation_name: "Yield".into(),
arguments: vec![arg.into()],
state_id: 0,
resume_label: String::new(),
})
}
#[tokio::test]
async fn run_stream_emits_canonical_wire_shape() {
let (mut ctx, mut rx) = fresh_ctx();
let node = IRStreamBlock {
node_type: "stream_block",
source_line: 0,
source_column: 0,
};
let outcome = run_stream(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed {
output,
tokens_emitted,
step_index,
} => {
assert_eq!(output, "");
assert_eq!(tokens_emitted, 0);
assert_eq!(step_index, 0);
}
other => panic!("expected Completed, got {other:?}"),
}
let mut events = Vec::new();
while let Ok(ev) = rx.try_recv() {
events.push(ev);
}
assert_eq!(events.len(), 2);
match &events[0] {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "stream");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[test]
fn scan_stream_yields_finds_top_level_perform() {
let block = vec![
perform_yield("\"hello\""),
perform_yield("\"world\""),
];
let found = scan_stream_yields(&block);
assert_eq!(found.len(), 2);
assert_eq!(found[0].arguments[0], "\"hello\"");
assert_eq!(found[1].arguments[0], "\"world\"");
}
#[test]
fn scan_stream_yields_ignores_non_stream_performs() {
let block = vec![
Instruction::Perform(IRPerform {
effect_name: "OtherEffect".into(),
operation_name: "Yield".into(),
arguments: vec!["x".into()],
state_id: 0,
resume_label: String::new(),
}),
Instruction::Perform(IRPerform {
effect_name: "Stream".into(),
operation_name: "OtherOp".into(),
arguments: vec!["y".into()],
state_id: 0,
resume_label: String::new(),
}),
];
let found = scan_stream_yields(&block);
assert!(found.is_empty());
}
#[test]
fn scan_stream_yields_recurses_into_handler_frame_bodies() {
let block = vec![Instruction::HandlerFrame(IRHandlerFrame {
effect_names: vec!["Inner".into()],
clauses: vec![IRHandlerClause {
operation_name: "Op".into(),
parameter_names: vec!["v".into()],
body: vec![perform_yield("\"nested-in-clause\"")],
source_line: 0,
source_column: 0,
}],
body: vec![perform_yield("\"nested-in-frame-body\"")],
frame_id: 0,
body_states: Vec::new(),
source_line: 0,
source_column: 0,
})];
let found = scan_stream_yields(&block);
assert_eq!(found.len(), 2);
}
#[tokio::test]
async fn bridge_emits_step_token_per_static_yield() {
let (mut ctx, mut rx) = fresh_ctx();
let block = vec![
perform_yield("\"first\""),
perform_yield("\"second\""),
perform_yield("\"third\""),
];
let mut runtime = EffectRuntime::new();
let wrapped = vec![Instruction::HandlerFrame(IRHandlerFrame {
effect_names: vec!["Stream".into()],
clauses: vec![IRHandlerClause {
operation_name: "Yield".into(),
parameter_names: vec!["v".into()],
body: vec![Instruction::Resume(crate::effects::ir::IRResume {
value_expr: String::new(),
frame_id: 0,
})],
source_line: 0,
source_column: 0,
}],
body: block,
frame_id: 0,
body_states: Vec::new(),
source_line: 0,
source_column: 0,
})];
let result = bridge_effect_stream_yield(&wrapped, &mut runtime, "S", &mut ctx)
.await
.unwrap();
let mut tokens = Vec::new();
while let Ok(ev) = rx.try_recv() {
if let FlowExecutionEvent::StepToken { content, .. } = ev {
tokens.push(content);
}
}
assert_eq!(tokens.len(), 3);
assert_eq!(tokens[0], "first");
assert_eq!(tokens[1], "second");
assert_eq!(tokens[2], "third");
match result {
ExecutionResult::Completed(_) => {}
other => panic!("expected runtime Completed, got {other:?}"),
}
let audit = ctx.step_audit_records.lock().await;
assert_eq!(audit.len(), 1);
assert_eq!(audit[0].tokens_emitted, 3);
assert_eq!(audit[0].step_name, "S");
}
#[tokio::test]
async fn bridge_cancel_short_circuits_before_emission() {
let cancel = CancellationFlag::new();
cancel.cancel();
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
let block = vec![perform_yield("\"x\"")];
let mut runtime = EffectRuntime::new();
let outcome = bridge_effect_stream_yield(&block, &mut runtime, "S", &mut ctx).await;
assert!(matches!(outcome, Err(DispatchError::UpstreamCancelled)));
}
#[test]
fn value_to_wire_string_canonical_forms() {
assert_eq!(value_to_wire_string(&Value::String("hi".into())), "hi");
assert_eq!(value_to_wire_string(&Value::Int(42)), "42");
assert_eq!(value_to_wire_string(&Value::Bool(true)), "true");
assert_eq!(value_to_wire_string(&Value::Unit), "");
assert_eq!(value_to_wire_string(&Value::Symbol("foo".into())), "foo");
assert_eq!(
value_to_wire_string(&Value::List(vec![
Value::Int(1),
Value::Int(2),
])),
"[1, 2]"
);
}
}