use crate::ir_nodes::{IRFlow, IRFlowNode, IRProgram};
use crate::stream_effect::BackpressurePolicy;
use crate::stream_effect_dispatcher::resolve_stream_effect_for_step;
pub fn compile_source_to_ir(
source: &str,
source_file: &str,
) -> Result<(crate::ast::Program, IRProgram), PlanError> {
let tokens = crate::lexer::Lexer::new(source, source_file)
.tokenize()
.map_err(|e| PlanError::Parse(format!("lex error: {e:?}")))?;
let mut parser = crate::parser::Parser::new(tokens);
let program = parser.parse().map_err(|e| PlanError::Parse(e.message))?;
let mut checker = crate::type_checker::TypeChecker::new(&program);
let type_errors = checker.check();
if !type_errors.is_empty() {
return Err(PlanError::TypeCheck(
type_errors.into_iter().map(|e| e.message).collect(),
));
}
let ir = crate::ir_generator::IRGenerator::new().generate(&program);
Ok((program, ir))
}
pub fn find_ir_flow_by_name<'a>(
ir: &'a IRProgram,
flow_name: &str,
) -> Result<&'a IRFlow, PlanError> {
ir.flows
.iter()
.find(|f| f.name == flow_name)
.ok_or_else(|| PlanError::FlowNotFound {
flow_name: flow_name.to_string(),
available: ir.flows.iter().map(|f| f.name.clone()).collect(),
})
}
pub fn ir_flow_node_kind(node: &IRFlowNode) -> &'static str {
match node {
IRFlowNode::Step(_) => "step",
IRFlowNode::Probe(_) => "probe",
IRFlowNode::Reason(_) => "reason",
IRFlowNode::Validate(_) => "validate",
IRFlowNode::Refine(_) => "refine",
IRFlowNode::Weave(_) => "weave",
IRFlowNode::UseTool(_) => "use_tool",
IRFlowNode::Remember(_) => "remember",
IRFlowNode::Recall(_) => "recall",
IRFlowNode::Conditional(_) => "conditional",
IRFlowNode::ForIn(_) => "for_in",
IRFlowNode::Let(_) => "let",
IRFlowNode::Return(_) => "return",
IRFlowNode::Break(_) => "break",
IRFlowNode::Continue(_) => "continue",
IRFlowNode::LambdaDataApply(_) => "lambda_data_apply",
IRFlowNode::Par(_) => "par",
IRFlowNode::Hibernate(_) => "hibernate",
IRFlowNode::Deliberate(_) => "deliberate",
IRFlowNode::Consensus(_) => "consensus",
IRFlowNode::Forge(_) => "forge",
IRFlowNode::Focus(_) => "focus",
IRFlowNode::Associate(_) => "associate",
IRFlowNode::Aggregate(_) => "aggregate",
IRFlowNode::Explore(_) => "explore",
IRFlowNode::Ingest(_) => "ingest",
IRFlowNode::ShieldApply(_) => "shield_apply",
IRFlowNode::Stream(_) => "stream_block",
IRFlowNode::Navigate(_) => "navigate",
IRFlowNode::Drill(_) => "drill",
IRFlowNode::Trail(_) => "trail",
IRFlowNode::Corroborate(_) => "corroborate",
IRFlowNode::OtsApply(_) => "ots_apply",
IRFlowNode::MandateApply(_) => "mandate_apply",
IRFlowNode::ComputeApply(_) => "compute_apply",
IRFlowNode::Listen(_) => "listen",
IRFlowNode::DaemonStep(_) => "daemon_step",
IRFlowNode::Emit(_) => "emit",
IRFlowNode::Publish(_) => "publish",
IRFlowNode::Discover(_) => "discover",
IRFlowNode::Persist(_) => "persist",
IRFlowNode::Retrieve(_) => "retrieve",
IRFlowNode::Mutate(_) => "mutate",
IRFlowNode::Purge(_) => "purge",
IRFlowNode::Transact(_) => "transact",
}
}
pub fn compose_system_prompt_public(
flow: &IRFlow,
ir: &IRProgram,
backend_tag: Option<&str>,
) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(persona) = ir.personas.first() {
parts.push(format!("# Persona: {}", persona.name));
if !persona.domain.is_empty() {
parts.push(format!("Domain expertise: {}", persona.domain.join(", ")));
}
if !persona.tone.is_empty() {
parts.push(format!("Communication tone: {}", persona.tone));
}
if !persona.language.is_empty() {
parts.push(format!("Language: {}", persona.language));
}
if let Some(ct) = persona.confidence_threshold {
parts.push(format!("Confidence threshold: {ct:.2}"));
}
if persona.cite_sources == Some(true) {
parts.push("Always cite sources.".to_string());
}
if !persona.refuse_if.is_empty() {
parts.push(format!("Refuse if: {}", persona.refuse_if.join(", ")));
}
}
if let Some(ctx) = ir.contexts.first() {
parts.push(format!("\n# Context: {}", ctx.name));
if !ctx.depth.is_empty() {
parts.push(format!("Analysis depth: {}", ctx.depth));
}
if !ctx.memory_scope.is_empty() {
parts.push(format!("Memory scope: {}", ctx.memory_scope));
}
if let Some(t) = ctx.temperature {
parts.push(format!("Temperature: {t:.1}"));
}
if let Some(mt) = ctx.max_tokens {
parts.push(format!("Max tokens: {mt}"));
}
}
parts.push(format!("\n# Flow: {}", flow.name));
if let Some(tag) = backend_tag {
parts.push(format!("\n[Backend: {tag} | AXON {}]", env!("CARGO_PKG_VERSION")));
}
parts.join("\n")
}
#[derive(Debug, Clone, PartialEq)]
pub struct StreamingStep {
pub step_name: String,
pub user_prompt: String,
pub max_tokens: Option<u32>,
pub temperature: Option<f64>,
pub effect_policy: Option<BackpressurePolicy>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct StreamingExecutionPlan {
pub flow_name: String,
pub backend_name: String,
pub system_prompt: String,
pub steps: Vec<StreamingStep>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PlanError {
Parse(String),
TypeCheck(Vec<String>),
IrGeneration(String),
FlowNotFound { flow_name: String, available: Vec<String> },
}
#[derive(Debug, Clone, PartialEq)]
pub enum PlanFallback {
AnchorConstraintsPresent,
LambdaApplyPresent,
LetBindingPresent,
UseToolPresent,
HibernatePresent,
PixPresent,
UnsupportedNode {
kind: &'static str,
},
}
impl PlanFallback {
pub fn slug(&self) -> &'static str {
match self {
Self::AnchorConstraintsPresent => "anchor_constraints",
Self::LambdaApplyPresent => "lambda_apply",
Self::LetBindingPresent => "let_binding",
Self::UseToolPresent => "use_tool",
Self::HibernatePresent => "hibernate",
Self::PixPresent => "pix",
Self::UnsupportedNode { .. } => "unsupported_node",
}
}
}
pub fn build_streaming_plan(
source: &str,
source_file: &str,
flow_name: &str,
backend_name: &str,
) -> Result<StreamingExecutionPlan, PlanError> {
let (program, ir) = compile_source_to_ir(source, source_file)?;
build_plan_from_ir(&ir, &program, flow_name, backend_name)
}
pub fn build_plan_from_ir(
ir: &IRProgram,
program: &crate::ast::Program,
flow_name: &str,
backend_name: &str,
) -> Result<StreamingExecutionPlan, PlanError> {
let flow = find_ir_flow_by_name(ir, flow_name)?;
let ast_flow = program
.declarations
.iter()
.find_map(|d| match d {
crate::ast::Declaration::Flow(f) if f.name == flow_name => Some(f),
_ => None,
})
.ok_or_else(|| PlanError::FlowNotFound {
flow_name: flow_name.to_string(),
available: ir.flows.iter().map(|f| f.name.clone()).collect(),
})?;
let system_prompt = compose_system_prompt_public(flow, ir, None);
let mut steps = Vec::new();
for node in &flow.steps {
if let crate::ir_nodes::IRFlowNode::Step(ir_step) = node {
let max_tokens = ir
.contexts
.first()
.and_then(|c| c.max_tokens)
.map(|n| n as u32);
let temperature = ir.contexts.first().and_then(|c| c.temperature);
let effect_policy =
resolve_stream_effect_for_step(&ir_step.name, ast_flow, program);
steps.push(StreamingStep {
step_name: ir_step.name.clone(),
user_prompt: ir_step.ask.clone(),
max_tokens,
temperature,
effect_policy,
});
}
}
Ok(StreamingExecutionPlan {
flow_name: flow_name.to_string(),
backend_name: backend_name.to_string(),
system_prompt,
steps,
})
}
#[cfg(test)]
mod tests {
use super::*;
fn parse_simple_stream_source() -> &'static str {
"flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" output: Stream<Token> }\n\
}\n\
axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
}
fn parse_stream_with_effect_source() -> &'static str {
"tool chat_token_stream { description: \"stream\" effects: <stream:drop_oldest> }\n\
flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" apply: chat_token_stream }\n\
}\n\
axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
}
#[test]
fn build_plan_for_simple_stream_flow_returns_one_step() {
let plan = build_streaming_plan(
parse_simple_stream_source(),
"test.axon",
"Chat",
"stub",
)
.expect("simple stream flow plans cleanly");
assert_eq!(plan.flow_name, "Chat");
assert_eq!(plan.backend_name, "stub");
assert_eq!(plan.steps.len(), 1);
assert_eq!(plan.steps[0].step_name, "Generate");
assert_eq!(plan.steps[0].user_prompt, "hi");
assert_eq!(plan.steps[0].effect_policy, None, "no tool effect → no policy");
}
#[test]
fn build_plan_with_drop_oldest_effect_pre_resolves_policy() {
let plan = build_streaming_plan(
parse_stream_with_effect_source(),
"test.axon",
"Chat",
"stub",
)
.expect("stream-effect flow plans cleanly");
assert_eq!(plan.steps.len(), 1);
assert_eq!(plan.steps[0].effect_policy, Some(BackpressurePolicy::DropOldest));
}
#[test]
fn build_plan_unknown_flow_returns_flow_not_found() {
let err = build_streaming_plan(
parse_simple_stream_source(),
"test.axon",
"NonexistentFlow",
"stub",
)
.expect_err("unknown flow rejected");
match err {
PlanError::FlowNotFound { flow_name, available } => {
assert_eq!(flow_name, "NonexistentFlow");
assert_eq!(available, vec!["Chat".to_string()]);
}
other => panic!("expected FlowNotFound, got {other:?}"),
}
}
#[test]
fn build_plan_unparseable_source_returns_parse_error() {
let err = build_streaming_plan(
"not a valid axon source",
"test.axon",
"Chat",
"stub",
)
.expect_err("garbage rejected");
assert!(matches!(err, PlanError::Parse(_)));
}
#[test]
fn build_plan_multi_step_flow_preserves_order() {
let src = "flow MultiStep() -> Unit {\n\
step First { ask: \"one\" }\n\
step Second { ask: \"two\" }\n\
step Third { ask: \"three\" }\n\
}\n\
axonendpoint E { method: POST path: \"/m\" execute: MultiStep transport: sse }";
let plan = build_streaming_plan(src, "test.axon", "MultiStep", "stub").unwrap();
assert_eq!(plan.steps.len(), 3);
assert_eq!(plan.steps[0].step_name, "First");
assert_eq!(plan.steps[1].step_name, "Second");
assert_eq!(plan.steps[2].step_name, "Third");
assert_eq!(plan.steps[0].user_prompt, "one");
assert_eq!(plan.steps[1].user_prompt, "two");
assert_eq!(plan.steps[2].user_prompt, "three");
}
#[test]
fn plan_fallback_slugs_are_stable_strings() {
assert_eq!(PlanFallback::AnchorConstraintsPresent.slug(), "anchor_constraints");
assert_eq!(PlanFallback::LambdaApplyPresent.slug(), "lambda_apply");
assert_eq!(PlanFallback::LetBindingPresent.slug(), "let_binding");
assert_eq!(PlanFallback::UseToolPresent.slug(), "use_tool");
assert_eq!(PlanFallback::HibernatePresent.slug(), "hibernate");
assert_eq!(PlanFallback::PixPresent.slug(), "pix");
}
#[test]
fn plan_is_deterministic_for_same_source() {
let plan1 = build_streaming_plan(
parse_simple_stream_source(),
"test.axon",
"Chat",
"stub",
)
.unwrap();
let plan2 = build_streaming_plan(
parse_simple_stream_source(),
"test.axon",
"Chat",
"stub",
)
.unwrap();
assert_eq!(plan1, plan2, "plan builder is pure + deterministic");
}
#[test]
fn streaming_step_eq_is_field_wise() {
let a = StreamingStep {
step_name: "X".into(),
user_prompt: "y".into(),
max_tokens: Some(100),
temperature: Some(0.7),
effect_policy: Some(BackpressurePolicy::DropOldest),
};
let b = a.clone();
assert_eq!(a, b);
}
#[test]
fn streaming_plan_includes_backend_name() {
let plan = build_streaming_plan(
parse_simple_stream_source(),
"test.axon",
"Chat",
"anthropic",
)
.unwrap();
assert_eq!(plan.backend_name, "anthropic");
}
#[test]
fn empty_flow_body_produces_empty_step_list() {
let src = "flow Empty() -> Unit {\n\
}\n\
axonendpoint E { method: POST path: \"/e\" execute: Empty transport: sse }";
let plan = build_streaming_plan(src, "test.axon", "Empty", "stub").unwrap();
assert!(plan.steps.is_empty());
assert_eq!(plan.flow_name, "Empty");
}
#[test]
fn compile_source_to_ir_returns_program_and_ir_for_valid_source() {
let (program, ir) =
compile_source_to_ir(parse_simple_stream_source(), "test.axon").unwrap();
let flow_count = program
.declarations
.iter()
.filter(|d| matches!(d, crate::ast::Declaration::Flow(_)))
.count();
assert_eq!(flow_count, 1);
assert_eq!(ir.flows.len(), 1);
assert_eq!(ir.flows[0].name, "Chat");
}
#[test]
fn compile_source_to_ir_is_pure_deterministic() {
let src = parse_simple_stream_source();
let (p1, ir1) = compile_source_to_ir(src, "test.axon").unwrap();
let (p2, ir2) = compile_source_to_ir(src, "test.axon").unwrap();
assert_eq!(p1.declarations.len(), p2.declarations.len());
assert_eq!(ir1.flows.len(), ir2.flows.len());
assert_eq!(ir1.flows[0].name, ir2.flows[0].name);
}
#[test]
fn compile_source_to_ir_surfaces_parse_error() {
let err = compile_source_to_ir("not axon source at all", "test.axon").unwrap_err();
assert!(matches!(err, PlanError::Parse(_)));
}
#[test]
fn compile_source_to_ir_surfaces_type_check_error() {
let src = "axonendpoint Bad { method: POST path: \"/x\" execute: NonexistentFlow }";
let err = compile_source_to_ir(src, "test.axon").unwrap_err();
assert!(matches!(err, PlanError::Parse(_) | PlanError::TypeCheck(_)));
}
#[test]
fn find_ir_flow_by_name_returns_flow_when_present() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
assert_eq!(flow.name, "Chat");
}
#[test]
fn find_ir_flow_by_name_returns_flow_not_found_with_available_list() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let err = find_ir_flow_by_name(&ir, "Nope").unwrap_err();
match err {
PlanError::FlowNotFound { flow_name, available } => {
assert_eq!(flow_name, "Nope");
assert_eq!(available, vec!["Chat".to_string()]);
}
other => panic!("expected FlowNotFound, got {other:?}"),
}
}
#[test]
fn ir_flow_node_kind_step_returns_step() {
use crate::ir_nodes::{IRFlowNode, IRStep};
let n = IRFlowNode::Step(IRStep {
node_type: "Step",
source_line: 1,
source_column: 1,
name: "S".into(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".into(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: vec![],
});
assert_eq!(ir_flow_node_kind(&n), "step");
}
#[test]
fn ir_flow_node_kind_distinct_slugs_for_every_variant() {
let expected_slugs: Vec<&str> = vec![
"step",
"probe",
"reason",
"validate",
"refine",
"weave",
"use_tool",
"remember",
"recall",
"conditional",
"for_in",
"let",
"return",
"break",
"continue",
"lambda_data_apply",
"par",
"hibernate",
"deliberate",
"consensus",
"forge",
"focus",
"associate",
"aggregate",
"explore",
"ingest",
"shield_apply",
"stream_block",
"navigate",
"drill",
"trail",
"corroborate",
"ots_apply",
"mandate_apply",
"compute_apply",
"listen",
"daemon_step",
"emit",
"publish",
"discover",
"persist",
"retrieve",
"mutate",
"purge",
"transact",
];
assert_eq!(expected_slugs.len(), 45);
for slug in &expected_slugs {
assert!(
slug.chars()
.all(|c| c.is_ascii_lowercase() || c == '_' || c.is_ascii_digit()),
"slug {slug:?} must be lowercase snake_case for stable audit emission"
);
}
let mut sorted = expected_slugs.clone();
sorted.sort();
sorted.dedup();
assert_eq!(sorted.len(), expected_slugs.len(), "slug catalog has duplicates");
}
#[test]
fn ir_flow_node_kind_runner_drift() {
use crate::ir_nodes::{IRFlowNode, IRStep};
let step = IRFlowNode::Step(IRStep {
node_type: "Step",
source_line: 1,
source_column: 1,
name: "T".into(),
persona_ref: String::new(),
given: String::new(),
ask: String::new(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: vec![],
});
assert_eq!(ir_flow_node_kind(&step), "step");
}
#[test]
fn compose_system_prompt_public_includes_flow_name() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let prompt = compose_system_prompt_public(flow, &ir, None);
assert!(prompt.contains("# Flow: Chat"));
}
#[test]
fn compose_system_prompt_public_omits_backend_tag_when_none() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let prompt = compose_system_prompt_public(flow, &ir, None);
assert!(!prompt.contains("[Backend:"));
assert!(!prompt.contains("AXON"));
}
#[test]
fn compose_system_prompt_public_includes_backend_tag_when_set() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let prompt = compose_system_prompt_public(flow, &ir, Some("anthropic"));
assert!(prompt.contains("[Backend: anthropic | AXON "));
}
#[test]
fn compose_system_prompt_public_includes_persona_when_present() {
let src = "persona Doctor { domain: [\"medicine\"] tone: \"formal\" }\n\
context Clinic { depth: \"deep\" memory_scope: \"session\" }\n\
flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" output: Stream<Token> }\n\
}\n\
axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
let (_p, ir) = compile_source_to_ir(src, "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let prompt = compose_system_prompt_public(flow, &ir, None);
assert!(prompt.contains("# Persona: Doctor"));
assert!(prompt.contains("Domain expertise: medicine"));
assert!(prompt.contains("# Context: Clinic"));
assert!(prompt.contains("Analysis depth: deep"));
}
#[test]
fn compose_system_prompt_public_is_pure_deterministic() {
let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let p1 = compose_system_prompt_public(flow, &ir, Some("openai"));
let p2 = compose_system_prompt_public(flow, &ir, Some("openai"));
assert_eq!(p1, p2);
}
#[test]
fn build_streaming_plan_uses_compile_source_to_ir_internally() {
let src = parse_simple_stream_source();
let plan = build_streaming_plan(src, "t.axon", "Chat", "stub").unwrap();
let (_program, ir) = compile_source_to_ir(src, "t.axon").unwrap();
let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
let expected = compose_system_prompt_public(flow, &ir, None);
assert_eq!(plan.system_prompt, expected);
}
}