1#![allow(deprecated)]
24
25use std::io::{self, IsTerminal};
26use std::path::Path;
27
28use crate::anchor_checker;
29use crate::backend;
30use crate::conversation::{ConversationHistory, ContextWindow};
31use crate::exec_context::ExecContext;
32use crate::hooks::HookManager;
33use crate::ir_generator::IRGenerator;
34use crate::ir_nodes::*;
35use crate::lexer::{Lexer, LexerError};
36use crate::output::{OutputFormat, ReportBuilder, StepReport};
37use crate::parallel;
38use crate::plan_export::{self, PlanBuilder, PlanUnit, PlanStep, PlanTools, PlanToolEntry, PlanDependencies, UnresolvedRef};
39use crate::parser::{ParseError, Parser};
40use crate::session_store::SessionStore;
41use crate::step_deps;
42use crate::store::epistemic;
43use crate::store::filter::SqlValue;
44use crate::store::row_stream;
45use crate::store::postgres_backend::{PostgresStoreBackend, StoreError};
46use crate::store::registry::{StoreBackendKind, StoreRegistry};
47use crate::tool_registry::ToolRegistry;
48use crate::tool_validator::{self, EffectTracker};
49use crate::type_checker::TypeChecker;
50
51pub const AXON_VERSION: &str = env!("CARGO_PKG_VERSION");
58
59fn c(text: &str, code: &str, use_color: bool) -> String {
62 if use_color {
63 format!("{code}{text}\x1b[0m")
64 } else {
65 text.to_string()
66 }
67}
68
69fn truncate_output(s: &str, max_len: usize) -> String {
73 let single_line = s.replace('\n', " ");
74 if single_line.len() <= max_len {
75 single_line
76 } else {
77 format!("{}...", &single_line[..max_len])
78 }
79}
80
81#[derive(Debug, serde::Serialize)]
85struct ExecutionUnit {
86 flow_name: String,
87 persona_name: String,
88 context_name: String,
89 system_prompt: String,
90 steps: Vec<CompiledStep>,
91 anchor_instructions: Vec<String>,
92 effort: String,
93 #[serde(skip)]
94 resolved_anchors: Vec<IRAnchor>,
95 #[serde(skip)]
101 param_bindings: Vec<(String, String)>,
102}
103
104#[derive(Debug, serde::Serialize)]
106struct CompiledStep {
107 step_name: String,
108 step_type: String,
109 system_prompt: String,
110 user_prompt: String,
111 #[serde(skip_serializing_if = "Option::is_none")]
113 tool_argument: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
116 memory_expression: Option<String>,
117 #[serde(skip_serializing_if = "Option::is_none")]
121 lambda_apply_payload: Option<crate::lambda_runtime::LambdaApplyPayload>,
122 #[serde(skip_serializing_if = "Option::is_none")]
126 let_payload: Option<LetPayload>,
127 #[serde(skip_serializing_if = "Option::is_none")]
133 store_fields: Option<Vec<(String, String)>>,
134 #[serde(skip_serializing_if = "Vec::is_empty")]
142 tool_named_args: Vec<(String, String, String)>,
143 #[serde(skip_serializing_if = "Vec::is_empty")]
148 tool_param_types: Vec<(String, String)>,
149}
150
151#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
156pub struct LetPayload {
157 pub target: String,
158 pub value: String,
159 pub value_kind: String,
160}
161
162#[derive(Debug, serde::Serialize)]
164struct TraceEvent {
165 event: String,
166 unit: String,
167 step: String,
168 detail: String,
169}
170
171fn build_execution_plan(ir: &IRProgram, backend: &str) -> Vec<ExecutionUnit> {
174 let mut units = Vec::new();
175
176 for run in &ir.runs {
177 let system_prompt = build_system_prompt(run, backend);
178 let anchor_instructions = build_anchor_instructions(run);
179 let steps = build_compiled_steps(run, ir);
180
181 units.push(ExecutionUnit {
182 flow_name: run.flow_name.clone(),
183 persona_name: run.persona_name.clone(),
184 context_name: run.context_name.clone(),
185 system_prompt,
186 steps,
187 anchor_instructions,
188 effort: run.effort.clone(),
189 resolved_anchors: run.resolved_anchors.clone(),
190 param_bindings: Vec::new(),
193 });
194 }
195
196 units
197}
198
199fn build_system_prompt(run: &IRRun, backend: &str) -> String {
200 let mut parts: Vec<String> = Vec::new();
201
202 if let Some(ref persona) = run.resolved_persona {
204 parts.push(format!("# Persona: {}", persona.name));
205 if !persona.domain.is_empty() {
206 parts.push(format!("Domain expertise: {}", persona.domain.join(", ")));
207 }
208 if !persona.tone.is_empty() {
209 parts.push(format!("Communication tone: {}", persona.tone));
210 }
211 if !persona.language.is_empty() {
212 parts.push(format!("Language: {}", persona.language));
213 }
214 if let Some(ct) = persona.confidence_threshold {
215 parts.push(format!("Confidence threshold: {ct:.2}"));
216 }
217 if persona.cite_sources == Some(true) {
218 parts.push("Always cite sources.".to_string());
219 }
220 if !persona.refuse_if.is_empty() {
221 parts.push(format!("Refuse if: {}", persona.refuse_if.join(", ")));
222 }
223 }
224
225 if let Some(ref ctx) = run.resolved_context {
227 parts.push(format!("\n# Context: {}", ctx.name));
228 if !ctx.depth.is_empty() {
229 parts.push(format!("Analysis depth: {}", ctx.depth));
230 }
231 if !ctx.memory_scope.is_empty() {
232 parts.push(format!("Memory scope: {}", ctx.memory_scope));
233 }
234 if let Some(t) = ctx.temperature {
235 parts.push(format!("Temperature: {t:.1}"));
236 }
237 if let Some(mt) = ctx.max_tokens {
238 parts.push(format!("Max tokens: {mt}"));
239 }
240 }
241
242 if !run.resolved_anchors.is_empty() {
244 parts.push("\n# Constraints (Anchors)".to_string());
245 for anchor in &run.resolved_anchors {
246 let mut constraint = format!("- {}: {}", anchor.name, anchor.require);
247 if let Some(cf) = anchor.confidence_floor {
248 constraint.push_str(&format!(" (confidence ≥ {cf:.2})"));
249 }
250 if !anchor.on_violation.is_empty() {
251 constraint.push_str(&format!(" [on_violation: {}]", anchor.on_violation));
252 }
253 parts.push(constraint);
254 }
255 }
256
257 parts.push(format!("\n[Backend: {backend} | AXON {AXON_VERSION}]"));
259
260 parts.join("\n")
261}
262
263fn build_anchor_instructions(run: &IRRun) -> Vec<String> {
264 run.resolved_anchors
265 .iter()
266 .map(|a| {
267 let mut s = format!("{}: {}", a.name, a.require);
268 if let Some(cf) = a.confidence_floor {
269 s.push_str(&format!(" (≥{cf:.2})"));
270 }
271 s
272 })
273 .collect()
274}
275
276fn build_compiled_steps(run: &IRRun, ir: &IRProgram) -> Vec<CompiledStep> {
277 let flow = match &run.resolved_flow {
278 Some(f) => f,
279 None => return Vec::new(),
280 };
281
282 let mut steps = Vec::new();
283 for node in &flow.steps {
284 let (step_name, step_type, action) = extract_step_info(node);
285 let system_prompt = format!(
286 "You are executing step '{}' of flow '{}'.",
287 step_name, flow.name
288 );
289 let user_prompt = if action.is_empty() {
290 format!("Execute step: {step_name}")
291 } else {
292 action
293 };
294
295 let tool_argument = match node {
297 IRFlowNode::UseTool(s) => Some(s.argument.clone()),
298 _ => None,
299 };
300
301 let (tool_named_args, tool_param_types) = match node {
306 IRFlowNode::UseTool(s) => {
307 let named: Vec<(String, String, String)> = s
308 .named_args
309 .iter()
310 .map(|a| (a.name.clone(), a.value.clone(), a.value_kind.clone()))
311 .collect();
312 let types: Vec<(String, String)> = ir
313 .tools
314 .iter()
315 .find(|t| t.name == s.tool_name)
316 .map(|t| {
317 t.parameters
318 .iter()
319 .map(|p| (p.name.clone(), p.type_name.clone()))
320 .collect()
321 })
322 .unwrap_or_default();
323 (named, types)
324 }
325 _ => (Vec::new(), Vec::new()),
326 };
327
328 let memory_expression = match node {
330 IRFlowNode::Remember(s) => Some(s.expression.clone()),
331 IRFlowNode::Recall(s) => Some(s.query.clone()),
332 IRFlowNode::Persist(s) => Some(s.store_name.clone()),
333 IRFlowNode::Retrieve(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
334 IRFlowNode::Mutate(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
335 IRFlowNode::Purge(s) => Some(format!("{}:{}", s.store_name, s.where_expr)),
336 _ => None,
337 };
338
339 let lambda_apply_payload = match node {
345 IRFlowNode::LambdaDataApply(s) => {
346 let snap = ir
347 .lambda_data_specs
348 .iter()
349 .find(|spec| spec.name == s.lambda_data_name)
350 .map(|spec| crate::lambda_runtime::SpecSnapshot {
351 name: spec.name.clone(),
352 ontology: spec.ontology.clone(),
353 certainty: spec.certainty,
354 temporal_frame_start: spec.temporal_frame_start.clone(),
355 temporal_frame_end: spec.temporal_frame_end.clone(),
356 provenance: spec.provenance.clone(),
357 derivation: spec.derivation.clone(),
358 })
359 .unwrap_or_default();
360 Some(crate::lambda_runtime::LambdaApplyPayload {
361 lambda_data_name: s.lambda_data_name.clone(),
362 target: s.target.clone(),
363 output_type: s.output_type.clone(),
364 spec_snapshot: snap,
365 })
366 }
367 _ => None,
368 };
369
370 let let_payload = match node {
374 IRFlowNode::Let(s) => Some(LetPayload {
375 target: s.target.clone(),
376 value: s.value.clone(),
377 value_kind: s.value_kind.clone(),
378 }),
379 _ => None,
380 };
381
382 let store_fields = match node {
388 IRFlowNode::Persist(s) if !s.fields.is_empty() => {
389 Some(s.fields.clone())
390 }
391 IRFlowNode::Mutate(s) if !s.fields.is_empty() => {
392 Some(s.fields.clone())
393 }
394 _ => None,
395 };
396
397 steps.push(CompiledStep {
398 step_name,
399 step_type,
400 system_prompt,
401 user_prompt,
402 tool_argument,
403 memory_expression,
404 lambda_apply_payload,
405 let_payload,
406 store_fields,
407 tool_named_args,
408 tool_param_types,
409 });
410 }
411
412 steps
413}
414
415pub(crate) fn build_structured_tool_body(
421 interpolated_args: &[(String, String)],
422 param_types: &[(String, String)],
423) -> String {
424 let mut map = serde_json::Map::new();
425 for (name, value) in interpolated_args {
426 let declared = param_types
427 .iter()
428 .find(|(p, _)| p == name)
429 .map(|(_, t)| t.as_str());
430 map.insert(name.clone(), coerce_tool_arg_value(value, declared));
431 }
432 serde_json::Value::Object(map).to_string()
433}
434
435pub(crate) fn coerce_tool_arg_value(value: &str, declared_type: Option<&str>) -> serde_json::Value {
443 let base = declared_type.map(|t| t.trim_end_matches('?').split('<').next().unwrap_or(t));
444 match base {
445 Some("Int") => value
446 .parse::<i64>()
447 .map(|i| serde_json::Value::Number(i.into()))
448 .unwrap_or_else(|_| serde_json::Value::String(value.to_string())),
449 Some("Float") => value
450 .parse::<f64>()
451 .ok()
452 .and_then(serde_json::Number::from_f64)
453 .map(serde_json::Value::Number)
454 .unwrap_or_else(|| serde_json::Value::String(value.to_string())),
455 Some("Bool") => match value {
456 "true" => serde_json::Value::Bool(true),
457 "false" => serde_json::Value::Bool(false),
458 _ => serde_json::Value::String(value.to_string()),
459 },
460 _ => serde_json::Value::String(value.to_string()),
461 }
462}
463
464fn extract_step_info(node: &IRFlowNode) -> (String, String, String) {
465 match node {
466 IRFlowNode::Step(s) => (s.name.clone(), "step".to_string(), s.ask.clone()),
467 IRFlowNode::Probe(s) => (s.target.clone(), "probe".to_string(), format!("Probe: {}", s.target)),
468 IRFlowNode::Reason(s) => (s.target.clone(), "reason".to_string(), format!("Reason about: {}", s.target)),
469 IRFlowNode::Validate(s) => (s.target.clone(), "validate".to_string(), format!("Validate: {}", s.target)),
470 IRFlowNode::Refine(s) => (s.target.clone(), "refine".to_string(), format!("Refine: {}", s.target)),
471 IRFlowNode::Weave(s) => ("weave".to_string(), "weave".to_string(), format!("Weave {} sources into {}", s.sources.len(), s.target)),
472 IRFlowNode::UseTool(s) => (s.tool_name.clone(), "use_tool".to_string(), format!("Use tool: {}", s.tool_name)),
473 IRFlowNode::Remember(s) => (s.memory_target.clone(), "remember".to_string(), format!("Remember: {}", s.expression)),
474 IRFlowNode::Recall(s) => (s.memory_source.clone(), "recall".to_string(), format!("Recall: {}", s.query)),
475 IRFlowNode::Conditional(s) => (s.condition.clone(), "conditional".to_string(), format!("If: {}", s.condition)),
476 IRFlowNode::ForIn(s) => (s.variable.clone(), "for_in".to_string(), format!("For {} in {}", s.variable, s.iterable)),
477 IRFlowNode::Let(s) => (s.target.clone(), "let".to_string(), format!("Let {} = {}", s.target, s.value)),
478 IRFlowNode::Return(s) => ("return".to_string(), "return".to_string(), format!("Return: {}", s.value_expr)),
479 IRFlowNode::Par(_) => ("parallel".to_string(), "parallel".to_string(), "Parallel block".to_string()),
480 IRFlowNode::Hibernate(_) => ("hibernate".to_string(), "hibernate".to_string(), "Hibernate".to_string()),
481 IRFlowNode::Deliberate(_) => ("deliberate".to_string(), "deliberate".to_string(), "Deliberate block".to_string()),
482 IRFlowNode::Consensus(_) => ("consensus".to_string(), "consensus".to_string(), "Consensus block".to_string()),
483 IRFlowNode::Forge(_) => ("forge".to_string(), "forge".to_string(), "Forge block".to_string()),
484 IRFlowNode::Focus(s) => (s.expression.clone(), "focus".to_string(), format!("Focus: {}", s.expression)),
485 IRFlowNode::Associate(s) => (s.left.clone(), "associate".to_string(), format!("Associate: {} ↔ {}", s.left, s.right)),
486 IRFlowNode::Aggregate(s) => (s.target.clone(), "aggregate".to_string(), format!("Aggregate: {}", s.target)),
487 IRFlowNode::Explore(s) => (s.target.clone(), "explore".to_string(), format!("Explore: {}", s.target)),
488 IRFlowNode::Ingest(s) => (s.source.clone(), "ingest".to_string(), format!("Ingest: {}", s.source)),
489 IRFlowNode::ShieldApply(s) => (s.shield_name.clone(), "shield_apply".to_string(), format!("Apply shield: {}", s.shield_name)),
490 IRFlowNode::Stream(_) => ("stream".to_string(), "stream".to_string(), "Stream block".to_string()),
491 IRFlowNode::Navigate(s) => (s.pix_ref.clone(), "navigate".to_string(), format!("Navigate: {}", s.pix_ref)),
492 IRFlowNode::Drill(s) => (s.pix_ref.clone(), "drill".to_string(), format!("Drill: {} → {}", s.pix_ref, s.subtree_path)),
493 IRFlowNode::Trail(s) => (s.navigate_ref.clone(), "trail".to_string(), format!("Trail: {}", s.navigate_ref)),
494 IRFlowNode::Corroborate(s) => (s.navigate_ref.clone(), "corroborate".to_string(), format!("Corroborate: {}", s.navigate_ref)),
495 IRFlowNode::OtsApply(s) => (s.ots_name.clone(), "ots_apply".to_string(), format!("Apply OTS: {}", s.ots_name)),
496 IRFlowNode::MandateApply(s) => (s.mandate_name.clone(), "mandate_apply".to_string(), format!("Apply mandate: {}", s.mandate_name)),
497 IRFlowNode::ComputeApply(s) => (s.compute_name.clone(), "compute_apply".to_string(), format!("Apply compute: {}", s.compute_name)),
498 IRFlowNode::Listen(s) => (s.channel.clone(), "listen".to_string(), format!("Listen: {}", s.channel)),
499 IRFlowNode::DaemonStep(s) => (s.daemon_ref.clone(), "daemon".to_string(), format!("Daemon: {}", s.daemon_ref)),
500 IRFlowNode::Persist(s) => (s.store_name.clone(), "persist".to_string(), format!("Persist to: {}", s.store_name)),
501 IRFlowNode::Retrieve(s) => (s.store_name.clone(), "retrieve".to_string(), format!("Retrieve from: {}", s.store_name)),
502 IRFlowNode::Mutate(s) => (s.store_name.clone(), "mutate".to_string(), format!("Mutate: {}", s.store_name)),
503 IRFlowNode::Purge(s) => (s.store_name.clone(), "purge".to_string(), format!("Purge: {}", s.store_name)),
504 IRFlowNode::Transact(_) => ("transact".to_string(), "transact".to_string(), "Transact block".to_string()),
505 IRFlowNode::LambdaDataApply(s) => (s.lambda_data_name.clone(), "lambda_data_apply".to_string(), format!("Apply ΛD: {}", s.lambda_data_name)),
506 IRFlowNode::Emit(s) => (s.channel_ref.clone(), "emit".to_string(), format!("Emit on {}: {}", s.channel_ref, s.value_ref)),
508 IRFlowNode::Publish(s) => (s.channel_ref.clone(), "publish".to_string(), format!("Publish {} within {}", s.channel_ref, s.shield_ref)),
509 IRFlowNode::Discover(s) => (s.capability_ref.clone(), "discover".to_string(), format!("Discover {} as {}", s.capability_ref, s.alias)),
510 IRFlowNode::Break(_) => ("break".to_string(), "break".to_string(), "Break out of for-in loop".to_string()),
513 IRFlowNode::Continue(_) => ("continue".to_string(), "continue".to_string(), "Continue to next for-in iteration".to_string()),
514 }
515}
516
517fn execute_stub(
520 units: &[ExecutionUnit],
521 use_color: bool,
522 trace: bool,
523) -> (bool, Vec<TraceEvent>) {
524 let mut events: Vec<TraceEvent> = Vec::new();
525
526 for (i, unit) in units.iter().enumerate() {
527 println!(
528 "\n{}",
529 c(
530 &format!("▶ Execution Unit {}/{}: {} as {}", i + 1, units.len(), unit.flow_name, unit.persona_name),
531 "\x1b[1;36m",
532 use_color,
533 )
534 );
535
536 if trace {
537 events.push(TraceEvent {
538 event: "unit_start".to_string(),
539 unit: unit.flow_name.clone(),
540 step: String::new(),
541 detail: format!("persona={}, context={}", unit.persona_name, unit.context_name),
542 });
543 }
544
545 println!(
547 " {} {}",
548 c("System:", "\x1b[1;33m", use_color),
549 truncate(&unit.system_prompt, 120)
550 );
551
552 if !unit.anchor_instructions.is_empty() {
553 println!(
554 " {} {}",
555 c("Anchors:", "\x1b[1;33m", use_color),
556 unit.anchor_instructions.join(" | ")
557 );
558 }
559
560 if !unit.effort.is_empty() {
561 println!(
562 " {} {}",
563 c("Effort:", "\x1b[1;33m", use_color),
564 unit.effort
565 );
566 }
567
568 let mut stub_ctx = crate::exec_context::ExecContext::new(
570 &unit.flow_name,
571 &unit.persona_name,
572 i,
573 );
574 for (name, value) in &unit.param_bindings {
577 stub_ctx.set(name, value);
578 }
579 for (j, step) in unit.steps.iter().enumerate() {
580 stub_ctx.set_step(&step.step_name, &step.step_type, j);
581 println!(
582 " {} {}.{} [{}] {}",
583 c("→", "\x1b[32m", use_color),
584 j + 1,
585 c(&step.step_name, "\x1b[1m", use_color),
586 step.step_type,
587 &step.user_prompt
588 );
589
590 if step.step_type == "lambda_data_apply" {
597 if let Some(payload) = &step.lambda_apply_payload {
598 let target_value = if payload.target.is_empty() {
599 serde_json::Value::Null
600 } else {
601 let raw = stub_ctx
606 .get(&payload.target)
607 .map(|s| s.to_string())
608 .unwrap_or_else(|| payload.target.clone());
609 serde_json::Value::String(raw)
610 };
611 match crate::lambda_runtime::build_psi(
612 &payload.spec_snapshot,
613 target_value,
614 ) {
615 Ok(psi) => {
616 let psi_json = serde_json::to_string(&psi).unwrap_or_default();
617 if !payload.output_type.is_empty() {
618 stub_ctx.set(&payload.output_type, &psi_json);
619 }
620 stub_ctx.set_result(&step.step_name, &psi_json);
621 if trace {
622 events.push(TraceEvent {
623 event: "lambda_data_apply".to_string(),
624 unit: unit.flow_name.clone(),
625 step: step.step_name.clone(),
626 detail: psi_json,
627 });
628 }
629 continue;
630 }
631 Err(err) => {
632 eprintln!(
633 " {} lambda apply error: {}",
634 c("✗", "\x1b[31m", use_color),
635 err
636 );
637 if trace {
638 events.push(TraceEvent {
639 event: "lambda_data_apply_error".to_string(),
640 unit: unit.flow_name.clone(),
641 step: step.step_name.clone(),
642 detail: err.to_string(),
643 });
644 }
645 return (false, events);
646 }
647 }
648 }
649 }
650
651 if step.step_type == "let_binding" {
663 if let Some(payload) = &step.let_payload {
664 let resolved = if payload.value_kind == "reference"
665 && !payload.value.is_empty()
666 {
667 stub_ctx
668 .get(&payload.value)
669 .map(str::to_string)
670 .unwrap_or_else(|| payload.value.clone())
671 } else {
672 payload.value.clone()
673 };
674 if !payload.target.is_empty() {
675 stub_ctx.set(&payload.target, &resolved);
676 }
677 stub_ctx.set_result(&step.step_name, &resolved);
678 if trace {
679 events.push(TraceEvent {
680 event: "let_binding".to_string(),
681 unit: unit.flow_name.clone(),
682 step: step.step_name.clone(),
683 detail: format!(
684 "{}={} (kind={})",
685 payload.target, resolved, payload.value_kind,
686 ),
687 });
688 }
689 continue;
690 }
691 }
692
693 match step.step_type.as_str() {
708 "remember" => {
709 let target = &step.step_name;
710 if !target.is_empty() {
711 stub_ctx.set(target, "<remembered>");
712 }
713 if trace {
714 events.push(TraceEvent {
715 event: "remember".to_string(),
716 unit: unit.flow_name.clone(),
717 step: step.step_name.clone(),
718 detail: step
719 .memory_expression
720 .clone()
721 .unwrap_or_default(),
722 });
723 }
724 continue;
725 }
726 "recall" => {
727 let source = &step.step_name;
728 if !source.is_empty() {
729 stub_ctx.set(source, "<recalled>");
730 }
731 if trace {
732 events.push(TraceEvent {
733 event: "recall".to_string(),
734 unit: unit.flow_name.clone(),
735 step: step.step_name.clone(),
736 detail: step
737 .memory_expression
738 .clone()
739 .unwrap_or_default(),
740 });
741 }
742 continue;
743 }
744 "return" => {
745 stub_ctx.set("__return_value__", &step.user_prompt);
753 if trace {
754 events.push(TraceEvent {
755 event: "return".to_string(),
756 unit: unit.flow_name.clone(),
757 step: step.step_name.clone(),
758 detail: step.user_prompt.clone(),
759 });
760 }
761 continue;
762 }
763 "hibernate" => {
764 stub_ctx.set("__hibernation_token__", "<stub-token>");
767 if trace {
768 events.push(TraceEvent {
769 event: "hibernate".to_string(),
770 unit: unit.flow_name.clone(),
771 step: step.step_name.clone(),
772 detail: format!("flow={}", unit.flow_name),
773 });
774 }
775 continue;
776 }
777 "drill" => {
778 let key = format!("drill:{}", step.step_name);
781 stub_ctx.set(&key, "<stub-drill-result>");
782 if trace {
783 events.push(TraceEvent {
784 event: "drill".to_string(),
785 unit: unit.flow_name.clone(),
786 step: step.step_name.clone(),
787 detail: step.user_prompt.clone(),
788 });
789 }
790 continue;
791 }
792 "trail" => {
793 let key = format!("trail:{}", step.step_name);
794 stub_ctx.set(&key, "<stub-trail-result>");
795 if trace {
796 events.push(TraceEvent {
797 event: "trail".to_string(),
798 unit: unit.flow_name.clone(),
799 step: step.step_name.clone(),
800 detail: step.user_prompt.clone(),
801 });
802 }
803 continue;
804 }
805 "conditional" | "for_in" | "parallel" => {
806 if trace {
809 events.push(TraceEvent {
810 event: step.step_type.clone(),
811 unit: unit.flow_name.clone(),
812 step: step.step_name.clone(),
813 detail: step.user_prompt.clone(),
814 });
815 }
816 continue;
817 }
818 "break" | "continue" => {
819 if trace {
823 events.push(TraceEvent {
824 event: step.step_type.clone(),
825 unit: unit.flow_name.clone(),
826 step: step.step_name.clone(),
827 detail: step.user_prompt.clone(),
828 });
829 }
830 continue;
831 }
832 _ => {}
833 }
834
835 if trace {
836 events.push(TraceEvent {
837 event: "step_stub".to_string(),
838 unit: unit.flow_name.clone(),
839 step: step.step_name.clone(),
840 detail: format!("[{}] {}", step.step_type, step.user_prompt),
841 });
842 }
843 }
844
845 if trace {
846 events.push(TraceEvent {
847 event: "unit_complete".to_string(),
848 unit: unit.flow_name.clone(),
849 step: String::new(),
850 detail: format!("{} steps (stub)", unit.steps.len()),
851 });
852 }
853
854 println!(
855 " {} {} steps completed (stub mode)",
856 c("✓", "\x1b[32m", use_color),
857 unit.steps.len()
858 );
859 }
860
861 (true, events)
862}
863
864const MAX_ANCHOR_RETRIES: u32 = 2;
867
868fn block_on_store<F>(fut: F) -> F::Output
881where
882 F: std::future::Future + Send,
883 F::Output: Send,
884{
885 std::thread::scope(|scope| {
886 scope
887 .spawn(|| {
888 tokio::runtime::Builder::new_current_thread()
889 .enable_all()
890 .build()
891 .expect("Fase 35.e: failed to build the store-op Tokio runtime")
892 .block_on(fut)
893 })
894 .join()
895 .expect("Fase 35.e: the store-op thread panicked")
896 })
897}
898
899async fn execute_sql_store_step_async(
941 store_registry: &StoreRegistry,
942 pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
956 step_type: &str,
957 store_name: &str,
958 memory_expr: &str,
959 store_fields: Option<&[(String, String)]>,
960 ctx: &ExecContext,
961) -> Result<String, StoreError> {
962 let spec = store_registry.spec(store_name);
965 let _connection = spec.map(|s| s.connection.clone()).unwrap_or_default();
966 let confidence_floor = spec.and_then(|s| s.confidence_floor);
967
968 let where_expr = memory_expr
980 .splitn(2, ':')
981 .nth(1)
982 .unwrap_or("")
983 .to_string();
984 let where_bindings: std::collections::HashMap<String, String> =
988 ctx.vars().clone();
989
990 let data: Vec<(String, SqlValue)> = match store_fields {
999 Some(fields) => fields
1000 .iter()
1001 .map(|(col, expr)| {
1002 (col.clone(), SqlValue::Text(ctx.interpolate(expr)))
1003 })
1004 .collect(),
1005 None => ctx
1006 .user_bindings()
1007 .into_iter()
1008 .map(|(k, v)| (k, SqlValue::Text(v)))
1009 .collect(),
1010 };
1011
1012 let store_name = store_name.to_string();
1013 let step_type = step_type.to_string();
1014 let store_name_for_reinsert = store_name.clone();
1015
1016 let mut pin: Option<sqlx::pool::PoolConnection<sqlx::Postgres>> =
1025 pinned_conns.remove(&store_name);
1026
1027 let backend = match store_registry.resolve(&store_name) {
1033 Ok(crate::store::registry::StoreHandle::Postgres(b)) => b,
1034 Ok(_) => {
1035 if let Some(p) = pin {
1037 pinned_conns.insert(store_name_for_reinsert, p);
1038 }
1039 return Err(StoreError::Connect {
1040 source: format!(
1041 "axonstore `{store_name}` expected to resolve to \
1042 a postgresql backend but the registry returned \
1043 `in_memory`. Routing bug — the SQL gate in \
1044 `execute_real` should have skipped this step."
1045 ),
1046 });
1047 }
1048 Err(e) => {
1049 if let Some(p) = pin {
1050 pinned_conns.insert(store_name_for_reinsert, p);
1051 }
1052 return Err(e);
1053 }
1054 };
1055
1056 let result: Result<String, StoreError> = async {
1061 match step_type.as_str() {
1062 "retrieve" => {
1063 let cancel = crate::cancel_token::CancellationFlag::new();
1069 let mut store_conn = match &mut pin {
1077 Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1078 None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1079 };
1080 let stream_outcome = row_stream::stream_retrieve(
1081 &backend,
1082 &mut store_conn,
1083 &store_name,
1084 &where_expr,
1085 row_stream::DEFAULT_RETRIEVE_POLICY,
1086 row_stream::DEFAULT_MAX_ROWS,
1087 &cancel,
1088 &where_bindings,
1089 )
1090 .await?;
1091 let metadata = row_stream::stream_metadata(
1092 row_stream::DEFAULT_RETRIEVE_POLICY,
1093 &stream_outcome,
1094 );
1095 let outcome = epistemic::enforce_retrieve_floor(
1096 epistemic::mark_retrieved(stream_outcome.rows),
1097 confidence_floor,
1098 );
1099 let mut envelope =
1100 epistemic::retrieve_envelope(&outcome, confidence_floor);
1101 envelope["stream"] = metadata;
1102 Ok(serde_json::to_string(&envelope)
1103 .unwrap_or_else(|_| "{}".to_string()))
1104 }
1105 "purge" => {
1106 let mut store_conn = match &mut pin {
1108 Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1109 None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1110 };
1111 let n = backend
1112 .purge(&mut store_conn, &store_name, &where_expr, &where_bindings)
1113 .await?;
1114 Ok(format!("{n} row(s) purged"))
1115 }
1116 "persist" => {
1117 epistemic::enforce_persist_floor(
1120 &data,
1121 confidence_floor,
1122 &store_name,
1123 )?;
1124 let mut store_conn = match &mut pin {
1126 Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1127 None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1128 };
1129 let n = backend.insert(&mut store_conn, &store_name, &data).await?;
1130 Ok(format!("{n} row(s) persisted"))
1131 }
1132 "mutate" => {
1133 let mut store_conn = match &mut pin {
1135 Some(p) => crate::store::store_conn::StoreConn::Pinned(p),
1136 None => crate::store::store_conn::StoreConn::Pool(backend.pool()),
1137 };
1138 let n = backend
1139 .mutate(&mut store_conn, &store_name, &where_expr, &data, &where_bindings)
1140 .await?;
1141 Ok(format!("{n} row(s) mutated"))
1142 }
1143 other => Err(StoreError::Query {
1145 op: "store",
1146 source: format!("unsupported store step type `{other}`"),
1147 }),
1148 }
1149 }.await;
1150
1151 if let Some(p) = pin {
1158 pinned_conns.insert(store_name_for_reinsert, p);
1159 }
1160
1161 result
1162}
1163
1164fn execute_sql_store_step(
1177 store_registry: &StoreRegistry,
1178 pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
1179 step_type: &str,
1180 store_name: &str,
1181 memory_expr: &str,
1182 store_fields: Option<&[(String, String)]>,
1183 ctx: &ExecContext,
1184) -> Result<String, StoreError> {
1185 block_on_store(execute_sql_store_step_async(
1186 store_registry,
1187 pinned_conns,
1188 step_type,
1189 store_name,
1190 memory_expr,
1191 store_fields,
1192 ctx,
1193 ))
1194}
1195
1196async fn execute_real_async(
1213 units: &[ExecutionUnit],
1214 backend_name: &str,
1215 source_file: &str,
1216 use_color: bool,
1217 trace: bool,
1218 stream: bool,
1219 output_fmt: OutputFormat,
1220 report: &mut ReportBuilder,
1221 registry: &ToolRegistry,
1222 store_registry: &StoreRegistry,
1223 pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
1227 api_key_override: Option<&str>,
1228) -> Result<(bool, Vec<TraceEvent>), backend::BackendError> {
1229 let api_key = match api_key_override {
1230 Some(key) => key.to_string(),
1231 None => backend::get_api_key(backend_name)?,
1232 };
1233 let mut events: Vec<TraceEvent> = Vec::new();
1234 let mut total_input_tokens: u64 = 0;
1235 let mut total_output_tokens: u64 = 0;
1236 let mut session = SessionStore::new(source_file);
1237 let mut hooks = HookManager::new();
1238 let mut effects = EffectTracker::new();
1239 let json = output_fmt.is_json();
1240
1241 for (i, unit) in units.iter().enumerate() {
1242 if !json {
1243 println!(
1244 "\n{}",
1245 c(
1246 &format!(
1247 "▶ Execution Unit {}/{}: {} as {}",
1248 i + 1,
1249 units.len(),
1250 unit.flow_name,
1251 unit.persona_name
1252 ),
1253 "\x1b[1;36m",
1254 use_color,
1255 )
1256 );
1257 }
1258
1259 if trace {
1260 events.push(TraceEvent {
1261 event: "unit_start".to_string(),
1262 unit: unit.flow_name.clone(),
1263 step: String::new(),
1264 detail: format!(
1265 "persona={}, context={}",
1266 unit.persona_name, unit.context_name
1267 ),
1268 });
1269 }
1270
1271 let mut ctx = ExecContext::new(&unit.flow_name, &unit.persona_name, i);
1272 for (name, value) in &unit.param_bindings {
1276 ctx.set(name, value);
1277 }
1278 let mut conversation = ConversationHistory::new();
1279 let mut context_window = ContextWindow::new();
1280 hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
1281 report.begin_unit(&unit.flow_name, &unit.persona_name);
1282
1283 let step_name_set: std::collections::HashSet<&str> =
1289 unit.steps.iter().map(|s| s.step_name.as_str()).collect();
1290 let step_infos: Vec<step_deps::StepInfo> = unit.steps.iter().map(|s| {
1291 step_deps::StepInfo {
1292 name: s.step_name.clone(),
1293 step_type: s.step_type.clone(),
1294 user_prompt: s.user_prompt.clone(),
1295 argument: step_deps::use_tool_analysis_argument(
1296 s.tool_argument.as_deref()
1297 .or(s.memory_expression.as_deref())
1298 .unwrap_or(""),
1299 &s.tool_named_args,
1300 &step_name_set,
1301 ),
1302 }
1303 }).collect();
1304
1305 let dep_graph = step_deps::analyze(&step_infos);
1306 let schedule = parallel::build_schedule(&dep_graph);
1307
1308 if trace {
1309 if !json && dep_graph.max_depth > 0 {
1310 println!(
1311 " {} {}",
1312 c("⊞", "\x1b[2;36m", use_color),
1313 c(
1314 &format!(
1315 "Dependencies: depth={}, {} parallel group{}{}",
1316 dep_graph.max_depth,
1317 dep_graph.parallel_groups.len(),
1318 if dep_graph.parallel_groups.len() == 1 { "" } else { "s" },
1319 if dep_graph.unresolved_refs.is_empty() {
1320 String::new()
1321 } else {
1322 format!(", {} unresolved ref(s)", dep_graph.unresolved_refs.len())
1323 },
1324 ),
1325 "\x1b[2;36m",
1326 use_color,
1327 ),
1328 );
1329 }
1330
1331 events.push(TraceEvent {
1332 event: "step_deps".to_string(),
1333 unit: unit.flow_name.clone(),
1334 step: String::new(),
1335 detail: format!(
1336 "depth={}, parallel_groups={}, unresolved={}, steps: {}",
1337 dep_graph.max_depth,
1338 dep_graph.parallel_groups.len(),
1339 dep_graph.unresolved_refs.len(),
1340 dep_graph.steps.iter()
1341 .map(|s| {
1342 if s.depends_on.is_empty() {
1343 s.name.clone()
1344 } else {
1345 format!("{}→[{}]", s.name, s.depends_on.join(","))
1346 }
1347 })
1348 .collect::<Vec<_>>()
1349 .join(", "),
1350 ),
1351 });
1352
1353 if !json && schedule.has_parallelism() {
1354 println!(
1355 " {} {}",
1356 c("⫘", "\x1b[2;35m", use_color),
1357 c(
1358 &format!("Schedule: {}", schedule.summary()),
1359 "\x1b[2;35m",
1360 use_color,
1361 ),
1362 );
1363 }
1364
1365 events.push(TraceEvent {
1366 event: "schedule".to_string(),
1367 unit: unit.flow_name.clone(),
1368 step: String::new(),
1369 detail: format!(
1370 "waves={}, parallel_waves={}, max_parallelism={}, schedule: {}",
1371 schedule.waves.len(),
1372 schedule.parallel_waves,
1373 schedule.max_parallelism,
1374 schedule.summary(),
1375 ),
1376 });
1377 }
1378
1379 let step_map: std::collections::HashMap<&str, (usize, &CompiledStep)> = unit
1381 .steps
1382 .iter()
1383 .enumerate()
1384 .map(|(j, s)| (s.step_name.as_str(), (j, s)))
1385 .collect();
1386
1387 for (wave_idx, wave) in schedule.waves.iter().enumerate() {
1389 let is_parallel_wave = wave.is_parallel && wave.steps.len() > 1;
1390
1391 if is_parallel_wave && !json {
1392 println!(
1393 " {} {}",
1394 c("⫘", "\x1b[35m", use_color),
1395 c(
1396 &format!(
1397 "Wave {}/{}: [{}] (parallel, {} steps)",
1398 wave_idx + 1,
1399 schedule.waves.len(),
1400 wave.steps.join(" | "),
1401 wave.steps.len(),
1402 ),
1403 "\x1b[2;35m",
1404 use_color,
1405 ),
1406 );
1407 }
1408
1409 if trace {
1410 events.push(TraceEvent {
1411 event: "wave_start".to_string(),
1412 unit: unit.flow_name.clone(),
1413 step: String::new(),
1414 detail: format!(
1415 "wave={}/{}, steps=[{}], parallel={}",
1416 wave_idx + 1,
1417 schedule.waves.len(),
1418 wave.steps.join(", "),
1419 is_parallel_wave,
1420 ),
1421 });
1422 }
1423
1424 if is_parallel_wave {
1425 let ctx_snapshot = ctx.clone();
1428 let conversation_snapshot = conversation.clone();
1429
1430 let wave_results = parallel::execute_wave(wave, |step_name| {
1431 let (_j, step) = match step_map.get(step_name) {
1433 Some(v) => *v,
1434 None => return parallel::WaveStepResult {
1435 step_name: step_name.to_string(),
1436 output: "step not found".to_string(),
1437 success: false,
1438 },
1439 };
1440
1441 if step.step_type == "use_tool" {
1443 let arg = if !step.tool_named_args.is_empty() {
1447 let interpolated: Vec<(String, String)> = step
1448 .tool_named_args
1449 .iter()
1450 .map(|(n, v, kind)| {
1451 (n.clone(), ctx_snapshot.resolve_named_arg(v, kind))
1454 })
1455 .collect();
1456 build_structured_tool_body(&interpolated, &step.tool_param_types)
1457 } else {
1458 ctx_snapshot.interpolate(step.tool_argument.as_deref().unwrap_or(""))
1459 };
1460 if let Some(result) = registry.dispatch(&step.step_name, &arg) {
1461 return parallel::WaveStepResult {
1462 step_name: step_name.to_string(),
1463 output: result.output,
1464 success: result.success,
1465 };
1466 }
1467 }
1468
1469 let full_system = format!("{}\n\n{}", unit.system_prompt, step.system_prompt);
1471 let interpolated_prompt = ctx_snapshot.interpolate(&step.user_prompt);
1472 let mut thread_conversation = conversation_snapshot.clone();
1473 let mut thread_events: Vec<TraceEvent> = Vec::new();
1474 let mut thread_input_tokens: u64 = 0;
1475 let mut thread_output_tokens: u64 = 0;
1476
1477 let result = execute_step_with_retry(
1478 backend_name,
1479 &api_key,
1480 &full_system,
1481 &interpolated_prompt,
1482 &step.step_name,
1483 &unit.flow_name,
1484 &unit.resolved_anchors,
1485 use_color,
1486 false, false, json,
1489 &mut thread_conversation,
1490 &mut thread_events,
1491 &mut thread_input_tokens,
1492 &mut thread_output_tokens,
1493 );
1494
1495 parallel::WaveStepResult {
1496 step_name: step_name.to_string(),
1497 output: result,
1498 success: true,
1499 }
1500 });
1501
1502 for wr in &wave_results {
1504 let (j, step) = step_map[wr.step_name.as_str()];
1505
1506 ctx.set_step(&step.step_name, &step.step_type, j);
1507 ctx.set_result(&step.step_name, &wr.output);
1508 hooks.on_step_start(&step.step_name, &step.step_type);
1509 hooks.on_step_end(0, 0, 0, 0, false);
1510
1511 if !json {
1512 let status = if wr.success { "ok" } else { "error" };
1513 println!(
1514 " {} {}.{} [{}] → {} [parallel]",
1515 c("⫘", "\x1b[35m", use_color),
1516 j + 1,
1517 c(&step.step_name, "\x1b[1m", use_color),
1518 step.step_type,
1519 c(
1520 &format!("{status}: {}", truncate_output(&wr.output, 80)),
1521 if wr.success { "\x1b[32m" } else { "\x1b[31m" },
1522 use_color,
1523 ),
1524 );
1525 }
1526
1527 report.record_step(StepReport {
1528 name: step.step_name.clone(),
1529 step_type: step.step_type.clone(),
1530 result: wr.output.clone(),
1531 duration_ms: 0,
1532 input_tokens: 0,
1533 output_tokens: 0,
1534 anchor_breaches: 0,
1535 chain_activations: 0,
1536 was_retried: false,
1537 });
1538
1539 if trace {
1540 events.push(TraceEvent {
1541 event: "step_parallel".to_string(),
1542 unit: unit.flow_name.clone(),
1543 step: step.step_name.clone(),
1544 detail: format!(
1545 "wave={}, success={}, output={}",
1546 wave_idx + 1,
1547 wr.success,
1548 truncate_output(&wr.output, 120),
1549 ),
1550 });
1551 }
1552 }
1553
1554 for wr in &wave_results {
1556 conversation.add_user(&format!("[Step {}]", wr.step_name));
1557 conversation.add_assistant(&wr.output);
1558 }
1559 } else {
1560 for step_name in &wave.steps {
1562 let (j, step) = step_map[step_name.as_str()];
1563
1564 ctx.set_step(&step.step_name, &step.step_type, j);
1565 hooks.on_step_start(&step.step_name, &step.step_type);
1566
1567 if !json {
1568 println!(
1569 " {} {}.{} [{}]",
1570 c("→", "\x1b[33m", use_color),
1571 j + 1,
1572 c(&step.step_name, "\x1b[1m", use_color),
1573 step.step_type,
1574 );
1575 }
1576
1577 if step.step_type == "use_tool" {
1579 let arg = if !step.tool_named_args.is_empty() {
1583 let interpolated: Vec<(String, String)> = step
1584 .tool_named_args
1585 .iter()
1586 .map(|(n, v, kind)| (n.clone(), ctx.resolve_named_arg(v, kind)))
1589 .collect();
1590 build_structured_tool_body(&interpolated, &step.tool_param_types)
1591 } else {
1592 ctx.interpolate(step.tool_argument.as_deref().unwrap_or(""))
1593 };
1594 if let Some(result) = registry.dispatch(&step.step_name, &arg) {
1595 let status = if result.success { "ok" } else { "error" };
1596 if !json {
1597 println!(
1598 " {} {} → {} [native]",
1599 c("🔧", "\x1b[36m", use_color),
1600 c(&result.tool_name, "\x1b[1m", use_color),
1601 c(&format!("{status}: {}", result.output), if result.success { "\x1b[32m" } else { "\x1b[31m" }, use_color),
1602 );
1603 }
1604
1605 if let Some(entry) = registry.get(&step.step_name) {
1607 if !entry.output_schema.is_empty() {
1608 let vr = tool_validator::validate_output(
1609 &step.step_name, &result.output, &entry.output_schema,
1610 );
1611 if !vr.passed && !json {
1612 println!(
1613 " {} {}",
1614 c("⚠", "\x1b[33m", use_color),
1615 c(
1616 &format!("Validation: {} — {}", vr.schema, vr.message),
1617 "\x1b[33m",
1618 use_color,
1619 ),
1620 );
1621 }
1622 if trace {
1623 events.push(TraceEvent {
1624 event: format!("tool_validate_{}", if vr.passed { "pass" } else { "fail" }),
1625 unit: unit.flow_name.clone(),
1626 step: step.step_name.clone(),
1627 detail: format!("schema={}, {}", vr.schema, vr.message),
1628 });
1629 }
1630 }
1631 if !entry.effect_row.is_empty() {
1632 effects.record(
1633 &step.step_name, &step.step_name, &unit.flow_name, &entry.effect_row,
1634 );
1635 }
1636 }
1637
1638 ctx.set_result(&step.step_name, &result.output);
1639 hooks.on_step_end(0, 0, 0, 0, false);
1640 report.record_step(StepReport {
1641 name: step.step_name.clone(),
1642 step_type: step.step_type.clone(),
1643 result: result.output.clone(),
1644 duration_ms: 0,
1645 input_tokens: 0,
1646 output_tokens: 0,
1647 anchor_breaches: 0,
1648 chain_activations: 0,
1649 was_retried: false,
1650 });
1651 if trace {
1652 events.push(TraceEvent {
1653 event: "tool_native".to_string(),
1654 unit: unit.flow_name.clone(),
1655 step: step.step_name.clone(),
1656 detail: format!(
1657 "tool={}, success={}, output={}",
1658 result.tool_name, result.success, result.output
1659 ),
1660 });
1661 }
1662 continue; }
1664 }
1666
1667 if matches!(step.step_type.as_str(), "remember" | "recall" | "persist" | "retrieve" | "mutate" | "purge") {
1669 let raw_expr = step.memory_expression.as_deref().unwrap_or("");
1670 let expr = ctx.interpolate(raw_expr);
1671
1672 if matches!(step.step_type.as_str(), "persist" | "retrieve" | "mutate" | "purge")
1679 && store_registry.backend_kind(&step.step_name)
1680 == Some(StoreBackendKind::Postgresql)
1681 {
1682 let (result_text, ok) = match execute_sql_store_step_async(
1693 store_registry,
1694 pinned_conns,
1695 &step.step_type,
1696 &step.step_name,
1697 raw_expr,
1698 step.store_fields.as_deref(),
1699 &ctx,
1700 ).await {
1701 Ok(summary) => (summary, true),
1702 Err(e) => (format!("store error: {e}"), false),
1703 };
1704 ctx.set_result(&step.step_name, &result_text);
1705 let detail = format!("{} → {}", step.step_name, result_text);
1706 if !json {
1707 let color = if ok { "\x1b[35m" } else { "\x1b[31m" };
1708 println!(
1709 " {} {} [{}]",
1710 c(if ok { "💾" } else { "✗" }, color, use_color),
1711 c(&detail, color, use_color),
1712 step.step_type,
1713 );
1714 }
1715 hooks.on_step_end(0, 0, 0, 0, false);
1716 report.record_step(StepReport {
1717 name: step.step_name.clone(),
1718 step_type: step.step_type.clone(),
1719 result: detail.clone(),
1720 duration_ms: 0,
1721 input_tokens: 0,
1722 output_tokens: 0,
1723 anchor_breaches: 0,
1724 chain_activations: 0,
1725 was_retried: false,
1726 });
1727 if trace {
1728 events.push(TraceEvent {
1729 event: format!("axonstore_sql_{}", step.step_type),
1730 unit: unit.flow_name.clone(),
1731 step: step.step_name.clone(),
1732 detail,
1733 });
1734 }
1735 continue; }
1737
1738 let (action, detail) = match step.step_type.as_str() {
1739 "remember" => {
1740 session.remember(&step.step_name, &expr, &step.step_name);
1741 ctx.set_result(&step.step_name, &expr);
1742 ("remember", format!("{} = {}", step.step_name, expr))
1743 }
1744 "recall" => {
1745 let val = session.recall(&step.step_name)
1746 .map(|e| e.value.clone())
1747 .unwrap_or_else(|| "(not found)".to_string());
1748 ctx.set_result(&step.step_name, &val);
1749 ("recall", format!("{} → {}", step.step_name, val))
1750 }
1751 "persist" => {
1752 session.persist(&step.step_name, &expr, &step.step_name);
1753 ctx.set_result(&step.step_name, &expr);
1754 ("persist", format!("{} → store", step.step_name))
1755 }
1756 "retrieve" => {
1757 let val = session.retrieve(&step.step_name)
1758 .map(|e| e.value.clone())
1759 .unwrap_or_else(|| {
1760 let results = session.retrieve_query(&expr);
1761 if results.is_empty() {
1762 "(not found)".to_string()
1763 } else {
1764 format!("{} entries", results.len())
1765 }
1766 });
1767 ctx.set_result(&step.step_name, &val);
1768 ("retrieve", format!("{} → {}", step.step_name, val))
1769 }
1770 "mutate" => {
1771 let parts: Vec<&str> = expr.splitn(2, ':').collect();
1772 let ok = if parts.len() == 2 {
1773 session.mutate(parts[0], parts[1], &step.step_name)
1774 } else {
1775 false
1776 };
1777 let msg = if ok { "updated" } else { "not found" };
1778 ctx.set_result(&step.step_name, msg);
1779 ("mutate", format!("{} → {}", step.step_name, msg))
1780 }
1781 "purge" => {
1782 let ok = session.purge(&step.step_name);
1783 let msg = if ok { "removed" } else { "not found" };
1784 ctx.set_result(&step.step_name, msg);
1785 ("purge", format!("{} → {}", step.step_name, msg))
1786 }
1787 _ => unreachable!(),
1788 };
1789
1790 if !json {
1791 println!(
1792 " {} {} [{}]",
1793 c("💾", "\x1b[35m", use_color),
1794 c(&detail, "\x1b[35m", use_color),
1795 action,
1796 );
1797 }
1798 hooks.on_step_end(0, 0, 0, 0, false);
1799 report.record_step(StepReport {
1800 name: step.step_name.clone(),
1801 step_type: step.step_type.clone(),
1802 result: detail.clone(),
1803 duration_ms: 0,
1804 input_tokens: 0,
1805 output_tokens: 0,
1806 anchor_breaches: 0,
1807 chain_activations: 0,
1808 was_retried: false,
1809 });
1810 if trace {
1811 events.push(TraceEvent {
1812 event: format!("session_{action}"),
1813 unit: unit.flow_name.clone(),
1814 step: step.step_name.clone(),
1815 detail,
1816 });
1817 }
1818 continue; }
1820
1821 let full_system = format!("{}\n\n{}", unit.system_prompt, step.system_prompt);
1823 let interpolated_prompt = ctx.interpolate(&step.user_prompt);
1824
1825 let dropped = context_window.enforce(&mut conversation);
1827 if dropped > 0 {
1828 if !json {
1829 println!(
1830 " {} {}",
1831 c("⊘", "\x1b[33m", use_color),
1832 c(
1833 &format!(
1834 "Context window: dropped {} messages ({} total chars remaining, ~{}k tokens)",
1835 dropped,
1836 conversation.total_chars(),
1837 ContextWindow::estimate_tokens(conversation.total_chars()) / 1000,
1838 ),
1839 "\x1b[2;33m",
1840 use_color,
1841 ),
1842 );
1843 }
1844 if trace {
1845 events.push(TraceEvent {
1846 event: "context_truncated".to_string(),
1847 unit: unit.flow_name.clone(),
1848 step: step.step_name.clone(),
1849 detail: format!(
1850 "dropped={}, remaining_chars={}, remaining_turns={}",
1851 dropped,
1852 conversation.total_chars(),
1853 conversation.turn_count(),
1854 ),
1855 });
1856 }
1857 }
1858
1859 let step_input_before = total_input_tokens;
1860 let step_output_before = total_output_tokens;
1861 let step_result = execute_step_with_retry(
1862 backend_name,
1863 &api_key,
1864 &full_system,
1865 &interpolated_prompt,
1866 &step.step_name,
1867 &unit.flow_name,
1868 &unit.resolved_anchors,
1869 use_color,
1870 trace,
1871 stream,
1872 json,
1873 &mut conversation,
1874 &mut events,
1875 &mut total_input_tokens,
1876 &mut total_output_tokens,
1877 );
1878 let step_in = total_input_tokens - step_input_before;
1879 let step_out = total_output_tokens - step_output_before;
1880 ctx.set_result(&step.step_name, &step_result);
1881 hooks.on_step_end(step_in, step_out, 0, 0, false);
1882 report.record_step(StepReport {
1883 name: step.step_name.clone(),
1884 step_type: step.step_type.clone(),
1885 result: step_result,
1886 duration_ms: 0,
1887 input_tokens: step_in,
1888 output_tokens: step_out,
1889 anchor_breaches: 0,
1890 chain_activations: 0,
1891 was_retried: false,
1892 });
1893
1894 } } } hooks.on_unit_end();
1899 report.end_unit(&hooks);
1900
1901 if trace {
1902 events.push(TraceEvent {
1903 event: "unit_complete".to_string(),
1904 unit: unit.flow_name.clone(),
1905 step: String::new(),
1906 detail: format!(
1907 "{} steps, {} conversation turns, {} chars context{}",
1908 unit.steps.len(),
1909 conversation.turn_count(),
1910 conversation.total_chars(),
1911 if context_window.was_truncated() {
1912 format!(
1913 ", {} messages truncated across {} events",
1914 context_window.total_dropped,
1915 context_window.truncation_count,
1916 )
1917 } else {
1918 String::new()
1919 },
1920 ),
1921 });
1922 }
1923
1924 if !json {
1925 println!(
1926 " {} {} steps completed",
1927 c("✓", "\x1b[32m", use_color),
1928 unit.steps.len()
1929 );
1930 }
1931 }
1932
1933 if let Err(e) = session.flush() {
1935 if !json {
1936 eprintln!(" {} {}", c("⚠", "\x1b[33m", use_color), e);
1937 }
1938 } else if session.store_count() > 0 && !json {
1939 println!(
1940 " {}",
1941 c(
1942 &format!("💾 Session: {} memory, {} persistent ({})",
1943 session.memory_count(), session.store_count(),
1944 session.store_path().display()),
1945 "\x1b[2m",
1946 use_color,
1947 )
1948 );
1949 }
1950
1951 if !json && (total_input_tokens > 0 || total_output_tokens > 0) {
1953 println!(
1954 "\n {}",
1955 c(
1956 &format!(
1957 "Token usage: {} input + {} output = {} total",
1958 total_input_tokens,
1959 total_output_tokens,
1960 total_input_tokens + total_output_tokens
1961 ),
1962 "\x1b[2m",
1963 use_color,
1964 )
1965 );
1966 }
1967
1968 if hooks.total_steps() > 0 {
1970 if !json {
1971 println!(
1972 " {}",
1973 c(
1974 &format!(
1975 "Execution: {} steps across {} units in {}ms (avg {}ms/step){}",
1976 hooks.total_steps(),
1977 hooks.unit_metrics().len(),
1978 hooks.total_duration_ms(),
1979 hooks.avg_step_duration_ms(),
1980 if hooks.retried_steps() > 0 {
1981 format!(", {} retried", hooks.retried_steps())
1982 } else {
1983 String::new()
1984 },
1985 ),
1986 "\x1b[2m",
1987 use_color,
1988 )
1989 );
1990 }
1991
1992 if trace {
1993 for um in hooks.unit_metrics() {
1995 events.push(TraceEvent {
1996 event: "hook_unit_metrics".to_string(),
1997 unit: um.unit_name.clone(),
1998 step: String::new(),
1999 detail: format!(
2000 "duration={}ms, steps={}, tokens_in={}, tokens_out={}, breaches={}, chains={}",
2001 um.duration_ms, um.total_steps,
2002 um.total_input_tokens, um.total_output_tokens,
2003 um.total_anchor_breaches, um.total_chain_activations,
2004 ),
2005 });
2006 }
2007 }
2008 }
2009
2010 if effects.total_executions() > 0 {
2012 if !json {
2013 println!(
2014 " {}",
2015 c(
2016 &format!("Effects: {}", effects.summary()),
2017 "\x1b[2m",
2018 use_color,
2019 )
2020 );
2021 }
2022 if trace {
2023 events.push(TraceEvent {
2024 event: "effect_summary".to_string(),
2025 unit: String::new(),
2026 step: String::new(),
2027 detail: effects.summary(),
2028 });
2029 }
2030 }
2031
2032 Ok((true, events))
2033}
2034
2035fn execute_real(
2053 units: &[ExecutionUnit],
2054 backend_name: &str,
2055 source_file: &str,
2056 use_color: bool,
2057 trace: bool,
2058 stream: bool,
2059 output_fmt: OutputFormat,
2060 report: &mut ReportBuilder,
2061 registry: &ToolRegistry,
2062 store_registry: &StoreRegistry,
2063 pinned_conns: &mut std::collections::HashMap<String, sqlx::pool::PoolConnection<sqlx::Postgres>>,
2064 api_key_override: Option<&str>,
2065) -> Result<(bool, Vec<TraceEvent>), backend::BackendError> {
2066 block_on_store(execute_real_async(
2067 units,
2068 backend_name,
2069 source_file,
2070 use_color,
2071 trace,
2072 stream,
2073 output_fmt,
2074 report,
2075 registry,
2076 store_registry,
2077 pinned_conns,
2078 api_key_override,
2079 ))
2080}
2081
2082#[allow(clippy::too_many_arguments)]
2088fn execute_step_with_retry(
2089 backend_name: &str,
2090 api_key: &str,
2091 system_prompt: &str,
2092 original_user_prompt: &str,
2093 step_name: &str,
2094 flow_name: &str,
2095 anchors: &[IRAnchor],
2096 use_color: bool,
2097 trace: bool,
2098 stream: bool,
2099 json: bool,
2100 conversation: &mut ConversationHistory,
2101 events: &mut Vec<TraceEvent>,
2102 total_input_tokens: &mut u64,
2103 total_output_tokens: &mut u64,
2104) -> String {
2105 let mut user_prompt = original_user_prompt.to_string();
2106 let mut attempt: u32 = 0;
2107 let mut last_response_text = String::new();
2108 let effective_stream = stream && !json; loop {
2111 let history = conversation.messages();
2112 let result = if effective_stream {
2113 use std::io::Write;
2115 print!(" ");
2116 let _ = std::io::stdout().flush();
2117 let resp = backend::call_multi_stream(
2118 backend_name, api_key, system_prompt, history, &user_prompt, None,
2119 |chunk| {
2120 print!("{chunk}");
2121 let _ = std::io::stdout().flush();
2122 },
2123 );
2124 println!(); resp
2126 } else {
2127 backend::call_multi(backend_name, api_key, system_prompt, history, &user_prompt, None)
2128 };
2129
2130 match result {
2131 Ok(resp) => {
2132 *total_input_tokens += resp.input_tokens;
2133 *total_output_tokens += resp.output_tokens;
2134 last_response_text = resp.text.clone();
2135
2136 if !json {
2138 let preview = if effective_stream {
2139 String::new() } else if resp.text.len() > 500 {
2141 format!("{}…", &resp.text[..500])
2142 } else {
2143 resp.text.clone()
2144 };
2145
2146 println!(
2147 " {} {}",
2148 c("✓", "\x1b[32m", use_color),
2149 c(
2150 &format!(
2151 "[{}] {} tokens in, {} out",
2152 resp.stop_reason, resp.input_tokens, resp.output_tokens
2153 ),
2154 "\x1b[2m",
2155 use_color,
2156 )
2157 );
2158
2159 if !effective_stream {
2160 for line in preview.lines() {
2161 println!(" {line}");
2162 }
2163 }
2164 }
2165
2166 if trace {
2167 events.push(TraceEvent {
2168 event: "step_complete".to_string(),
2169 unit: flow_name.to_string(),
2170 step: step_name.to_string(),
2171 detail: format!(
2172 "model={}, input_tokens={}, output_tokens={}, stop={}, attempt={}",
2173 resp.model, resp.input_tokens, resp.output_tokens, resp.stop_reason, attempt + 1
2174 ),
2175 });
2176 }
2177
2178 if anchors.is_empty() {
2180 conversation.add_user(original_user_prompt);
2181 conversation.add_assistant(&last_response_text);
2182 return last_response_text;
2183 }
2184
2185 let results = anchor_checker::check_all(anchors, &resp.text);
2186 let mut error_breaches: Vec<String> = Vec::new();
2187
2188 for result in &results {
2189 if result.passed {
2190 if !json {
2191 println!(
2192 " {} {}",
2193 c("⚓", "\x1b[32m", use_color),
2194 c(&format!("{}: pass ({:.0}%)", result.anchor_name, result.confidence * 100.0), "\x1b[32m", use_color),
2195 );
2196 }
2197 if trace {
2198 events.push(TraceEvent {
2199 event: "anchor_pass".to_string(),
2200 unit: flow_name.to_string(),
2201 step: step_name.to_string(),
2202 detail: format!("{} (confidence={:.2})", result.anchor_name, result.confidence),
2203 });
2204 }
2205 } else {
2206 if !json {
2207 let severity_color = if result.severity == "error" { "\x1b[31m" } else { "\x1b[33m" };
2208 println!(
2209 " {} {} [{}] ({:.0}%)",
2210 c("⚓", severity_color, use_color),
2211 c(&format!("{}: BREACH", result.anchor_name), &format!("\x1b[1m{severity_color}"), use_color),
2212 result.severity,
2213 result.confidence * 100.0,
2214 );
2215 for v in &result.violations {
2216 println!(
2217 " {} {}",
2218 c("↳", severity_color, use_color),
2219 v,
2220 );
2221 }
2222 }
2223 if trace {
2224 events.push(TraceEvent {
2225 event: "anchor_breach".to_string(),
2226 unit: flow_name.to_string(),
2227 step: step_name.to_string(),
2228 detail: format!(
2229 "{} [{}] (confidence={:.2}): {}",
2230 result.anchor_name,
2231 result.severity,
2232 result.confidence,
2233 result.violations.join("; ")
2234 ),
2235 });
2236 }
2237
2238 if result.severity == "error" {
2240 for v in &result.violations {
2241 error_breaches.push(format!("{}: {}", result.anchor_name, v));
2242 }
2243 }
2244 }
2245 }
2246
2247 let chain_results = anchor_checker::check_chained(&results, anchors, &resp.text);
2249 for (rule, chain_result) in &chain_results {
2250 if chain_result.passed {
2251 if !json {
2252 println!(
2253 " {} {}",
2254 c("⛓", "\x1b[36m", use_color),
2255 c(
2256 &format!(
2257 "{} → {}: pass ({:.0}%) [{}]",
2258 rule.trigger, chain_result.anchor_name,
2259 chain_result.confidence * 100.0, rule.reason,
2260 ),
2261 "\x1b[36m",
2262 use_color,
2263 ),
2264 );
2265 }
2266 } else {
2267 if !json {
2268 println!(
2269 " {} {}",
2270 c("⛓", "\x1b[31m", use_color),
2271 c(
2272 &format!(
2273 "{} → {}: BREACH ({:.0}%) [{}]",
2274 rule.trigger, chain_result.anchor_name,
2275 chain_result.confidence * 100.0, rule.reason,
2276 ),
2277 "\x1b[1;31m",
2278 use_color,
2279 ),
2280 );
2281 for v in &chain_result.violations {
2282 println!(
2283 " {} {}",
2284 c("↳", "\x1b[31m", use_color),
2285 v,
2286 );
2287 }
2288 }
2289 if chain_result.severity == "error" {
2291 for v in &chain_result.violations {
2292 error_breaches.push(format!(
2293 "{} (chained from {}): {}",
2294 chain_result.anchor_name, rule.trigger, v
2295 ));
2296 }
2297 }
2298 }
2299 if trace {
2300 events.push(TraceEvent {
2301 event: "anchor_chain".to_string(),
2302 unit: flow_name.to_string(),
2303 step: step_name.to_string(),
2304 detail: format!(
2305 "{} → {}: {} (confidence={:.2}, reason={})",
2306 rule.trigger,
2307 chain_result.anchor_name,
2308 if chain_result.passed { "pass" } else { "BREACH" },
2309 chain_result.confidence,
2310 rule.reason,
2311 ),
2312 });
2313 }
2314 }
2315
2316 if error_breaches.is_empty() || attempt >= MAX_ANCHOR_RETRIES {
2318 if !error_breaches.is_empty() {
2319 if !json {
2320 println!(
2321 " {} {}",
2322 c("⚠", "\x1b[33m", use_color),
2323 c(
2324 &format!(
2325 "Anchor breaches remain after {} retries — continuing",
2326 MAX_ANCHOR_RETRIES
2327 ),
2328 "\x1b[33m",
2329 use_color,
2330 ),
2331 );
2332 }
2333 if trace {
2334 events.push(TraceEvent {
2335 event: "retry_exhausted".to_string(),
2336 unit: flow_name.to_string(),
2337 step: step_name.to_string(),
2338 detail: format!(
2339 "{} error breaches after {} retries",
2340 error_breaches.len(),
2341 MAX_ANCHOR_RETRIES
2342 ),
2343 });
2344 }
2345 }
2346 conversation.add_user(original_user_prompt);
2347 conversation.add_assistant(&last_response_text);
2348 return last_response_text;
2349 }
2350
2351 attempt += 1;
2353 let feedback = error_breaches
2354 .iter()
2355 .enumerate()
2356 .map(|(i, v)| format!("{}. {}", i + 1, v))
2357 .collect::<Vec<_>>()
2358 .join("\n");
2359
2360 user_prompt = format!(
2361 "{}\n\n\
2362 IMPORTANT: Your previous response violated the following constraints:\n\
2363 {}\n\n\
2364 Please regenerate your response, strictly avoiding the violations listed above.",
2365 original_user_prompt,
2366 feedback
2367 );
2368
2369 if !json {
2370 println!(
2371 " {} {}",
2372 c("↻", "\x1b[35m", use_color),
2373 c(
2374 &format!(
2375 "Retry {}/{} — {} anchor breach(es) detected",
2376 attempt,
2377 MAX_ANCHOR_RETRIES,
2378 error_breaches.len()
2379 ),
2380 "\x1b[1;35m",
2381 use_color,
2382 ),
2383 );
2384 }
2385
2386 if trace {
2387 events.push(TraceEvent {
2388 event: "retry_attempt".to_string(),
2389 unit: flow_name.to_string(),
2390 step: step_name.to_string(),
2391 detail: format!(
2392 "attempt={}/{}, breaches: {}",
2393 attempt,
2394 MAX_ANCHOR_RETRIES,
2395 error_breaches.join("; ")
2396 ),
2397 });
2398 }
2399
2400 }
2402 Err(err) => {
2403 if !json {
2404 eprintln!(
2405 " {} step '{}' failed: {}",
2406 c("✗", "\x1b[31m", use_color),
2407 step_name,
2408 err
2409 );
2410 }
2411
2412 if trace {
2413 events.push(TraceEvent {
2414 event: "step_error".to_string(),
2415 unit: flow_name.to_string(),
2416 step: step_name.to_string(),
2417 detail: format!("{err}"),
2418 });
2419 }
2420
2421 return String::new(); }
2423 }
2424 }
2425}
2426
2427fn truncate(s: &str, max: usize) -> String {
2428 let first_line = s.lines().next().unwrap_or(s);
2429 if first_line.len() > max {
2430 format!("{}…", &first_line[..max])
2431 } else {
2432 first_line.to_string()
2433 }
2434}
2435
2436fn build_plan_export(
2438 units: &[ExecutionUnit],
2439 source_file: &str,
2440 backend: &str,
2441 registry: &ToolRegistry,
2442) -> plan_export::PlanExport {
2443 let mut plan_units = Vec::new();
2444 let mut all_deps = PlanDependencies {
2445 max_depth: 0,
2446 parallel_groups: Vec::new(),
2447 unresolved_refs: Vec::new(),
2448 };
2449
2450 for unit in units {
2451 let step_name_set: std::collections::HashSet<&str> =
2455 unit.steps.iter().map(|s| s.step_name.as_str()).collect();
2456 let step_infos: Vec<step_deps::StepInfo> = unit.steps.iter().map(|s| {
2457 step_deps::StepInfo {
2458 name: s.step_name.clone(),
2459 step_type: s.step_type.clone(),
2460 user_prompt: s.user_prompt.clone(),
2461 argument: step_deps::use_tool_analysis_argument(
2462 s.tool_argument.as_deref()
2463 .or(s.memory_expression.as_deref())
2464 .unwrap_or(""),
2465 &s.tool_named_args,
2466 &step_name_set,
2467 ),
2468 }
2469 }).collect();
2470
2471 let dep_graph = step_deps::analyze(&step_infos);
2472
2473 let plan_steps: Vec<PlanStep> = unit.steps.iter().zip(dep_graph.steps.iter()).map(|(s, d)| {
2475 PlanStep {
2476 name: s.step_name.clone(),
2477 step_type: s.step_type.clone(),
2478 prompt_preview: truncate(&s.user_prompt, 200),
2479 tool_argument: s.tool_argument.clone(),
2480 memory_expression: s.memory_expression.clone(),
2481 depends_on: d.depends_on.clone(),
2482 is_root: d.is_root,
2483 }
2484 }).collect();
2485
2486 plan_units.push(PlanUnit {
2487 flow_name: unit.flow_name.clone(),
2488 persona_name: unit.persona_name.clone(),
2489 context_name: unit.context_name.clone(),
2490 effort: unit.effort.clone(),
2491 anchor_count: unit.resolved_anchors.len(),
2492 anchors: unit.anchor_instructions.clone(),
2493 steps: plan_steps,
2494 });
2495
2496 if dep_graph.max_depth > all_deps.max_depth {
2498 all_deps.max_depth = dep_graph.max_depth;
2499 }
2500 all_deps.parallel_groups.extend(dep_graph.parallel_groups);
2501 all_deps.unresolved_refs.extend(
2502 dep_graph.unresolved_refs.into_iter().map(|(step, var)| {
2503 UnresolvedRef { step, variable: var }
2504 }),
2505 );
2506 }
2507
2508 let tools = PlanTools {
2510 total: registry.len(),
2511 builtin: registry.builtin_names().into_iter().map(|s| s.to_string()).collect(),
2512 program: registry.program_names().into_iter().map(|s| s.to_string()).collect(),
2513 registered: registry.tool_names().into_iter().map(|name| {
2514 let entry = registry.get(name).unwrap();
2515 PlanToolEntry {
2516 name: entry.name.clone(),
2517 provider: entry.provider.clone(),
2518 source: format!("{:?}", entry.source).to_lowercase(),
2519 output_schema: entry.output_schema.clone(),
2520 effect_row: entry.effect_row.clone(),
2521 }
2522 }).collect(),
2523 };
2524
2525 PlanBuilder::build(source_file, backend, &plan_units, tools, all_deps)
2526}
2527
2528pub struct ServerRunnerMetrics {
2531 pub success: bool,
2532 pub steps_executed: usize,
2533 pub tokens_input: u64,
2534 pub tokens_output: u64,
2535 pub anchor_breaches: usize,
2536 pub step_names: Vec<String>,
2537 pub step_results: Vec<String>,
2538 pub per_step_chunks: Vec<Vec<String>>,
2540 pub provenance_events: Vec<String>,
2557 pub blame_attribution: Option<crate::wire_envelope::BlameContext>,
2570 pub epistemic_envelopes: Vec<crate::epistemic_capture::EpistemicEnvelope>,
2577}
2578
2579pub fn derive_epistemic_envelopes_for_flow(
2589 ir: &crate::ir_nodes::IRProgram,
2590 flow_name: &str,
2591) -> Vec<crate::epistemic_capture::EpistemicEnvelope> {
2592 ir.flows
2593 .iter()
2594 .find(|f| f.name == flow_name)
2595 .map(|f| crate::epistemic_capture::collect_for_flow(f, &ir.tools, 1.0))
2596 .unwrap_or_default()
2597}
2598
2599pub fn execute_server_flow(
2600 ir: &crate::ir_nodes::IRProgram,
2601 flow_name: &str,
2602 backend: &str,
2603 source_file: &str,
2604 api_key_override: Option<&str>,
2605 request_body: Option<&serde_json::Value>,
2610 request_path: &std::collections::HashMap<String, String>,
2615 request_query: &std::collections::HashMap<String, String>,
2620 tool_base_url: Option<&str>,
2626) -> Result<ServerRunnerMetrics, String> {
2627 let mut target_run = None;
2628 for run in &ir.runs {
2629 if run.flow_name == flow_name {
2630 target_run = Some(run);
2631 break;
2632 }
2633 }
2634
2635 let mut execution_units = Vec::new();
2636
2637 if let Some(run) = target_run {
2638 execution_units.push(ExecutionUnit {
2639 flow_name: run.flow_name.clone(),
2640 persona_name: run.persona_name.clone(),
2641 context_name: run.context_name.clone(),
2642 system_prompt: build_system_prompt(run, backend),
2643 steps: build_compiled_steps(run, ir),
2644 anchor_instructions: build_anchor_instructions(run),
2645 effort: run.effort.clone(),
2646 resolved_anchors: run.resolved_anchors.clone(),
2647 param_bindings: run
2653 .resolved_flow
2654 .as_ref()
2655 .map(|f| crate::request_binding::bind_request(
2656 f,
2657 request_path,
2658 request_query,
2659 request_body,
2660 ))
2661 .unwrap_or_default(),
2662 });
2663 } else {
2664 let target_flow: &crate::ir_nodes::IRFlow = ir
2665 .flows
2666 .iter()
2667 .find(|f| f.name == flow_name)
2668 .ok_or_else(|| format!("flow '{}' not found in compiled IR", flow_name))?;
2669
2670 let default_persona = ir.personas.first().cloned().unwrap_or_else(|| crate::ir_nodes::IRPersona {
2671 node_type: "Persona",
2672 source_line: 0,
2673 source_column: 0,
2674 name: "Default".to_string(),
2675 domain: vec![],
2676 tone: "".to_string(),
2677 confidence_threshold: None,
2678 cite_sources: None,
2679 refuse_if: vec![],
2680 language: "".to_string(),
2681 description: "".to_string(),
2682 });
2683 let default_context = ir.contexts.first().cloned().unwrap_or_else(|| crate::ir_nodes::IRContext {
2684 node_type: "Context",
2685 source_line: 0,
2686 source_column: 0,
2687 name: "Default".to_string(),
2688 memory_scope: "".to_string(),
2689 language: "".to_string(),
2690 depth: "".to_string(),
2691 max_tokens: None,
2692 temperature: None,
2693 cite_sources: None,
2694 });
2695
2696 let run = crate::ir_nodes::IRRun {
2697 node_type: "Run",
2698 source_line: 0,
2699 source_column: 0,
2700 flow_name: flow_name.to_string(),
2701 arguments: vec![],
2702 persona_name: default_persona.name.clone(),
2703 context_name: default_context.name.clone(),
2704 anchor_names: vec![],
2705 on_failure: "".to_string(),
2706 on_failure_params: vec![],
2707 output_to: "".to_string(),
2708 effort: "low".to_string(),
2709 resolved_flow: Some(target_flow.clone()),
2710 resolved_persona: Some(default_persona),
2711 resolved_context: Some(default_context),
2712 resolved_anchors: ir.anchors.clone(),
2713 };
2714
2715 execution_units.push(ExecutionUnit {
2716 flow_name: run.flow_name.clone(),
2717 persona_name: run.persona_name.clone(),
2718 context_name: run.context_name.clone(),
2719 system_prompt: build_system_prompt(&run, backend),
2720 steps: build_compiled_steps(&run, ir),
2721 anchor_instructions: build_anchor_instructions(&run),
2722 effort: run.effort.clone(),
2723 resolved_anchors: run.resolved_anchors.clone(),
2724 param_bindings: crate::request_binding::bind_request(
2729 target_flow,
2730 request_path,
2731 request_query,
2732 request_body,
2733 ),
2734 });
2735 }
2736
2737 let mut report = crate::output::ReportBuilder::new(source_file, backend, "json");
2738 let mut registry = crate::tool_registry::ToolRegistry::new();
2739 registry.register_from_ir(&ir.tools);
2748 if let Some(base) = tool_base_url {
2752 registry.resolve_relative_endpoints(base);
2753 }
2754
2755 let store_registry = StoreRegistry::build(&ir.axonstore_specs)
2759 .map_err(|e| format!("axonstore registry: {e}"))?;
2760
2761 let needed_pg_stores: std::collections::HashSet<String> = {
2788 let mut needed = std::collections::HashSet::new();
2789 for unit in &execution_units {
2790 for step in &unit.steps {
2791 if matches!(
2792 step.step_type.as_str(),
2793 "persist" | "retrieve" | "mutate" | "purge"
2794 ) && store_registry.backend_kind(&step.step_name)
2795 == Some(crate::store::registry::StoreBackendKind::Postgresql)
2796 {
2797 needed.insert(step.step_name.clone());
2798 }
2799 }
2800 }
2801 needed
2802 };
2803
2804 let (success, _events) = if backend == "stub" {
2805 let result = execute_stub(&execution_units, false, false);
2806 for unit in &execution_units {
2823 report.begin_unit(&unit.flow_name, &unit.persona_name);
2824 for step in &unit.steps {
2825 report.record_step(crate::output::StepReport {
2826 name: step.step_name.clone(),
2827 step_type: step.step_type.clone(),
2828 result: "(stub)".to_string(),
2829 duration_ms: 0,
2830 input_tokens: 0,
2831 output_tokens: 0,
2832 anchor_breaches: 0,
2833 chain_activations: 0,
2834 was_retried: false,
2835 });
2836 }
2837 let mut stub_hooks = crate::hooks::HookManager::new();
2838 stub_hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
2839 stub_hooks.on_unit_end();
2840 report.end_unit(&stub_hooks);
2841 }
2842 result
2843 } else {
2844 block_on_store(async {
2856 let mut pinned_conns: std::collections::HashMap<
2857 String,
2858 sqlx::pool::PoolConnection<sqlx::Postgres>,
2859 > = std::collections::HashMap::new();
2860
2861 for store_name in &needed_pg_stores {
2870 match store_registry.resolve(store_name) {
2871 Ok(crate::store::registry::StoreHandle::Postgres(backend_pool)) => {
2872 match backend_pool.acquire_pin().await {
2873 Ok(conn) => {
2874 crate::store::pin_observability::emit_pin_acquire(
2875 store_name,
2876 flow_name,
2877 "",
2878 "eager",
2879 None,
2880 );
2881 pinned_conns.insert(store_name.clone(), conn);
2882 }
2883 Err(e) => {
2884 tracing::warn!(
2885 target: "axon::store::pin",
2886 store_name = %store_name,
2887 error = %e,
2888 d_letter = "37.x.j.D1",
2889 "failed to acquire flow pin; falling \
2890 back to per-step pool acquisition \
2891 (legacy path) for this store. Adopter \
2892 under transaction-mode pooler may \
2893 observe the unnamed-prepared-statement \
2894 race for this op."
2895 );
2896 }
2897 }
2898 }
2899 Ok(_) => {}
2900 Err(e) => {
2901 tracing::warn!(
2902 target: "axon::store::pin",
2903 store_name = %store_name,
2904 error = %e,
2905 d_letter = "37.x.j.D1",
2906 "failed to resolve axonstore for flow pin; \
2907 falling back to per-step pool acquisition."
2908 );
2909 }
2910 }
2911 }
2912
2913 execute_real_async(
2915 &execution_units,
2916 backend,
2917 source_file,
2918 false,
2919 false,
2920 false,
2921 crate::output::OutputFormat::Json,
2922 &mut report,
2923 ®istry,
2924 &store_registry,
2925 &mut pinned_conns,
2926 api_key_override,
2927 ).await
2928 }).map_err(|e| format!("Backend error: {:?}", e))?
2933 };
2934
2935 let hooks = crate::hooks::HookManager::new();
2936 let r = report.build(success, &hooks);
2937
2938 let blame_attribution =
2943 crate::wire_envelope_producers::derive_blame_from_report(&r);
2944
2945 let mut steps_executed = 0;
2946 let mut tokens_input = 0;
2947 let mut tokens_output = 0;
2948 let mut step_results = Vec::new();
2949 let mut step_names = Vec::new();
2950 let mut anchor_breaches = 0;
2951
2952 for u in r.units {
2953 for s in u.steps {
2954 steps_executed += 1;
2955 tokens_input += s.input_tokens;
2956 tokens_output += s.output_tokens;
2957 step_results.push(s.result);
2958 step_names.push(s.name);
2959 anchor_breaches += s.anchor_breaches as usize;
2960 }
2961 }
2962
2963 let per_step_chunks: Vec<Vec<String>> = step_results.iter().map(|result| {
2965 if result.is_empty() {
2966 vec![]
2967 } else {
2968 result.split_whitespace()
2970 .collect::<Vec<&str>>()
2971 .chunks(3) .map(|chunk| chunk.join(" "))
2973 .collect()
2974 }
2975 }).collect();
2976
2977 let provenance_walk: Vec<(String, String)> = execution_units
2984 .iter()
2985 .flat_map(|u| {
2986 u.steps
2987 .iter()
2988 .map(|s| (s.step_type.clone(), s.step_name.clone()))
2989 })
2990 .collect();
2991 let provenance_events =
2992 crate::wire_envelope_producers::collect_provenance_events_from(
2993 &provenance_walk,
2994 );
2995
2996 let epistemic_envelopes = derive_epistemic_envelopes_for_flow(ir, flow_name);
3006
3007 Ok(ServerRunnerMetrics {
3008 success,
3009 steps_executed,
3010 tokens_input,
3011 tokens_output,
3012 anchor_breaches,
3013 step_names,
3014 step_results,
3015 per_step_chunks,
3016 provenance_events,
3017 blame_attribution,
3018 epistemic_envelopes,
3019 })
3020}
3021
3022pub fn run_run(
3025 file: &str,
3026 backend: &str,
3027 trace: bool,
3028 tool_mode: &str,
3029 stream: bool,
3030 output: &str,
3031 export_plan: bool,
3032) -> i32 {
3033 let output_fmt = match OutputFormat::from_str(output) {
3034 Some(f) => f,
3035 None => {
3036 eprintln!("✗ Invalid output format '{}'. Use 'text' or 'json'.", output);
3037 return 2;
3038 }
3039 };
3040 let json = output_fmt.is_json();
3041 let use_color = if json { false } else { io::stdout().is_terminal() };
3042 let path = Path::new(file);
3043 let filename = path
3044 .file_name()
3045 .map(|n| n.to_string_lossy().into_owned())
3046 .unwrap_or_else(|| file.to_string());
3047
3048 let source = match std::fs::read_to_string(path) {
3050 Ok(s) => s,
3051 Err(_) => {
3052 eprintln!(
3053 "{}",
3054 c(&format!("✗ File not found: {file}"), "\x1b[1;31m", use_color)
3055 );
3056 return 2;
3057 }
3058 };
3059
3060 let tokens = match Lexer::new(&source, file).tokenize() {
3062 Ok(t) => t,
3063 Err(LexerError { message, line, column }) => {
3064 let loc = if column > 0 {
3065 format!(":{line}:{column}")
3066 } else {
3067 format!(":{line}")
3068 };
3069 eprintln!(
3070 "{} {message}",
3071 c(&format!("✗ {filename}{loc}"), "\x1b[1;31m", use_color)
3072 );
3073 return 1;
3074 }
3075 };
3076
3077 let mut parser = Parser::new(tokens);
3079 let program = match parser.parse() {
3080 Ok(p) => p,
3081 Err(ParseError { message, line, column, .. }) => {
3082 let loc = if column > 0 {
3083 format!(":{line}:{column}")
3084 } else {
3085 format!(":{line}")
3086 };
3087 eprintln!(
3088 "{} {message}",
3089 c(&format!("✗ {filename}{loc}"), "\x1b[1;31m", use_color)
3090 );
3091 return 1;
3092 }
3093 };
3094
3095 let type_errors = TypeChecker::new(&program).check();
3097 if !type_errors.is_empty() {
3098 eprintln!(
3099 "{} {} type error(s)",
3100 c(&format!("✗ {filename}"), "\x1b[1;31m", use_color),
3101 type_errors.len()
3102 );
3103 for te in &type_errors {
3104 eprintln!(" error [line {}]: {}", te.line, te.message);
3105 }
3106 return 1;
3107 }
3108
3109 let ir_program = IRGenerator::new().generate(&program);
3111
3112 let units = build_execution_plan(&ir_program, backend);
3114
3115 if units.is_empty() {
3116 eprintln!(
3117 "{}",
3118 c("⚠ No run statements found — nothing to execute.", "\x1b[1;33m", use_color)
3119 );
3120 return 0;
3121 }
3122
3123 let mode_label = if tool_mode == "real" {
3125 if stream { "real+stream" } else { "real" }
3126 } else {
3127 "stub"
3128 };
3129
3130 if !json {
3131 println!(
3132 "{}",
3133 c(
3134 &format!(
3135 "═══ AXON Run: {filename} ({} unit{}, backend={backend}, mode={tool_mode}) ═══",
3136 units.len(),
3137 if units.len() == 1 { "" } else { "s" }
3138 ),
3139 "\x1b[1;36m",
3140 use_color,
3141 )
3142 );
3143 }
3144
3145 let mut report = ReportBuilder::new(file, backend, mode_label);
3146
3147 let mut registry = ToolRegistry::new();
3149 registry.register_from_ir(&ir_program.tools);
3150
3151 let store_registry = match StoreRegistry::build(&ir_program.axonstore_specs) {
3154 Ok(r) => r,
3155 Err(e) => {
3156 eprintln!(
3157 "{} {e}",
3158 c(&format!("✗ {filename}"), "\x1b[1;31m", use_color)
3159 );
3160 return 1;
3161 }
3162 };
3163
3164 if !json && !registry.program_names().is_empty() {
3165 println!(
3166 " {}",
3167 c(
3168 &format!(
3169 "Tools: {} registered ({} builtin + {} program)",
3170 registry.len(),
3171 registry.builtin_names().len(),
3172 registry.program_names().len(),
3173 ),
3174 "\x1b[2m",
3175 use_color,
3176 )
3177 );
3178 }
3179
3180 if export_plan {
3182 let plan = build_plan_export(&units, file, backend, ®istry);
3183 println!("{}", PlanBuilder::to_json(&plan));
3184 return 0;
3185 }
3186
3187 let mut cli_pinned_conns: std::collections::HashMap<
3192 String,
3193 sqlx::pool::PoolConnection<sqlx::Postgres>,
3194 > = std::collections::HashMap::new();
3195 let (success, events) = if tool_mode == "real" {
3196 match execute_real(&units, backend, file, use_color, trace, stream, output_fmt, &mut report, ®istry, &store_registry, &mut cli_pinned_conns, None) {
3197 Ok((s, e)) => (s, e),
3198 Err(err) => {
3199 eprintln!(
3200 "{}",
3201 c(&format!("✗ Backend error: {err}"), "\x1b[1;31m", use_color)
3202 );
3203 return 2;
3204 }
3205 }
3206 } else {
3207 let (s, e) = execute_stub(&units, use_color, trace);
3208 for unit in &units {
3210 report.begin_unit(&unit.flow_name, &unit.persona_name);
3211 for step in &unit.steps {
3212 report.record_step(StepReport {
3213 name: step.step_name.clone(),
3214 step_type: step.step_type.clone(),
3215 result: "(stub)".into(),
3216 duration_ms: 0,
3217 input_tokens: 0,
3218 output_tokens: 0,
3219 anchor_breaches: 0,
3220 chain_activations: 0,
3221 was_retried: false,
3222 });
3223 }
3224 let mut stub_hooks = crate::hooks::HookManager::new();
3226 stub_hooks.on_unit_start(&unit.flow_name, &unit.persona_name);
3227 stub_hooks.on_unit_end();
3228 report.end_unit(&stub_hooks);
3229 }
3230 (s, e)
3231 };
3232
3233 if json {
3235 let stub_hooks = crate::hooks::HookManager::new();
3238 let execution_report = report.build(success, &stub_hooks);
3239 println!("{}", ReportBuilder::to_json(&execution_report));
3240 } else {
3241 let total_steps: usize = units.iter().map(|u| u.steps.len()).sum();
3242 println!(
3243 "\n{}",
3244 c(
3245 &format!(
3246 "═══ {} unit{}, {} step{} — {mode_label} execution complete ═══",
3247 units.len(),
3248 if units.len() == 1 { "" } else { "s" },
3249 total_steps,
3250 if total_steps == 1 { "" } else { "s" },
3251 ),
3252 "\x1b[1;32m",
3253 use_color,
3254 )
3255 );
3256 }
3257
3258 if trace && !events.is_empty() {
3260 let trace_path = Path::new(file).with_extension("trace.json");
3261 let trace_json = serde_json::json!({
3262 "_meta": {
3263 "source": file,
3264 "backend": backend,
3265 "tool_mode": tool_mode,
3266 "axon_version": AXON_VERSION,
3267 "mode": "stub",
3268 },
3269 "events": events,
3270 });
3271 match serde_json::to_string_pretty(&trace_json) {
3272 Ok(json_str) => match std::fs::write(&trace_path, json_str) {
3273 Ok(_) => {
3274 if !json {
3275 println!(
3276 "{}",
3277 c(
3278 &format!("📋 Trace saved → {}", trace_path.display()),
3279 "\x1b[1;35m",
3280 use_color,
3281 )
3282 );
3283 }
3284 }
3285 Err(e) => eprintln!("⚠ Could not save trace: {e}"),
3286 },
3287 Err(e) => eprintln!("⚠ Could not serialize trace: {e}"),
3288 }
3289 }
3290
3291 if success { 0 } else { 1 }
3292}
3293
3294#[cfg(test)]
3297mod fase58e_tests {
3298 use super::*;
3299
3300 #[test]
3301 fn coerce_respects_declared_int_float_bool() {
3302 assert_eq!(coerce_tool_arg_value("5", Some("Int")), serde_json::json!(5));
3303 assert_eq!(
3304 coerce_tool_arg_value("3.14", Some("Float")),
3305 serde_json::json!(3.14)
3306 );
3307 assert_eq!(
3308 coerce_tool_arg_value("true", Some("Bool")),
3309 serde_json::json!(true)
3310 );
3311 assert_eq!(
3312 coerce_tool_arg_value("false", Some("Bool")),
3313 serde_json::json!(false)
3314 );
3315 }
3316
3317 #[test]
3318 fn coerce_keeps_string_param_verbatim_even_if_all_digits() {
3319 assert_eq!(
3321 coerce_tool_arg_value("12345", Some("String")),
3322 serde_json::json!("12345")
3323 );
3324 assert_eq!(
3325 coerce_tool_arg_value("Acme Corp", Some("String")),
3326 serde_json::json!("Acme Corp")
3327 );
3328 }
3329
3330 #[test]
3331 fn coerce_optional_and_generic_types_use_base() {
3332 assert_eq!(coerce_tool_arg_value("7", Some("Int?")), serde_json::json!(7));
3333 assert_eq!(
3335 coerce_tool_arg_value("x", Some("List<String>")),
3336 serde_json::json!("x")
3337 );
3338 }
3339
3340 #[test]
3341 fn coerce_unparseable_scalar_falls_back_to_string_not_dropped() {
3342 assert_eq!(
3346 coerce_tool_arg_value("not-a-number", Some("Int")),
3347 serde_json::json!("not-a-number")
3348 );
3349 assert_eq!(
3350 coerce_tool_arg_value("maybe", Some("Bool")),
3351 serde_json::json!("maybe")
3352 );
3353 }
3354
3355 #[test]
3356 fn coerce_unknown_or_schemaless_param_is_string() {
3357 assert_eq!(coerce_tool_arg_value("5", None), serde_json::json!("5"));
3358 assert_eq!(
3359 coerce_tool_arg_value("5", Some("SearchResults")),
3360 serde_json::json!("5")
3361 );
3362 }
3363
3364 #[test]
3365 fn build_body_assembles_typed_structured_object() {
3366 let args = vec![
3367 ("query".to_string(), "Acme Corp".to_string()),
3368 ("max_results".to_string(), "5".to_string()),
3369 ("safesearch".to_string(), "true".to_string()),
3370 ];
3371 let types = vec![
3372 ("query".to_string(), "String".to_string()),
3373 ("max_results".to_string(), "Int".to_string()),
3374 ("safesearch".to_string(), "Bool".to_string()),
3375 ];
3376 let v: serde_json::Value =
3377 serde_json::from_str(&build_structured_tool_body(&args, &types)).unwrap();
3378 assert_eq!(v["query"], serde_json::json!("Acme Corp"));
3379 assert_eq!(v["max_results"], serde_json::json!(5));
3380 assert_eq!(v["safesearch"], serde_json::json!(true));
3381 assert!(v.get("input").is_none());
3383 }
3384
3385 #[test]
3386 fn build_body_escapes_special_characters_via_serde() {
3387 let args = vec![("query".to_string(), "a\"b\nc".to_string())];
3388 let types = vec![("query".to_string(), "String".to_string())];
3389 let v: serde_json::Value =
3390 serde_json::from_str(&build_structured_tool_body(&args, &types)).unwrap();
3391 assert_eq!(v["query"], serde_json::json!("a\"b\nc"));
3392 }
3393
3394 #[test]
3395 fn build_body_empty_args_is_empty_object() {
3396 assert_eq!(build_structured_tool_body(&[], &[]), "{}");
3397 }
3398}
3399
3400#[cfg(test)]
3401mod fase35e_tests {
3402 use super::*;
3403
3404 fn pg_store(name: &str, connection: &str) -> IRAxonStore {
3405 IRAxonStore {
3406 node_type: "axonstore",
3407 source_line: 0,
3408 source_column: 0,
3409 name: name.to_string(),
3410 backend: "postgresql".to_string(),
3411 connection: connection.to_string(),
3412 confidence_floor: None,
3413 isolation: String::new(),
3414 on_breach: String::new(),
3415 capability: String::new(),
3416 column_schema: None,
3417 }
3418 }
3419
3420 #[test]
3421 fn block_on_store_runs_a_future_from_a_plain_thread() {
3422 let n = block_on_store(async { 20 + 15 });
3424 assert_eq!(n, 35);
3425 }
3426
3427 #[tokio::test]
3428 async fn block_on_store_runs_a_future_from_within_a_runtime() {
3429 let n = block_on_store(async { 7 * 6 });
3434 assert_eq!(n, 42);
3435 }
3436
3437 #[test]
3438 fn sql_store_step_surfaces_missing_env_var_never_a_kv_fallback() {
3439 let registry = StoreRegistry::build(&[pg_store(
3444 "logs",
3445 "env:AXON_NONEXISTENT_VAR_FASE35E",
3446 )])
3447 .unwrap();
3448 let ctx = ExecContext::new("F", "P", 0);
3449 let mut pin_map = std::collections::HashMap::new();
3450 let result = execute_sql_store_step(
3451 ®istry,
3452 &mut pin_map,
3453 "retrieve",
3454 "logs",
3455 "logs:id = 1",
3456 None,
3457 &ctx,
3458 );
3459 assert!(matches!(result, Err(StoreError::MissingEnvVar { .. })));
3460 }
3461
3462 #[test]
3463 fn sql_persist_below_confidence_floor_is_blocked() {
3464 let mut store = pg_store("ledger", "postgresql://u:p@localhost:5432/db");
3468 store.confidence_floor = Some(0.8);
3469 let registry = StoreRegistry::build(&[store]).unwrap();
3470 let mut ctx = ExecContext::new("F", "P", 0);
3471 ctx.set("amount", "100"); let mut pin_map = std::collections::HashMap::new();
3473 let result =
3474 execute_sql_store_step(®istry, &mut pin_map, "persist", "ledger", "ledger", None, &ctx);
3475 assert!(matches!(result, Err(StoreError::Epistemic(_))));
3476 }
3477
3478 #[test]
3479 fn sql_store_step_persist_builds_a_row_from_user_bindings() {
3480 let registry =
3486 StoreRegistry::build(&[pg_store("events", "not a dsn")]).unwrap();
3487 let mut ctx = ExecContext::new("F", "P", 0);
3488 ctx.set("event_kind", "login");
3489 let mut pin_map = std::collections::HashMap::new();
3490 let result =
3491 execute_sql_store_step(®istry, &mut pin_map, "persist", "events", "events", None, &ctx);
3492 assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3493 }
3494
3495 #[test]
3496 fn sql_persist_scopes_the_row_to_the_declared_field_block() {
3497 let registry =
3505 StoreRegistry::build(&[pg_store("chat_history", "not a dsn")]).unwrap();
3506 let mut ctx = ExecContext::new("F", "P", 0);
3507 ctx.set("message", "hello");
3508 ctx.set("channel_kind", "whatsapp");
3509 ctx.set("tenant_id", "acme");
3510 let fields = vec![
3511 ("sender".to_string(), "user".to_string()),
3512 ("content".to_string(), "${message}".to_string()),
3513 ("tenant_id".to_string(), "${tenant_id}".to_string()),
3514 ];
3515 let mut pin_map = std::collections::HashMap::new();
3516 let result = execute_sql_store_step(
3517 ®istry,
3518 &mut pin_map,
3519 "persist",
3520 "chat_history",
3521 "chat_history",
3522 Some(&fields),
3523 &ctx,
3524 );
3525 assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3526 }
3527
3528 #[test]
3529 fn sql_mutate_scopes_the_set_to_the_declared_field_block() {
3530 let registry =
3538 StoreRegistry::build(&[pg_store("accounts", "not a dsn")]).unwrap();
3539 let mut ctx = ExecContext::new("F", "P", 0);
3540 ctx.set("tenant_id", "acme"); ctx.set("new_balance", "500");
3542 let fields = vec![
3543 ("balance".to_string(), "${new_balance}".to_string()),
3544 ("status".to_string(), "active".to_string()),
3545 ];
3546 let mut pin_map = std::collections::HashMap::new();
3547 let result = execute_sql_store_step(
3548 ®istry,
3549 &mut pin_map,
3550 "mutate",
3551 "accounts",
3552 "accounts:id = 1",
3553 Some(&fields),
3554 &ctx,
3555 );
3556 assert!(matches!(result, Err(StoreError::PoolInit { .. })));
3557 }
3558}