axon/flow_plan.rs
1//! §Fase 33.x.b + 33.x.c — Streaming execution plan extraction +
2//! unified `.axon` source compilation pipeline.
3//!
4//! Builds a streaming-shaped execution plan from `.axon` source for
5//! the production async SSE path. Pre-resolves per-step
6//! [`BackpressurePolicy`] via
7//! [`crate::stream_effect_dispatcher::resolve_stream_effect_for_step`]
8//! so the hot per-chunk loop in
9//! `crate::axon_server::server_execute_streaming_async` does NOT
10//! re-walk the AST per chunk.
11//!
12//! # Scope boundary (33.x.b)
13//!
14//! 33.x.b ships the SIMPLEST step shape (single `ask`, no anchors,
15//! no `apply: lambda`, no `let` bindings, no mid-stream `use_tool`).
16//! This is exactly the canonical Kivi/Stream-effect adopter shape
17//! the diagnostic anchor pins. Flows that use the omitted features
18//! still execute correctly on the legacy synchronous path; the
19//! streaming path falls back via [`PlanFallback::LegacyOrchestration`]
20//! and the SSE handler routes through `server_execute_full` (the
21//! pre-33.x.b path). NO silent degradation — the fallback is
22//! observable in plan output so adopter diagnostics can flag it.
23//!
24//! # Lift target (33.x.c)
25//!
26//! `runner::build_execution_plan` (private, sync) stays unchanged in
27//! 33.x.b. 33.x.c unifies the two builders. Until then, this module
28//! is the SOLE source of truth for streaming-path plan extraction;
29//! `runner.rs` is the sole source of truth for sync-path execution.
30//!
31//! # D-letter anchors
32//!
33//! - **D1** — every streaming-path flow resolves a [`StreamingExecutionPlan`]
34//! here before dispatching to `Backend::stream()`. The plan's
35//! `backend_name` is the resolved provider name; `steps[].effect_policy`
36//! pre-resolves the declared `<stream:<policy>>` for the
37//! `StreamPolicyEnforcer` wrap in 33.x.d.
38//! - **D4** — the plan structure carries NO new wire-shape fields;
39//! it is internal scaffolding that produces the same
40//! FlowExecutionEvent sequence as the pre-33.x.b path for the
41//! adopter shapes 33.x.b supports.
42//! - **D5** — fallback to legacy orchestration is observable via
43//! [`PlanFallback`] so 33.x.g can hook `axon-W002`-style warnings.
44
45use crate::ir_nodes::{IRFlow, IRFlowNode, IRProgram};
46use crate::stream_effect::BackpressurePolicy;
47use crate::stream_effect_dispatcher::resolve_stream_effect_for_step;
48
49// ────────────────────────────────────────────────────────────────────
50// §33.x.c — Shared source-compilation pipeline
51// ────────────────────────────────────────────────────────────────────
52
53/// Compile an `.axon` source string into its AST [`crate::ast::Program`]
54/// + IR [`IRProgram`] forms. Both stacks-of-truth are returned so
55/// downstream callers can do AST-level walking (effect-row resolution,
56/// route table extraction) AND IR-level execution planning without
57/// re-running the pipeline.
58///
59/// # Stages
60///
61/// 1. Lex via [`crate::lexer::Lexer`]
62/// 2. Parse via [`crate::parser::Parser`]
63/// 3. Type-check via [`crate::type_checker::TypeChecker`]
64/// 4. IR generation via [`crate::ir_generator::IRGenerator`]
65///
66/// Each stage's failure surfaces as a structured [`PlanError`] variant
67/// so the caller distinguishes "user source is malformed" from "the
68/// pipeline encountered an internal invariant violation".
69///
70/// # Performance
71///
72/// This is the canonical source-compilation entry point on the Fase
73/// 33.x.b production async streaming path (called once per
74/// flow-invocation). The legacy `axon_server.rs` call sites still
75/// inline their own pipelines; 33.x.i (mono-file `crate::backend`
76/// retirement) is when the deeper migration completes.
77///
78/// # Purity
79///
80/// Pure + deterministic. No I/O. No global state. Same source +
81/// source_file → same `(Program, IRProgram)` byte-for-byte.
82pub fn compile_source_to_ir(
83 source: &str,
84 source_file: &str,
85) -> Result<(crate::ast::Program, IRProgram), PlanError> {
86 let tokens = crate::lexer::Lexer::new(source, source_file)
87 .tokenize()
88 .map_err(|e| PlanError::Parse(format!("lex error: {e:?}")))?;
89
90 let mut parser = crate::parser::Parser::new(tokens);
91 let program = parser.parse().map_err(|e| PlanError::Parse(e.message))?;
92
93 let mut checker = crate::type_checker::TypeChecker::new(&program);
94 let type_errors = checker.check();
95 if !type_errors.is_empty() {
96 return Err(PlanError::TypeCheck(
97 type_errors.into_iter().map(|e| e.message).collect(),
98 ));
99 }
100
101 let ir = crate::ir_generator::IRGenerator::new().generate(&program);
102 Ok((program, ir))
103}
104
105/// Locate an [`IRFlow`] by name. Returns a structured [`PlanError`]
106/// with the list of available flow names so the diagnostic message
107/// is adopter-actionable (typo-detection on first read).
108///
109/// # Used by
110///
111/// * [`build_plan_from_ir`] — the streaming-path planner.
112/// * Future 33.x.i migrations of the legacy axon_server.rs sync
113/// path will adopt this helper for byte-identical diagnostics.
114pub fn find_ir_flow_by_name<'a>(
115 ir: &'a IRProgram,
116 flow_name: &str,
117) -> Result<&'a IRFlow, PlanError> {
118 ir.flows
119 .iter()
120 .find(|f| f.name == flow_name)
121 .ok_or_else(|| PlanError::FlowNotFound {
122 flow_name: flow_name.to_string(),
123 available: ir.flows.iter().map(|f| f.name.clone()).collect(),
124 })
125}
126
127/// Stable kind discriminant for an [`IRFlowNode`]. Closed catalog
128/// extracted as a separate public helper so adding a new
129/// `IRFlowNode` variant on the frontend forces an explicit match
130/// update here (compiler enforces exhaustiveness).
131///
132/// # Drift contract with `runner::extract_step_info`
133///
134/// The synchronous CLI path's `runner::extract_step_info` produces
135/// a `step_type` string for each `IRFlowNode` variant. The kinds
136/// here MUST match that mapping. Drift is gated by
137/// [`ir_flow_node_kind_runner_drift`] in `flow_plan::tests`.
138pub fn ir_flow_node_kind(node: &IRFlowNode) -> &'static str {
139 match node {
140 IRFlowNode::Step(_) => "step",
141 IRFlowNode::Probe(_) => "probe",
142 IRFlowNode::Reason(_) => "reason",
143 IRFlowNode::Validate(_) => "validate",
144 IRFlowNode::Refine(_) => "refine",
145 IRFlowNode::Weave(_) => "weave",
146 IRFlowNode::UseTool(_) => "use_tool",
147 IRFlowNode::Remember(_) => "remember",
148 IRFlowNode::Recall(_) => "recall",
149 IRFlowNode::Conditional(_) => "conditional",
150 IRFlowNode::ForIn(_) => "for_in",
151 IRFlowNode::Let(_) => "let",
152 IRFlowNode::Return(_) => "return",
153 IRFlowNode::Break(_) => "break",
154 IRFlowNode::Continue(_) => "continue",
155 IRFlowNode::LambdaDataApply(_) => "lambda_data_apply",
156 IRFlowNode::Par(_) => "par",
157 IRFlowNode::Hibernate(_) => "hibernate",
158 IRFlowNode::Deliberate(_) => "deliberate",
159 IRFlowNode::Consensus(_) => "consensus",
160 IRFlowNode::Forge(_) => "forge",
161 IRFlowNode::Focus(_) => "focus",
162 IRFlowNode::Associate(_) => "associate",
163 IRFlowNode::Aggregate(_) => "aggregate",
164 IRFlowNode::Explore(_) => "explore",
165 IRFlowNode::Ingest(_) => "ingest",
166 IRFlowNode::ShieldApply(_) => "shield_apply",
167 IRFlowNode::Stream(_) => "stream_block",
168 IRFlowNode::Navigate(_) => "navigate",
169 IRFlowNode::Drill(_) => "drill",
170 IRFlowNode::Trail(_) => "trail",
171 IRFlowNode::Corroborate(_) => "corroborate",
172 IRFlowNode::OtsApply(_) => "ots_apply",
173 IRFlowNode::MandateApply(_) => "mandate_apply",
174 IRFlowNode::ComputeApply(_) => "compute_apply",
175 IRFlowNode::Listen(_) => "listen",
176 IRFlowNode::DaemonStep(_) => "daemon_step",
177 IRFlowNode::Emit(_) => "emit",
178 IRFlowNode::Publish(_) => "publish",
179 IRFlowNode::Discover(_) => "discover",
180 IRFlowNode::Persist(_) => "persist",
181 IRFlowNode::Retrieve(_) => "retrieve",
182 IRFlowNode::Mutate(_) => "mutate",
183 IRFlowNode::Purge(_) => "purge",
184 IRFlowNode::Transact(_) => "transact",
185 }
186}
187
188/// Compose a streaming-path system prompt from persona + context +
189/// (optional) backend tag. Public helper shared by the streaming
190/// planner today + adopted by the sync CLI path during 33.x.i.
191///
192/// # Parameters
193///
194/// * `flow` — the IR flow whose system prompt is being built. The
195/// flow's name surfaces in the prompt for trace clarity.
196/// * `ir` — full IR for persona/context resolution.
197/// * `backend_tag` — `Some("anthropic")` appends `[Backend:
198/// anthropic | AXON <version>]`; `None` omits the tag. The
199/// streaming path passes `None` because the wire's
200/// `axon.complete.backend` field already carries this info.
201///
202/// # Determinism
203///
204/// Pure + deterministic. Same inputs → same string byte-for-byte.
205pub fn compose_system_prompt_public(
206 flow: &IRFlow,
207 ir: &IRProgram,
208 backend_tag: Option<&str>,
209) -> String {
210 let mut parts: Vec<String> = Vec::new();
211
212 if let Some(persona) = ir.personas.first() {
213 parts.push(format!("# Persona: {}", persona.name));
214 if !persona.domain.is_empty() {
215 parts.push(format!("Domain expertise: {}", persona.domain.join(", ")));
216 }
217 if !persona.tone.is_empty() {
218 parts.push(format!("Communication tone: {}", persona.tone));
219 }
220 if !persona.language.is_empty() {
221 parts.push(format!("Language: {}", persona.language));
222 }
223 if let Some(ct) = persona.confidence_threshold {
224 parts.push(format!("Confidence threshold: {ct:.2}"));
225 }
226 if persona.cite_sources == Some(true) {
227 parts.push("Always cite sources.".to_string());
228 }
229 if !persona.refuse_if.is_empty() {
230 parts.push(format!("Refuse if: {}", persona.refuse_if.join(", ")));
231 }
232 }
233
234 if let Some(ctx) = ir.contexts.first() {
235 parts.push(format!("\n# Context: {}", ctx.name));
236 if !ctx.depth.is_empty() {
237 parts.push(format!("Analysis depth: {}", ctx.depth));
238 }
239 if !ctx.memory_scope.is_empty() {
240 parts.push(format!("Memory scope: {}", ctx.memory_scope));
241 }
242 if let Some(t) = ctx.temperature {
243 parts.push(format!("Temperature: {t:.1}"));
244 }
245 if let Some(mt) = ctx.max_tokens {
246 parts.push(format!("Max tokens: {mt}"));
247 }
248 }
249
250 parts.push(format!("\n# Flow: {}", flow.name));
251
252 if let Some(tag) = backend_tag {
253 parts.push(format!("\n[Backend: {tag} | AXON {}]", env!("CARGO_PKG_VERSION")));
254 }
255
256 parts.join("\n")
257}
258
259// ────────────────────────────────────────────────────────────────────
260// Plan types
261// ────────────────────────────────────────────────────────────────────
262
263/// One step in a streaming execution plan.
264///
265/// Pre-resolves every field the hot loop needs — system prompt,
266/// user prompt, declared effect policy. The per-chunk loop in
267/// `server_execute_streaming_async` reads these fields without
268/// touching the IR or AST.
269#[derive(Debug, Clone, PartialEq)]
270pub struct StreamingStep {
271 /// Canonical step name (matches `IRStep.name`). Surfaces in
272 /// `axon.token.step` + `axon.complete.step_names`.
273 pub step_name: String,
274 /// User prompt this step asks the LLM. Built from `step.ask` +
275 /// optional `apply: tool` argument expansion.
276 pub user_prompt: String,
277 /// Optional max-tokens cap from `context.max_tokens` or
278 /// step-level `max_tokens:` declaration.
279 pub max_tokens: Option<u32>,
280 /// Optional temperature from `context.temperature` (overridden
281 /// by locked-param dispatch in the Backend impl).
282 pub temperature: Option<f64>,
283 /// Pre-resolved backpressure policy from the step's tool's
284 /// `effects: <stream:<policy>>` declaration. `None` when the
285 /// step doesn't `apply:` a tool with a stream effect.
286 /// Activated by 33.x.d's `StreamPolicyEnforcer` wrap; recorded
287 /// today in `axon.complete.stream_policies` for audit
288 /// correlation (Fase 33.e).
289 pub effect_policy: Option<BackpressurePolicy>,
290}
291
292/// Streaming execution plan — one per flow invocation.
293#[derive(Debug, Clone, PartialEq)]
294pub struct StreamingExecutionPlan {
295 /// Flow name from `run` declaration.
296 pub flow_name: String,
297 /// Backend name as the adopter resolved it (after `auto`
298 /// resolution upstream of plan construction).
299 pub backend_name: String,
300 /// Composed system prompt — persona + context + anchor
301 /// instructions (Fase 11 stack). Same shape as the sync
302 /// `runner::build_system_prompt` output.
303 pub system_prompt: String,
304 /// Ordered step list. Empty plan == empty flow (rare but valid
305 /// per IR grammar; the streaming path emits FlowStart +
306 /// FlowComplete with `steps_executed: 0` per Fase 33.b).
307 pub steps: Vec<StreamingStep>,
308}
309
310/// Reason a flow could not be planned for the streaming path.
311///
312/// Each variant is a closed-catalog signal the SSE handler routes
313/// to either an `axon.error` wire event (hard error) or a fallback
314/// to the legacy synchronous path (soft fall-through).
315#[derive(Debug, Clone, PartialEq)]
316pub enum PlanError {
317 /// Source did not parse. Mirrors `Parser::parse` error message.
318 Parse(String),
319 /// Type checker rejected the source.
320 TypeCheck(Vec<String>),
321 /// IR generation failed (rare — usually means the type-check
322 /// pass missed an invariant).
323 IrGeneration(String),
324 /// The requested flow_name was not found in the IR's flow list.
325 FlowNotFound { flow_name: String, available: Vec<String> },
326 // §Fase 33.z.e — `LegacyOrchestrationRequired` variant DELETED
327 // (the 33.y.l `#[deprecated]` retirement cycle completes here).
328 // The 33.y per-IRFlowNode async dispatcher (45/45 graduation)
329 // covers every IRFlowNode variant the planner used to reject.
330 // Any downstream crate that pattern-matched against this variant
331 // hits an explicit compile error — the intended failure shape
332 // for the deprecation cycle.
333}
334
335/// Closed catalog of reasons the streaming path falls back to the
336/// legacy synchronous orchestration. Each variant maps to a
337/// specific adopter source shape the 33.x.b scope explicitly
338/// defers.
339#[derive(Debug, Clone, PartialEq)]
340pub enum PlanFallback {
341 /// Flow uses `anchor: <name>` constraints. Anchor enforcement
342 /// fires on FINAL flow output, which is incompatible with
343 /// per-token streaming until per-chunk anchor checking lands.
344 AnchorConstraintsPresent,
345 /// Flow uses `apply: <lambda>` (Fase 15 lambda data apply).
346 /// Lambda apply runs after the step's LLM response; per-chunk
347 /// lambda application is a future fase.
348 LambdaApplyPresent,
349 /// Flow uses `let X = ...` SSA bindings (Fase 17). Binding
350 /// resolution runs between steps; streaming path doesn't yet
351 /// thread the binding context through the per-chunk loop.
352 LetBindingPresent,
353 /// Flow uses `use_tool` mid-step (function calling). Mid-stream
354 /// tool calls are explicit Fase 33-followon-2 scope.
355 UseToolPresent,
356 /// Flow contains a `Hibernate` step (Fase 19 CPS). Hibernation
357 /// requires the synchronous CPS handler stack.
358 HibernatePresent,
359 /// Flow contains a `Drill` or `Trail` step (Fase 19 PIX). PIX
360 /// trace state is captured on the synchronous path.
361 PixPresent,
362 /// Flow contains an `IRFlowNode` variant that 33.x.b does not
363 /// yet model (Conditional / ForIn / Par / Probe / Reason /
364 /// ShieldApply / etc.). 33.x.b ships the canonical
365 /// `step S { ask: "..." [apply: tool] }` shape only; subsequent
366 /// 33.x followups extend coverage per founder-sequenced
367 /// sub-fases. The legacy synchronous path keeps working for
368 /// these flows — there is no functional regression.
369 UnsupportedNode {
370 /// Stable kind discriminant — e.g. `"conditional"`,
371 /// `"for_in"`, `"reason"`. Surfaces in audit row + future
372 /// 33.x.g warning emission.
373 kind: &'static str,
374 },
375}
376
377impl PlanFallback {
378 /// Stable slug for diagnostic emission. Used by audit row +
379 /// future 33.x.g warning surface.
380 pub fn slug(&self) -> &'static str {
381 match self {
382 Self::AnchorConstraintsPresent => "anchor_constraints",
383 Self::LambdaApplyPresent => "lambda_apply",
384 Self::LetBindingPresent => "let_binding",
385 Self::UseToolPresent => "use_tool",
386 Self::HibernatePresent => "hibernate",
387 Self::PixPresent => "pix",
388 Self::UnsupportedNode { .. } => "unsupported_node",
389 }
390 }
391}
392
393// ────────────────────────────────────────────────────────────────────
394// Plan builder
395// ────────────────────────────────────────────────────────────────────
396
397/// Build a streaming execution plan from `.axon` source.
398///
399/// Pipeline: lex → parse → type-check → IR generation → walk the
400/// IR's runs for `flow_name` → emit one [`StreamingStep`] per step.
401///
402/// §Fase 33.z.e — Returns `Err(PlanError::*)` only for hard compile
403/// failures (Parse / TypeCheck / IrGeneration / FlowNotFound). The
404/// 33.y.l `LegacyOrchestrationRequired` variant has been DELETED;
405/// every IRFlowNode variant the planner produces is dispatchable
406/// via `flow_dispatcher::dispatch_node`.
407pub fn build_streaming_plan(
408 source: &str,
409 source_file: &str,
410 flow_name: &str,
411 backend_name: &str,
412) -> Result<StreamingExecutionPlan, PlanError> {
413 // §33.x.c — delegated to the unified `compile_source_to_ir`
414 // helper. Both the AST and IR forms are returned so
415 // `build_plan_from_ir` can resolve effect rows from the AST
416 // without re-walking the parser pipeline.
417 let (program, ir) = compile_source_to_ir(source, source_file)?;
418 build_plan_from_ir(&ir, &program, flow_name, backend_name)
419}
420
421/// Build a plan from an already-typed IR. Useful for tests that
422/// drive the planner with hand-constructed IR (no source parse).
423///
424/// §Fase 33.z.e — the `unsupported_feature_reason` rejection step
425/// has been DELETED in lockstep with `PlanError::LegacyOrchestrationRequired`
426/// + `run_streaming_legacy_path`. The per-IRFlowNode dispatcher
427/// (Fase 33.y 45/45) handles every IRFlowNode variant; the planner
428/// no longer needs to gate against a closed-deferred-catalog. Any
429/// shape the planner could compile pre-33.z.e is dispatchable
430/// post-33.z.e via `flow_dispatcher::dispatch_node`.
431pub fn build_plan_from_ir(
432 ir: &IRProgram,
433 program: &crate::ast::Program,
434 flow_name: &str,
435 backend_name: &str,
436) -> Result<StreamingExecutionPlan, PlanError> {
437 // 5. Locate the flow on the IR side. §33.x.c uses the public
438 // `find_ir_flow_by_name` helper so the diagnostic message
439 // shape is shared across callers.
440 let flow = find_ir_flow_by_name(ir, flow_name)?;
441
442 // 5b. Locate the same flow on the AST side (for effect-row
443 // resolution via `resolve_stream_effect_for_step`, which
444 // takes `&FlowDefinition`).
445 let ast_flow = program
446 .declarations
447 .iter()
448 .find_map(|d| match d {
449 crate::ast::Declaration::Flow(f) if f.name == flow_name => Some(f),
450 _ => None,
451 })
452 .ok_or_else(|| PlanError::FlowNotFound {
453 flow_name: flow_name.to_string(),
454 available: ir.flows.iter().map(|f| f.name.clone()).collect(),
455 })?;
456
457 // §Fase 33.z.e — `unsupported_feature_reason` gate retired.
458 // The dispatcher path handles every IRFlowNode variant; no
459 // pre-flight rejection needed. The `StreamingExecutionPlan`
460 // produced below carries only canonical Step variants by
461 // design (other shapes route through the dispatcher's
462 // per-variant handlers directly via `flow.steps`); the plan
463 // structure stays as the legacy carrier for backend resolution
464 // + system-prompt composition + per-step effect policy
465 // resolution.
466
467 // 7. System prompt — §33.x.c uses the public composer with
468 // `backend_tag: None`. The streaming wire's
469 // `axon.complete.backend` field already carries the backend
470 // name so the prompt suffix is redundant on this path
471 // (avoids hidden-context drift between sync + async).
472 let system_prompt = compose_system_prompt_public(flow, ir, None);
473
474 // 8. Per-step plan. Only `IRFlowNode::Step` variants surface in
475 // `StreamingExecutionPlan.steps` (other variants are handled
476 // by the dispatcher's per-variant handlers in
477 // `streaming_via_dispatcher` directly via `flow.steps`).
478 let mut steps = Vec::new();
479 for node in &flow.steps {
480 if let crate::ir_nodes::IRFlowNode::Step(ir_step) = node {
481 let max_tokens = ir
482 .contexts
483 .first()
484 .and_then(|c| c.max_tokens)
485 .map(|n| n as u32);
486 let temperature = ir.contexts.first().and_then(|c| c.temperature);
487 let effect_policy =
488 resolve_stream_effect_for_step(&ir_step.name, ast_flow, program);
489
490 steps.push(StreamingStep {
491 step_name: ir_step.name.clone(),
492 user_prompt: ir_step.ask.clone(),
493 max_tokens,
494 temperature,
495 effect_policy,
496 });
497 }
498 }
499
500 Ok(StreamingExecutionPlan {
501 flow_name: flow_name.to_string(),
502 backend_name: backend_name.to_string(),
503 system_prompt,
504 steps,
505 })
506}
507
508// §33.x.c — The private `ir_flow_node_kind` was lifted to the
509// module's public surface above. The single source of truth for
510// the kind-string mapping lives at `flow_plan::ir_flow_node_kind`
511// and is drift-gated by `tests::ir_flow_node_kind_runner_drift`.
512
513// §33.x.c — `compose_system_prompt` was lifted to public
514// `compose_system_prompt_public` above. It accepts an optional
515// `backend_tag` so the sync CLI path can opt into the canonical
516// `[Backend: foo | AXON x.y.z]` suffix during 33.x.i migration.
517
518// ────────────────────────────────────────────────────────────────────
519// Tests
520// ────────────────────────────────────────────────────────────────────
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 fn parse_simple_stream_source() -> &'static str {
527 // Canonical Kivi-shape: single step, output Stream<Token>,
528 // explicit transport: sse. No anchors, no apply, no let.
529 "flow Chat() -> Unit {\n\
530 step Generate { ask: \"hi\" output: Stream<Token> }\n\
531 }\n\
532 axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
533 }
534
535 fn parse_stream_with_effect_source() -> &'static str {
536 // Disjunct (b): tool with stream effect, step apply.
537 "tool chat_token_stream { description: \"stream\" effects: <stream:drop_oldest> }\n\
538 flow Chat() -> Unit {\n\
539 step Generate { ask: \"hi\" apply: chat_token_stream }\n\
540 }\n\
541 axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
542 }
543
544 #[test]
545 fn build_plan_for_simple_stream_flow_returns_one_step() {
546 let plan = build_streaming_plan(
547 parse_simple_stream_source(),
548 "test.axon",
549 "Chat",
550 "stub",
551 )
552 .expect("simple stream flow plans cleanly");
553
554 assert_eq!(plan.flow_name, "Chat");
555 assert_eq!(plan.backend_name, "stub");
556 assert_eq!(plan.steps.len(), 1);
557 assert_eq!(plan.steps[0].step_name, "Generate");
558 assert_eq!(plan.steps[0].user_prompt, "hi");
559 assert_eq!(plan.steps[0].effect_policy, None, "no tool effect → no policy");
560 }
561
562 #[test]
563 fn build_plan_with_drop_oldest_effect_pre_resolves_policy() {
564 let plan = build_streaming_plan(
565 parse_stream_with_effect_source(),
566 "test.axon",
567 "Chat",
568 "stub",
569 )
570 .expect("stream-effect flow plans cleanly");
571
572 assert_eq!(plan.steps.len(), 1);
573 assert_eq!(plan.steps[0].effect_policy, Some(BackpressurePolicy::DropOldest));
574 }
575
576 #[test]
577 fn build_plan_unknown_flow_returns_flow_not_found() {
578 let err = build_streaming_plan(
579 parse_simple_stream_source(),
580 "test.axon",
581 "NonexistentFlow",
582 "stub",
583 )
584 .expect_err("unknown flow rejected");
585
586 match err {
587 PlanError::FlowNotFound { flow_name, available } => {
588 assert_eq!(flow_name, "NonexistentFlow");
589 assert_eq!(available, vec!["Chat".to_string()]);
590 }
591 other => panic!("expected FlowNotFound, got {other:?}"),
592 }
593 }
594
595 #[test]
596 fn build_plan_unparseable_source_returns_parse_error() {
597 let err = build_streaming_plan(
598 "not a valid axon source",
599 "test.axon",
600 "Chat",
601 "stub",
602 )
603 .expect_err("garbage rejected");
604
605 assert!(matches!(err, PlanError::Parse(_)));
606 }
607
608 #[test]
609 fn build_plan_multi_step_flow_preserves_order() {
610 let src = "flow MultiStep() -> Unit {\n\
611 step First { ask: \"one\" }\n\
612 step Second { ask: \"two\" }\n\
613 step Third { ask: \"three\" }\n\
614 }\n\
615 axonendpoint E { method: POST path: \"/m\" execute: MultiStep transport: sse }";
616 let plan = build_streaming_plan(src, "test.axon", "MultiStep", "stub").unwrap();
617 assert_eq!(plan.steps.len(), 3);
618 assert_eq!(plan.steps[0].step_name, "First");
619 assert_eq!(plan.steps[1].step_name, "Second");
620 assert_eq!(plan.steps[2].step_name, "Third");
621 assert_eq!(plan.steps[0].user_prompt, "one");
622 assert_eq!(plan.steps[1].user_prompt, "two");
623 assert_eq!(plan.steps[2].user_prompt, "three");
624 }
625
626 #[test]
627 fn plan_fallback_slugs_are_stable_strings() {
628 // Each variant has a documented slug. Drift in this match
629 // surfaces in 33.x.g warning emission.
630 assert_eq!(PlanFallback::AnchorConstraintsPresent.slug(), "anchor_constraints");
631 assert_eq!(PlanFallback::LambdaApplyPresent.slug(), "lambda_apply");
632 assert_eq!(PlanFallback::LetBindingPresent.slug(), "let_binding");
633 assert_eq!(PlanFallback::UseToolPresent.slug(), "use_tool");
634 assert_eq!(PlanFallback::HibernatePresent.slug(), "hibernate");
635 assert_eq!(PlanFallback::PixPresent.slug(), "pix");
636 }
637
638 #[test]
639 fn plan_is_deterministic_for_same_source() {
640 let plan1 = build_streaming_plan(
641 parse_simple_stream_source(),
642 "test.axon",
643 "Chat",
644 "stub",
645 )
646 .unwrap();
647 let plan2 = build_streaming_plan(
648 parse_simple_stream_source(),
649 "test.axon",
650 "Chat",
651 "stub",
652 )
653 .unwrap();
654 assert_eq!(plan1, plan2, "plan builder is pure + deterministic");
655 }
656
657 #[test]
658 fn streaming_step_eq_is_field_wise() {
659 let a = StreamingStep {
660 step_name: "X".into(),
661 user_prompt: "y".into(),
662 max_tokens: Some(100),
663 temperature: Some(0.7),
664 effect_policy: Some(BackpressurePolicy::DropOldest),
665 };
666 let b = a.clone();
667 assert_eq!(a, b);
668 }
669
670 #[test]
671 fn streaming_plan_includes_backend_name() {
672 let plan = build_streaming_plan(
673 parse_simple_stream_source(),
674 "test.axon",
675 "Chat",
676 "anthropic",
677 )
678 .unwrap();
679 assert_eq!(plan.backend_name, "anthropic");
680 }
681
682 #[test]
683 fn empty_flow_body_produces_empty_step_list() {
684 // Pathological but parseable case: empty flow body.
685 let src = "flow Empty() -> Unit {\n\
686 }\n\
687 axonendpoint E { method: POST path: \"/e\" execute: Empty transport: sse }";
688 let plan = build_streaming_plan(src, "test.axon", "Empty", "stub").unwrap();
689 assert!(plan.steps.is_empty());
690 assert_eq!(plan.flow_name, "Empty");
691 }
692
693 // ── §33.x.c — Public helper tests ───────────────────────────────
694
695 #[test]
696 fn compile_source_to_ir_returns_program_and_ir_for_valid_source() {
697 let (program, ir) =
698 compile_source_to_ir(parse_simple_stream_source(), "test.axon").unwrap();
699 // Program has at least one Flow declaration.
700 let flow_count = program
701 .declarations
702 .iter()
703 .filter(|d| matches!(d, crate::ast::Declaration::Flow(_)))
704 .count();
705 assert_eq!(flow_count, 1);
706 // IR mirrors with one flow.
707 assert_eq!(ir.flows.len(), 1);
708 assert_eq!(ir.flows[0].name, "Chat");
709 }
710
711 #[test]
712 fn compile_source_to_ir_is_pure_deterministic() {
713 let src = parse_simple_stream_source();
714 let (p1, ir1) = compile_source_to_ir(src, "test.axon").unwrap();
715 let (p2, ir2) = compile_source_to_ir(src, "test.axon").unwrap();
716 assert_eq!(p1.declarations.len(), p2.declarations.len());
717 assert_eq!(ir1.flows.len(), ir2.flows.len());
718 assert_eq!(ir1.flows[0].name, ir2.flows[0].name);
719 }
720
721 #[test]
722 fn compile_source_to_ir_surfaces_parse_error() {
723 let err = compile_source_to_ir("not axon source at all", "test.axon").unwrap_err();
724 assert!(matches!(err, PlanError::Parse(_)));
725 }
726
727 #[test]
728 fn compile_source_to_ir_surfaces_type_check_error() {
729 // §Fase 28 — `axonendpoint method: YEET` is rejected at
730 // parse time. Use a different shape that parses cleanly
731 // but fails type-check: undefined flow reference.
732 let src = "axonendpoint Bad { method: POST path: \"/x\" execute: NonexistentFlow }";
733 let err = compile_source_to_ir(src, "test.axon").unwrap_err();
734 // Either parser rejected it (post-Fase 28 hardening may
735 // catch undefined references at parse time) or the type
736 // checker did — both are valid PlanError variants.
737 assert!(matches!(err, PlanError::Parse(_) | PlanError::TypeCheck(_)));
738 }
739
740 #[test]
741 fn find_ir_flow_by_name_returns_flow_when_present() {
742 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
743 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
744 assert_eq!(flow.name, "Chat");
745 }
746
747 #[test]
748 fn find_ir_flow_by_name_returns_flow_not_found_with_available_list() {
749 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
750 let err = find_ir_flow_by_name(&ir, "Nope").unwrap_err();
751 match err {
752 PlanError::FlowNotFound { flow_name, available } => {
753 assert_eq!(flow_name, "Nope");
754 assert_eq!(available, vec!["Chat".to_string()]);
755 }
756 other => panic!("expected FlowNotFound, got {other:?}"),
757 }
758 }
759
760 #[test]
761 fn ir_flow_node_kind_step_returns_step() {
762 use crate::ir_nodes::{IRFlowNode, IRStep};
763 let n = IRFlowNode::Step(IRStep {
764 node_type: "Step",
765 source_line: 1,
766 source_column: 1,
767 name: "S".into(),
768 persona_ref: String::new(),
769 given: String::new(),
770 ask: "hi".into(),
771 use_tool: None,
772 probe: None,
773 reason: None,
774 weave: None,
775 output_type: String::new(),
776 confidence_floor: None,
777 navigate_ref: String::new(),
778 apply_ref: String::new(),
779 body: vec![],
780 });
781 assert_eq!(ir_flow_node_kind(&n), "step");
782 }
783
784 #[test]
785 fn ir_flow_node_kind_distinct_slugs_for_every_variant() {
786 // We can't enumerate every variant with fresh instances
787 // (some have non-trivial payloads), but we can pin the slug
788 // set by enumerating distinct slugs that the function CAN
789 // produce. The match in `ir_flow_node_kind` is exhaustive
790 // (compiler-enforced); this test pins the slug values that
791 // downstream callers (e.g. audit logs, 33.x.g warning
792 // catalog) depend on.
793 let expected_slugs: Vec<&str> = vec![
794 "step",
795 "probe",
796 "reason",
797 "validate",
798 "refine",
799 "weave",
800 "use_tool",
801 "remember",
802 "recall",
803 "conditional",
804 "for_in",
805 "let",
806 "return",
807 "break",
808 "continue",
809 "lambda_data_apply",
810 "par",
811 "hibernate",
812 "deliberate",
813 "consensus",
814 "forge",
815 "focus",
816 "associate",
817 "aggregate",
818 "explore",
819 "ingest",
820 "shield_apply",
821 "stream_block",
822 "navigate",
823 "drill",
824 "trail",
825 "corroborate",
826 "ots_apply",
827 "mandate_apply",
828 "compute_apply",
829 "listen",
830 "daemon_step",
831 "emit",
832 "publish",
833 "discover",
834 "persist",
835 "retrieve",
836 "mutate",
837 "purge",
838 "transact",
839 ];
840 // 45 closed-catalog slugs. Adding a new IRFlowNode variant
841 // requires updating `ir_flow_node_kind` (compiler enforces)
842 // AND this list (intentional — keeps the slug catalog
843 // pinned for audit/warning callers).
844 assert_eq!(expected_slugs.len(), 45);
845 // Every slug is a valid lowercase identifier (snake_case).
846 for slug in &expected_slugs {
847 assert!(
848 slug.chars()
849 .all(|c| c.is_ascii_lowercase() || c == '_' || c.is_ascii_digit()),
850 "slug {slug:?} must be lowercase snake_case for stable audit emission"
851 );
852 }
853 // Every slug is unique.
854 let mut sorted = expected_slugs.clone();
855 sorted.sort();
856 sorted.dedup();
857 assert_eq!(sorted.len(), expected_slugs.len(), "slug catalog has duplicates");
858 }
859
860 #[test]
861 fn ir_flow_node_kind_runner_drift() {
862 // §33.x.c drift gate: `flow_plan::ir_flow_node_kind` is the
863 // single source of truth for IRFlowNode kind slugs. The
864 // legacy `runner.rs::extract_step_info` produces a
865 // `step_type` string for each variant that downstream
866 // consumers (audit row, stub trace, IR debugger) parse.
867 // Drift between the two surfaces here would make the audit
868 // row inconsistent across sync + async paths.
869 //
870 // For 33.x.c the drift gate operates by-construction: the
871 // public helper here is the canonical surface; future
872 // migrations of `extract_step_info` to call into this
873 // helper are tracked under 33.x.i. Until then, this test
874 // pins the kind catalog so accidental drift on either side
875 // surfaces in CI.
876 use crate::ir_nodes::{IRFlowNode, IRStep};
877 let step = IRFlowNode::Step(IRStep {
878 node_type: "Step",
879 source_line: 1,
880 source_column: 1,
881 name: "T".into(),
882 persona_ref: String::new(),
883 given: String::new(),
884 ask: String::new(),
885 use_tool: None,
886 probe: None,
887 reason: None,
888 weave: None,
889 output_type: String::new(),
890 confidence_floor: None,
891 navigate_ref: String::new(),
892 apply_ref: String::new(),
893 body: vec![],
894 });
895 // Pinned drift assertion: the canonical "step" kind is
896 // what `runner.rs::extract_step_info` produces for
897 // `IRFlowNode::Step`, per file inspection at 33.x.c
898 // landing. Future runner refactor MUST keep this
899 // invariant or update this test (deliberate change).
900 assert_eq!(ir_flow_node_kind(&step), "step");
901 }
902
903 #[test]
904 fn compose_system_prompt_public_includes_flow_name() {
905 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
906 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
907 let prompt = compose_system_prompt_public(flow, &ir, None);
908 assert!(prompt.contains("# Flow: Chat"));
909 }
910
911 #[test]
912 fn compose_system_prompt_public_omits_backend_tag_when_none() {
913 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
914 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
915 let prompt = compose_system_prompt_public(flow, &ir, None);
916 assert!(!prompt.contains("[Backend:"));
917 assert!(!prompt.contains("AXON"));
918 }
919
920 #[test]
921 fn compose_system_prompt_public_includes_backend_tag_when_set() {
922 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
923 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
924 let prompt = compose_system_prompt_public(flow, &ir, Some("anthropic"));
925 assert!(prompt.contains("[Backend: anthropic | AXON "));
926 }
927
928 #[test]
929 fn compose_system_prompt_public_includes_persona_when_present() {
930 // `domain` is a list per AST grammar; `tone` is a string.
931 // `tone` is a closed-catalog enum (see type_checker valid tones).
932 let src = "persona Doctor { domain: [\"medicine\"] tone: \"formal\" }\n\
933 context Clinic { depth: \"deep\" memory_scope: \"session\" }\n\
934 flow Chat() -> Unit {\n\
935 step Generate { ask: \"hi\" output: Stream<Token> }\n\
936 }\n\
937 axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
938 let (_p, ir) = compile_source_to_ir(src, "t.axon").unwrap();
939 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
940 let prompt = compose_system_prompt_public(flow, &ir, None);
941 assert!(prompt.contains("# Persona: Doctor"));
942 assert!(prompt.contains("Domain expertise: medicine"));
943 assert!(prompt.contains("# Context: Clinic"));
944 assert!(prompt.contains("Analysis depth: deep"));
945 }
946
947 #[test]
948 fn compose_system_prompt_public_is_pure_deterministic() {
949 let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
950 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
951 let p1 = compose_system_prompt_public(flow, &ir, Some("openai"));
952 let p2 = compose_system_prompt_public(flow, &ir, Some("openai"));
953 assert_eq!(p1, p2);
954 }
955
956 #[test]
957 fn build_streaming_plan_uses_compile_source_to_ir_internally() {
958 // Cross-check: plan built via build_streaming_plan has the
959 // same system_prompt as one built by manually invoking
960 // compile_source_to_ir + compose_system_prompt_public.
961 let src = parse_simple_stream_source();
962 let plan = build_streaming_plan(src, "t.axon", "Chat", "stub").unwrap();
963 let (_program, ir) = compile_source_to_ir(src, "t.axon").unwrap();
964 let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
965 let expected = compose_system_prompt_public(flow, &ir, None);
966 assert_eq!(plan.system_prompt, expected);
967 }
968}