Skip to main content

axon/
runner.rs

1//! `axon run` native implementation — stub + real execution.
2//!
3//! Pipeline: Source → Lex → Parse → Type-check → IR → Execution Plan → Execute
4//!
5//! Execution modes:
6//!   - stub (default): prints execution plan without API calls
7//!   - real: sends each step to LLM backend (Anthropic Messages API)
8//!
9//! Exit codes:
10//!   0 — success
11//!   1 — compilation or execution error
12//!   2 — I/O or configuration error
13//!
14//! # §Fase 33.x.i — `crate::backend` deprecation
15//!
16//! This file is one of four callers of the deprecated synchronous
17//! `crate::backend` mono-file (see `backend.rs` module docs).
18//! The `#![allow(deprecated)]` below silences the deprecation
19//! warnings on this file's call sites while the deeper async
20//! migration progresses under followup sub-fase Fase 33.x.i.2
21//! (sync→async migration of the 4 callers, separate cycle).
22
23#![allow(deprecated)]
24
25use std::io::{self, IsTerminal};
26use std::path::Path;
27
28use crate::anchor_checker;
29use crate::backend;
30use crate::conversation::{ConversationHistory, ContextWindow};
31use crate::exec_context::ExecContext;
32use crate::hooks::HookManager;
33use crate::ir_generator::IRGenerator;
34use crate::ir_nodes::*;
35use crate::lexer::{Lexer, LexerError};
36use crate::output::{OutputFormat, ReportBuilder, StepReport};
37use crate::parallel;
38use crate::plan_export::{self, PlanBuilder, PlanUnit, PlanStep, PlanTools, PlanToolEntry, PlanDependencies, UnresolvedRef};
39use crate::parser::{ParseError, Parser};
40use crate::session_store::SessionStore;
41use crate::step_deps;
42use crate::store::epistemic;
43use crate::store::filter::SqlValue;
44use crate::store::row_stream;
45use crate::store::postgres_backend::{PostgresStoreBackend, StoreError};
46use crate::store::registry::{StoreBackendKind, StoreRegistry};
47use crate::tool_registry::ToolRegistry;
48use crate::tool_validator::{self, EffectTracker};
49use crate::type_checker::TypeChecker;
50
51/// Single source of truth for the AXON version string.
52/// Resolved at compile time from `[package].version` in `Cargo.toml`,
53/// so a single bump there propagates to every caller. Eliminates the
54/// drift that previously had `AXON_VERSION` redeclared as a string
55/// literal in five files (audit_cli.rs, compiler.rs, main.rs, repl.rs,
56/// runner.rs) — each carrying a different stale value.
57pub const AXON_VERSION: &str = env!("CARGO_PKG_VERSION");
58
59// ── ANSI helpers ─────────────────────────────────────────────────────────────
60
61fn c(text: &str, code: &str, use_color: bool) -> String {
62    if use_color {
63        format!("{code}{text}\x1b[0m")
64    } else {
65        text.to_string()
66    }
67}
68
69// ── Helpers ─────────────────────────────────────────────────────────────────
70
71/// Truncate a string for display, appending "..." if over max_len.
72fn truncate_output(s: &str, max_len: usize) -> String {
73    let single_line = s.replace('\n', " ");
74    if single_line.len() <= max_len {
75        single_line
76    } else {
77        format!("{}...", &single_line[..max_len])
78    }
79}
80
81// ── Compiled execution plan ─────────────────────────────────────────────────
82
83/// A compiled execution unit — one per `run` statement.
84#[derive(Debug, serde::Serialize)]
85struct ExecutionUnit {
86    flow_name: String,
87    persona_name: String,
88    context_name: String,
89    system_prompt: String,
90    steps: Vec<CompiledStep>,
91    anchor_instructions: Vec<String>,
92    effort: String,
93    #[serde(skip)]
94    resolved_anchors: Vec<IRAnchor>,
95    /// §Fase 37.b (D1) — the Request Binding Contract bindings:
96    /// `(flow parameter name, value)` pairs resolved from the HTTP
97    /// request body. Seeded into the unit's `ExecContext` before the
98    /// step walk so `${param}` interpolates. Empty for a caller with
99    /// no request body (CLI / batch / pipeline) — D5 backwards-compat.
100    #[serde(skip)]
101    param_bindings: Vec<(String, String)>,
102}
103
104/// A compiled step ready for LLM dispatch.
105#[derive(Debug, serde::Serialize)]
106struct CompiledStep {
107    step_name: String,
108    step_type: String,
109    system_prompt: String,
110    user_prompt: String,
111    /// For `use_tool` steps: the raw argument expression.
112    #[serde(skip_serializing_if = "Option::is_none")]
113    tool_argument: Option<String>,
114    /// For memory steps: the expression/query/target.
115    #[serde(skip_serializing_if = "Option::is_none")]
116    memory_expression: Option<String>,
117    /// Fase 15.c — for `lambda_data_apply` steps: the full payload
118    /// (spec snapshot + target + output_type) so the runner can build
119    /// ψ = ⟨T, V, E⟩ without reaching back into the IR.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    lambda_apply_payload: Option<crate::lambda_runtime::LambdaApplyPayload>,
122    /// Fase 17.c — for `let_binding` steps: the payload (target,
123    /// value, value_kind) so the stub executor can perform the
124    /// SSA binding without re-traversing the IR.
125    #[serde(skip_serializing_if = "Option::is_none")]
126    let_payload: Option<LetPayload>,
127    /// §Fase 35.o / 35.p — for `persist` (INSERT columns) and `mutate`
128    /// (UPDATE SET assignments) steps: the declared `{ col: value }`
129    /// block. `Some` ⇒ the SQL row is built from exactly these columns
130    /// (interpolated); `None` ⇒ no block was written and the runtime
131    /// falls back to the flow's user bindings (v1.31.0).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    store_fields: Option<Vec<(String, String)>>,
134    /// §Fase 58.e — for a `use Tool(k = v, …)` dispatch: the bound keyword
135    /// args `(name, raw value)`. Non-empty ⇒ the runtime assembles a STRUCTURED
136    /// JSON request body (`{"query":"…","max_results":5}`) instead of the flat
137    /// `{"input": …}`. Empty for the legacy single-`on <arg>` form (D5).
138    /// §Fase 60 — each entry is `(name, raw value, value_kind)`; `value_kind`
139    /// (`"literal"` / `"reference"`) drives runtime resolution: a reference is a
140    /// binding lookup, a literal keeps `${…}` interpolation.
141    #[serde(skip_serializing_if = "Vec::is_empty")]
142    tool_named_args: Vec<(String, String, String)>,
143    /// §Fase 58.e — the called tool's declared `(param, type)` schema, resolved
144    /// from `ir.tools` at build time so the runtime coerces each arg value to
145    /// its DECLARED JSON type (a `String` param stays a string even when its
146    /// value is all-digits) without reaching back into the IR.
147    #[serde(skip_serializing_if = "Vec::is_empty")]
148    tool_param_types: Vec<(String, String)>,
149}
150
151/// Fase 17.c — payload carried inside a CompiledStep for `let X = value`
152/// SSA bindings. `value_kind` ∈ {"literal", "reference", "expression"}
153/// disambiguates a quoted literal from a dotted-identifier reference
154/// resolved at runtime.
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
156pub struct LetPayload {
157    pub target: String,
158    pub value: String,
159    pub value_kind: String,
160}
161
162/// Trace event for execution recording.
163#[derive(Debug, serde::Serialize)]
164struct TraceEvent {
165    event: String,
166    unit: String,
167    step: String,
168    detail: String,
169}
170
171// ── Build execution plan from IR ────────────────────────────────────────────
172
173fn build_execution_plan(ir: &IRProgram, backend: &str) -> Vec<ExecutionUnit> {
174    let mut units = Vec::new();
175
176    for run in &ir.runs {
177        let system_prompt = build_system_prompt(run, backend);
178        let anchor_instructions = build_anchor_instructions(run);
179        let steps = build_compiled_steps(run, ir);
180
181        units.push(ExecutionUnit {
182            flow_name: run.flow_name.clone(),
183            persona_name: run.persona_name.clone(),
184            context_name: run.context_name.clone(),
185            system_prompt,
186            steps,
187            anchor_instructions,
188            effort: run.effort.clone(),
189            resolved_anchors: run.resolved_anchors.clone(),
190            // §Fase 37.b — the CLI / `run`-statement plan builder has
191            // no HTTP request body; the binding is empty (D5).
192            param_bindings: Vec::new(),
193        });
194    }
195
196    units
197}
198
199fn build_system_prompt(run: &IRRun, backend: &str) -> String {
200    let mut parts: Vec<String> = Vec::new();
201
202    // Persona block
203    if let Some(ref persona) = run.resolved_persona {
204        parts.push(format!("# Persona: {}", persona.name));
205        if !persona.domain.is_empty() {
206            parts.push(format!("Domain expertise: {}", persona.domain.join(", ")));
207        }
208        if !persona.tone.is_empty() {
209            parts.push(format!("Communication tone: {}", persona.tone));
210        }
211        if !persona.language.is_empty() {
212            parts.push(format!("Language: {}", persona.language));
213        }
214        if let Some(ct) = persona.confidence_threshold {
215            parts.push(format!("Confidence threshold: {ct:.2}"));
216        }
217        if persona.cite_sources == Some(true) {
218            parts.push("Always cite sources.".to_string());
219        }
220        if !persona.refuse_if.is_empty() {
221            parts.push(format!("Refuse if: {}", persona.refuse_if.join(", ")));
222        }
223    }
224
225    // Context block
226    if let Some(ref ctx) = run.resolved_context {
227        parts.push(format!("\n# Context: {}", ctx.name));
228        if !ctx.depth.is_empty() {
229            parts.push(format!("Analysis depth: {}", ctx.depth));
230        }
231        if !ctx.memory_scope.is_empty() {
232            parts.push(format!("Memory scope: {}", ctx.memory_scope));
233        }
234        if let Some(t) = ctx.temperature {
235            parts.push(format!("Temperature: {t:.1}"));
236        }
237        if let Some(mt) = ctx.max_tokens {
238            parts.push(format!("Max tokens: {mt}"));
239        }
240    }
241
242    // Anchor enforcement
243    if !run.resolved_anchors.is_empty() {
244        parts.push("\n# Constraints (Anchors)".to_string());
245        for anchor in &run.resolved_anchors {
246            let mut constraint = format!("- {}: {}", anchor.name, anchor.require);
247            if let Some(cf) = anchor.confidence_floor {
248                constraint.push_str(&format!(" (confidence ≥ {cf:.2})"));
249            }
250            if !anchor.on_violation.is_empty() {
251                constraint.push_str(&format!(" [on_violation: {}]", anchor.on_violation));
252            }
253            parts.push(constraint);
254        }
255    }
256
257    // Backend tag
258    parts.push(format!("\n[Backend: {backend} | AXON {AXON_VERSION}]"));
259
260    parts.join("\n")
261}
262
263fn build_anchor_instructions(run: &IRRun) -> Vec<String> {
264    run.resolved_anchors
265        .iter()
266        .map(|a| {
267            let mut s = format!("{}: {}", a.name, a.require);
268            if let Some(cf) = a.confidence_floor {
269                s.push_str(&format!(" (≥{cf:.2})"));
270            }
271            s
272        })
273        .collect()
274}
275
276fn build_compiled_steps(run: &IRRun, ir: &IRProgram) -> Vec<CompiledStep> {
277    let flow = match &run.resolved_flow {
278        Some(f) => f,
279        None => return Vec::new(),
280    };
281
282    let mut steps = Vec::new();
283    for node in &flow.steps {
284        let (step_name, step_type, action) = extract_step_info(node);
285        let system_prompt = format!(
286            "You are executing step '{}' of flow '{}'.",
287            step_name, flow.name
288        );
289        let user_prompt = if action.is_empty() {
290            format!("Execute step: {step_name}")
291        } else {
292            action
293        };
294
295        // Extract tool argument for use_tool steps
296        let tool_argument = match node {
297            IRFlowNode::UseTool(s) => Some(s.argument.clone()),
298            _ => None,
299        };
300
301        // §Fase 58.e — the structured keyword args of a `use Tool(k = v, …)`
302        // dispatch, plus the called tool's declared `(param, type)` schema
303        // (resolved once from `ir.tools`) so the runtime coerces each value to
304        // its declared JSON type. Both empty for the legacy single-arg form.
305        let (tool_named_args, tool_param_types) = match node {
306            IRFlowNode::UseTool(s) => {
307                let named: Vec<(String, String, String)> = s
308                    .named_args
309                    .iter()
310                    .map(|a| (a.name.clone(), a.value.clone(), a.value_kind.clone()))
311                    .collect();
312                let types: Vec<(String, String)> = ir
313                    .tools
314                    .iter()
315                    .find(|t| t.name == s.tool_name)
316                    .map(|t| {
317                        t.parameters
318                            .iter()
319                            .map(|p| (p.name.clone(), p.type_name.clone()))
320                            .collect()
321                    })
322                    .unwrap_or_default();
323                (named, types)
324            }
325            _ => (Vec::new(), Vec::new()),
326        };
327
328        // Extract memory expression for remember/recall/persist/retrieve/mutate/purge
329        let memory_expression = match node {
330            IRFlowNode::Remember(s) => Some(s.expression.clone()),
331            IRFlowNode::Recall(s) => Some(s.query.clone()),
332            IRFlowNode::Persist(s) => Some(s.store_name.clone()),
333            IRFlowNode::Retrieve(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
334            IRFlowNode::Mutate(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
335            IRFlowNode::Purge(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
336            _ => None,
337        };
338
339        // Fase 15.c — materialise the lambda apply payload by looking
340        // up the spec snapshot from ir.lambda_data_specs. The runner
341        // needs the full snapshot at execute-time to construct ψ;
342        // carrying it in the CompiledStep keeps the executor free of
343        // IR back-references (mirrors Python's BaseBackend pattern).
344        let lambda_apply_payload = match node {
345            IRFlowNode::LambdaDataApply(s) => {
346                let snap = ir
347                    .lambda_data_specs
348                    .iter()
349                    .find(|spec| spec.name == s.lambda_data_name)
350                    .map(|spec| crate::lambda_runtime::SpecSnapshot {
351                        name: spec.name.clone(),
352                        ontology: spec.ontology.clone(),
353                        certainty: spec.certainty,
354                        temporal_frame_start: spec.temporal_frame_start.clone(),
355                        temporal_frame_end: spec.temporal_frame_end.clone(),
356                        provenance: spec.provenance.clone(),
357                        derivation: spec.derivation.clone(),
358                    })
359                    .unwrap_or_default();
360                Some(crate::lambda_runtime::LambdaApplyPayload {
361                    lambda_data_name: s.lambda_data_name.clone(),
362                    target: s.target.clone(),
363                    output_type: s.output_type.clone(),
364                    spec_snapshot: snap,
365                })
366            }
367            _ => None,
368        };
369
370        // Fase 17.c — materialise the let payload from the IR Let
371        // node so the stub executor can bind without re-traversing
372        // the IR. Same pattern as the lambda apply payload above.
373        let let_payload = match node {
374            IRFlowNode::Let(s) => Some(LetPayload {
375                target: s.target.clone(),
376                value: s.value.clone(),
377                value_kind: s.value_kind.clone(),
378            }),
379            _ => None,
380        };
381
382        // §Fase 35.o / 35.p — materialise the declared `{ col: value }`
383        // block of a `persist` (INSERT columns) or `mutate` (UPDATE SET
384        // assignments) so `execute_sql_store_step` scopes the SQL row
385        // to exactly those columns. No block ⇒ `None` → the v1.31.0
386        // user-bindings fallback.
387        let store_fields = match node {
388            IRFlowNode::Persist(s) if !s.fields.is_empty() => {
389                Some(s.fields.clone())
390            }
391            IRFlowNode::Mutate(s) if !s.fields.is_empty() => {
392                Some(s.fields.clone())
393            }
394            _ => None,
395        };
396
397        steps.push(CompiledStep {
398            step_name,
399            step_type,
400            system_prompt,
401            user_prompt,
402            tool_argument,
403            memory_expression,
404            lambda_apply_payload,
405            let_payload,
406            store_fields,
407            tool_named_args,
408            tool_param_types,
409        });
410    }
411
412    steps
413}
414
415/// §Fase 58.e — assemble the STRUCTURED JSON request body for a `use Tool(k =
416/// v, …)` dispatch from its ALREADY-INTERPOLATED `(name, value)` args. Each
417/// value is coerced to its DECLARED parameter type so the tool backend receives
418/// `{"query":"Acme","max_results":5,"safe":true}` — not a flat
419/// `{"input": "…"}`. serde builds the object, so JSON escaping is correct.
420pub(crate) fn build_structured_tool_body(
421    interpolated_args: &[(String, String)],
422    param_types: &[(String, String)],
423) -> String {
424    let mut map = serde_json::Map::new();
425    for (name, value) in interpolated_args {
426        let declared = param_types
427            .iter()
428            .find(|(p, _)| p == name)
429            .map(|(_, t)| t.as_str());
430        map.insert(name.clone(), coerce_tool_arg_value(value, declared));
431    }
432    serde_json::Value::Object(map).to_string()
433}
434
435/// §Fase 58.e — coerce an interpolated arg value to JSON per its DECLARED type.
436/// `Int`/`Float`/`Bool` parse into the matching JSON scalar; a value that does
437/// not parse falls back to a JSON string (the §58.d type-checker already flags
438/// a literal mismatch at compile time — interpolated/runtime values are coerced
439/// leniently rather than dropped). `String`, custom domain types, lists, and
440/// unknown / schema-less (`None`) stay JSON strings — so a `String` parameter
441/// keeps its value verbatim even when it is all-digits.
442pub(crate) fn coerce_tool_arg_value(value: &str, declared_type: Option<&str>) -> serde_json::Value {
443    let base = declared_type.map(|t| t.trim_end_matches('?').split('<').next().unwrap_or(t));
444    match base {
445        Some("Int") => value
446            .parse::<i64>()
447            .map(|i| serde_json::Value::Number(i.into()))
448            .unwrap_or_else(|_| serde_json::Value::String(value.to_string())),
449        Some("Float") => value
450            .parse::<f64>()
451            .ok()
452            .and_then(serde_json::Number::from_f64)
453            .map(serde_json::Value::Number)
454            .unwrap_or_else(|| serde_json::Value::String(value.to_string())),
455        Some("Bool") => match value {
456            "true" => serde_json::Value::Bool(true),
457            "false" => serde_json::Value::Bool(false),
458            _ => serde_json::Value::String(value.to_string()),
459        },
460        _ => serde_json::Value::String(value.to_string()),
461    }
462}
463
464fn extract_step_info(node: &IRFlowNode) -> (String, String, String) {
465    match node {
466        IRFlowNode::Step(s) => (s.name.clone(), "step".to_string(), s.ask.clone()),
467        IRFlowNode::Probe(s) => (s.target.clone(), "probe".to_string(), format!("Probe: {}", s.target)),
468        IRFlowNode::Reason(s) => (s.target.clone(), "reason".to_string(), format!("Reason about: {}", s.target)),
469        IRFlowNode::Validate(s) => (s.target.clone(), "validate".to_string(), format!("Validate: {}", s.target)),
470        IRFlowNode::Refine(s) => (s.target.clone(), "refine".to_string(), format!("Refine: {}", s.target)),
471        IRFlowNode::Weave(s) => ("weave".to_string(), "weave".to_string(), format!("Weave {} sources into {}", s.sources.len(), s.target)),
472        IRFlowNode::UseTool(s) => (s.tool_name.clone(), "use_tool".to_string(), format!("Use tool: {}", s.tool_name)),
473        IRFlowNode::Remember(s) => (s.memory_target.clone(), "remember".to_string(), format!("Remember: {}", s.expression)),
474        IRFlowNode::Recall(s) => (s.memory_source.clone(), "recall".to_string(), format!("Recall: {}", s.query)),
475        IRFlowNode::Conditional(s) => (s.condition.clone(), "conditional".to_string(), format!("If: {}", s.condition)),
476        IRFlowNode::ForIn(s) => (s.variable.clone(), "for_in".to_string(), format!("For {} in {}", s.variable, s.iterable)),
477        IRFlowNode::Let(s) => (s.target.clone(), "let".to_string(), format!("Let {} = {}", s.target, s.value)),
478        IRFlowNode::Return(s) => ("return".to_string(), "return".to_string(), format!("Return: {}", s.value_expr)),
479        IRFlowNode::Par(_) => ("parallel".to_string(), "parallel".to_string(), "Parallel block".to_string()),
480        IRFlowNode::Hibernate(_) => ("hibernate".to_string(), "hibernate".to_string(), "Hibernate".to_string()),
481        IRFlowNode::Deliberate(_) => ("deliberate".to_string(), "deliberate".to_string(), "Deliberate block".to_string()),
482        IRFlowNode::Consensus(_) => ("consensus".to_string(), "consensus".to_string(), "Consensus block".to_string()),
483        IRFlowNode::Forge(_) => ("forge".to_string(), "forge".to_string(), "Forge block".to_string()),
484        IRFlowNode::Focus(s) => (s.expression.clone(), "focus".to_string(), format!("Focus: {}", s.expression)),
485        IRFlowNode::Associate(s) => (s.left.clone(), "associate".to_string(), format!("Associate: {} ↔ {}", s.left, s.right)),
486        IRFlowNode::Aggregate(s) => (s.target.clone(), "aggregate".to_string(), format!("Aggregate: {}", s.target)),
487        IRFlowNode::Explore(s) => (s.target.clone(), "explore".to_string(), format!("Explore: {}", s.target)),
488        IRFlowNode::Ingest(s) => (s.source.clone(), "ingest".to_string(), format!("Ingest: {}", s.source)),
489        IRFlowNode::ShieldApply(s) => (s.shield_name.clone(), "shield_apply".to_string(), format!("Apply shield: {}", s.shield_name)),
490        IRFlowNode::Stream(_) => ("stream".to_string(), "stream".to_string(), "Stream block".to_string()),
491        IRFlowNode::Navigate(s) => (s.pix_ref.clone(), "navigate".to_string(), format!("Navigate: {}", s.pix_ref)),
492        IRFlowNode::Drill(s) => (s.pix_ref.clone(), "drill".to_string(), format!("Drill: {} → {}", s.pix_ref, s.subtree_path)),
493        IRFlowNode::Trail(s) => (s.navigate_ref.clone(), "trail".to_string(), format!("Trail: {}", s.navigate_ref)),
494        IRFlowNode::Corroborate(s) => (s.navigate_ref.clone(), "corroborate".to_string(), format!("Corroborate: {}", s.navigate_ref)),
495        IRFlowNode::OtsApply(s) => (s.ots_name.clone(), "ots_apply".to_string(), format!("Apply OTS: {}", s.ots_name)),
496        IRFlowNode::MandateApply(s) => (s.mandate_name.clone(), "mandate_apply".to_string(), format!("Apply mandate: {}", s.mandate_name)),
497        IRFlowNode::ComputeApply(s) => (s.compute_name.clone(), "compute_apply".to_string(), format!("Apply compute: {}", s.compute_name)),
498        IRFlowNode::Listen(s) => (s.channel.clone(), "listen".to_string(), format!("Listen: {}", s.channel)),
499        IRFlowNode::DaemonStep(s) => (s.daemon_ref.clone(), "daemon".to_string(), format!("Daemon: {}", s.daemon_ref)),
500        IRFlowNode::Persist(s) => (s.store_name.clone(), "persist".to_string(), format!("Persist to: {}", s.store_name)),
501        IRFlowNode::Retrieve(s) => (s.store_name.clone(), "retrieve".to_string(), format!("Retrieve from: {}", s.store_name)),
502        IRFlowNode::Mutate(s) => (s.store_name.clone(), "mutate".to_string(), format!("Mutate: {}", s.store_name)),
503        IRFlowNode::Purge(s) => (s.store_name.clone(), "purge".to_string(), format!("Purge: {}", s.store_name)),
504        IRFlowNode::Transact(_) => ("transact".to_string(), "transact".to_string(), "Transact block".to_string()),
505        IRFlowNode::LambdaDataApply(s) => (s.lambda_data_name.clone(), "lambda_data_apply".to_string(), format!("Apply ΛD: {}", s.lambda_data_name)),
506        // §λ-L-E Fase 13 — Mobile typed channel reductions.
507        IRFlowNode::Emit(s) => (s.channel_ref.clone(), "emit".to_string(), format!("Emit on {}: {}", s.channel_ref, s.value_ref)),
508        IRFlowNode::Publish(s) => (s.channel_ref.clone(), "publish".to_string(), format!("Publish {} within {}", s.channel_ref, s.shield_ref)),
509        IRFlowNode::Discover(s) => (s.capability_ref.clone(), "discover".to_string(), format!("Discover {} as {}", s.capability_ref, s.alias)),
510        // Fase 19.e — break / continue. Payload-free; the executor
511        // raises sentinel exceptions caught by the enclosing for-in.
512        IRFlowNode::Break(_) => ("break".to_string(), "break".to_string(), "Break out of for-in loop".to_string()),
513        IRFlowNode::Continue(_) => ("continue".to_string(), "continue".to_string(), "Continue to next for-in iteration".to_string()),
514    }
515}
516
517// ── Stub executor ───────────────────────────────────────────────────────────
518
519fn execute_stub(
520    units: &[ExecutionUnit],
521    use_color: bool,
522    trace: bool,
523) -> (bool, Vec<TraceEvent>) {
524    let mut events: Vec<TraceEvent> = Vec::new();
525
526    for (i, unit) in units.iter().enumerate() {
527        println!(
528            "\n{}",
529            c(
530                &format!("▶ Execution Unit {}/{}: {} as {}", i + 1, units.len(), unit.flow_name, unit.persona_name),
531                "\x1b[1;36m",
532                use_color,
533            )
534        );
535
536        if trace {
537            events.push(TraceEvent {
538                event: "unit_start".to_string(),
539                unit: unit.flow_name.clone(),
540                step: String::new(),
541                detail: format!("persona={}, context={}", unit.persona_name, unit.context_name),
542            });
543        }
544
545        // Show system prompt summary
546        println!(
547            "  {} {}",
548            c("System:", "\x1b[1;33m", use_color),
549            truncate(&unit.system_prompt, 120)
550        );
551
552        if !unit.anchor_instructions.is_empty() {
553            println!(
554                "  {} {}",
555                c("Anchors:", "\x1b[1;33m", use_color),
556                unit.anchor_instructions.join(" | ")
557            );
558        }
559
560        if !unit.effort.is_empty() {
561            println!(
562                "  {} {}",
563                c("Effort:", "\x1b[1;33m", use_color),
564                unit.effort
565            );
566        }
567
568        // Execute each step (stub)
569        let mut stub_ctx = crate::exec_context::ExecContext::new(
570            &unit.flow_name,
571            &unit.persona_name,
572            i,
573        );
574        // §Fase 37.b (D1) — seed the flow's parameters from the
575        // request body BEFORE the step walk so `${param}` resolves.
576        for (name, value) in &unit.param_bindings {
577            stub_ctx.set(name, value);
578        }
579        for (j, step) in unit.steps.iter().enumerate() {
580            stub_ctx.set_step(&step.step_name, &step.step_type, j);
581            println!(
582                "  {} {}.{} [{}] {}",
583                c("→", "\x1b[32m", use_color),
584                j + 1,
585                c(&step.step_name, "\x1b[1m", use_color),
586                step.step_type,
587                &step.user_prompt
588            );
589
590            // Fase 15.c — `lambda_data_apply` is the only primitive the
591            // stub executor implements semantically: it's a pure binding
592            // (no LLM, no I/O), so the stub can produce a correct ψ
593            // without diverging from the real executor. Adopters running
594            // `axon run --stub` get observable bindings for downstream
595            // ${OutputType} interpolation.
596            if step.step_type == "lambda_data_apply" {
597                if let Some(payload) = &step.lambda_apply_payload {
598                    let target_value = if payload.target.is_empty() {
599                        serde_json::Value::Null
600                    } else {
601                        // Interpolate target via stub_ctx — supports
602                        // ${StepName} / $var. Falls back to a string
603                        // literal of the target ref so the trace stays
604                        // observable even when the var is unresolved.
605                        let raw = stub_ctx
606                            .get(&payload.target)
607                            .map(|s| s.to_string())
608                            .unwrap_or_else(|| payload.target.clone());
609                        serde_json::Value::String(raw)
610                    };
611                    match crate::lambda_runtime::build_psi(
612                        &payload.spec_snapshot,
613                        target_value,
614                    ) {
615                        Ok(psi) => {
616                            let psi_json = serde_json::to_string(&psi).unwrap_or_default();
617                            if !payload.output_type.is_empty() {
618                                stub_ctx.set(&payload.output_type, &psi_json);
619                            }
620                            stub_ctx.set_result(&step.step_name, &psi_json);
621                            if trace {
622                                events.push(TraceEvent {
623                                    event: "lambda_data_apply".to_string(),
624                                    unit: unit.flow_name.clone(),
625                                    step: step.step_name.clone(),
626                                    detail: psi_json,
627                                });
628                            }
629                            continue;
630                        }
631                        Err(err) => {
632                            eprintln!(
633                                "  {} lambda apply error: {}",
634                                c("✗", "\x1b[31m", use_color),
635                                err
636                            );
637                            if trace {
638                                events.push(TraceEvent {
639                                    event: "lambda_data_apply_error".to_string(),
640                                    unit: unit.flow_name.clone(),
641                                    step: step.step_name.clone(),
642                                    detail: err.to_string(),
643                                });
644                            }
645                            return (false, events);
646                        }
647                    }
648                }
649            }
650
651            // Fase 17.c — `let_binding` is also a pure SSA binding
652            // (no LLM, no I/O). The stub binds the resolved value
653            // into ExecContext under `target` so downstream ${X} /
654            // $X interpolation finds it. Resolution rule:
655            //   * literal — bind verbatim
656            //   * reference — look up in stub_ctx; fall back to the
657            //     literal value string if absent (preserves observable
658            //     trace even when the var is unresolved at stub time)
659            //   * expression — bind the joined string; runtime
660            //     evaluation via NativeComputeDispatcher is a future
661            //     sub-phase
662            if step.step_type == "let_binding" {
663                if let Some(payload) = &step.let_payload {
664                    let resolved = if payload.value_kind == "reference"
665                        && !payload.value.is_empty()
666                    {
667                        stub_ctx
668                            .get(&payload.value)
669                            .map(str::to_string)
670                            .unwrap_or_else(|| payload.value.clone())
671                    } else {
672                        payload.value.clone()
673                    };
674                    if !payload.target.is_empty() {
675                        stub_ctx.set(&payload.target, &resolved);
676                    }
677                    stub_ctx.set_result(&step.step_name, &resolved);
678                    if trace {
679                        events.push(TraceEvent {
680                            event: "let_binding".to_string(),
681                            unit: unit.flow_name.clone(),
682                            step: step.step_name.clone(),
683                            detail: format!(
684                                "{}={} (kind={})",
685                                payload.target, resolved, payload.value_kind,
686                            ),
687                        });
688                    }
689                    continue;
690                }
691            }
692
693            // ── Fase 19.f/g — stub-correct dispatch for the 11 new
694            // primitives (Conditional / ForIn / Par / Return / Remember /
695            // Recall / Hibernate / Drill / Trail / Break / Continue).
696            //
697            // "Stub-correct" means: recognize the step type, emit a
698            // trace event with the right shape, and bind any
699            // adopter-visible placeholders to ExecContext so downstream
700            // ${X} / $X interpolation continues to resolve. The stub
701            // does NOT perform the real subsystem work (LLM scoring,
702            // PIX traversal, HMAC token signing, MemoryBackend writes)
703            // — that is the Python runner's responsibility. The Rust
704            // stub mirrors the Python contract at the trace boundary
705            // so cross-stack parity goldens (Fase 19.h) compare on the
706            // same structured shapes.
707            match step.step_type.as_str() {
708                "remember" => {
709                    let target = &step.step_name;
710                    if !target.is_empty() {
711                        stub_ctx.set(target, "<remembered>");
712                    }
713                    if trace {
714                        events.push(TraceEvent {
715                            event: "remember".to_string(),
716                            unit: unit.flow_name.clone(),
717                            step: step.step_name.clone(),
718                            detail: step
719                                .memory_expression
720                                .clone()
721                                .unwrap_or_default(),
722                        });
723                    }
724                    continue;
725                }
726                "recall" => {
727                    let source = &step.step_name;
728                    if !source.is_empty() {
729                        stub_ctx.set(source, "<recalled>");
730                    }
731                    if trace {
732                        events.push(TraceEvent {
733                            event: "recall".to_string(),
734                            unit: unit.flow_name.clone(),
735                            step: step.step_name.clone(),
736                            detail: step
737                                .memory_expression
738                                .clone()
739                                .unwrap_or_default(),
740                        });
741                    }
742                    continue;
743                }
744                "return" => {
745                    // Mirror Python's `__return_value__` slot. The stub
746                    // does not actually short-circuit the unit (no
747                    // sentinel mechanism) — it just records the
748                    // intended value so the trace shows the early-exit
749                    // intent. Adopters running `axon run --stub` see
750                    // the binding; the real Python executor enforces
751                    // termination.
752                    stub_ctx.set("__return_value__", &step.user_prompt);
753                    if trace {
754                        events.push(TraceEvent {
755                            event: "return".to_string(),
756                            unit: unit.flow_name.clone(),
757                            step: step.step_name.clone(),
758                            detail: step.user_prompt.clone(),
759                        });
760                    }
761                    continue;
762                }
763                "hibernate" => {
764                    // Bind a placeholder token; full ContinuityTokenSigner
765                    // integration is the Python runner's job.
766                    stub_ctx.set("__hibernation_token__", "<stub-token>");
767                    if trace {
768                        events.push(TraceEvent {
769                            event: "hibernate".to_string(),
770                            unit: unit.flow_name.clone(),
771                            step: step.step_name.clone(),
772                            detail: format!("flow={}", unit.flow_name),
773                        });
774                    }
775                    continue;
776                }
777                "drill" => {
778                    // Bind under `drill:<pix_ref>` so adopter code
779                    // that interpolates the binding finds something.
780                    let key = format!("drill:{}", step.step_name);
781                    stub_ctx.set(&key, "<stub-drill-result>");
782                    if trace {
783                        events.push(TraceEvent {
784                            event: "drill".to_string(),
785                            unit: unit.flow_name.clone(),
786                            step: step.step_name.clone(),
787                            detail: step.user_prompt.clone(),
788                        });
789                    }
790                    continue;
791                }
792                "trail" => {
793                    let key = format!("trail:{}", step.step_name);
794                    stub_ctx.set(&key, "<stub-trail-result>");
795                    if trace {
796                        events.push(TraceEvent {
797                            event: "trail".to_string(),
798                            unit: unit.flow_name.clone(),
799                            step: step.step_name.clone(),
800                            detail: step.user_prompt.clone(),
801                        });
802                    }
803                    continue;
804                }
805                "conditional" | "for_in" | "parallel" => {
806                    // Pure control flow — no adopter-visible binding;
807                    // just record the structural intent in the trace.
808                    if trace {
809                        events.push(TraceEvent {
810                            event: step.step_type.clone(),
811                            unit: unit.flow_name.clone(),
812                            step: step.step_name.clone(),
813                            detail: step.user_prompt.clone(),
814                        });
815                    }
816                    continue;
817                }
818                "break" | "continue" => {
819                    // Loop-control sentinels. Stub doesn't enforce
820                    // loop exit (no sentinel exception machinery here)
821                    // — it just records the keyword in the trace.
822                    if trace {
823                        events.push(TraceEvent {
824                            event: step.step_type.clone(),
825                            unit: unit.flow_name.clone(),
826                            step: step.step_name.clone(),
827                            detail: step.user_prompt.clone(),
828                        });
829                    }
830                    continue;
831                }
832                _ => {}
833            }
834
835            if trace {
836                events.push(TraceEvent {
837                    event: "step_stub".to_string(),
838                    unit: unit.flow_name.clone(),
839                    step: step.step_name.clone(),
840                    detail: format!("[{}] {}", step.step_type, step.user_prompt),
841                });
842            }
843        }
844
845        if trace {
846            events.push(TraceEvent {
847                event: "unit_complete".to_string(),
848                unit: unit.flow_name.clone(),
849                step: String::new(),
850                detail: format!("{} steps (stub)", unit.steps.len()),
851            });
852        }
853
854        println!(
855            "  {} {} steps completed (stub mode)",
856            c("✓", "\x1b[32m", use_color),
857            unit.steps.len()
858        );
859    }
860
861    (true, events)
862}
863
864// ── Real executor ───────────────────────────────────────────────────────────
865
866const MAX_ANCHOR_RETRIES: u32 = 2;
867
868// ── §Fase 35.e — axonstore SQL routing for the sync runner ──────────
869//
870// The sync runner is synchronous; `PostgresStoreBackend`'s operations
871// are async. `block_on_store` bridges the two by running the future on
872// a freshly-spawned OS thread that owns a current-thread Tokio runtime.
873// A fresh thread never carries an ambient runtime, so this is safe
874// whether `execute_real` runs on a server worker thread, a
875// `spawn_blocking` thread, or a plain CLI thread — there is no
876// "runtime within a runtime" hazard. `std::thread::scope` joins the
877// thread before returning. One pool is created + used + dropped per
878// store op; cross-request pooling is the streaming dispatcher's path
879// (35.f, the production hot path).
880fn block_on_store<F>(fut: F) -> F::Output
881where
882    F: std::future::Future + Send,
883    F::Output: Send,
884{
885    std::thread::scope(|scope| {
886        scope
887            .spawn(|| {
888                tokio::runtime::Builder::new_current_thread()
889                    .enable_all()
890                    .build()
891                    .expect("Fase 35.e: failed to build the store-op Tokio runtime")
892                    .block_on(fut)
893            })
894            .join()
895            .expect("Fase 35.e: the store-op thread panicked")
896    })
897}
898
899/// Execute one `persist`/`retrieve`/`mutate`/`purge` step against a
900/// postgresql-backed `axonstore`, returning a human-readable result
901/// summary or a typed [`StoreError`].
902///
903/// The store name doubles as the SQL table name (D12 — `IRAxonStore`
904/// carries no schema, so v1.31.0 operates against existing tables).
905/// §Fase 35.o / 35.p — `persist` (INSERT columns) and `mutate`
906/// (UPDATE SET assignments) write the columns of their declared
907/// `{ col: value }` block (`store_fields`, value expressions
908/// interpolated); with no block they fall back to writing the flow's
909/// user bindings as a row ([`ExecContext::user_bindings`]).
910/// `retrieve`/`purge` are driven by the `where`-expression. D5 — the
911/// SAME `PostgresStoreBackend` the streaming dispatcher uses, so the
912/// two execution paths never diverge.
913///
914/// §Fase 37.d (D3) — `memory_expr` is the RAW `store:where` expression
915/// (NOT pre-interpolated). A `${name}` in the `where` clause is
916/// resolved by the filter compiler against `ctx.vars()` into a `$N`
917/// bind parameter — never string-spliced into the `where` source. The
918/// pre-37.d path interpolated the whole expression first, which let a
919/// request value carrying a `'` break a string-literal boundary.
920/// §Fase 37.x.j.10 (POST-CLOSE HOTFIX 2026-05-21) — Async variant of
921/// `execute_sql_store_step`. The pre-hotfix sync variant wrapped the
922/// SQL dispatch in its OWN `block_on_store` (own temporary tokio
923/// runtime per call). Combined with the eager pin acquisition (also
924/// on its own temp runtime), this created a fatal cross-runtime
925/// hazard: the pinned `PoolConnection<Postgres>` carries reactor
926/// handles bound to the runtime that ACQUIRED it; awaiting on it from
927/// a different runtime hangs indefinitely (the reactor that would
928/// notify the I/O completion is already dropped).
929///
930/// 37.x.j.10 collapses the per-step runtime back into a SINGLE
931/// outer-scope runtime owned by `execute_server_flow`. This async fn
932/// runs on the caller's runtime, so the pin acquired at flow start
933/// + every SQL dispatch + the implicit pin drop on flow exit ALL
934/// live on the same runtime. Reactor handles stay valid.
935///
936/// The sync variant `execute_sql_store_step` is retained as a thin
937/// wrapper for CLI tests + pre-async callers; it spins up a single
938/// `block_on_store` and calls this async variant. New callers should
939/// invoke the async variant directly from an async context.
940async fn execute_sql_store_step_async(
941    store_registry: &StoreRegistry,
942    // §Fase 37.x.j (D1) — pinned-connection map shared across the flow
943    // execution. Keyed by axonstore name; when the entry exists the
944    // store op routes its SQL through that exact physical Postgres
945    // connection (held since `execute_server_flow` start). When the
946    // entry is absent the op falls back to `StoreConn::Pool` (legacy
947    // pre-37.x.j behavior) — keeping CLI tests + non-server callers
948    // working unchanged.
949    //
950    // §37.x.j.10 — the `&mut` reference is held across `.await`
951    // boundaries inside this fn. Safe because the function is the
952    // unique &mut borrower of the map for its execution and the map
953    // itself is owned by the outer scope (`execute_server_flow`'s
954    // single block_on_store, or a test scope's single async wrapper).
955    pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
956    step_type: &str,
957    store_name: &str,
958    memory_expr: &str,
959    store_fields: Option<&[(String, String)]>,
960    ctx: &ExecContext,
961) -> Result<String, StoreError> {
962    // The connection + confidence_floor live on the `IRAxonStore` the
963    // registry validated.
964    let spec = store_registry.spec(store_name);
965    let _connection = spec.map(|s| s.connection.clone()).unwrap_or_default();
966    let confidence_floor = spec.and_then(|s| s.confidence_floor);
967
968    // §Fase 37.x.j (D1) — the SHARED backend is resolved from the
969    // registry cache INSIDE the `block_on_store` async block below
970    // (the registry's `resolve` may need a tokio context when it
971    // lazily builds the PgPool on first reference). Pre-37.x.j the
972    // runner created a fresh `PgPool` per `connect_named` call — a
973    // pre-existing inefficiency that 37.x.j fixes en passant by
974    // routing through the cached pool.
975
976    // `memory_expression` is `"store:where"` for retrieve/mutate/purge
977    // and the bare store name for persist — the where-expr is whatever
978    // follows the first colon (empty when absent).
979    let where_expr = memory_expr
980        .splitn(2, ':')
981        .nth(1)
982        .unwrap_or("")
983        .to_string();
984    // §Fase 37.d (D3) — an OWNED copy of the flow's variable map, moved
985    // into the store-op task; the filter compiler resolves `${name}`
986    // in `where_expr` against it into `$N` bind parameters.
987    let where_bindings: std::collections::HashMap<String, String> =
988        ctx.vars().clone();
989
990    // §Fase 35.o / 35.p — when the `persist` / `mutate` step declared a
991    // `{ col: value }` block, the SQL row is exactly those columns with
992    // their value expressions interpolated against the flow context.
993    // With no block (`store_fields` is `None`) fall back to the v1.31.0
994    // behaviour: every user binding as a text column. `store_fields` is
995    // only materialised for `persist`/`mutate`, so `retrieve`/`purge`
996    // (which ignore `data`) always take the fallback. Every value binds
997    // as text (D12 — no column-type schema in v1.31).
998    let data: Vec<(String, SqlValue)> = match store_fields {
999        Some(fields) => fields
1000            .iter()
1001            .map(|(col, expr)| {
1002                (col.clone(), SqlValue::Text(ctx.interpolate(expr)))
1003            })
1004            .collect(),
1005        None => ctx
1006            .user_bindings()
1007            .into_iter()
1008            .map(|(k, v)| (k, SqlValue::Text(v)))
1009            .collect(),
1010    };
1011
1012    let store_name = store_name.to_string();
1013    let step_type = step_type.to_string();
1014    let store_name_for_reinsert = store_name.clone();
1015
1016    // §Fase 37.x.j (D1) — take the pin OUT of the shared map for the
1017    // duration of this dispatch. After the dispatch returns (success
1018    // OR error), the pin is re-inserted UNCONDITIONALLY so the next
1019    // store op against this same store reuses it.
1020    //
1021    // §Fase 37.x.j.10 — no longer wrapped in block_on_store. The
1022    // async fn runs on the caller's runtime, so the pin's reactor
1023    // handles stay valid for every `.await` below.
1024    let mut pin: Option<sqlx::pool::PoolConnection<sqlx::Postgres>> =
1025        pinned_conns.remove(&store_name);
1026
1027    // §Fase 37.x.j (D1) — resolve the SHARED backend from the registry
1028    // cache. The registry caches `PostgresStoreBackend` by resolved
1029    // DSN; the backend's inner `PgPool` is `Arc<...>` so the clone
1030    // shares pool state with every other call AND with the eagerly-
1031    // acquired pin in `pinned_conns`.
1032    let backend = match store_registry.resolve(&store_name) {
1033        Ok(crate::store::registry::StoreHandle::Postgres(b)) => b,
1034        Ok(_) => {
1035            // Re-insert the pin if we removed one (we won't dispatch).
1036            if let Some(p) = pin {
1037                pinned_conns.insert(store_name_for_reinsert, p);
1038            }
1039            return Err(StoreError::Connect {
1040                source: format!(
1041                    "axonstore `{store_name}` expected to resolve to \
1042                     a postgresql backend but the registry returned \
1043                     `in_memory`. Routing bug — the SQL gate in \
1044                     `execute_real` should have skipped this step."
1045                ),
1046            });
1047        }
1048        Err(e) => {
1049            if let Some(p) = pin {
1050                pinned_conns.insert(store_name_for_reinsert, p);
1051            }
1052            return Err(e);
1053        }
1054    };
1055
1056    // §Fase 37.x.j.10 — dispatch body inlined here. `pin` is `&mut`-
1057    // borrowed inside each match arm for the StoreConn::Pinned variant;
1058    // the borrow ends at the end of each arm so we can re-insert `pin`
1059    // unconditionally below regardless of result.
1060    let result: Result<String, StoreError> = async {
1061        match step_type.as_str() {
1062            "retrieve" => {
1063                // §35.i Pillar III — retrieve drains off a lazy cursor,
1064                // bounded (never materializes a huge result set).
1065                // §35.g Pillar I — every tuple born Untrusted,
1066                // confidence_floor filters sub-floor rows. The result
1067                // is an epistemic envelope carrying both dispositions.
1068                let cancel = crate::cancel_token::CancellationFlag::new();
1069                // §Fase 37.x.j (D1) — build `StoreConn::Pinned` when a
1070                // pin is held for this store (the post-37.x.j default
1071                // for server-driven flows), else `StoreConn::Pool`
1072                // (legacy path for CLI / pre-server callers). The
1073                // Pinned variant routes the SELECT through the exact
1074                // physical Postgres backend connection acquired at
1075                // flow start — Supavisor/PgBouncer cannot swap.
1076                let mut store_conn = match &mut pin {
1077                    Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1078                    None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1079                };
1080                let stream_outcome = row_stream::stream_retrieve(
1081                    &backend,
1082                    &mut store_conn,
1083                    &store_name,
1084                    &where_expr,
1085                    row_stream::DEFAULT_RETRIEVE_POLICY,
1086                    row_stream::DEFAULT_MAX_ROWS,
1087                    &cancel,
1088                    &where_bindings,
1089                )
1090                .await?;
1091                let metadata = row_stream::stream_metadata(
1092                    row_stream::DEFAULT_RETRIEVE_POLICY,
1093                    &stream_outcome,
1094                );
1095                let outcome = epistemic::enforce_retrieve_floor(
1096                    epistemic::mark_retrieved(stream_outcome.rows),
1097                    confidence_floor,
1098                );
1099                let mut envelope =
1100                    epistemic::retrieve_envelope(&outcome, confidence_floor);
1101                envelope["stream"] = metadata;
1102                Ok(serde_json::to_string(&envelope)
1103                    .unwrap_or_else(|_| "{}".to_string()))
1104            }
1105            "purge" => {
1106                // §Fase 37.x.j (D1) — pinned/pool dispatch (see retrieve).
1107                let mut store_conn = match &mut pin {
1108                    Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1109                    None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1110                };
1111                let n = backend
1112                    .purge(&mut store_conn, &store_name, &where_expr, &where_bindings)
1113                    .await?;
1114                Ok(format!("{n} row(s) purged"))
1115            }
1116            "persist" => {
1117                // §35.g Pillar I — a sub-floor or un-elevated write
1118                // into a confidence-floored store is a typed error.
1119                epistemic::enforce_persist_floor(
1120                    &data,
1121                    confidence_floor,
1122                    &store_name,
1123                )?;
1124                // §Fase 37.x.j (D1) — pinned/pool dispatch.
1125                let mut store_conn = match &mut pin {
1126                    Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1127                    None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1128                };
1129                let n = backend.insert(&mut store_conn, &store_name, &data).await?;
1130                Ok(format!("{n} row(s) persisted"))
1131            }
1132            "mutate" => {
1133                // §Fase 37.x.j (D1) — pinned/pool dispatch.
1134                let mut store_conn = match &mut pin {
1135                    Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1136                    None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1137                };
1138                let n = backend
1139                    .mutate(&mut store_conn, &store_name, &where_expr, &data, &where_bindings)
1140                    .await?;
1141                Ok(format!("{n} row(s) mutated"))
1142            }
1143            // The caller only routes the four store-op step types here.
1144            other => Err(StoreError::Query {
1145                op: "store",
1146                source: format!("unsupported store step type `{other}`"),
1147            }),
1148        }
1149    }.await;
1150
1151    // §Fase 37.x.j (D1) — re-insert the pin (UNCONDITIONALLY — success
1152    // OR error path) so the next store op against this store reuses
1153    // the same physical Postgres backend connection. `pin` was taken
1154    // out at the top of this fn and the dispatch above only borrows
1155    // it `&mut`-wise inside each match arm — so it's still owned here
1156    // regardless of `result`'s Ok/Err outcome.
1157    if let Some(p) = pin {
1158        pinned_conns.insert(store_name_for_reinsert, p);
1159    }
1160
1161    result
1162}
1163
1164/// §Fase 35.e — Sync wrapper retained for CLI tests + pre-async callers.
1165///
1166/// §Fase 37.x.j.10 (POST-CLOSE HOTFIX) — wraps the new async fn
1167/// `execute_sql_store_step_async` in a SINGLE block_on_store so the
1168/// pin acquire (if any was pre-populated) + the SQL dispatch happen
1169/// on the SAME temporary tokio runtime. Pre-hotfix the sync variant
1170/// had its OWN block_on_store inside (per-step temp runtime); when
1171/// the caller's eager pin acquisition was ALSO on a separate temp
1172/// runtime, the cross-runtime hazard appeared. The wrapper here is
1173/// safe ONLY when the caller's pin map is empty (legacy Pool path)
1174/// — production callers MUST use the async variant directly inside
1175/// the OUTER block_on_store at `execute_server_flow`.
1176fn execute_sql_store_step(
1177    store_registry: &StoreRegistry,
1178    pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
1179    step_type: &str,
1180    store_name: &str,
1181    memory_expr: &str,
1182    store_fields: Option<&[(String, String)]>,
1183    ctx: &ExecContext,
1184) -> Result<String, StoreError> {
1185    block_on_store(execute_sql_store_step_async(
1186        store_registry,
1187        pinned_conns,
1188        step_type,
1189        store_name,
1190        memory_expr,
1191        store_fields,
1192        ctx,
1193    ))
1194}
1195
1196/// §Fase 37.x.j.10 (POST-CLOSE HOTFIX 2026-05-21) — Async variant of
1197/// `execute_real`. Production callers MUST invoke this from inside
1198/// the OUTER `block_on_store` at `execute_server_flow` so the entire
1199/// flow execution (eager pin acquire + every store dispatch + implicit
1200/// pin drop on exit) lives on a SINGLE temporary tokio runtime. This
1201/// is the load-bearing structural property that prevents the cross-
1202/// runtime `PoolConnection<Postgres>` hazard the pre-hotfix code
1203/// exhibited.
1204///
1205/// The single store-op site (`execute_sql_store_step_async`) is now
1206/// awaited directly here — no nested `block_on_store`. Every other
1207/// operation in this fn is synchronous-style code; the async fn just
1208/// means the await of the SQL dispatch site is legal.
1209///
1210/// The sync variant `execute_real` retained as a thin wrapper for the
1211/// CLI path + pre-async callers that don't have a tokio context.
1212async fn execute_real_async(
1213    units: &[ExecutionUnit],
1214    backend_name: &str,
1215    source_file: &str,
1216    use_color: bool,
1217    trace: bool,
1218    stream: bool,
1219    output_fmt: OutputFormat,
1220    report: &mut ReportBuilder,
1221    registry: &ToolRegistry,
1222    store_registry: &StoreRegistry,
1223    // §Fase 37.x.j (D1) — flow-scoped pinned connection map, populated
1224    // by `execute_server_flow` (server-driven flows) and empty for
1225    // CLI / pre-37.x.j callers.
1226    pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
1227    api_key_override: Option<&str>,
1228) -> Result<(bool, Vec<TraceEvent>), backend::BackendError> {
1229    let api_key = match api_key_override {
1230        Some(key) => key.to_string(),
1231        None => backend::get_api_key(backend_name)?,
1232    };
1233    let mut events: Vec<TraceEvent> = Vec::new();
1234    let mut total_input_tokens: u64 = 0;
1235    let mut total_output_tokens: u64 = 0;
1236    let mut session = SessionStore::new(source_file);
1237    let mut hooks = HookManager::new();
1238    let mut effects = EffectTracker::new();
1239    let json = output_fmt.is_json();
1240
1241    for (i, unit) in units.iter().enumerate() {
1242        if !json {
1243            println!(
1244                "\n{}",
1245                c(
1246                    &format!(
1247                        "▶ Execution Unit {}/{}: {} as {}",
1248                        i + 1,
1249                        units.len(),
1250                        unit.flow_name,
1251                        unit.persona_name
1252                    ),
1253                    "\x1b[1;36m",
1254                    use_color,
1255                )
1256            );
1257        }
1258
1259        if trace {
1260            events.push(TraceEvent {
1261                event: "unit_start".to_string(),
1262                unit: unit.flow_name.clone(),
1263                step: String::new(),
1264                detail: format!(
1265                    "persona={}, context={}",
1266                    unit.persona_name, unit.context_name
1267                ),
1268            });
1269        }
1270
1271        let mut ctx = ExecContext::new(&unit.flow_name, &unit.persona_name, i);
1272        // §Fase 37.b (D1) — seed the flow's parameters from the
1273        // request body BEFORE the step walk so `${param}` resolves in
1274        // step prompts, `where:` clauses and `persist` field blocks.
1275        for (name, value) in &unit.param_bindings {
1276            ctx.set(name, value);
1277        }
1278        let mut conversation = ConversationHistory::new();
1279        let mut context_window = ContextWindow::new();
1280        hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
1281        report.begin_unit(&unit.flow_name, &unit.persona_name);
1282
1283        // Step dependency analysis + parallel schedule
1284        // §Fase 61 — the set of producing step names, so a `use Tool(k = v)`
1285        // call's keyword-arg references fold into the analysis argument (a flow-
1286        // param reference is gated out). Without this the call's data-deps are
1287        // invisible and the scheduler co-schedules it with its sources.
1288        let step_name_set: std::collections::HashSet<&str> =
1289            unit.steps.iter().map(|s| s.step_name.as_str()).collect();
1290        let step_infos: Vec<step_deps::StepInfo> = unit.steps.iter().map(|s| {
1291            step_deps::StepInfo {
1292                name: s.step_name.clone(),
1293                step_type: s.step_type.clone(),
1294                user_prompt: s.user_prompt.clone(),
1295                argument: step_deps::use_tool_analysis_argument(
1296                    s.tool_argument.as_deref()
1297                        .or(s.memory_expression.as_deref())
1298                        .unwrap_or(""),
1299                    &s.tool_named_args,
1300                    &step_name_set,
1301                ),
1302            }
1303        }).collect();
1304
1305        let dep_graph = step_deps::analyze(&step_infos);
1306        let schedule = parallel::build_schedule(&dep_graph);
1307
1308        if trace {
1309            if !json && dep_graph.max_depth > 0 {
1310                println!(
1311                    "  {} {}",
1312                    c("⊞", "\x1b[2;36m", use_color),
1313                    c(
1314                        &format!(
1315                            "Dependencies: depth={}, {} parallel group{}{}",
1316                            dep_graph.max_depth,
1317                            dep_graph.parallel_groups.len(),
1318                            if dep_graph.parallel_groups.len() == 1 { "" } else { "s" },
1319                            if dep_graph.unresolved_refs.is_empty() {
1320                                String::new()
1321                            } else {
1322                                format!(", {} unresolved ref(s)", dep_graph.unresolved_refs.len())
1323                            },
1324                        ),
1325                        "\x1b[2;36m",
1326                        use_color,
1327                    ),
1328                );
1329            }
1330
1331            events.push(TraceEvent {
1332                event: "step_deps".to_string(),
1333                unit: unit.flow_name.clone(),
1334                step: String::new(),
1335                detail: format!(
1336                    "depth={}, parallel_groups={}, unresolved={}, steps: {}",
1337                    dep_graph.max_depth,
1338                    dep_graph.parallel_groups.len(),
1339                    dep_graph.unresolved_refs.len(),
1340                    dep_graph.steps.iter()
1341                        .map(|s| {
1342                            if s.depends_on.is_empty() {
1343                                s.name.clone()
1344                            } else {
1345                                format!("{}→[{}]", s.name, s.depends_on.join(","))
1346                            }
1347                        })
1348                        .collect::<Vec<_>>()
1349                        .join(", "),
1350                ),
1351            });
1352
1353            if !json && schedule.has_parallelism() {
1354                println!(
1355                    "  {} {}",
1356                    c("⫘", "\x1b[2;35m", use_color),
1357                    c(
1358                        &format!("Schedule: {}", schedule.summary()),
1359                        "\x1b[2;35m",
1360                        use_color,
1361                    ),
1362                );
1363            }
1364
1365            events.push(TraceEvent {
1366                event: "schedule".to_string(),
1367                unit: unit.flow_name.clone(),
1368                step: String::new(),
1369                detail: format!(
1370                    "waves={}, parallel_waves={}, max_parallelism={}, schedule: {}",
1371                    schedule.waves.len(),
1372                    schedule.parallel_waves,
1373                    schedule.max_parallelism,
1374                    schedule.summary(),
1375                ),
1376            });
1377        }
1378
1379        // Build step lookup by name → (index, &CompiledStep)
1380        let step_map: std::collections::HashMap<&str, (usize, &CompiledStep)> = unit
1381            .steps
1382            .iter()
1383            .enumerate()
1384            .map(|(j, s)| (s.step_name.as_str(), (j, s)))
1385            .collect();
1386
1387        // ── Wave-based execution loop ────────────────────────────
1388        for (wave_idx, wave) in schedule.waves.iter().enumerate() {
1389            let is_parallel_wave = wave.is_parallel && wave.steps.len() > 1;
1390
1391            if is_parallel_wave && !json {
1392                println!(
1393                    "  {} {}",
1394                    c("⫘", "\x1b[35m", use_color),
1395                    c(
1396                        &format!(
1397                            "Wave {}/{}: [{}] (parallel, {} steps)",
1398                            wave_idx + 1,
1399                            schedule.waves.len(),
1400                            wave.steps.join(" | "),
1401                            wave.steps.len(),
1402                        ),
1403                        "\x1b[2;35m",
1404                        use_color,
1405                    ),
1406                );
1407            }
1408
1409            if trace {
1410                events.push(TraceEvent {
1411                    event: "wave_start".to_string(),
1412                    unit: unit.flow_name.clone(),
1413                    step: String::new(),
1414                    detail: format!(
1415                        "wave={}/{}, steps=[{}], parallel={}",
1416                        wave_idx + 1,
1417                        schedule.waves.len(),
1418                        wave.steps.join(", "),
1419                        is_parallel_wave,
1420                    ),
1421                });
1422            }
1423
1424            if is_parallel_wave {
1425                // ── Parallel wave execution ──────────────────────
1426                // Snapshot shared state; each thread gets its own copy.
1427                let ctx_snapshot = ctx.clone();
1428                let conversation_snapshot = conversation.clone();
1429
1430                let wave_results = parallel::execute_wave(wave, |step_name| {
1431                    // Thread-local state (no mutation of shared state)
1432                    let (_j, step) = match step_map.get(step_name) {
1433                        Some(v) => *v,
1434                        None => return parallel::WaveStepResult {
1435                            step_name: step_name.to_string(),
1436                            output: "step not found".to_string(),
1437                            success: false,
1438                        },
1439                    };
1440
1441                    // Native tool steps
1442                    if step.step_type == "use_tool" {
1443                        // §Fase 58.e — `use Tool(k = v, …)` assembles a typed
1444                        // structured JSON body; the legacy single-`on <arg>`
1445                        // form keeps the flat interpolation (D5).
1446                        let arg = if !step.tool_named_args.is_empty() {
1447                            let interpolated: Vec<(String, String)> = step
1448                                .tool_named_args
1449                                .iter()
1450                                .map(|(n, v, kind)| {
1451                                    // §Fase 60 — resolve by value_kind (reference
1452                                    // → binding lookup; literal → interpolation).
1453                                    (n.clone(), ctx_snapshot.resolve_named_arg(v, kind))
1454                                })
1455                                .collect();
1456                            build_structured_tool_body(&interpolated, &step.tool_param_types)
1457                        } else {
1458                            ctx_snapshot.interpolate(step.tool_argument.as_deref().unwrap_or(""))
1459                        };
1460                        if let Some(result) = registry.dispatch(&step.step_name, &arg) {
1461                            return parallel::WaveStepResult {
1462                                step_name: step_name.to_string(),
1463                                output: result.output,
1464                                success: result.success,
1465                            };
1466                        }
1467                    }
1468
1469                    // LLM steps — each thread gets its own conversation copy
1470                    let full_system = format!("{}\n\n{}", unit.system_prompt, step.system_prompt);
1471                    let interpolated_prompt = ctx_snapshot.interpolate(&step.user_prompt);
1472                    let mut thread_conversation = conversation_snapshot.clone();
1473                    let mut thread_events: Vec<TraceEvent> = Vec::new();
1474                    let mut thread_input_tokens: u64 = 0;
1475                    let mut thread_output_tokens: u64 = 0;
1476
1477                    let result = execute_step_with_retry(
1478                        backend_name,
1479                        &api_key,
1480                        &full_system,
1481                        &interpolated_prompt,
1482                        &step.step_name,
1483                        &unit.flow_name,
1484                        &unit.resolved_anchors,
1485                        use_color,
1486                        false, // no trace in parallel threads (avoid interleaved output)
1487                        false, // no streaming in parallel (interleaved stdout)
1488                        json,
1489                        &mut thread_conversation,
1490                        &mut thread_events,
1491                        &mut thread_input_tokens,
1492                        &mut thread_output_tokens,
1493                    );
1494
1495                    parallel::WaveStepResult {
1496                        step_name: step_name.to_string(),
1497                        output: result,
1498                        success: true,
1499                    }
1500                });
1501
1502                // Merge results back into shared state
1503                for wr in &wave_results {
1504                    let (j, step) = step_map[wr.step_name.as_str()];
1505
1506                    ctx.set_step(&step.step_name, &step.step_type, j);
1507                    ctx.set_result(&step.step_name, &wr.output);
1508                    hooks.on_step_start(&step.step_name, &step.step_type);
1509                    hooks.on_step_end(0, 0, 0, 0, false);
1510
1511                    if !json {
1512                        let status = if wr.success { "ok" } else { "error" };
1513                        println!(
1514                            "  {} {}.{} [{}] → {} [parallel]",
1515                            c("⫘", "\x1b[35m", use_color),
1516                            j + 1,
1517                            c(&step.step_name, "\x1b[1m", use_color),
1518                            step.step_type,
1519                            c(
1520                                &format!("{status}: {}", truncate_output(&wr.output, 80)),
1521                                if wr.success { "\x1b[32m" } else { "\x1b[31m" },
1522                                use_color,
1523                            ),
1524                        );
1525                    }
1526
1527                    report.record_step(StepReport {
1528                        name: step.step_name.clone(),
1529                        step_type: step.step_type.clone(),
1530                        result: wr.output.clone(),
1531                        duration_ms: 0,
1532                        input_tokens: 0,
1533                        output_tokens: 0,
1534                        anchor_breaches: 0,
1535                        chain_activations: 0,
1536                        was_retried: false,
1537                    });
1538
1539                    if trace {
1540                        events.push(TraceEvent {
1541                            event: "step_parallel".to_string(),
1542                            unit: unit.flow_name.clone(),
1543                            step: step.step_name.clone(),
1544                            detail: format!(
1545                                "wave={}, success={}, output={}",
1546                                wave_idx + 1,
1547                                wr.success,
1548                                truncate_output(&wr.output, 120),
1549                            ),
1550                        });
1551                    }
1552                }
1553
1554                // Append wave results to conversation as synthetic context
1555                for wr in &wave_results {
1556                    conversation.add_user(&format!("[Step {}]", wr.step_name));
1557                    conversation.add_assistant(&wr.output);
1558                }
1559            } else {
1560                // ── Sequential execution (single-step wave) ──────
1561                for step_name in &wave.steps {
1562                    let (j, step) = step_map[step_name.as_str()];
1563
1564            ctx.set_step(&step.step_name, &step.step_type, j);
1565            hooks.on_step_start(&step.step_name, &step.step_type);
1566
1567            if !json {
1568                println!(
1569                    "  {} {}.{} [{}]",
1570                    c("→", "\x1b[33m", use_color),
1571                    j + 1,
1572                    c(&step.step_name, "\x1b[1m", use_color),
1573                    step.step_type,
1574                );
1575            }
1576
1577            // ── Native tool interception ────────────────────────
1578            if step.step_type == "use_tool" {
1579                // §Fase 58.e — `use Tool(k = v, …)` assembles a typed structured
1580                // JSON body; the legacy single-`on <arg>` form keeps the flat
1581                // interpolation (D5).
1582                let arg = if !step.tool_named_args.is_empty() {
1583                    let interpolated: Vec<(String, String)> = step
1584                        .tool_named_args
1585                        .iter()
1586                        // §Fase 60 — resolve by value_kind (reference → binding
1587                        // lookup; literal → interpolation).
1588                        .map(|(n, v, kind)| (n.clone(), ctx.resolve_named_arg(v, kind)))
1589                        .collect();
1590                    build_structured_tool_body(&interpolated, &step.tool_param_types)
1591                } else {
1592                    ctx.interpolate(step.tool_argument.as_deref().unwrap_or(""))
1593                };
1594                if let Some(result) = registry.dispatch(&step.step_name, &arg) {
1595                    let status = if result.success { "ok" } else { "error" };
1596                    if !json {
1597                        println!(
1598                            "  {} {} → {} [native]",
1599                            c("🔧", "\x1b[36m", use_color),
1600                            c(&result.tool_name, "\x1b[1m", use_color),
1601                            c(&format!("{status}: {}", result.output), if result.success { "\x1b[32m" } else { "\x1b[31m" }, use_color),
1602                        );
1603                    }
1604
1605                    // Validate output against schema + track effects
1606                    if let Some(entry) = registry.get(&step.step_name) {
1607                        if !entry.output_schema.is_empty() {
1608                            let vr = tool_validator::validate_output(
1609                                &step.step_name, &result.output, &entry.output_schema,
1610                            );
1611                            if !vr.passed && !json {
1612                                println!(
1613                                    "  {} {}",
1614                                    c("⚠", "\x1b[33m", use_color),
1615                                    c(
1616                                        &format!("Validation: {} — {}", vr.schema, vr.message),
1617                                        "\x1b[33m",
1618                                        use_color,
1619                                    ),
1620                                );
1621                            }
1622                            if trace {
1623                                events.push(TraceEvent {
1624                                    event: format!("tool_validate_{}", if vr.passed { "pass" } else { "fail" }),
1625                                    unit: unit.flow_name.clone(),
1626                                    step: step.step_name.clone(),
1627                                    detail: format!("schema={}, {}", vr.schema, vr.message),
1628                                });
1629                            }
1630                        }
1631                        if !entry.effect_row.is_empty() {
1632                            effects.record(
1633                                &step.step_name, &step.step_name, &unit.flow_name, &entry.effect_row,
1634                            );
1635                        }
1636                    }
1637
1638                    ctx.set_result(&step.step_name, &result.output);
1639                    hooks.on_step_end(0, 0, 0, 0, false);
1640                    report.record_step(StepReport {
1641                        name: step.step_name.clone(),
1642                        step_type: step.step_type.clone(),
1643                        result: result.output.clone(),
1644                        duration_ms: 0,
1645                        input_tokens: 0,
1646                        output_tokens: 0,
1647                        anchor_breaches: 0,
1648                        chain_activations: 0,
1649                        was_retried: false,
1650                    });
1651                    if trace {
1652                        events.push(TraceEvent {
1653                            event: "tool_native".to_string(),
1654                            unit: unit.flow_name.clone(),
1655                            step: step.step_name.clone(),
1656                            detail: format!(
1657                                "tool={}, success={}, output={}",
1658                                result.tool_name, result.success, result.output
1659                            ),
1660                        });
1661                    }
1662                    continue; // Skip LLM call
1663                }
1664                // Unknown tool — fall through to LLM
1665            }
1666
1667            // ── Session memory interception ─────────────────────
1668            if matches!(step.step_type.as_str(), "remember" | "recall" | "persist" | "retrieve" | "mutate" | "purge") {
1669                let raw_expr = step.memory_expression.as_deref().unwrap_or("");
1670                let expr = ctx.interpolate(raw_expr);
1671
1672                // §Fase 35.e — SQL routing. A persist/retrieve/mutate/
1673                // purge whose store resolves to a postgresql backend
1674                // executes real SQL and skips the key-value path
1675                // entirely. remember/recall, and every in_memory or
1676                // undeclared store, fall through to the byte-identical
1677                // pre-35 key-value path below (D3 — absolute).
1678                if matches!(step.step_type.as_str(), "persist" | "retrieve" | "mutate" | "purge")
1679                    && store_registry.backend_kind(&step.step_name)
1680                        == Some(StoreBackendKind::Postgresql)
1681                {
1682                    // §Fase 37.d (D3) — pass the RAW `store:where`
1683                    // expression (NOT the pre-interpolated `expr`): the
1684                    // filter compiler resolves `${name}` in the `where`
1685                    // clause into `$N` bind parameters, never a splice.
1686                    // §Fase 37.x.j.10 — call the async variant via
1687                    // `.await` on the SAME runtime as the outer
1688                    // `execute_server_flow` block_on_store. Pre-hotfix
1689                    // this was the sync `execute_sql_store_step` whose
1690                    // internal block_on_store created a per-step temp
1691                    // runtime, defeating the pin's reactor handles.
1692                    let (result_text, ok) = match execute_sql_store_step_async(
1693                        store_registry,
1694                        pinned_conns,
1695                        &step.step_type,
1696                        &step.step_name,
1697                        raw_expr,
1698                        step.store_fields.as_deref(),
1699                        &ctx,
1700                    ).await {
1701                        Ok(summary) => (summary, true),
1702                        Err(e) => (format!("store error: {e}"), false),
1703                    };
1704                    ctx.set_result(&step.step_name, &result_text);
1705                    let detail = format!("{} → {}", step.step_name, result_text);
1706                    if !json {
1707                        let color = if ok { "\x1b[35m" } else { "\x1b[31m" };
1708                        println!(
1709                            "  {} {} [{}]",
1710                            c(if ok { "💾" } else { "✗" }, color, use_color),
1711                            c(&detail, color, use_color),
1712                            step.step_type,
1713                        );
1714                    }
1715                    hooks.on_step_end(0, 0, 0, 0, false);
1716                    report.record_step(StepReport {
1717                        name: step.step_name.clone(),
1718                        step_type: step.step_type.clone(),
1719                        result: detail.clone(),
1720                        duration_ms: 0,
1721                        input_tokens: 0,
1722                        output_tokens: 0,
1723                        anchor_breaches: 0,
1724                        chain_activations: 0,
1725                        was_retried: false,
1726                    });
1727                    if trace {
1728                        events.push(TraceEvent {
1729                            event: format!("axonstore_sql_{}", step.step_type),
1730                            unit: unit.flow_name.clone(),
1731                            step: step.step_name.clone(),
1732                            detail,
1733                        });
1734                    }
1735                    continue; // Skip the key-value path and the LLM call.
1736                }
1737
1738                let (action, detail) = match step.step_type.as_str() {
1739                    "remember" => {
1740                        session.remember(&step.step_name, &expr, &step.step_name);
1741                        ctx.set_result(&step.step_name, &expr);
1742                        ("remember", format!("{} = {}", step.step_name, expr))
1743                    }
1744                    "recall" => {
1745                        let val = session.recall(&step.step_name)
1746                            .map(|e| e.value.clone())
1747                            .unwrap_or_else(|| "(not found)".to_string());
1748                        ctx.set_result(&step.step_name, &val);
1749                        ("recall", format!("{} → {}", step.step_name, val))
1750                    }
1751                    "persist" => {
1752                        session.persist(&step.step_name, &expr, &step.step_name);
1753                        ctx.set_result(&step.step_name, &expr);
1754                        ("persist", format!("{} → store", step.step_name))
1755                    }
1756                    "retrieve" => {
1757                        let val = session.retrieve(&step.step_name)
1758                            .map(|e| e.value.clone())
1759                            .unwrap_or_else(|| {
1760                                let results = session.retrieve_query(&expr);
1761                                if results.is_empty() {
1762                                    "(not found)".to_string()
1763                                } else {
1764                                    format!("{} entries", results.len())
1765                                }
1766                            });
1767                        ctx.set_result(&step.step_name, &val);
1768                        ("retrieve", format!("{} → {}", step.step_name, val))
1769                    }
1770                    "mutate" => {
1771                        let parts: Vec<&str> = expr.splitn(2, ':').collect();
1772                        let ok = if parts.len() == 2 {
1773                            session.mutate(parts[0], parts[1], &step.step_name)
1774                        } else {
1775                            false
1776                        };
1777                        let msg = if ok { "updated" } else { "not found" };
1778                        ctx.set_result(&step.step_name, msg);
1779                        ("mutate", format!("{} → {}", step.step_name, msg))
1780                    }
1781                    "purge" => {
1782                        let ok = session.purge(&step.step_name);
1783                        let msg = if ok { "removed" } else { "not found" };
1784                        ctx.set_result(&step.step_name, msg);
1785                        ("purge", format!("{} → {}", step.step_name, msg))
1786                    }
1787                    _ => unreachable!(),
1788                };
1789
1790                if !json {
1791                    println!(
1792                        "  {} {} [{}]",
1793                        c("💾", "\x1b[35m", use_color),
1794                        c(&detail, "\x1b[35m", use_color),
1795                        action,
1796                    );
1797                }
1798                hooks.on_step_end(0, 0, 0, 0, false);
1799                report.record_step(StepReport {
1800                    name: step.step_name.clone(),
1801                    step_type: step.step_type.clone(),
1802                    result: detail.clone(),
1803                    duration_ms: 0,
1804                    input_tokens: 0,
1805                    output_tokens: 0,
1806                    anchor_breaches: 0,
1807                    chain_activations: 0,
1808                    was_retried: false,
1809                });
1810                if trace {
1811                    events.push(TraceEvent {
1812                        event: format!("session_{action}"),
1813                        unit: unit.flow_name.clone(),
1814                        step: step.step_name.clone(),
1815                        detail,
1816                    });
1817                }
1818                continue; // Skip LLM call
1819            }
1820
1821            // ── LLM call with variable interpolation + conversation history ──
1822            let full_system = format!("{}\n\n{}", unit.system_prompt, step.system_prompt);
1823            let interpolated_prompt = ctx.interpolate(&step.user_prompt);
1824
1825            // Enforce context budget before LLM call
1826            let dropped = context_window.enforce(&mut conversation);
1827            if dropped > 0 {
1828                if !json {
1829                    println!(
1830                        "  {} {}",
1831                        c("⊘", "\x1b[33m", use_color),
1832                        c(
1833                            &format!(
1834                                "Context window: dropped {} messages ({} total chars remaining, ~{}k tokens)",
1835                                dropped,
1836                                conversation.total_chars(),
1837                                ContextWindow::estimate_tokens(conversation.total_chars()) / 1000,
1838                            ),
1839                            "\x1b[2;33m",
1840                            use_color,
1841                        ),
1842                    );
1843                }
1844                if trace {
1845                    events.push(TraceEvent {
1846                        event: "context_truncated".to_string(),
1847                        unit: unit.flow_name.clone(),
1848                        step: step.step_name.clone(),
1849                        detail: format!(
1850                            "dropped={}, remaining_chars={}, remaining_turns={}",
1851                            dropped,
1852                            conversation.total_chars(),
1853                            conversation.turn_count(),
1854                        ),
1855                    });
1856                }
1857            }
1858
1859            let step_input_before = total_input_tokens;
1860            let step_output_before = total_output_tokens;
1861            let step_result = execute_step_with_retry(
1862                backend_name,
1863                &api_key,
1864                &full_system,
1865                &interpolated_prompt,
1866                &step.step_name,
1867                &unit.flow_name,
1868                &unit.resolved_anchors,
1869                use_color,
1870                trace,
1871                stream,
1872                json,
1873                &mut conversation,
1874                &mut events,
1875                &mut total_input_tokens,
1876                &mut total_output_tokens,
1877            );
1878            let step_in = total_input_tokens - step_input_before;
1879            let step_out = total_output_tokens - step_output_before;
1880            ctx.set_result(&step.step_name, &step_result);
1881            hooks.on_step_end(step_in, step_out, 0, 0, false);
1882            report.record_step(StepReport {
1883                name: step.step_name.clone(),
1884                step_type: step.step_type.clone(),
1885                result: step_result,
1886                duration_ms: 0,
1887                input_tokens: step_in,
1888                output_tokens: step_out,
1889                anchor_breaches: 0,
1890                chain_activations: 0,
1891                was_retried: false,
1892            });
1893
1894                } // end sequential wave step loop
1895            } // end sequential/parallel branch
1896        } // end wave loop
1897
1898        hooks.on_unit_end();
1899        report.end_unit(&hooks);
1900
1901        if trace {
1902            events.push(TraceEvent {
1903                event: "unit_complete".to_string(),
1904                unit: unit.flow_name.clone(),
1905                step: String::new(),
1906                detail: format!(
1907                    "{} steps, {} conversation turns, {} chars context{}",
1908                    unit.steps.len(),
1909                    conversation.turn_count(),
1910                    conversation.total_chars(),
1911                    if context_window.was_truncated() {
1912                        format!(
1913                            ", {} messages truncated across {} events",
1914                            context_window.total_dropped,
1915                            context_window.truncation_count,
1916                        )
1917                    } else {
1918                        String::new()
1919                    },
1920                ),
1921            });
1922        }
1923
1924        if !json {
1925            println!(
1926                "  {} {} steps completed",
1927                c("✓", "\x1b[32m", use_color),
1928                unit.steps.len()
1929            );
1930        }
1931    }
1932
1933    // Flush session store to disk
1934    if let Err(e) = session.flush() {
1935        if !json {
1936            eprintln!("  {} {}", c("⚠", "\x1b[33m", use_color), e);
1937        }
1938    } else if session.store_count() > 0 && !json {
1939        println!(
1940            "  {}",
1941            c(
1942                &format!("💾 Session: {} memory, {} persistent ({})",
1943                    session.memory_count(), session.store_count(),
1944                    session.store_path().display()),
1945                "\x1b[2m",
1946                use_color,
1947            )
1948        );
1949    }
1950
1951    // Token usage summary
1952    if !json && (total_input_tokens > 0 || total_output_tokens > 0) {
1953        println!(
1954            "\n  {}",
1955            c(
1956                &format!(
1957                    "Token usage: {} input + {} output = {} total",
1958                    total_input_tokens,
1959                    total_output_tokens,
1960                    total_input_tokens + total_output_tokens
1961                ),
1962                "\x1b[2m",
1963                use_color,
1964            )
1965        );
1966    }
1967
1968    // Execution metrics summary
1969    if hooks.total_steps() > 0 {
1970        if !json {
1971            println!(
1972                "  {}",
1973                c(
1974                    &format!(
1975                        "Execution: {} steps across {} units in {}ms (avg {}ms/step){}",
1976                        hooks.total_steps(),
1977                        hooks.unit_metrics().len(),
1978                        hooks.total_duration_ms(),
1979                        hooks.avg_step_duration_ms(),
1980                        if hooks.retried_steps() > 0 {
1981                            format!(", {} retried", hooks.retried_steps())
1982                        } else {
1983                            String::new()
1984                        },
1985                    ),
1986                    "\x1b[2m",
1987                    use_color,
1988                )
1989            );
1990        }
1991
1992        if trace {
1993            // Per-unit timing breakdown in trace
1994            for um in hooks.unit_metrics() {
1995                events.push(TraceEvent {
1996                    event: "hook_unit_metrics".to_string(),
1997                    unit: um.unit_name.clone(),
1998                    step: String::new(),
1999                    detail: format!(
2000                        "duration={}ms, steps={}, tokens_in={}, tokens_out={}, breaches={}, chains={}",
2001                        um.duration_ms, um.total_steps,
2002                        um.total_input_tokens, um.total_output_tokens,
2003                        um.total_anchor_breaches, um.total_chain_activations,
2004                    ),
2005                });
2006            }
2007        }
2008    }
2009
2010    // Effect tracking summary
2011    if effects.total_executions() > 0 {
2012        if !json {
2013            println!(
2014                "  {}",
2015                c(
2016                    &format!("Effects: {}", effects.summary()),
2017                    "\x1b[2m",
2018                    use_color,
2019                )
2020            );
2021        }
2022        if trace {
2023            events.push(TraceEvent {
2024                event: "effect_summary".to_string(),
2025                unit: String::new(),
2026                step: String::new(),
2027                detail: effects.summary(),
2028            });
2029        }
2030    }
2031
2032    Ok((true, events))
2033}
2034
2035/// §Fase 35.e — Sync wrapper for `execute_real_async`.
2036///
2037/// §Fase 37.x.j.10 (POST-CLOSE HOTFIX) — retained for the CLI path
2038/// + pre-async tests. Wraps the async fn in a SINGLE `block_on_store`
2039/// so the entire flow execution lives on one temporary tokio runtime.
2040/// Pre-hotfix the sync variant called `execute_sql_store_step` which
2041/// HAD its own block_on_store internally (per-step temp runtime); when
2042/// the caller's eager pin acquisition was ALSO on a separate temp
2043/// runtime, the pin's reactor handles were stale → SQL `.await` hung.
2044/// The new structure guarantees pin + dispatch share one runtime.
2045///
2046/// Production server-driven callers (`execute_server_flow`) MUST NOT
2047/// use this wrapper — they construct their OWN outer `block_on_store`
2048/// to ALSO include the eager pin acquisition. This wrapper is correct
2049/// only when the caller's `pinned_conns` map is empty (legacy Pool
2050/// path) — in that case there's no pre-acquired pin whose runtime
2051/// could mismatch the dispatch's.
2052fn execute_real(
2053    units: &[ExecutionUnit],
2054    backend_name: &str,
2055    source_file: &str,
2056    use_color: bool,
2057    trace: bool,
2058    stream: bool,
2059    output_fmt: OutputFormat,
2060    report: &mut ReportBuilder,
2061    registry: &ToolRegistry,
2062    store_registry: &StoreRegistry,
2063    pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
2064    api_key_override: Option<&str>,
2065) -> Result<(bool, Vec<TraceEvent>), backend::BackendError> {
2066    block_on_store(execute_real_async(
2067        units,
2068        backend_name,
2069        source_file,
2070        use_color,
2071        trace,
2072        stream,
2073        output_fmt,
2074        report,
2075        registry,
2076        store_registry,
2077        pinned_conns,
2078        api_key_override,
2079    ))
2080}
2081
2082/// Execute a single step with anchor-breach retry loop.
2083///
2084/// On error-severity breaches, re-prompts the LLM with violation feedback
2085/// up to MAX_ANCHOR_RETRIES times. Warning-severity breaches are reported
2086/// but do not trigger retries.
2087#[allow(clippy::too_many_arguments)]
2088fn execute_step_with_retry(
2089    backend_name: &str,
2090    api_key: &str,
2091    system_prompt: &str,
2092    original_user_prompt: &str,
2093    step_name: &str,
2094    flow_name: &str,
2095    anchors: &[IRAnchor],
2096    use_color: bool,
2097    trace: bool,
2098    stream: bool,
2099    json: bool,
2100    conversation: &mut ConversationHistory,
2101    events: &mut Vec<TraceEvent>,
2102    total_input_tokens: &mut u64,
2103    total_output_tokens: &mut u64,
2104) -> String {
2105    let mut user_prompt = original_user_prompt.to_string();
2106    let mut attempt: u32 = 0;
2107    let mut last_response_text = String::new();
2108    let effective_stream = stream && !json; // No streaming in JSON mode
2109
2110    loop {
2111        let history = conversation.messages();
2112        let result = if effective_stream {
2113            // Streaming mode: print tokens as they arrive
2114            use std::io::Write;
2115            print!("    ");
2116            let _ = std::io::stdout().flush();
2117            let resp = backend::call_multi_stream(
2118                backend_name, api_key, system_prompt, history, &user_prompt, None,
2119                |chunk| {
2120                    print!("{chunk}");
2121                    let _ = std::io::stdout().flush();
2122                },
2123            );
2124            println!(); // End the streamed line
2125            resp
2126        } else {
2127            backend::call_multi(backend_name, api_key, system_prompt, history, &user_prompt, None)
2128        };
2129
2130        match result {
2131            Ok(resp) => {
2132                *total_input_tokens += resp.input_tokens;
2133                *total_output_tokens += resp.output_tokens;
2134                last_response_text = resp.text.clone();
2135
2136                // Print response (non-streaming: show preview; streaming: already printed)
2137                if !json {
2138                    let preview = if effective_stream {
2139                        String::new() // Already printed inline
2140                    } else if resp.text.len() > 500 {
2141                        format!("{}…", &resp.text[..500])
2142                    } else {
2143                        resp.text.clone()
2144                    };
2145
2146                    println!(
2147                        "  {} {}",
2148                        c("✓", "\x1b[32m", use_color),
2149                        c(
2150                            &format!(
2151                                "[{}] {} tokens in, {} out",
2152                                resp.stop_reason, resp.input_tokens, resp.output_tokens
2153                            ),
2154                            "\x1b[2m",
2155                            use_color,
2156                        )
2157                    );
2158
2159                    if !effective_stream {
2160                        for line in preview.lines() {
2161                            println!("    {line}");
2162                        }
2163                    }
2164                }
2165
2166                if trace {
2167                    events.push(TraceEvent {
2168                        event: "step_complete".to_string(),
2169                        unit: flow_name.to_string(),
2170                        step: step_name.to_string(),
2171                        detail: format!(
2172                            "model={}, input_tokens={}, output_tokens={}, stop={}, attempt={}",
2173                            resp.model, resp.input_tokens, resp.output_tokens, resp.stop_reason, attempt + 1
2174                        ),
2175                    });
2176                }
2177
2178                // ── Anchor checking ──────────────────────────────
2179                if anchors.is_empty() {
2180                    conversation.add_user(original_user_prompt);
2181                    conversation.add_assistant(&last_response_text);
2182                    return last_response_text;
2183                }
2184
2185                let results = anchor_checker::check_all(anchors, &resp.text);
2186                let mut error_breaches: Vec<String> = Vec::new();
2187
2188                for result in &results {
2189                    if result.passed {
2190                        if !json {
2191                            println!(
2192                                "  {} {}",
2193                                c("⚓", "\x1b[32m", use_color),
2194                                c(&format!("{}: pass ({:.0}%)", result.anchor_name, result.confidence * 100.0), "\x1b[32m", use_color),
2195                            );
2196                        }
2197                        if trace {
2198                            events.push(TraceEvent {
2199                                event: "anchor_pass".to_string(),
2200                                unit: flow_name.to_string(),
2201                                step: step_name.to_string(),
2202                                detail: format!("{} (confidence={:.2})", result.anchor_name, result.confidence),
2203                            });
2204                        }
2205                    } else {
2206                        if !json {
2207                            let severity_color = if result.severity == "error" { "\x1b[31m" } else { "\x1b[33m" };
2208                            println!(
2209                                "  {} {} [{}] ({:.0}%)",
2210                                c("⚓", severity_color, use_color),
2211                                c(&format!("{}: BREACH", result.anchor_name), &format!("\x1b[1m{severity_color}"), use_color),
2212                                result.severity,
2213                                result.confidence * 100.0,
2214                            );
2215                            for v in &result.violations {
2216                                println!(
2217                                    "    {} {}",
2218                                    c("↳", severity_color, use_color),
2219                                    v,
2220                                );
2221                            }
2222                        }
2223                        if trace {
2224                            events.push(TraceEvent {
2225                                event: "anchor_breach".to_string(),
2226                                unit: flow_name.to_string(),
2227                                step: step_name.to_string(),
2228                                detail: format!(
2229                                    "{} [{}] (confidence={:.2}): {}",
2230                                    result.anchor_name,
2231                                    result.severity,
2232                                    result.confidence,
2233                                    result.violations.join("; ")
2234                                ),
2235                            });
2236                        }
2237
2238                        // Collect error-severity breaches for retry
2239                        if result.severity == "error" {
2240                            for v in &result.violations {
2241                                error_breaches.push(format!("{}: {}", result.anchor_name, v));
2242                            }
2243                        }
2244                    }
2245                }
2246
2247                // ── Anchor chaining ─────────────────────────────
2248                let chain_results = anchor_checker::check_chained(&results, anchors, &resp.text);
2249                for (rule, chain_result) in &chain_results {
2250                    if chain_result.passed {
2251                        if !json {
2252                            println!(
2253                                "  {} {}",
2254                                c("⛓", "\x1b[36m", use_color),
2255                                c(
2256                                    &format!(
2257                                        "{} → {}: pass ({:.0}%) [{}]",
2258                                        rule.trigger, chain_result.anchor_name,
2259                                        chain_result.confidence * 100.0, rule.reason,
2260                                    ),
2261                                    "\x1b[36m",
2262                                    use_color,
2263                                ),
2264                            );
2265                        }
2266                    } else {
2267                        if !json {
2268                            println!(
2269                                "  {} {}",
2270                                c("⛓", "\x1b[31m", use_color),
2271                                c(
2272                                    &format!(
2273                                        "{} → {}: BREACH ({:.0}%) [{}]",
2274                                        rule.trigger, chain_result.anchor_name,
2275                                        chain_result.confidence * 100.0, rule.reason,
2276                                    ),
2277                                    "\x1b[1;31m",
2278                                    use_color,
2279                                ),
2280                            );
2281                            for v in &chain_result.violations {
2282                                println!(
2283                                    "    {} {}",
2284                                    c("↳", "\x1b[31m", use_color),
2285                                    v,
2286                                );
2287                            }
2288                        }
2289                        // Chain breaches count as error breaches for retry
2290                        if chain_result.severity == "error" {
2291                            for v in &chain_result.violations {
2292                                error_breaches.push(format!(
2293                                    "{} (chained from {}): {}",
2294                                    chain_result.anchor_name, rule.trigger, v
2295                                ));
2296                            }
2297                        }
2298                    }
2299                    if trace {
2300                        events.push(TraceEvent {
2301                            event: "anchor_chain".to_string(),
2302                            unit: flow_name.to_string(),
2303                            step: step_name.to_string(),
2304                            detail: format!(
2305                                "{} → {}: {} (confidence={:.2}, reason={})",
2306                                rule.trigger,
2307                                chain_result.anchor_name,
2308                                if chain_result.passed { "pass" } else { "BREACH" },
2309                                chain_result.confidence,
2310                                rule.reason,
2311                            ),
2312                        });
2313                    }
2314                }
2315
2316                // ── Retry on error-severity breaches ─────────────
2317                if error_breaches.is_empty() || attempt >= MAX_ANCHOR_RETRIES {
2318                    if !error_breaches.is_empty() {
2319                        if !json {
2320                            println!(
2321                                "  {} {}",
2322                                c("⚠", "\x1b[33m", use_color),
2323                                c(
2324                                    &format!(
2325                                        "Anchor breaches remain after {} retries — continuing",
2326                                        MAX_ANCHOR_RETRIES
2327                                    ),
2328                                    "\x1b[33m",
2329                                    use_color,
2330                                ),
2331                            );
2332                        }
2333                        if trace {
2334                            events.push(TraceEvent {
2335                                event: "retry_exhausted".to_string(),
2336                                unit: flow_name.to_string(),
2337                                step: step_name.to_string(),
2338                                detail: format!(
2339                                    "{} error breaches after {} retries",
2340                                    error_breaches.len(),
2341                                    MAX_ANCHOR_RETRIES
2342                                ),
2343                            });
2344                        }
2345                    }
2346                    conversation.add_user(original_user_prompt);
2347                    conversation.add_assistant(&last_response_text);
2348                    return last_response_text;
2349                }
2350
2351                // Build retry prompt with violation feedback
2352                attempt += 1;
2353                let feedback = error_breaches
2354                    .iter()
2355                    .enumerate()
2356                    .map(|(i, v)| format!("{}. {}", i + 1, v))
2357                    .collect::<Vec<_>>()
2358                    .join("\n");
2359
2360                user_prompt = format!(
2361                    "{}\n\n\
2362                    IMPORTANT: Your previous response violated the following constraints:\n\
2363                    {}\n\n\
2364                    Please regenerate your response, strictly avoiding the violations listed above.",
2365                    original_user_prompt,
2366                    feedback
2367                );
2368
2369                if !json {
2370                    println!(
2371                        "  {} {}",
2372                        c("↻", "\x1b[35m", use_color),
2373                        c(
2374                            &format!(
2375                                "Retry {}/{} — {} anchor breach(es) detected",
2376                                attempt,
2377                                MAX_ANCHOR_RETRIES,
2378                                error_breaches.len()
2379                            ),
2380                            "\x1b[1;35m",
2381                            use_color,
2382                        ),
2383                    );
2384                }
2385
2386                if trace {
2387                    events.push(TraceEvent {
2388                        event: "retry_attempt".to_string(),
2389                        unit: flow_name.to_string(),
2390                        step: step_name.to_string(),
2391                        detail: format!(
2392                            "attempt={}/{}, breaches: {}",
2393                            attempt,
2394                            MAX_ANCHOR_RETRIES,
2395                            error_breaches.join("; ")
2396                        ),
2397                    });
2398                }
2399
2400                // Loop continues with updated user_prompt
2401            }
2402            Err(err) => {
2403                if !json {
2404                    eprintln!(
2405                        "  {} step '{}' failed: {}",
2406                        c("✗", "\x1b[31m", use_color),
2407                        step_name,
2408                        err
2409                    );
2410                }
2411
2412                if trace {
2413                    events.push(TraceEvent {
2414                        event: "step_error".to_string(),
2415                        unit: flow_name.to_string(),
2416                        step: step_name.to_string(),
2417                        detail: format!("{err}"),
2418                    });
2419                }
2420
2421                return String::new(); // API error — don't retry
2422            }
2423        }
2424    }
2425}
2426
2427fn truncate(s: &str, max: usize) -> String {
2428    let first_line = s.lines().next().unwrap_or(s);
2429    if first_line.len() > max {
2430        format!("{}…", &first_line[..max])
2431    } else {
2432        first_line.to_string()
2433    }
2434}
2435
2436/// Build a plan export from compiled execution units.
2437fn build_plan_export(
2438    units: &[ExecutionUnit],
2439    source_file: &str,
2440    backend: &str,
2441    registry: &ToolRegistry,
2442) -> plan_export::PlanExport {
2443    let mut plan_units = Vec::new();
2444    let mut all_deps = PlanDependencies {
2445        max_depth: 0,
2446        parallel_groups: Vec::new(),
2447        unresolved_refs: Vec::new(),
2448    };
2449
2450    for unit in units {
2451        // Build step infos for dependency analysis
2452        // §Fase 61 — fold `use Tool(k = v)` keyword-arg references into the
2453        // analysis argument so the plan reflects the real dependency edges.
2454        let step_name_set: std::collections::HashSet<&str> =
2455            unit.steps.iter().map(|s| s.step_name.as_str()).collect();
2456        let step_infos: Vec<step_deps::StepInfo> = unit.steps.iter().map(|s| {
2457            step_deps::StepInfo {
2458                name: s.step_name.clone(),
2459                step_type: s.step_type.clone(),
2460                user_prompt: s.user_prompt.clone(),
2461                argument: step_deps::use_tool_analysis_argument(
2462                    s.tool_argument.as_deref()
2463                        .or(s.memory_expression.as_deref())
2464                        .unwrap_or(""),
2465                    &s.tool_named_args,
2466                    &step_name_set,
2467                ),
2468            }
2469        }).collect();
2470
2471        let dep_graph = step_deps::analyze(&step_infos);
2472
2473        // Build plan steps with dependency info
2474        let plan_steps: Vec<PlanStep> = unit.steps.iter().zip(dep_graph.steps.iter()).map(|(s, d)| {
2475            PlanStep {
2476                name: s.step_name.clone(),
2477                step_type: s.step_type.clone(),
2478                prompt_preview: truncate(&s.user_prompt, 200),
2479                tool_argument: s.tool_argument.clone(),
2480                memory_expression: s.memory_expression.clone(),
2481                depends_on: d.depends_on.clone(),
2482                is_root: d.is_root,
2483            }
2484        }).collect();
2485
2486        plan_units.push(PlanUnit {
2487            flow_name: unit.flow_name.clone(),
2488            persona_name: unit.persona_name.clone(),
2489            context_name: unit.context_name.clone(),
2490            effort: unit.effort.clone(),
2491            anchor_count: unit.resolved_anchors.len(),
2492            anchors: unit.anchor_instructions.clone(),
2493            steps: plan_steps,
2494        });
2495
2496        // Merge dependency info
2497        if dep_graph.max_depth > all_deps.max_depth {
2498            all_deps.max_depth = dep_graph.max_depth;
2499        }
2500        all_deps.parallel_groups.extend(dep_graph.parallel_groups);
2501        all_deps.unresolved_refs.extend(
2502            dep_graph.unresolved_refs.into_iter().map(|(step, var)| {
2503                UnresolvedRef { step, variable: var }
2504            }),
2505        );
2506    }
2507
2508    // Build tool info
2509    let tools = PlanTools {
2510        total: registry.len(),
2511        builtin: registry.builtin_names().into_iter().map(|s| s.to_string()).collect(),
2512        program: registry.program_names().into_iter().map(|s| s.to_string()).collect(),
2513        registered: registry.tool_names().into_iter().map(|name| {
2514            let entry = registry.get(name).unwrap();
2515            PlanToolEntry {
2516                name: entry.name.clone(),
2517                provider: entry.provider.clone(),
2518                source: format!("{:?}", entry.source).to_lowercase(),
2519                output_schema: entry.output_schema.clone(),
2520                effect_row: entry.effect_row.clone(),
2521            }
2522        }).collect(),
2523    };
2524
2525    PlanBuilder::build(source_file, backend, &plan_units, tools, all_deps)
2526}
2527
2528// ── Server execution entry point ─────────────────────────────────────────────
2529
2530pub struct ServerRunnerMetrics {
2531    pub success: bool,
2532    pub steps_executed: usize,
2533    pub tokens_input: u64,
2534    pub tokens_output: u64,
2535    pub anchor_breaches: usize,
2536    pub step_names: Vec<String>,
2537    pub step_results: Vec<String>,
2538    /// Per-step token chunks for streaming (simulated from step results).
2539    pub per_step_chunks: Vec<Vec<String>>,
2540    /// §Fase 39.c.y — semantic provenance events captured during
2541    /// flow execution. Each entry is a `kind:identifier` slug
2542    /// (closed taxonomy enforced by producer sites):
2543    ///   - `retrieve:<store>`         — Pillar II store read
2544    ///   - `persist:<store>`          — Pillar II store insert
2545    ///   - `mutate:<store>`           — Pillar II store update
2546    ///   - `purge:<store>`            — Pillar II store delete
2547    ///   - `shield:<name>@<step>`     — Pillar I shield invocation
2548    ///   - `ots:<name>@<step>`        — OTS apply
2549    ///   - `mandate:<name>@<step>`    — mandate apply
2550    ///   - `compute:<name>@<step>`    — compute apply
2551    ///   - `lambda_apply:<name>@<step>` — lambda data apply
2552    /// The wire envelope's `provenance_chain` is built from
2553    /// `[flow:F, …events…, step:S0, step:S1, …, backend:B]`.
2554    /// Empty for trivial flows; populated by [`emit_provenance_event`]
2555    /// at the runtime sites.
2556    pub provenance_events: Vec<String>,
2557    /// §Fase 39.c.z — closed-catalog blame attribution from runtime
2558    /// degradation events. Populated when:
2559    ///   - an anchor with severity != "error" fires (degraded path
2560    ///     proceeds)
2561    ///   - a shield flags content but flow proceeds
2562    ///   - a store mutation chain verification fails AND flow
2563    ///     proceeds with prior-state read
2564    ///   - a backend returns truncated / partial response
2565    ///   - D5 detects a recoverable type mismatch
2566    /// `None` on the clean happy path. The first surfaced blame
2567    /// wins (subsequent events are recorded in audit_log but do
2568    /// not overwrite the primary attribution).
2569    pub blame_attribution: Option<crate::wire_envelope::BlameContext>,
2570    /// §Fase 55.b — the Theorem 5.1 `(base, scope, confidence)` triple of
2571    /// every flow-level `use <Tool>` dispatch whose tool declares an
2572    /// `epistemic:<level>` effect. Derived from the IR via
2573    /// [`crate::epistemic_capture::collect_for_flow`] — the same function
2574    /// the streaming path calls, so both transports surface byte-identical
2575    /// envelopes (§55.c parity). Empty for flows with no epistemic tool.
2576    pub epistemic_envelopes: Vec<crate::epistemic_capture::EpistemicEnvelope>,
2577}
2578
2579/// §Fase 55.b/c — derive a flow's epistemic envelopes from the IR. This is
2580/// the SINGLE site both transports funnel through — the synchronous runner
2581/// calls it directly with its in-hand `ir`; the streaming
2582/// `axon_server::resolve_epistemic_envelopes_for_flow` re-derives the IR
2583/// from source and calls THIS function — so the sync `FlowEnvelope` and the
2584/// streaming `axon.complete` carry byte-identical `(base, scope, confidence)`
2585/// triples by construction (the §55.c parity invariant: there is exactly
2586/// one derivation, never two that could drift). `input_confidence = 1.0`:
2587/// a top-level flow's ψ is clean before any tool degrades it.
2588pub fn derive_epistemic_envelopes_for_flow(
2589    ir: &crate::ir_nodes::IRProgram,
2590    flow_name: &str,
2591) -> Vec<crate::epistemic_capture::EpistemicEnvelope> {
2592    ir.flows
2593        .iter()
2594        .find(|f| f.name == flow_name)
2595        .map(|f| crate::epistemic_capture::collect_for_flow(f, &ir.tools, 1.0))
2596        .unwrap_or_default()
2597}
2598
2599pub fn execute_server_flow(
2600    ir: &crate::ir_nodes::IRProgram,
2601    flow_name: &str,
2602    backend: &str,
2603    source_file: &str,
2604    api_key_override: Option<&str>,
2605    // §Fase 37.b (D1) — the parsed HTTP request body. The flow's
2606    // declared parameters bind from its same-named fields (the Request
2607    // Binding Contract) and seed each `ExecContext` before the step
2608    // walk. `None` for a caller with no request body (D5).
2609    request_body: Option<&serde_json::Value>,
2610    // §Fase 37.y (D3) — the URL path captures (e.g. for
2611    // `/api/tenants/{tenant_id}` the map is `{tenant_id: "acme"}`).
2612    // Empty map for callers without a dynamic route (D5 backwards-
2613    // compat). Passed to `bind_request` alongside `request_body`.
2614    request_path: &std::collections::HashMap<String, String>,
2615    // §Fase 37.y (D3) — the URL query string parsed into name → value.
2616    // Single-value semantics in v1.38.5 (multi-value query keys
2617    // deferred per plan vivo §7); axum's `Query<HashMap>` extractor
2618    // provides this shape.
2619    request_query: &std::collections::HashMap<String, String>,
2620    // §Fase 58.g (D7) — optional per-tenant / per-server tool base URL.
2621    // When `Some`, every URL-dispatched program tool with a RELATIVE
2622    // `runtime` is resolved against it (`{base}/{slug}`) so the adopter
2623    // wires their tool-server via config without touching the program;
2624    // absolute runtimes stay verbatim (D5). `None` → no resolution.
2625    tool_base_url: Option<&str>,
2626) -> Result<ServerRunnerMetrics, String> {
2627    let mut target_run = None;
2628    for run in &ir.runs {
2629        if run.flow_name == flow_name {
2630            target_run = Some(run);
2631            break;
2632        }
2633    }
2634
2635    let mut execution_units = Vec::new();
2636
2637    if let Some(run) = target_run {
2638        execution_units.push(ExecutionUnit {
2639            flow_name: run.flow_name.clone(),
2640            persona_name: run.persona_name.clone(),
2641            context_name: run.context_name.clone(),
2642            system_prompt: build_system_prompt(run, backend),
2643            steps: build_compiled_steps(run, ir),
2644            anchor_instructions: build_anchor_instructions(run),
2645            effort: run.effort.clone(),
2646            resolved_anchors: run.resolved_anchors.clone(),
2647            // §Fase 37.b (D1) — bind the request body to the resolved
2648            // flow's declared parameters.
2649            // §Fase 37.y (D3) — extended to bind from path + query
2650            // sources too; the runtime merge respects the D4
2651            // compile-time collision rejection (axon-T901).
2652            param_bindings: run
2653                .resolved_flow
2654                .as_ref()
2655                .map(|f| crate::request_binding::bind_request(
2656                    f,
2657                    request_path,
2658                    request_query,
2659                    request_body,
2660                ))
2661                .unwrap_or_default(),
2662        });
2663    } else {
2664        let target_flow: &crate::ir_nodes::IRFlow = ir
2665            .flows
2666            .iter()
2667            .find(|f| f.name == flow_name)
2668            .ok_or_else(|| format!("flow '{}' not found in compiled IR", flow_name))?;
2669
2670        let default_persona = ir.personas.first().cloned().unwrap_or_else(|| crate::ir_nodes::IRPersona {
2671            node_type: "Persona",
2672            source_line: 0,
2673            source_column: 0,
2674            name: "Default".to_string(),
2675            domain: vec![],
2676            tone: "".to_string(),
2677            confidence_threshold: None,
2678            cite_sources: None,
2679            refuse_if: vec![],
2680            language: "".to_string(),
2681            description: "".to_string(),
2682        });
2683        let default_context = ir.contexts.first().cloned().unwrap_or_else(|| crate::ir_nodes::IRContext {
2684            node_type: "Context",
2685            source_line: 0,
2686            source_column: 0,
2687            name: "Default".to_string(),
2688            memory_scope: "".to_string(),
2689            language: "".to_string(),
2690            depth: "".to_string(),
2691            max_tokens: None,
2692            temperature: None,
2693            cite_sources: None,
2694        });
2695
2696        let run = crate::ir_nodes::IRRun {
2697            node_type: "Run",
2698            source_line: 0,
2699            source_column: 0,
2700            flow_name: flow_name.to_string(),
2701            arguments: vec![],
2702            persona_name: default_persona.name.clone(),
2703            context_name: default_context.name.clone(),
2704            anchor_names: vec![],
2705            on_failure: "".to_string(),
2706            on_failure_params: vec![],
2707            output_to: "".to_string(),
2708            effort: "low".to_string(),
2709            resolved_flow: Some(target_flow.clone()),
2710            resolved_persona: Some(default_persona),
2711            resolved_context: Some(default_context),
2712            resolved_anchors: ir.anchors.clone(),
2713        };
2714
2715        execution_units.push(ExecutionUnit {
2716            flow_name: run.flow_name.clone(),
2717            persona_name: run.persona_name.clone(),
2718            context_name: run.context_name.clone(),
2719            system_prompt: build_system_prompt(&run, backend),
2720            steps: build_compiled_steps(&run, ir),
2721            anchor_instructions: build_anchor_instructions(&run),
2722            effort: run.effort.clone(),
2723            resolved_anchors: run.resolved_anchors.clone(),
2724            // §Fase 37.b (D1) — bind the request body to the flow's
2725            // declared parameters (the dynamic-route execution path).
2726            // §Fase 37.y (D3) — extended to bind from path + query
2727            // sources too.
2728            param_bindings: crate::request_binding::bind_request(
2729                target_flow,
2730                request_path,
2731                request_query,
2732                request_body,
2733            ),
2734        });
2735    }
2736
2737    let mut report = crate::output::ReportBuilder::new(source_file, backend, "json");
2738    let mut registry = crate::tool_registry::ToolRegistry::new();
2739    // §Fase 58.f — register the program's declared tools on the SERVER path
2740    // (the CLI path already does this in `run_run`). Without this, every
2741    // program-declared `tool { provider: http … }` missed the registry and the
2742    // step silently degraded to an LLM call (the brief #22 / #17 finding). This
2743    // `registry` is a per-call local (built fresh above for THIS request), so
2744    // registration is request-scoped — no cross-tenant tool contamination
2745    // between concurrent flows (§58 D10). Provider→URL resolves via each tool's
2746    // declared `runtime:` field (D7); the §58.e structured body then POSTs to it.
2747    registry.register_from_ir(&ir.tools);
2748    // §Fase 58.g (D7) — resolve relative tool runtimes against the
2749    // caller-supplied per-tenant / per-server base URL. Request-scoped
2750    // (this `registry` is a per-call local) → no cross-tenant leakage.
2751    if let Some(base) = tool_base_url {
2752        registry.resolve_relative_endpoints(base);
2753    }
2754
2755    // §Fase 35.e — build the axonstore registry from the program's
2756    // declarations. The D2 closed-catalog gate runs here: an unknown
2757    // backend fails fast, at deploy, with a named error.
2758    let store_registry = StoreRegistry::build(&ir.axonstore_specs)
2759        .map_err(|e| format!("axonstore registry: {e}"))?;
2760
2761    // §Fase 37.x.j (D1) — Eager acquire one PoolConnection per
2762    // postgresql-backed axonstore referenced in the flow body BEFORE
2763    // executing any step. Each pin is held for the whole flow
2764    // execution and released on `pinned_conns` drop at the end of
2765    // this function (Rust handles the drop order: HashMap drops →
2766    // every PoolConnection drops → the per-conn `after_release
2767    // DEALLOCATE ALL` hook from Fase 38.x.a D2 runs → conn returns
2768    // to the pool clean).
2769    //
2770    // The discovery walk filters `step.step_type` to the four SQL
2771    // store ops + checks the registry's `backend_kind` to skip
2772    // in_memory stores (no race, no pin needed). The set is
2773    // deduplicated by store_name — multiple steps against the same
2774    // store share ONE pin (the D1 invariant).
2775    //
2776    // Acquire failure is non-fatal: the flow proceeds with an empty
2777    // pin map, which falls back to the legacy `StoreConn::Pool`
2778    // path. This preserves resilience against transient pool
2779    // saturation (a deploy-time `verify_postgres_schemas` failure
2780    // is the right gate for "store unreachable", not flow-time).
2781    // §Fase 37.x.j.10 (POST-CLOSE HOTFIX) — Compute the set of
2782    // postgresql-backed axonstores referenced by ANY execution unit's
2783    // body. This walks the IR purely SYNCHRONOUSLY — no .await, no
2784    // tokio runtime needed. The actual pin acquisition happens INSIDE
2785    // the single outer `block_on_store` below so the pins acquire on
2786    // the SAME runtime that later dispatches their SQL.
2787    let needed_pg_stores: std::collections::HashSet<String> = {
2788        let mut needed = std::collections::HashSet::new();
2789        for unit in &execution_units {
2790            for step in &unit.steps {
2791                if matches!(
2792                    step.step_type.as_str(),
2793                    "persist" | "retrieve" | "mutate" | "purge"
2794                ) && store_registry.backend_kind(&step.step_name)
2795                    == Some(crate::store::registry::StoreBackendKind::Postgresql)
2796                {
2797                    needed.insert(step.step_name.clone());
2798                }
2799            }
2800        }
2801        needed
2802    };
2803
2804    let (success, _events) = if backend == "stub" {
2805        let result = execute_stub(&execution_units, false, false);
2806        // §Fase 33.b Layer 1 — close the steps_executed:0 hollow-wire bug.
2807        //
2808        // execute_stub prints step results to stdout and updates its
2809        // local stub_ctx but does NOT touch the ReportBuilder. The CLI
2810        // path at the bottom of this file handles the gap by manually
2811        // populating the report after execute_stub returns; the server
2812        // path (this function) historically did not, so every dynamic-
2813        // route SSE dispatch over the stub backend served a hollow
2814        // body: `step:""`, `token:""`, `steps_executed:0`.
2815        //
2816        // Mirror the CLI path here: enumerate the execution_units +
2817        // record one StepReport per step. `result: "(stub)"` matches
2818        // the CLI's placeholder — adopters running on stub see the
2819        // step name + a sentinel result, NOT an empty event. Real
2820        // backend streaming (Fase 33.d) replaces "(stub)" with the
2821        // actual backend chunk text.
2822        for unit in &execution_units {
2823            report.begin_unit(&unit.flow_name, &unit.persona_name);
2824            for step in &unit.steps {
2825                report.record_step(crate::output::StepReport {
2826                    name: step.step_name.clone(),
2827                    step_type: step.step_type.clone(),
2828                    result: "(stub)".to_string(),
2829                    duration_ms: 0,
2830                    input_tokens: 0,
2831                    output_tokens: 0,
2832                    anchor_breaches: 0,
2833                    chain_activations: 0,
2834                    was_retried: false,
2835                });
2836            }
2837            let mut stub_hooks = crate::hooks::HookManager::new();
2838            stub_hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
2839            stub_hooks.on_unit_end();
2840            report.end_unit(&stub_hooks);
2841        }
2842        result
2843    } else {
2844        // §Fase 37.x.j.10 (POST-CLOSE HOTFIX 2026-05-21) — SINGLE
2845        // outer `block_on_store` wraps BOTH the eager pin acquisition
2846        // AND the flow execution. This is the load-bearing structural
2847        // property: pin acquire + every SQL dispatch + implicit pin
2848        // drop ALL live on the SAME temporary tokio runtime. Reactor
2849        // handles inside each `PoolConnection<Postgres>` stay valid
2850        // throughout the flow. Pre-hotfix the eager-acquire loop used
2851        // its OWN block_on_store per store (separate runtime), and
2852        // the dispatch's nested block_on_store inside the sync
2853        // `execute_sql_store_step` was YET ANOTHER runtime — pinned
2854        // conn awaited from a foreign runtime hung indefinitely.
2855        block_on_store(async {
2856            let mut pinned_conns: std::collections::HashMap<
2857                String,
2858                sqlx::pool::PoolConnection<sqlx::Postgres>,
2859            > = std::collections::HashMap::new();
2860
2861            // — 1. Eager pin acquisition on THIS runtime.
2862            //
2863            // Note: `async` (no `move`) so we borrow `report`,
2864            // `registry`, `store_registry`, `execution_units`,
2865            // `needed_pg_stores`, etc. by reference. The async block's
2866            // lifetime is bounded by `block_on_store` which is
2867            // bounded by the enclosing function — so the borrows
2868            // outlive the future safely.
2869            for store_name in &needed_pg_stores {
2870                match store_registry.resolve(store_name) {
2871                    Ok(crate::store::registry::StoreHandle::Postgres(backend_pool)) => {
2872                        match backend_pool.acquire_pin().await {
2873                            Ok(conn) => {
2874                                crate::store::pin_observability::emit_pin_acquire(
2875                                    store_name,
2876                                    flow_name,
2877                                    "",
2878                                    "eager",
2879                                    None,
2880                                );
2881                                pinned_conns.insert(store_name.clone(), conn);
2882                            }
2883                            Err(e) => {
2884                                tracing::warn!(
2885                                    target: "axon::store::pin",
2886                                    store_name = %store_name,
2887                                    error = %e,
2888                                    d_letter = "37.x.j.D1",
2889                                    "failed to acquire flow pin; falling \
2890                                     back to per-step pool acquisition \
2891                                     (legacy path) for this store. Adopter \
2892                                     under transaction-mode pooler may \
2893                                     observe the unnamed-prepared-statement \
2894                                     race for this op."
2895                                );
2896                            }
2897                        }
2898                    }
2899                    Ok(_) => {}
2900                    Err(e) => {
2901                        tracing::warn!(
2902                            target: "axon::store::pin",
2903                            store_name = %store_name,
2904                            error = %e,
2905                            d_letter = "37.x.j.D1",
2906                            "failed to resolve axonstore for flow pin; \
2907                             falling back to per-step pool acquisition."
2908                        );
2909                    }
2910                }
2911            }
2912
2913            // — 2. Run the flow on THIS SAME runtime.
2914            execute_real_async(
2915                &execution_units,
2916                backend,
2917                source_file,
2918                false,
2919                false,
2920                false,
2921                crate::output::OutputFormat::Json,
2922                &mut report,
2923                &registry,
2924                &store_registry,
2925                &mut pinned_conns,
2926                api_key_override,
2927            ).await
2928            // — 3. `pinned_conns` drops here → every PoolConnection
2929            //   drops on THIS runtime → `after_release(DEALLOCATE ALL)`
2930            //   hook runs (Fase 38.x.a D2) → conns return to pool
2931            //   clean. The whole pin lifecycle stayed on one runtime.
2932        }).map_err(|e| format!("Backend error: {:?}", e))?
2933    };
2934
2935    let hooks = crate::hooks::HookManager::new();
2936    let r = report.build(success, &hooks);
2937
2938    // §Fase 39.c.z — derive blame from the report BEFORE the
2939    // partial-move loop below. The producer walks the units +
2940    // steps by reference; the loop afterward consumes them. We
2941    // must extract any structured observability from `r` first.
2942    let blame_attribution =
2943        crate::wire_envelope_producers::derive_blame_from_report(&r);
2944
2945    let mut steps_executed = 0;
2946    let mut tokens_input = 0;
2947    let mut tokens_output = 0;
2948    let mut step_results = Vec::new();
2949    let mut step_names = Vec::new();
2950    let mut anchor_breaches = 0;
2951
2952    for u in r.units {
2953        for s in u.steps {
2954            steps_executed += 1;
2955            tokens_input += s.input_tokens;
2956            tokens_output += s.output_tokens;
2957            step_results.push(s.result);
2958            step_names.push(s.name);
2959            anchor_breaches += s.anchor_breaches as usize;
2960        }
2961    }
2962
2963    // Generate per-step token chunks (simulated streaming granularity)
2964    let per_step_chunks: Vec<Vec<String>> = step_results.iter().map(|result| {
2965        if result.is_empty() {
2966            vec![]
2967        } else {
2968            // Chunk by word boundaries (~token-level granularity)
2969            result.split_whitespace()
2970                .collect::<Vec<&str>>()
2971                .chunks(3) // ~3 words per chunk
2972                .map(|chunk| chunk.join(" "))
2973                .collect()
2974        }
2975    }).collect();
2976
2977    // §Fase 39.c.y — derive semantic provenance events from the IR
2978    // walk. Each store-touching step + each shield/ots/mandate/compute
2979    // apply emits a `kind:identifier` slug into the chain. The slug
2980    // taxonomy is closed (see ServerRunnerMetrics.provenance_events
2981    // doc + wire_envelope_producers module). This is the FOUNDATION
2982    // of Pillar II audit lineage on the wire envelope.
2983    let provenance_walk: Vec<(String, String)> = execution_units
2984        .iter()
2985        .flat_map(|u| {
2986            u.steps
2987                .iter()
2988                .map(|s| (s.step_type.clone(), s.step_name.clone()))
2989        })
2990        .collect();
2991    let provenance_events =
2992        crate::wire_envelope_producers::collect_provenance_events_from(
2993            &provenance_walk,
2994        );
2995
2996    // §Fase 39.c.z — blame_attribution was derived BEFORE the
2997    // partial-move loop above (the report's units/steps are
2998    // consumed into step_names/step_results by that loop). The
2999    // priority order is: anchor breach > shield rejection > store
3000    // breach > backend soft-fail > type mismatch. The first
3001    // surfaced wins per `merge_blame`'s stable tie-break.
3002
3003    // §Fase 55.b/c — capture the epistemic envelopes via the SINGLE shared
3004    // derivation (the streaming path funnels through the same function).
3005    let epistemic_envelopes = derive_epistemic_envelopes_for_flow(ir, flow_name);
3006
3007    Ok(ServerRunnerMetrics {
3008        success,
3009        steps_executed,
3010        tokens_input,
3011        tokens_output,
3012        anchor_breaches,
3013        step_names,
3014        step_results,
3015        per_step_chunks,
3016        provenance_events,
3017        blame_attribution,
3018        epistemic_envelopes,
3019    })
3020}
3021
3022// ── Public entry point ───────────────────────────────────────────────────────
3023
3024pub fn run_run(
3025    file: &str,
3026    backend: &str,
3027    trace: bool,
3028    tool_mode: &str,
3029    stream: bool,
3030    output: &str,
3031    export_plan: bool,
3032) -> i32 {
3033    let output_fmt = match OutputFormat::from_str(output) {
3034        Some(f) => f,
3035        None => {
3036            eprintln!("✗ Invalid output format '{}'. Use 'text' or 'json'.", output);
3037            return 2;
3038        }
3039    };
3040    let json = output_fmt.is_json();
3041    let use_color = if json { false } else { io::stdout().is_terminal() };
3042    let path = Path::new(file);
3043    let filename = path
3044        .file_name()
3045        .map(|n| n.to_string_lossy().into_owned())
3046        .unwrap_or_else(|| file.to_string());
3047
3048    // ── 1. Read source ───────────────────────────────────────────
3049    let source = match std::fs::read_to_string(path) {
3050        Ok(s) => s,
3051        Err(_) => {
3052            eprintln!(
3053                "{}",
3054                c(&format!("✗ File not found: {file}"), "\x1b[1;31m", use_color)
3055            );
3056            return 2;
3057        }
3058    };
3059
3060    // ── 2. Lex ───────────────────────────────────────────────────
3061    let tokens = match Lexer::new(&source, file).tokenize() {
3062        Ok(t) => t,
3063        Err(LexerError { message, line, column }) => {
3064            let loc = if column > 0 {
3065                format!(":{line}:{column}")
3066            } else {
3067                format!(":{line}")
3068            };
3069            eprintln!(
3070                "{}  {message}",
3071                c(&format!("✗ {filename}{loc}"), "\x1b[1;31m", use_color)
3072            );
3073            return 1;
3074        }
3075    };
3076
3077    // ── 3. Parse ─────────────────────────────────────────────────
3078    let mut parser = Parser::new(tokens);
3079    let program = match parser.parse() {
3080        Ok(p) => p,
3081        Err(ParseError { message, line, column, .. }) => {
3082            let loc = if column > 0 {
3083                format!(":{line}:{column}")
3084            } else {
3085                format!(":{line}")
3086            };
3087            eprintln!(
3088                "{}  {message}",
3089                c(&format!("✗ {filename}{loc}"), "\x1b[1;31m", use_color)
3090            );
3091            return 1;
3092        }
3093    };
3094
3095    // ── 4. Type check ────────────────────────────────────────────
3096    let type_errors = TypeChecker::new(&program).check();
3097    if !type_errors.is_empty() {
3098        eprintln!(
3099            "{}  {} type error(s)",
3100            c(&format!("✗ {filename}"), "\x1b[1;31m", use_color),
3101            type_errors.len()
3102        );
3103        for te in &type_errors {
3104            eprintln!("  error [line {}]: {}", te.line, te.message);
3105        }
3106        return 1;
3107    }
3108
3109    // ── 5. Generate IR ───────────────────────────────────────────
3110    let ir_program = IRGenerator::new().generate(&program);
3111
3112    // ── 6. Build execution plan ──────────────────────────────────
3113    let units = build_execution_plan(&ir_program, backend);
3114
3115    if units.is_empty() {
3116        eprintln!(
3117            "{}",
3118            c("⚠ No run statements found — nothing to execute.", "\x1b[1;33m", use_color)
3119        );
3120        return 0;
3121    }
3122
3123    // ── 7. Execute ───────────────────────────────────────────────
3124    let mode_label = if tool_mode == "real" {
3125        if stream { "real+stream" } else { "real" }
3126    } else {
3127        "stub"
3128    };
3129
3130    if !json {
3131        println!(
3132            "{}",
3133            c(
3134                &format!(
3135                    "═══ AXON Run: {filename} ({} unit{}, backend={backend}, mode={tool_mode}) ═══",
3136                    units.len(),
3137                    if units.len() == 1 { "" } else { "s" }
3138                ),
3139                "\x1b[1;36m",
3140                use_color,
3141            )
3142        );
3143    }
3144
3145    let mut report = ReportBuilder::new(file, backend, mode_label);
3146
3147    // Build tool registry from IR + builtins
3148    let mut registry = ToolRegistry::new();
3149    registry.register_from_ir(&ir_program.tools);
3150
3151    // §Fase 35.e — build the axonstore registry (D2 closed-catalog
3152    // gate). An unknown `backend:` fails fast, before execution.
3153    let store_registry = match StoreRegistry::build(&ir_program.axonstore_specs) {
3154        Ok(r) => r,
3155        Err(e) => {
3156            eprintln!(
3157                "{}  {e}",
3158                c(&format!("✗ {filename}"), "\x1b[1;31m", use_color)
3159            );
3160            return 1;
3161        }
3162    };
3163
3164    if !json && !registry.program_names().is_empty() {
3165        println!(
3166            "  {}",
3167            c(
3168                &format!(
3169                    "Tools: {} registered ({} builtin + {} program)",
3170                    registry.len(),
3171                    registry.builtin_names().len(),
3172                    registry.program_names().len(),
3173                ),
3174                "\x1b[2m",
3175                use_color,
3176            )
3177        );
3178    }
3179
3180    // ── Export plan and exit (no execution) ──────────────────────
3181    if export_plan {
3182        let plan = build_plan_export(&units, file, backend, &registry);
3183        println!("{}", PlanBuilder::to_json(&plan));
3184        return 0;
3185    }
3186
3187    // §Fase 37.x.j (D1) — CLI path: no flow-scoped pinning (the CLI
3188    // runs one flow per process invocation; the legacy per-step
3189    // `StoreConn::Pool` fallback is acceptable for one-shot runs and
3190    // keeps CLI smoke tests byte-identical to pre-37.x.j).
3191    let mut cli_pinned_conns: std::collections::HashMap<
3192        String,
3193        sqlx::pool::PoolConnection<sqlx::Postgres>,
3194    > = std::collections::HashMap::new();
3195    let (success, events) = if tool_mode == "real" {
3196        match execute_real(&units, backend, file, use_color, trace, stream, output_fmt, &mut report, &registry, &store_registry, &mut cli_pinned_conns, None) {
3197            Ok((s, e)) => (s, e),
3198            Err(err) => {
3199                eprintln!(
3200                    "{}",
3201                    c(&format!("✗ Backend error: {err}"), "\x1b[1;31m", use_color)
3202                );
3203                return 2;
3204            }
3205        }
3206    } else {
3207        let (s, e) = execute_stub(&units, use_color, trace);
3208        // For stub mode, build minimal unit reports
3209        for unit in &units {
3210            report.begin_unit(&unit.flow_name, &unit.persona_name);
3211            for step in &unit.steps {
3212                report.record_step(StepReport {
3213                    name: step.step_name.clone(),
3214                    step_type: step.step_type.clone(),
3215                    result: "(stub)".into(),
3216                    duration_ms: 0,
3217                    input_tokens: 0,
3218                    output_tokens: 0,
3219                    anchor_breaches: 0,
3220                    chain_activations: 0,
3221                    was_retried: false,
3222                });
3223            }
3224            // Stub mode has no HookManager — use a temporary one for the unit
3225            let mut stub_hooks = crate::hooks::HookManager::new();
3226            stub_hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
3227            stub_hooks.on_unit_end();
3228            report.end_unit(&stub_hooks);
3229        }
3230        (s, e)
3231    };
3232
3233    // ── 8. JSON output or text summary ─────────────────────────
3234    if json {
3235        // Build report with a dummy HookManager for stub mode
3236        // (real mode already populated hooks inside execute_real)
3237        let stub_hooks = crate::hooks::HookManager::new();
3238        let execution_report = report.build(success, &stub_hooks);
3239        println!("{}", ReportBuilder::to_json(&execution_report));
3240    } else {
3241        let total_steps: usize = units.iter().map(|u| u.steps.len()).sum();
3242        println!(
3243            "\n{}",
3244            c(
3245                &format!(
3246                    "═══ {} unit{}, {} step{} — {mode_label} execution complete ═══",
3247                    units.len(),
3248                    if units.len() == 1 { "" } else { "s" },
3249                    total_steps,
3250                    if total_steps == 1 { "" } else { "s" },
3251                ),
3252                "\x1b[1;32m",
3253                use_color,
3254            )
3255        );
3256    }
3257
3258    // ── 9. Save trace ────────────────────────────────────────────
3259    if trace && !events.is_empty() {
3260        let trace_path = Path::new(file).with_extension("trace.json");
3261        let trace_json = serde_json::json!({
3262            "_meta": {
3263                "source": file,
3264                "backend": backend,
3265                "tool_mode": tool_mode,
3266                "axon_version": AXON_VERSION,
3267                "mode": "stub",
3268            },
3269            "events": events,
3270        });
3271        match serde_json::to_string_pretty(&trace_json) {
3272            Ok(json_str) => match std::fs::write(&trace_path, json_str) {
3273                Ok(_) => {
3274                    if !json {
3275                        println!(
3276                            "{}",
3277                            c(
3278                                &format!("📋 Trace saved → {}", trace_path.display()),
3279                                "\x1b[1;35m",
3280                                use_color,
3281                            )
3282                        );
3283                    }
3284                }
3285                Err(e) => eprintln!("⚠ Could not save trace: {e}"),
3286            },
3287            Err(e) => eprintln!("⚠ Could not serialize trace: {e}"),
3288        }
3289    }
3290
3291    if success { 0 } else { 1 }
3292}
3293
3294// ── §Fase 35.e — sync-runner axonstore wiring tests ─────────────────
3295
3296#[cfg(test)]
3297mod fase58e_tests {
3298    use super::*;
3299
3300    #[test]
3301    fn coerce_respects_declared_int_float_bool() {
3302        assert_eq!(coerce_tool_arg_value("5", Some("Int")), serde_json::json!(5));
3303        assert_eq!(
3304            coerce_tool_arg_value("3.14", Some("Float")),
3305            serde_json::json!(3.14)
3306        );
3307        assert_eq!(
3308            coerce_tool_arg_value("true", Some("Bool")),
3309            serde_json::json!(true)
3310        );
3311        assert_eq!(
3312            coerce_tool_arg_value("false", Some("Bool")),
3313            serde_json::json!(false)
3314        );
3315    }
3316
3317    #[test]
3318    fn coerce_keeps_string_param_verbatim_even_if_all_digits() {
3319        // Robustness invariant: a `String` param is NEVER numified.
3320        assert_eq!(
3321            coerce_tool_arg_value("12345", Some("String")),
3322            serde_json::json!("12345")
3323        );
3324        assert_eq!(
3325            coerce_tool_arg_value("Acme Corp", Some("String")),
3326            serde_json::json!("Acme Corp")
3327        );
3328    }
3329
3330    #[test]
3331    fn coerce_optional_and_generic_types_use_base() {
3332        assert_eq!(coerce_tool_arg_value("7", Some("Int?")), serde_json::json!(7));
3333        // `List<String>` → base `List` → not a scalar → string.
3334        assert_eq!(
3335            coerce_tool_arg_value("x", Some("List<String>")),
3336            serde_json::json!("x")
3337        );
3338    }
3339
3340    #[test]
3341    fn coerce_unparseable_scalar_falls_back_to_string_not_dropped() {
3342        // Declared Int/Bool but the (interpolated) value isn't one → lenient
3343        // string rather than a drop. The §58.d type-checker already flags a
3344        // literal mismatch at compile time.
3345        assert_eq!(
3346            coerce_tool_arg_value("not-a-number", Some("Int")),
3347            serde_json::json!("not-a-number")
3348        );
3349        assert_eq!(
3350            coerce_tool_arg_value("maybe", Some("Bool")),
3351            serde_json::json!("maybe")
3352        );
3353    }
3354
3355    #[test]
3356    fn coerce_unknown_or_schemaless_param_is_string() {
3357        assert_eq!(coerce_tool_arg_value("5", None), serde_json::json!("5"));
3358        assert_eq!(
3359            coerce_tool_arg_value("5", Some("SearchResults")),
3360            serde_json::json!("5")
3361        );
3362    }
3363
3364    #[test]
3365    fn build_body_assembles_typed_structured_object() {
3366        let args = vec![
3367            ("query".to_string(), "Acme Corp".to_string()),
3368            ("max_results".to_string(), "5".to_string()),
3369            ("safesearch".to_string(), "true".to_string()),
3370        ];
3371        let types = vec![
3372            ("query".to_string(), "String".to_string()),
3373            ("max_results".to_string(), "Int".to_string()),
3374            ("safesearch".to_string(), "Bool".to_string()),
3375        ];
3376        let v: serde_json::Value =
3377            serde_json::from_str(&build_structured_tool_body(&args, &types)).unwrap();
3378        assert_eq!(v["query"], serde_json::json!("Acme Corp"));
3379        assert_eq!(v["max_results"], serde_json::json!(5));
3380        assert_eq!(v["safesearch"], serde_json::json!(true));
3381        // NOT the flat `{"input": …}` legacy shape.
3382        assert!(v.get("input").is_none());
3383    }
3384
3385    #[test]
3386    fn build_body_escapes_special_characters_via_serde() {
3387        let args = vec![("query".to_string(), "a\"b\nc".to_string())];
3388        let types = vec![("query".to_string(), "String".to_string())];
3389        let v: serde_json::Value =
3390            serde_json::from_str(&build_structured_tool_body(&args, &types)).unwrap();
3391        assert_eq!(v["query"], serde_json::json!("a\"b\nc"));
3392    }
3393
3394    #[test]
3395    fn build_body_empty_args_is_empty_object() {
3396        assert_eq!(build_structured_tool_body(&[], &[]), "{}");
3397    }
3398}
3399
3400#[cfg(test)]
3401mod fase35e_tests {
3402    use super::*;
3403
3404    fn pg_store(name: &str, connection: &str) -> IRAxonStore {
3405        IRAxonStore {
3406            node_type: "axonstore",
3407            source_line: 0,
3408            source_column: 0,
3409            name: name.to_string(),
3410            backend: "postgresql".to_string(),
3411            connection: connection.to_string(),
3412            confidence_floor: None,
3413            isolation: String::new(),
3414            on_breach: String::new(),
3415            capability: String::new(),
3416            column_schema: None,
3417        }
3418    }
3419
3420    #[test]
3421    fn block_on_store_runs_a_future_from_a_plain_thread() {
3422        // The CLI path: `execute_real` runs with no ambient runtime.
3423        let n = block_on_store(async { 20 + 15 });
3424        assert_eq!(n, 35);
3425    }
3426
3427    #[tokio::test]
3428    async fn block_on_store_runs_a_future_from_within_a_runtime() {
3429        // The server path: `execute_real` runs on a Tokio worker
3430        // thread. `block_on_store` must NOT panic with "runtime within
3431        // a runtime" — it spawns a fresh OS thread that owns its own
3432        // runtime.
3433        let n = block_on_store(async { 7 * 6 });
3434        assert_eq!(n, 42);
3435    }
3436
3437    #[test]
3438    fn sql_store_step_surfaces_missing_env_var_never_a_kv_fallback() {
3439        // The SQL path is reached (routing works) and fails honestly:
3440        // a postgresql store whose `env:` var is unset yields a typed
3441        // StoreError — D2's "never a silent KV fallback", proven
3442        // end-to-end through the sync runner's helper.
3443        let registry = StoreRegistry::build(&[pg_store(
3444            "logs",
3445            "env:AXON_NONEXISTENT_VAR_FASE35E",
3446        )])
3447        .unwrap();
3448        let ctx = ExecContext::new("F", "P", 0);
3449        let mut pin_map = std::collections::HashMap::new();
3450        let result = execute_sql_store_step(
3451            &registry,
3452            &mut pin_map,
3453            "retrieve",
3454            "logs",
3455            "logs:id = 1",
3456            None,
3457            &ctx,
3458        );
3459        assert!(matches!(result, Err(StoreError::MissingEnvVar { .. })));
3460    }
3461
3462    #[test]
3463    fn sql_persist_below_confidence_floor_is_blocked() {
3464        // §35.g Pillar I — a store declaring confidence_floor rejects
3465        // an un-elevated persist (no `_confidence` binding) with a
3466        // typed epistemic error, before any row is written.
3467        let mut store = pg_store("ledger", "postgresql://u:p@localhost:5432/db");
3468        store.confidence_floor = Some(0.8);
3469        let registry = StoreRegistry::build(&[store]).unwrap();
3470        let mut ctx = ExecContext::new("F", "P", 0);
3471        ctx.set("amount", "100"); // a user binding, but no `_confidence`
3472        let mut pin_map = std::collections::HashMap::new();
3473        let result =
3474            execute_sql_store_step(&registry, &mut pin_map, "persist", "ledger", "ledger", None, &ctx);
3475        assert!(matches!(result, Err(StoreError::Epistemic(_))));
3476    }
3477
3478    #[test]
3479    fn sql_store_step_persist_builds_a_row_from_user_bindings() {
3480        // persist into a postgresql store writes the flow's user
3481        // bindings as a row. With a malformed DSN the connect fails
3482        // (typed PoolInit error) — proving persist reaches the SQL
3483        // path with the bindings-as-row data assembled, not the KV
3484        // path.
3485        let registry =
3486            StoreRegistry::build(&[pg_store("events", "not a dsn")]).unwrap();
3487        let mut ctx = ExecContext::new("F", "P", 0);
3488        ctx.set("event_kind", "login");
3489        let mut pin_map = std::collections::HashMap::new();
3490        let result =
3491            execute_sql_store_step(&registry, &mut pin_map, "persist", "events", "events", None, &ctx);
3492        assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3493    }
3494
3495    #[test]
3496    fn sql_persist_scopes_the_row_to_the_declared_field_block() {
3497        // §Fase 35.o — a `persist` carrying a `{ col: value }` block
3498        // writes EXACTLY those columns (value expressions interpolated
3499        // against the flow context), ignoring every other binding the
3500        // flow holds. The malformed DSN fails at connect (typed
3501        // PoolInit) — proving the field-scoped row was assembled and
3502        // reached the SQL path. The pre-35.o behaviour would have
3503        // dumped `message`/`channel_kind`/… into the INSERT.
3504        let registry =
3505            StoreRegistry::build(&[pg_store("chat_history", "not a dsn")]).unwrap();
3506        let mut ctx = ExecContext::new("F", "P", 0);
3507        ctx.set("message", "hello");
3508        ctx.set("channel_kind", "whatsapp");
3509        ctx.set("tenant_id", "acme");
3510        let fields = vec![
3511            ("sender".to_string(), "user".to_string()),
3512            ("content".to_string(), "${message}".to_string()),
3513            ("tenant_id".to_string(), "${tenant_id}".to_string()),
3514        ];
3515        let mut pin_map = std::collections::HashMap::new();
3516        let result = execute_sql_store_step(
3517            &registry,
3518            &mut pin_map,
3519            "persist",
3520            "chat_history",
3521            "chat_history",
3522            Some(&fields),
3523            &ctx,
3524        );
3525        assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3526    }
3527
3528    #[test]
3529    fn sql_mutate_scopes_the_set_to_the_declared_field_block() {
3530        // §Fase 35.p — a `mutate` carrying a `{ col: value }` block
3531        // builds the UPDATE SET from EXACTLY those columns (value
3532        // expressions interpolated), ignoring every other binding the
3533        // flow holds. The malformed DSN fails at connect (typed
3534        // PoolInit) — proving the field-scoped SET row was assembled
3535        // and reached the SQL path. The pre-35.p behaviour would have
3536        // SET `tenant_id` (a flow param, not a column).
3537        let registry =
3538            StoreRegistry::build(&[pg_store("accounts", "not a dsn")]).unwrap();
3539        let mut ctx = ExecContext::new("F", "P", 0);
3540        ctx.set("tenant_id", "acme"); // a flow param, NOT a column
3541        ctx.set("new_balance", "500");
3542        let fields = vec![
3543            ("balance".to_string(), "${new_balance}".to_string()),
3544            ("status".to_string(), "active".to_string()),
3545        ];
3546        let mut pin_map = std::collections::HashMap::new();
3547        let result = execute_sql_store_step(
3548            &registry,
3549            &mut pin_map,
3550            "mutate",
3551            "accounts",
3552            "accounts:id = 1",
3553            Some(&fields),
3554            &ctx,
3555        );
3556        assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3557    }
3558}