Skip to main content

axon/
flow_plan.rs

1//! §Fase 33.x.b + 33.x.c — Streaming execution plan extraction +
2//! unified `.axon` source compilation pipeline.
3//!
4//! Builds a streaming-shaped execution plan from `.axon` source for
5//! the production async SSE path. Pre-resolves per-step
6//! [`BackpressurePolicy`] via
7//! [`crate::stream_effect_dispatcher::resolve_stream_effect_for_step`]
8//! so the hot per-chunk loop in
9//! `crate::axon_server::server_execute_streaming_async` does NOT
10//! re-walk the AST per chunk.
11//!
12//! # Scope boundary (33.x.b)
13//!
14//! 33.x.b ships the SIMPLEST step shape (single `ask`, no anchors,
15//! no `apply: lambda`, no `let` bindings, no mid-stream `use_tool`).
16//! This is exactly the canonical Kivi/Stream-effect adopter shape
17//! the diagnostic anchor pins. Flows that use the omitted features
18//! still execute correctly on the legacy synchronous path; the
19//! streaming path falls back via [`PlanFallback::LegacyOrchestration`]
20//! and the SSE handler routes through `server_execute_full` (the
21//! pre-33.x.b path). NO silent degradation — the fallback is
22//! observable in plan output so adopter diagnostics can flag it.
23//!
24//! # Lift target (33.x.c)
25//!
26//! `runner::build_execution_plan` (private, sync) stays unchanged in
27//! 33.x.b. 33.x.c unifies the two builders. Until then, this module
28//! is the SOLE source of truth for streaming-path plan extraction;
29//! `runner.rs` is the sole source of truth for sync-path execution.
30//!
31//! # D-letter anchors
32//!
33//! - **D1** — every streaming-path flow resolves a [`StreamingExecutionPlan`]
34//!   here before dispatching to `Backend::stream()`. The plan's
35//!   `backend_name` is the resolved provider name; `steps[].effect_policy`
36//!   pre-resolves the declared `<stream:<policy>>` for the
37//!   `StreamPolicyEnforcer` wrap in 33.x.d.
38//! - **D4** — the plan structure carries NO new wire-shape fields;
39//!   it is internal scaffolding that produces the same
40//!   FlowExecutionEvent sequence as the pre-33.x.b path for the
41//!   adopter shapes 33.x.b supports.
42//! - **D5** — fallback to legacy orchestration is observable via
43//!   [`PlanFallback`] so 33.x.g can hook `axon-W002`-style warnings.
44
45use crate::ir_nodes::{IRFlow, IRFlowNode, IRProgram};
46use crate::stream_effect::BackpressurePolicy;
47use crate::stream_effect_dispatcher::resolve_stream_effect_for_step;
48
49// ────────────────────────────────────────────────────────────────────
50//  §33.x.c — Shared source-compilation pipeline
51// ────────────────────────────────────────────────────────────────────
52
53/// Compile an `.axon` source string into its AST [`crate::ast::Program`]
54/// + IR [`IRProgram`] forms. Both stacks-of-truth are returned so
55/// downstream callers can do AST-level walking (effect-row resolution,
56/// route table extraction) AND IR-level execution planning without
57/// re-running the pipeline.
58///
59/// # Stages
60///
61/// 1. Lex via [`crate::lexer::Lexer`]
62/// 2. Parse via [`crate::parser::Parser`]
63/// 3. Type-check via [`crate::type_checker::TypeChecker`]
64/// 4. IR generation via [`crate::ir_generator::IRGenerator`]
65///
66/// Each stage's failure surfaces as a structured [`PlanError`] variant
67/// so the caller distinguishes "user source is malformed" from "the
68/// pipeline encountered an internal invariant violation".
69///
70/// # Performance
71///
72/// This is the canonical source-compilation entry point on the Fase
73/// 33.x.b production async streaming path (called once per
74/// flow-invocation). The legacy `axon_server.rs` call sites still
75/// inline their own pipelines; 33.x.i (mono-file `crate::backend`
76/// retirement) is when the deeper migration completes.
77///
78/// # Purity
79///
80/// Pure + deterministic. No I/O. No global state. Same source +
81/// source_file → same `(Program, IRProgram)` byte-for-byte.
82pub fn compile_source_to_ir(
83    source: &str,
84    source_file: &str,
85) -> Result<(crate::ast::Program, IRProgram), PlanError> {
86    let tokens = crate::lexer::Lexer::new(source, source_file)
87        .tokenize()
88        .map_err(|e| PlanError::Parse(format!("lex error: {e:?}")))?;
89
90    let mut parser = crate::parser::Parser::new(tokens);
91    let program = parser.parse().map_err(|e| PlanError::Parse(e.message))?;
92
93    let mut checker = crate::type_checker::TypeChecker::new(&program);
94    let type_errors = checker.check();
95    if !type_errors.is_empty() {
96        return Err(PlanError::TypeCheck(
97            type_errors.into_iter().map(|e| e.message).collect(),
98        ));
99    }
100
101    let ir = crate::ir_generator::IRGenerator::new().generate(&program);
102    Ok((program, ir))
103}
104
105/// Locate an [`IRFlow`] by name. Returns a structured [`PlanError`]
106/// with the list of available flow names so the diagnostic message
107/// is adopter-actionable (typo-detection on first read).
108///
109/// # Used by
110///
111/// * [`build_plan_from_ir`] — the streaming-path planner.
112/// * Future 33.x.i migrations of the legacy axon_server.rs sync
113///   path will adopt this helper for byte-identical diagnostics.
114pub fn find_ir_flow_by_name<'a>(
115    ir: &'a IRProgram,
116    flow_name: &str,
117) -> Result<&'a IRFlow, PlanError> {
118    ir.flows
119        .iter()
120        .find(|f| f.name == flow_name)
121        .ok_or_else(|| PlanError::FlowNotFound {
122            flow_name: flow_name.to_string(),
123            available: ir.flows.iter().map(|f| f.name.clone()).collect(),
124        })
125}
126
127/// Stable kind discriminant for an [`IRFlowNode`]. Closed catalog
128/// extracted as a separate public helper so adding a new
129/// `IRFlowNode` variant on the frontend forces an explicit match
130/// update here (compiler enforces exhaustiveness).
131///
132/// # Drift contract with `runner::extract_step_info`
133///
134/// The synchronous CLI path's `runner::extract_step_info` produces
135/// a `step_type` string for each `IRFlowNode` variant. The kinds
136/// here MUST match that mapping. Drift is gated by
137/// [`ir_flow_node_kind_runner_drift`] in `flow_plan::tests`.
138pub fn ir_flow_node_kind(node: &IRFlowNode) -> &'static str {
139    match node {
140        IRFlowNode::Step(_) => "step",
141        IRFlowNode::Probe(_) => "probe",
142        IRFlowNode::Reason(_) => "reason",
143        IRFlowNode::Validate(_) => "validate",
144        IRFlowNode::Refine(_) => "refine",
145        IRFlowNode::Weave(_) => "weave",
146        IRFlowNode::UseTool(_) => "use_tool",
147        IRFlowNode::Remember(_) => "remember",
148        IRFlowNode::Recall(_) => "recall",
149        IRFlowNode::Conditional(_) => "conditional",
150        IRFlowNode::ForIn(_) => "for_in",
151        IRFlowNode::Let(_) => "let",
152        IRFlowNode::Return(_) => "return",
153        IRFlowNode::Break(_) => "break",
154        IRFlowNode::Continue(_) => "continue",
155        IRFlowNode::LambdaDataApply(_) => "lambda_data_apply",
156        IRFlowNode::Par(_) => "par",
157        IRFlowNode::Hibernate(_) => "hibernate",
158        IRFlowNode::Deliberate(_) => "deliberate",
159        IRFlowNode::Consensus(_) => "consensus",
160        IRFlowNode::Forge(_) => "forge",
161        IRFlowNode::Focus(_) => "focus",
162        IRFlowNode::Associate(_) => "associate",
163        IRFlowNode::Aggregate(_) => "aggregate",
164        IRFlowNode::Explore(_) => "explore",
165        IRFlowNode::Ingest(_) => "ingest",
166        IRFlowNode::ShieldApply(_) => "shield_apply",
167        IRFlowNode::Stream(_) => "stream_block",
168        IRFlowNode::Navigate(_) => "navigate",
169        IRFlowNode::Drill(_) => "drill",
170        IRFlowNode::Trail(_) => "trail",
171        IRFlowNode::Corroborate(_) => "corroborate",
172        IRFlowNode::OtsApply(_) => "ots_apply",
173        IRFlowNode::MandateApply(_) => "mandate_apply",
174        IRFlowNode::ComputeApply(_) => "compute_apply",
175        IRFlowNode::Listen(_) => "listen",
176        IRFlowNode::DaemonStep(_) => "daemon_step",
177        IRFlowNode::Emit(_) => "emit",
178        IRFlowNode::Publish(_) => "publish",
179        IRFlowNode::Discover(_) => "discover",
180        IRFlowNode::Persist(_) => "persist",
181        IRFlowNode::Retrieve(_) => "retrieve",
182        IRFlowNode::Mutate(_) => "mutate",
183        IRFlowNode::Purge(_) => "purge",
184        IRFlowNode::Transact(_) => "transact",
185    }
186}
187
188/// Compose a streaming-path system prompt from persona + context +
189/// (optional) backend tag. Public helper shared by the streaming
190/// planner today + adopted by the sync CLI path during 33.x.i.
191///
192/// # Parameters
193///
194/// * `flow` — the IR flow whose system prompt is being built. The
195///   flow's name surfaces in the prompt for trace clarity.
196/// * `ir` — full IR for persona/context resolution.
197/// * `backend_tag` — `Some("anthropic")` appends `[Backend:
198///   anthropic | AXON <version>]`; `None` omits the tag. The
199///   streaming path passes `None` because the wire's
200///   `axon.complete.backend` field already carries this info.
201///
202/// # Determinism
203///
204/// Pure + deterministic. Same inputs → same string byte-for-byte.
205pub fn compose_system_prompt_public(
206    flow: &IRFlow,
207    ir: &IRProgram,
208    backend_tag: Option<&str>,
209) -> String {
210    let mut parts: Vec<String> = Vec::new();
211
212    if let Some(persona) = ir.personas.first() {
213        parts.push(format!("# Persona: {}", persona.name));
214        if !persona.domain.is_empty() {
215            parts.push(format!("Domain expertise: {}", persona.domain.join(", ")));
216        }
217        if !persona.tone.is_empty() {
218            parts.push(format!("Communication tone: {}", persona.tone));
219        }
220        if !persona.language.is_empty() {
221            parts.push(format!("Language: {}", persona.language));
222        }
223        if let Some(ct) = persona.confidence_threshold {
224            parts.push(format!("Confidence threshold: {ct:.2}"));
225        }
226        if persona.cite_sources == Some(true) {
227            parts.push("Always cite sources.".to_string());
228        }
229        if !persona.refuse_if.is_empty() {
230            parts.push(format!("Refuse if: {}", persona.refuse_if.join(", ")));
231        }
232    }
233
234    if let Some(ctx) = ir.contexts.first() {
235        parts.push(format!("\n# Context: {}", ctx.name));
236        if !ctx.depth.is_empty() {
237            parts.push(format!("Analysis depth: {}", ctx.depth));
238        }
239        if !ctx.memory_scope.is_empty() {
240            parts.push(format!("Memory scope: {}", ctx.memory_scope));
241        }
242        if let Some(t) = ctx.temperature {
243            parts.push(format!("Temperature: {t:.1}"));
244        }
245        if let Some(mt) = ctx.max_tokens {
246            parts.push(format!("Max tokens: {mt}"));
247        }
248    }
249
250    parts.push(format!("\n# Flow: {}", flow.name));
251
252    if let Some(tag) = backend_tag {
253        parts.push(format!("\n[Backend: {tag} | AXON {}]", env!("CARGO_PKG_VERSION")));
254    }
255
256    parts.join("\n")
257}
258
259// ────────────────────────────────────────────────────────────────────
260//  Plan types
261// ────────────────────────────────────────────────────────────────────
262
263/// One step in a streaming execution plan.
264///
265/// Pre-resolves every field the hot loop needs — system prompt,
266/// user prompt, declared effect policy. The per-chunk loop in
267/// `server_execute_streaming_async` reads these fields without
268/// touching the IR or AST.
269#[derive(Debug, Clone, PartialEq)]
270pub struct StreamingStep {
271    /// Canonical step name (matches `IRStep.name`). Surfaces in
272    /// `axon.token.step` + `axon.complete.step_names`.
273    pub step_name: String,
274    /// User prompt this step asks the LLM. Built from `step.ask` +
275    /// optional `apply: tool` argument expansion.
276    pub user_prompt: String,
277    /// Optional max-tokens cap from `context.max_tokens` or
278    /// step-level `max_tokens:` declaration.
279    pub max_tokens: Option<u32>,
280    /// Optional temperature from `context.temperature` (overridden
281    /// by locked-param dispatch in the Backend impl).
282    pub temperature: Option<f64>,
283    /// Pre-resolved backpressure policy from the step's tool's
284    /// `effects: <stream:<policy>>` declaration. `None` when the
285    /// step doesn't `apply:` a tool with a stream effect.
286    /// Activated by 33.x.d's `StreamPolicyEnforcer` wrap; recorded
287    /// today in `axon.complete.stream_policies` for audit
288    /// correlation (Fase 33.e).
289    pub effect_policy: Option<BackpressurePolicy>,
290}
291
292/// Streaming execution plan — one per flow invocation.
293#[derive(Debug, Clone, PartialEq)]
294pub struct StreamingExecutionPlan {
295    /// Flow name from `run` declaration.
296    pub flow_name: String,
297    /// Backend name as the adopter resolved it (after `auto`
298    /// resolution upstream of plan construction).
299    pub backend_name: String,
300    /// Composed system prompt — persona + context + anchor
301    /// instructions (Fase 11 stack). Same shape as the sync
302    /// `runner::build_system_prompt` output.
303    pub system_prompt: String,
304    /// Ordered step list. Empty plan == empty flow (rare but valid
305    /// per IR grammar; the streaming path emits FlowStart +
306    /// FlowComplete with `steps_executed: 0` per Fase 33.b).
307    pub steps: Vec<StreamingStep>,
308}
309
310/// Reason a flow could not be planned for the streaming path.
311///
312/// Each variant is a closed-catalog signal the SSE handler routes
313/// to either an `axon.error` wire event (hard error) or a fallback
314/// to the legacy synchronous path (soft fall-through).
315#[derive(Debug, Clone, PartialEq)]
316pub enum PlanError {
317    /// Source did not parse. Mirrors `Parser::parse` error message.
318    Parse(String),
319    /// Type checker rejected the source.
320    TypeCheck(Vec<String>),
321    /// IR generation failed (rare — usually means the type-check
322    /// pass missed an invariant).
323    IrGeneration(String),
324    /// The requested flow_name was not found in the IR's flow list.
325    FlowNotFound { flow_name: String, available: Vec<String> },
326    // §Fase 33.z.e — `LegacyOrchestrationRequired` variant DELETED
327    // (the 33.y.l `#[deprecated]` retirement cycle completes here).
328    // The 33.y per-IRFlowNode async dispatcher (45/45 graduation)
329    // covers every IRFlowNode variant the planner used to reject.
330    // Any downstream crate that pattern-matched against this variant
331    // hits an explicit compile error — the intended failure shape
332    // for the deprecation cycle.
333}
334
335/// Closed catalog of reasons the streaming path falls back to the
336/// legacy synchronous orchestration. Each variant maps to a
337/// specific adopter source shape the 33.x.b scope explicitly
338/// defers.
339#[derive(Debug, Clone, PartialEq)]
340pub enum PlanFallback {
341    /// Flow uses `anchor: <name>` constraints. Anchor enforcement
342    /// fires on FINAL flow output, which is incompatible with
343    /// per-token streaming until per-chunk anchor checking lands.
344    AnchorConstraintsPresent,
345    /// Flow uses `apply: <lambda>` (Fase 15 lambda data apply).
346    /// Lambda apply runs after the step's LLM response; per-chunk
347    /// lambda application is a future fase.
348    LambdaApplyPresent,
349    /// Flow uses `let X = ...` SSA bindings (Fase 17). Binding
350    /// resolution runs between steps; streaming path doesn't yet
351    /// thread the binding context through the per-chunk loop.
352    LetBindingPresent,
353    /// Flow uses `use_tool` mid-step (function calling). Mid-stream
354    /// tool calls are explicit Fase 33-followon-2 scope.
355    UseToolPresent,
356    /// Flow contains a `Hibernate` step (Fase 19 CPS). Hibernation
357    /// requires the synchronous CPS handler stack.
358    HibernatePresent,
359    /// Flow contains a `Drill` or `Trail` step (Fase 19 PIX). PIX
360    /// trace state is captured on the synchronous path.
361    PixPresent,
362    /// Flow contains an `IRFlowNode` variant that 33.x.b does not
363    /// yet model (Conditional / ForIn / Par / Probe / Reason /
364    /// ShieldApply / etc.). 33.x.b ships the canonical
365    /// `step S { ask: "..." [apply: tool] }` shape only; subsequent
366    /// 33.x followups extend coverage per founder-sequenced
367    /// sub-fases. The legacy synchronous path keeps working for
368    /// these flows — there is no functional regression.
369    UnsupportedNode {
370        /// Stable kind discriminant — e.g. `"conditional"`,
371        /// `"for_in"`, `"reason"`. Surfaces in audit row + future
372        /// 33.x.g warning emission.
373        kind: &'static str,
374    },
375}
376
377impl PlanFallback {
378    /// Stable slug for diagnostic emission. Used by audit row +
379    /// future 33.x.g warning surface.
380    pub fn slug(&self) -> &'static str {
381        match self {
382            Self::AnchorConstraintsPresent => "anchor_constraints",
383            Self::LambdaApplyPresent => "lambda_apply",
384            Self::LetBindingPresent => "let_binding",
385            Self::UseToolPresent => "use_tool",
386            Self::HibernatePresent => "hibernate",
387            Self::PixPresent => "pix",
388            Self::UnsupportedNode { .. } => "unsupported_node",
389        }
390    }
391}
392
393// ────────────────────────────────────────────────────────────────────
394//  Plan builder
395// ────────────────────────────────────────────────────────────────────
396
397/// Build a streaming execution plan from `.axon` source.
398///
399/// Pipeline: lex → parse → type-check → IR generation → walk the
400/// IR's runs for `flow_name` → emit one [`StreamingStep`] per step.
401///
402/// §Fase 33.z.e — Returns `Err(PlanError::*)` only for hard compile
403/// failures (Parse / TypeCheck / IrGeneration / FlowNotFound). The
404/// 33.y.l `LegacyOrchestrationRequired` variant has been DELETED;
405/// every IRFlowNode variant the planner produces is dispatchable
406/// via `flow_dispatcher::dispatch_node`.
407pub fn build_streaming_plan(
408    source: &str,
409    source_file: &str,
410    flow_name: &str,
411    backend_name: &str,
412) -> Result<StreamingExecutionPlan, PlanError> {
413    // §33.x.c — delegated to the unified `compile_source_to_ir`
414    // helper. Both the AST and IR forms are returned so
415    // `build_plan_from_ir` can resolve effect rows from the AST
416    // without re-walking the parser pipeline.
417    let (program, ir) = compile_source_to_ir(source, source_file)?;
418    build_plan_from_ir(&ir, &program, flow_name, backend_name)
419}
420
421/// Build a plan from an already-typed IR. Useful for tests that
422/// drive the planner with hand-constructed IR (no source parse).
423///
424/// §Fase 33.z.e — the `unsupported_feature_reason` rejection step
425/// has been DELETED in lockstep with `PlanError::LegacyOrchestrationRequired`
426/// + `run_streaming_legacy_path`. The per-IRFlowNode dispatcher
427/// (Fase 33.y 45/45) handles every IRFlowNode variant; the planner
428/// no longer needs to gate against a closed-deferred-catalog. Any
429/// shape the planner could compile pre-33.z.e is dispatchable
430/// post-33.z.e via `flow_dispatcher::dispatch_node`.
431pub fn build_plan_from_ir(
432    ir: &IRProgram,
433    program: &crate::ast::Program,
434    flow_name: &str,
435    backend_name: &str,
436) -> Result<StreamingExecutionPlan, PlanError> {
437    // 5. Locate the flow on the IR side. §33.x.c uses the public
438    //    `find_ir_flow_by_name` helper so the diagnostic message
439    //    shape is shared across callers.
440    let flow = find_ir_flow_by_name(ir, flow_name)?;
441
442    // 5b. Locate the same flow on the AST side (for effect-row
443    //     resolution via `resolve_stream_effect_for_step`, which
444    //     takes `&FlowDefinition`).
445    let ast_flow = program
446        .declarations
447        .iter()
448        .find_map(|d| match d {
449            crate::ast::Declaration::Flow(f) if f.name == flow_name => Some(f),
450            _ => None,
451        })
452        .ok_or_else(|| PlanError::FlowNotFound {
453            flow_name: flow_name.to_string(),
454            available: ir.flows.iter().map(|f| f.name.clone()).collect(),
455        })?;
456
457    // §Fase 33.z.e — `unsupported_feature_reason` gate retired.
458    // The dispatcher path handles every IRFlowNode variant; no
459    // pre-flight rejection needed. The `StreamingExecutionPlan`
460    // produced below carries only canonical Step variants by
461    // design (other shapes route through the dispatcher's
462    // per-variant handlers directly via `flow.steps`); the plan
463    // structure stays as the legacy carrier for backend resolution
464    // + system-prompt composition + per-step effect policy
465    // resolution.
466
467    // 7. System prompt — §33.x.c uses the public composer with
468    //    `backend_tag: None`. The streaming wire's
469    //    `axon.complete.backend` field already carries the backend
470    //    name so the prompt suffix is redundant on this path
471    //    (avoids hidden-context drift between sync + async).
472    let system_prompt = compose_system_prompt_public(flow, ir, None);
473
474    // 8. Per-step plan. Only `IRFlowNode::Step` variants surface in
475    //    `StreamingExecutionPlan.steps` (other variants are handled
476    //    by the dispatcher's per-variant handlers in
477    //    `streaming_via_dispatcher` directly via `flow.steps`).
478    let mut steps = Vec::new();
479    for node in &flow.steps {
480        if let crate::ir_nodes::IRFlowNode::Step(ir_step) = node {
481            let max_tokens = ir
482                .contexts
483                .first()
484                .and_then(|c| c.max_tokens)
485                .map(|n| n as u32);
486            let temperature = ir.contexts.first().and_then(|c| c.temperature);
487            let effect_policy =
488                resolve_stream_effect_for_step(&ir_step.name, ast_flow, program);
489
490            steps.push(StreamingStep {
491                step_name: ir_step.name.clone(),
492                user_prompt: ir_step.ask.clone(),
493                max_tokens,
494                temperature,
495                effect_policy,
496            });
497        }
498    }
499
500    Ok(StreamingExecutionPlan {
501        flow_name: flow_name.to_string(),
502        backend_name: backend_name.to_string(),
503        system_prompt,
504        steps,
505    })
506}
507
508// §33.x.c — The private `ir_flow_node_kind` was lifted to the
509// module's public surface above. The single source of truth for
510// the kind-string mapping lives at `flow_plan::ir_flow_node_kind`
511// and is drift-gated by `tests::ir_flow_node_kind_runner_drift`.
512
513// §33.x.c — `compose_system_prompt` was lifted to public
514// `compose_system_prompt_public` above. It accepts an optional
515// `backend_tag` so the sync CLI path can opt into the canonical
516// `[Backend: foo | AXON x.y.z]` suffix during 33.x.i migration.
517
518// ────────────────────────────────────────────────────────────────────
519//  Tests
520// ────────────────────────────────────────────────────────────────────
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    fn parse_simple_stream_source() -> &'static str {
527        // Canonical Kivi-shape: single step, output Stream<Token>,
528        // explicit transport: sse. No anchors, no apply, no let.
529        "flow Chat() -> Unit {\n\
530            step Generate { ask: \"hi\" output: Stream<Token> }\n\
531         }\n\
532         axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
533    }
534
535    fn parse_stream_with_effect_source() -> &'static str {
536        // Disjunct (b): tool with stream effect, step apply.
537        "tool chat_token_stream { description: \"stream\" effects: <stream:drop_oldest> }\n\
538         flow Chat() -> Unit {\n\
539            step Generate { ask: \"hi\" apply: chat_token_stream }\n\
540         }\n\
541         axonendpoint ChatEndpoint { method: POST path: \"/chat\" execute: Chat transport: sse }"
542    }
543
544    #[test]
545    fn build_plan_for_simple_stream_flow_returns_one_step() {
546        let plan = build_streaming_plan(
547            parse_simple_stream_source(),
548            "test.axon",
549            "Chat",
550            "stub",
551        )
552        .expect("simple stream flow plans cleanly");
553
554        assert_eq!(plan.flow_name, "Chat");
555        assert_eq!(plan.backend_name, "stub");
556        assert_eq!(plan.steps.len(), 1);
557        assert_eq!(plan.steps[0].step_name, "Generate");
558        assert_eq!(plan.steps[0].user_prompt, "hi");
559        assert_eq!(plan.steps[0].effect_policy, None, "no tool effect → no policy");
560    }
561
562    #[test]
563    fn build_plan_with_drop_oldest_effect_pre_resolves_policy() {
564        let plan = build_streaming_plan(
565            parse_stream_with_effect_source(),
566            "test.axon",
567            "Chat",
568            "stub",
569        )
570        .expect("stream-effect flow plans cleanly");
571
572        assert_eq!(plan.steps.len(), 1);
573        assert_eq!(plan.steps[0].effect_policy, Some(BackpressurePolicy::DropOldest));
574    }
575
576    #[test]
577    fn build_plan_unknown_flow_returns_flow_not_found() {
578        let err = build_streaming_plan(
579            parse_simple_stream_source(),
580            "test.axon",
581            "NonexistentFlow",
582            "stub",
583        )
584        .expect_err("unknown flow rejected");
585
586        match err {
587            PlanError::FlowNotFound { flow_name, available } => {
588                assert_eq!(flow_name, "NonexistentFlow");
589                assert_eq!(available, vec!["Chat".to_string()]);
590            }
591            other => panic!("expected FlowNotFound, got {other:?}"),
592        }
593    }
594
595    #[test]
596    fn build_plan_unparseable_source_returns_parse_error() {
597        let err = build_streaming_plan(
598            "not a valid axon source",
599            "test.axon",
600            "Chat",
601            "stub",
602        )
603        .expect_err("garbage rejected");
604
605        assert!(matches!(err, PlanError::Parse(_)));
606    }
607
608    #[test]
609    fn build_plan_multi_step_flow_preserves_order() {
610        let src = "flow MultiStep() -> Unit {\n\
611                     step First { ask: \"one\" }\n\
612                     step Second { ask: \"two\" }\n\
613                     step Third { ask: \"three\" }\n\
614                   }\n\
615                   axonendpoint E { method: POST path: \"/m\" execute: MultiStep transport: sse }";
616        let plan = build_streaming_plan(src, "test.axon", "MultiStep", "stub").unwrap();
617        assert_eq!(plan.steps.len(), 3);
618        assert_eq!(plan.steps[0].step_name, "First");
619        assert_eq!(plan.steps[1].step_name, "Second");
620        assert_eq!(plan.steps[2].step_name, "Third");
621        assert_eq!(plan.steps[0].user_prompt, "one");
622        assert_eq!(plan.steps[1].user_prompt, "two");
623        assert_eq!(plan.steps[2].user_prompt, "three");
624    }
625
626    #[test]
627    fn plan_fallback_slugs_are_stable_strings() {
628        // Each variant has a documented slug. Drift in this match
629        // surfaces in 33.x.g warning emission.
630        assert_eq!(PlanFallback::AnchorConstraintsPresent.slug(), "anchor_constraints");
631        assert_eq!(PlanFallback::LambdaApplyPresent.slug(), "lambda_apply");
632        assert_eq!(PlanFallback::LetBindingPresent.slug(), "let_binding");
633        assert_eq!(PlanFallback::UseToolPresent.slug(), "use_tool");
634        assert_eq!(PlanFallback::HibernatePresent.slug(), "hibernate");
635        assert_eq!(PlanFallback::PixPresent.slug(), "pix");
636    }
637
638    #[test]
639    fn plan_is_deterministic_for_same_source() {
640        let plan1 = build_streaming_plan(
641            parse_simple_stream_source(),
642            "test.axon",
643            "Chat",
644            "stub",
645        )
646        .unwrap();
647        let plan2 = build_streaming_plan(
648            parse_simple_stream_source(),
649            "test.axon",
650            "Chat",
651            "stub",
652        )
653        .unwrap();
654        assert_eq!(plan1, plan2, "plan builder is pure + deterministic");
655    }
656
657    #[test]
658    fn streaming_step_eq_is_field_wise() {
659        let a = StreamingStep {
660            step_name: "X".into(),
661            user_prompt: "y".into(),
662            max_tokens: Some(100),
663            temperature: Some(0.7),
664            effect_policy: Some(BackpressurePolicy::DropOldest),
665        };
666        let b = a.clone();
667        assert_eq!(a, b);
668    }
669
670    #[test]
671    fn streaming_plan_includes_backend_name() {
672        let plan = build_streaming_plan(
673            parse_simple_stream_source(),
674            "test.axon",
675            "Chat",
676            "anthropic",
677        )
678        .unwrap();
679        assert_eq!(plan.backend_name, "anthropic");
680    }
681
682    #[test]
683    fn empty_flow_body_produces_empty_step_list() {
684        // Pathological but parseable case: empty flow body.
685        let src = "flow Empty() -> Unit {\n\
686                   }\n\
687                   axonendpoint E { method: POST path: \"/e\" execute: Empty transport: sse }";
688        let plan = build_streaming_plan(src, "test.axon", "Empty", "stub").unwrap();
689        assert!(plan.steps.is_empty());
690        assert_eq!(plan.flow_name, "Empty");
691    }
692
693    // ── §33.x.c — Public helper tests ───────────────────────────────
694
695    #[test]
696    fn compile_source_to_ir_returns_program_and_ir_for_valid_source() {
697        let (program, ir) =
698            compile_source_to_ir(parse_simple_stream_source(), "test.axon").unwrap();
699        // Program has at least one Flow declaration.
700        let flow_count = program
701            .declarations
702            .iter()
703            .filter(|d| matches!(d, crate::ast::Declaration::Flow(_)))
704            .count();
705        assert_eq!(flow_count, 1);
706        // IR mirrors with one flow.
707        assert_eq!(ir.flows.len(), 1);
708        assert_eq!(ir.flows[0].name, "Chat");
709    }
710
711    #[test]
712    fn compile_source_to_ir_is_pure_deterministic() {
713        let src = parse_simple_stream_source();
714        let (p1, ir1) = compile_source_to_ir(src, "test.axon").unwrap();
715        let (p2, ir2) = compile_source_to_ir(src, "test.axon").unwrap();
716        assert_eq!(p1.declarations.len(), p2.declarations.len());
717        assert_eq!(ir1.flows.len(), ir2.flows.len());
718        assert_eq!(ir1.flows[0].name, ir2.flows[0].name);
719    }
720
721    #[test]
722    fn compile_source_to_ir_surfaces_parse_error() {
723        let err = compile_source_to_ir("not axon source at all", "test.axon").unwrap_err();
724        assert!(matches!(err, PlanError::Parse(_)));
725    }
726
727    #[test]
728    fn compile_source_to_ir_surfaces_type_check_error() {
729        // §Fase 28 — `axonendpoint method: YEET` is rejected at
730        // parse time. Use a different shape that parses cleanly
731        // but fails type-check: undefined flow reference.
732        let src = "axonendpoint Bad { method: POST path: \"/x\" execute: NonexistentFlow }";
733        let err = compile_source_to_ir(src, "test.axon").unwrap_err();
734        // Either parser rejected it (post-Fase 28 hardening may
735        // catch undefined references at parse time) or the type
736        // checker did — both are valid PlanError variants.
737        assert!(matches!(err, PlanError::Parse(_) | PlanError::TypeCheck(_)));
738    }
739
740    #[test]
741    fn find_ir_flow_by_name_returns_flow_when_present() {
742        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
743        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
744        assert_eq!(flow.name, "Chat");
745    }
746
747    #[test]
748    fn find_ir_flow_by_name_returns_flow_not_found_with_available_list() {
749        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
750        let err = find_ir_flow_by_name(&ir, "Nope").unwrap_err();
751        match err {
752            PlanError::FlowNotFound { flow_name, available } => {
753                assert_eq!(flow_name, "Nope");
754                assert_eq!(available, vec!["Chat".to_string()]);
755            }
756            other => panic!("expected FlowNotFound, got {other:?}"),
757        }
758    }
759
760    #[test]
761    fn ir_flow_node_kind_step_returns_step() {
762        use crate::ir_nodes::{IRFlowNode, IRStep};
763        let n = IRFlowNode::Step(IRStep {
764            node_type: "Step",
765            source_line: 1,
766            source_column: 1,
767            name: "S".into(),
768            persona_ref: String::new(),
769            given: String::new(),
770            ask: "hi".into(),
771            use_tool: None,
772            probe: None,
773            reason: None,
774            weave: None,
775            output_type: String::new(),
776            confidence_floor: None,
777            navigate_ref: String::new(),
778            apply_ref: String::new(),
779            body: vec![],
780        });
781        assert_eq!(ir_flow_node_kind(&n), "step");
782    }
783
784    #[test]
785    fn ir_flow_node_kind_distinct_slugs_for_every_variant() {
786        // We can't enumerate every variant with fresh instances
787        // (some have non-trivial payloads), but we can pin the slug
788        // set by enumerating distinct slugs that the function CAN
789        // produce. The match in `ir_flow_node_kind` is exhaustive
790        // (compiler-enforced); this test pins the slug values that
791        // downstream callers (e.g. audit logs, 33.x.g warning
792        // catalog) depend on.
793        let expected_slugs: Vec<&str> = vec![
794            "step",
795            "probe",
796            "reason",
797            "validate",
798            "refine",
799            "weave",
800            "use_tool",
801            "remember",
802            "recall",
803            "conditional",
804            "for_in",
805            "let",
806            "return",
807            "break",
808            "continue",
809            "lambda_data_apply",
810            "par",
811            "hibernate",
812            "deliberate",
813            "consensus",
814            "forge",
815            "focus",
816            "associate",
817            "aggregate",
818            "explore",
819            "ingest",
820            "shield_apply",
821            "stream_block",
822            "navigate",
823            "drill",
824            "trail",
825            "corroborate",
826            "ots_apply",
827            "mandate_apply",
828            "compute_apply",
829            "listen",
830            "daemon_step",
831            "emit",
832            "publish",
833            "discover",
834            "persist",
835            "retrieve",
836            "mutate",
837            "purge",
838            "transact",
839        ];
840        // 45 closed-catalog slugs. Adding a new IRFlowNode variant
841        // requires updating `ir_flow_node_kind` (compiler enforces)
842        // AND this list (intentional — keeps the slug catalog
843        // pinned for audit/warning callers).
844        assert_eq!(expected_slugs.len(), 45);
845        // Every slug is a valid lowercase identifier (snake_case).
846        for slug in &expected_slugs {
847            assert!(
848                slug.chars()
849                    .all(|c| c.is_ascii_lowercase() || c == '_' || c.is_ascii_digit()),
850                "slug {slug:?} must be lowercase snake_case for stable audit emission"
851            );
852        }
853        // Every slug is unique.
854        let mut sorted = expected_slugs.clone();
855        sorted.sort();
856        sorted.dedup();
857        assert_eq!(sorted.len(), expected_slugs.len(), "slug catalog has duplicates");
858    }
859
860    #[test]
861    fn ir_flow_node_kind_runner_drift() {
862        // §33.x.c drift gate: `flow_plan::ir_flow_node_kind` is the
863        // single source of truth for IRFlowNode kind slugs. The
864        // legacy `runner.rs::extract_step_info` produces a
865        // `step_type` string for each variant that downstream
866        // consumers (audit row, stub trace, IR debugger) parse.
867        // Drift between the two surfaces here would make the audit
868        // row inconsistent across sync + async paths.
869        //
870        // For 33.x.c the drift gate operates by-construction: the
871        // public helper here is the canonical surface; future
872        // migrations of `extract_step_info` to call into this
873        // helper are tracked under 33.x.i. Until then, this test
874        // pins the kind catalog so accidental drift on either side
875        // surfaces in CI.
876        use crate::ir_nodes::{IRFlowNode, IRStep};
877        let step = IRFlowNode::Step(IRStep {
878            node_type: "Step",
879            source_line: 1,
880            source_column: 1,
881            name: "T".into(),
882            persona_ref: String::new(),
883            given: String::new(),
884            ask: String::new(),
885            use_tool: None,
886            probe: None,
887            reason: None,
888            weave: None,
889            output_type: String::new(),
890            confidence_floor: None,
891            navigate_ref: String::new(),
892            apply_ref: String::new(),
893            body: vec![],
894        });
895        // Pinned drift assertion: the canonical "step" kind is
896        // what `runner.rs::extract_step_info` produces for
897        // `IRFlowNode::Step`, per file inspection at 33.x.c
898        // landing. Future runner refactor MUST keep this
899        // invariant or update this test (deliberate change).
900        assert_eq!(ir_flow_node_kind(&step), "step");
901    }
902
903    #[test]
904    fn compose_system_prompt_public_includes_flow_name() {
905        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
906        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
907        let prompt = compose_system_prompt_public(flow, &ir, None);
908        assert!(prompt.contains("# Flow: Chat"));
909    }
910
911    #[test]
912    fn compose_system_prompt_public_omits_backend_tag_when_none() {
913        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
914        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
915        let prompt = compose_system_prompt_public(flow, &ir, None);
916        assert!(!prompt.contains("[Backend:"));
917        assert!(!prompt.contains("AXON"));
918    }
919
920    #[test]
921    fn compose_system_prompt_public_includes_backend_tag_when_set() {
922        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
923        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
924        let prompt = compose_system_prompt_public(flow, &ir, Some("anthropic"));
925        assert!(prompt.contains("[Backend: anthropic | AXON "));
926    }
927
928    #[test]
929    fn compose_system_prompt_public_includes_persona_when_present() {
930        // `domain` is a list per AST grammar; `tone` is a string.
931        // `tone` is a closed-catalog enum (see type_checker valid tones).
932        let src = "persona Doctor { domain: [\"medicine\"] tone: \"formal\" }\n\
933                   context Clinic { depth: \"deep\" memory_scope: \"session\" }\n\
934                   flow Chat() -> Unit {\n\
935                       step Generate { ask: \"hi\" output: Stream<Token> }\n\
936                   }\n\
937                   axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
938        let (_p, ir) = compile_source_to_ir(src, "t.axon").unwrap();
939        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
940        let prompt = compose_system_prompt_public(flow, &ir, None);
941        assert!(prompt.contains("# Persona: Doctor"));
942        assert!(prompt.contains("Domain expertise: medicine"));
943        assert!(prompt.contains("# Context: Clinic"));
944        assert!(prompt.contains("Analysis depth: deep"));
945    }
946
947    #[test]
948    fn compose_system_prompt_public_is_pure_deterministic() {
949        let (_p, ir) = compile_source_to_ir(parse_simple_stream_source(), "t.axon").unwrap();
950        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
951        let p1 = compose_system_prompt_public(flow, &ir, Some("openai"));
952        let p2 = compose_system_prompt_public(flow, &ir, Some("openai"));
953        assert_eq!(p1, p2);
954    }
955
956    #[test]
957    fn build_streaming_plan_uses_compile_source_to_ir_internally() {
958        // Cross-check: plan built via build_streaming_plan has the
959        // same system_prompt as one built by manually invoking
960        // compile_source_to_ir + compose_system_prompt_public.
961        let src = parse_simple_stream_source();
962        let plan = build_streaming_plan(src, "t.axon", "Chat", "stub").unwrap();
963        let (_program, ir) = compile_source_to_ir(src, "t.axon").unwrap();
964        let flow = find_ir_flow_by_name(&ir, "Chat").unwrap();
965        let expected = compose_system_prompt_public(flow, &ir, None);
966        assert_eq!(plan.system_prompt, expected);
967    }
968}