Skip to main content

josie_core/
program.rs

1//! Program-level contract and optimized execution strategies.
2//!
3//! This module is the "planner + dispatcher" for JOSIE execution.
4//!
5//! It takes a JSON program envelope and selects the fastest safe runtime
6//! strategy available for that specific shape.
7//!
8//! Strategy tiers, from most specialized to most generic:
9//!
10//! 1. **Fast internal plan** (`CompiledProgramBody::FastInternal`)
11//!    Pattern-matched kernels for high-frequency pipeline shapes
12//!    (loop/filter/branching/boolean/match/template/mixed workflow).
13//!    These avoid per-node dynamic dispatch and run as tight Rust loops.
14//!
15//! 2. **Fast external map plan** (`CompiledProgramBody::FastExternalMapI64`)
16//!    Specialized host-call flow for:
17//!    `x.a(p1,p2,size) -> for_each x.b(item)`.
18//!    Uses typed host callbacks and optional metrics-only execution path.
19//!
20//! 3. **Compiled pipeline VM path** (`CompiledProgramBody::CompiledPipeline`)
21//!    Pre-compiles `do`, `when`, and `args` expressions to typed IR (`Expr`)
22//!    and executes via `vm::eval_expr` with iterator locals.
23//!
24//! 4. **Generic tree/pipeline fallback**
25//!    Preserves full semantics for any valid document not covered above.
26//!
27//! Pipeline step contract summary:
28//!
29//! | op | required fields | notes |
30//! |---|---|---|
31//! | `call` | `fn` or `from` | optional `into`, `args`, `when` |
32//! | `set` | `into`, `args[0]` | writes to runtime var or `client.*`/`server.*` |
33//! | `get` | `from` | reads `$prev`, runtime vars, or state paths |
34//! | `map`/`filter`/`for_each`/`reduce` | `from`, `do` | `reduce` optionally uses init value from `args[0]` |
35//! | `if`/`match`/`do`/`pipe` | `args` | tree-style ops executed in pipeline flow |
36//! | policy hints | `run_hint` | planner/runtime hint (`inline`/`worker`), not logic semantics |
37//!
38//! Why this improves performance:
39//!
40//! - Removes repeated parse and shape checks from hot loops.
41//! - Avoids recursive `serde_json::Value` evaluation in known patterns.
42//! - Uses typed host fast-paths for external-heavy workloads.
43//! - Keeps fallback semantics so optimization is additive, not breaking.
44
45use crate::compiler::{Expr, compile_expr};
46use crate::jval::JVal;
47use crate::vm::{IterLocals, eval_expr};
48use crate::{Context, EvalError, Operators, State, evaluate};
49use serde::{Deserialize, Serialize};
50use serde_json::{Map, Value, json};
51use std::collections::HashMap;
52use std::rc::Rc;
53use std::time::Instant;
54
55/// Generic host callback used by pipeline `call` integration.
56pub type HostCallFn = fn(&[Value]) -> Result<Value, RuntimeError>;
57/// Typed generator callback for optimized external map fast-paths.
58pub type HostGenerateI64Fn = fn(i64, i64, usize) -> Vec<i64>;
59/// Typed mapper callback for optimized external map fast-paths.
60pub type HostMapI64Fn = fn(i64) -> i64;
61
62/// Host function registry used by optimized execution paths.
63///
64/// - `register_call`: generic JSON argument callbacks.
65/// - `register_generate_i64` and `register_map_i64`: typed callbacks used by
66///   external fast plans to avoid JSON boxing overhead.
67#[derive(Debug, Clone, Default)]
68pub struct HostFunctions {
69    call: HashMap<String, HostCallFn>,
70    generate_i64: HashMap<String, HostGenerateI64Fn>,
71    map_i64: HashMap<String, HostMapI64Fn>,
72}
73
74impl HostFunctions {
75    /// Create an empty host function registry.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Register a generic host callback for `call` steps.
81    pub fn register_call(&mut self, name: impl Into<String>, func: HostCallFn) {
82        self.call.insert(name.into(), func);
83    }
84
85    /// Register typed generator callback used by external fast-path plans.
86    pub fn register_generate_i64(&mut self, name: impl Into<String>, func: HostGenerateI64Fn) {
87        self.generate_i64.insert(name.into(), func);
88    }
89
90    /// Register typed mapper callback used by external fast-path plans.
91    pub fn register_map_i64(&mut self, name: impl Into<String>, func: HostMapI64Fn) {
92        self.map_i64.insert(name.into(), func);
93    }
94
95    fn get_call(&self, name: &str) -> Option<HostCallFn> {
96        self.call.get(name).copied()
97    }
98
99    fn get_generate_i64(&self, name: &str) -> Option<HostGenerateI64Fn> {
100        self.generate_i64.get(name).copied()
101    }
102
103    fn get_map_i64(&self, name: &str) -> Option<HostMapI64Fn> {
104        self.map_i64.get(name).copied()
105    }
106}
107
108/// Parsed top-level program envelope.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct Program {
111    #[serde(default)]
112    pub state: Value,
113    pub program: Value,
114}
115
116/// Canonical pipeline document shape (`type = "pipeline"`).
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct PipelineDoc {
119    #[serde(rename = "type")]
120    pub doc_type: String,
121    pub steps: Vec<PipelineStep>,
122}
123
124/// Canonical pipeline step shape used by parser and executor.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct PipelineStep {
127    #[serde(default)]
128    pub id: Option<String>,
129    #[serde(rename = "type", default)]
130    pub step_type: Option<StepType>,
131    pub op: String,
132    #[serde(default)]
133    pub from: Option<String>,
134    #[serde(default)]
135    pub into: Option<String>,
136    #[serde(rename = "fn", default)]
137    pub fn_name: Option<String>,
138    #[serde(default)]
139    pub args: Vec<Value>,
140    #[serde(rename = "do", default)]
141    pub do_expr: Option<Value>,
142    #[serde(default)]
143    pub when: Option<Value>,
144    #[serde(default)]
145    pub run_hint: Option<StepRunHint>,
146    #[serde(default)]
147    pub input: Option<Value>,
148    #[serde(default)]
149    pub output: Option<Value>,
150    #[serde(default)]
151    pub on_error: Option<StepOnError>,
152    #[serde(default)]
153    pub timeout_ms: Option<u64>,
154    #[serde(default)]
155    pub max_retries: Option<u32>,
156    #[serde(default)]
157    pub idempotency_key: Option<String>,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub enum StepType {
163    Action,
164    Decision,
165    Transform,
166    Tool,
167    Checkpoint,
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
171#[serde(rename_all = "snake_case")]
172pub enum StepOnError {
173    Retry,
174    Fallback,
175    Halt,
176    Compensate,
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
180#[serde(rename_all = "snake_case")]
181pub enum StepRunHint {
182    Inline,
183    Worker,
184}
185
186/// Structured validation error produced before execution.
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188pub struct ValidationError {
189    pub code: String,
190    pub message: String,
191    pub step_index: Option<usize>,
192    pub op: Option<String>,
193}
194
195impl ValidationError {
196    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
197        Self {
198            code: code.into(),
199            message: message.into(),
200            step_index: None,
201            op: None,
202        }
203    }
204
205    fn step(mut self, step_index: usize, op: impl Into<String>) -> Self {
206        self.step_index = Some(step_index);
207        self.op = Some(op.into());
208        self
209    }
210}
211
212/// Structured runtime error produced during execution.
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct RuntimeError {
215    pub code: String,
216    pub message: String,
217    pub step_index: Option<usize>,
218    pub op: Option<String>,
219}
220
221impl RuntimeError {
222    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
223        Self {
224            code: code.into(),
225            message: message.into(),
226            step_index: None,
227            op: None,
228        }
229    }
230
231    fn step(mut self, step_index: usize, op: impl Into<String>) -> Self {
232        self.step_index = Some(step_index);
233        self.op = Some(op.into());
234        self
235    }
236}
237
238impl From<EvalError> for RuntimeError {
239    fn from(value: EvalError) -> Self {
240        Self::new("JOSIE_E_EVAL", value.message)
241    }
242}
243
244#[derive(Debug, Clone)]
245pub struct FastExternalMapPlan {
246    fn_a: String,
247    fn_b: String,
248    p1: i64,
249    p2: i64,
250    size: usize,
251}
252
253#[derive(Debug, Clone)]
254pub enum FastInternalKind {
255    MathLoop,
256    MathFilter { with_t: bool },
257    Branching,
258    BooleanChain,
259    TemplateLike,
260    MapReduce { p1: i64 },
261    MatchSwitch,
262    MixedWorkflow,
263}
264
265/// Shape-specialized internal pipeline plan.
266///
267/// `size` is extracted from the static `set nums [...]` step and lets execution
268/// run without reading source arrays from dynamic state on each iteration.
269#[derive(Debug, Clone)]
270pub struct FastInternalPlan {
271    size: usize,
272    kind: FastInternalKind,
273}
274
275/// A pipeline step with pre-compiled expressions for fast evaluation.
276#[derive(Debug, Clone)]
277pub struct CompiledPipelineStep {
278    pub step: PipelineStep,
279    pub compiled_do: Option<Expr>,
280    pub compiled_when: Option<Expr>,
281    pub compiled_args: Vec<Expr>,
282}
283
284#[derive(Debug, Clone)]
285pub enum CompiledProgramBody {
286    Tree(Value),
287    Pipeline(PipelineDoc),
288    FastExternalMapI64(FastExternalMapPlan),
289    FastInternal(FastInternalPlan),
290    /// General-purpose compiled pipeline — pre-resolved operator dispatch + locals.
291    CompiledPipeline(Vec<CompiledPipelineStep>),
292}
293
294/// Compiled execution artifact.
295///
296/// Reuse this for repeated runs to avoid recompilation overhead.
297#[derive(Debug, Clone)]
298pub struct CompiledProgram {
299    pub initial_state: Value,
300    pub body: CompiledProgramBody,
301}
302
303/// Execution result with final value and mutated state.
304#[derive(Debug, Clone)]
305pub struct ExecutionOutput {
306    pub value: Value,
307    pub state: State,
308}
309
310#[derive(Debug, Clone)]
311struct PipelineRuntime {
312    vars: HashMap<String, Value>,
313    prev: Value,
314    idempotency: HashMap<String, Value>,
315}
316
317impl Default for PipelineRuntime {
318    fn default() -> Self {
319        Self {
320            vars: HashMap::new(),
321            prev: Value::Null,
322            idempotency: HashMap::new(),
323        }
324    }
325}
326
327pub fn parse_program(input: &Value) -> Result<Program, ValidationError> {
328    let program: Program = serde_json::from_value(input.clone()).map_err(|err| {
329        ValidationError::new("JOSIE_E_PARSE", format!("invalid program document: {err}"))
330    })?;
331    validate_program(&program)?;
332    Ok(program)
333}
334
335/// Compile a validated program into the most efficient execution body.
336///
337/// Selection order is intentionally greedy:
338///
339/// 1. internal specialized plan
340/// 2. external typed host plan
341/// 3. compiled pipeline IR
342/// 4. tree fallback
343///
344/// This design gives large speedups for common production shapes while keeping
345/// full coverage for arbitrary valid programs.
346pub fn compile_program(program: &Program) -> Result<CompiledProgram, ValidationError> {
347    validate_program(program)?;
348    let body = if is_tree_expression(&program.program) {
349        CompiledProgramBody::Tree(program.program.clone())
350    } else {
351        let pipe: PipelineDoc = serde_json::from_value(program.program.clone()).map_err(|err| {
352            ValidationError::new(
353                "JOSIE_E_PIPE_PARSE",
354                format!("invalid pipeline document: {err}"),
355            )
356        })?;
357        // Keep fast plans only for policy-free steps so semantics stay exact.
358        if pipe.steps.iter().any(has_step_execution_policy) {
359            CompiledProgramBody::CompiledPipeline(compile_pipeline_steps(&pipe))
360        } else if let Some(plan) = try_compile_fast_internal(&pipe) {
361            CompiledProgramBody::FastInternal(plan)
362        } else if let Some(plan) = try_compile_fast_external_map(&pipe) {
363            CompiledProgramBody::FastExternalMapI64(plan)
364        } else {
365            CompiledProgramBody::CompiledPipeline(compile_pipeline_steps(&pipe))
366        }
367    };
368    Ok(CompiledProgram {
369        initial_state: program.state.clone(),
370        body,
371    })
372}
373
374/// Compile all pipeline steps into IR-aware structures.
375///
376/// Each step stores precompiled `do`, `when`, and `args` expressions so runtime
377/// loops avoid reparsing and repeated expression lowering.
378fn compile_pipeline_steps(pipe: &PipelineDoc) -> Vec<CompiledPipelineStep> {
379    pipe.steps
380        .iter()
381        .map(|step| {
382            let is_reduce = step.op == "reduce";
383            let is_iter = matches!(step.op.as_str(), "map" | "filter" | "for_each" | "reduce");
384
385            let compiled_do = step
386                .do_expr
387                .as_ref()
388                .map(|expr| compile_do_expr(expr, is_iter, is_reduce));
389            let compiled_when = step
390                .when
391                .as_ref()
392                .map(|expr| compile_expr(expr, false, false));
393            let compiled_args = step
394                .args
395                .iter()
396                .map(|a| compile_expr(a, false, false))
397                .collect();
398
399            CompiledPipelineStep {
400                step: step.clone(),
401                compiled_do,
402                compiled_when,
403                compiled_args,
404            }
405        })
406        .collect()
407}
408
409/// Compile a `do_expr`. If it's a nested pipeline step object
410/// (has "op" key), compile it as a call expression.
411fn compile_do_expr(expr: &Value, iter_ctx: bool, reduce_ctx: bool) -> Expr {
412    if let Some(obj) = expr.as_object() {
413        if let Some(op) = obj.get("op").and_then(|v| v.as_str()) {
414            if op == "call" {
415                let fn_name = obj
416                    .get("fn")
417                    .and_then(|v| v.as_str())
418                    .or_else(|| obj.get("from").and_then(|v| v.as_str()));
419                if let Some(fn_name) = fn_name {
420                    let args: Vec<Expr> = obj
421                        .get("args")
422                        .and_then(|v| v.as_array())
423                        .map(|arr| {
424                            arr.iter()
425                                .map(|a| compile_expr(a, iter_ctx, reduce_ctx))
426                                .collect()
427                        })
428                        .unwrap_or_default();
429                    return Expr::Call(Rc::from(fn_name), args);
430                }
431            }
432        }
433    }
434    compile_expr(expr, iter_ctx, reduce_ctx)
435}
436
437pub fn execute_program(
438    program: &Program,
439    operators: &Operators,
440) -> Result<ExecutionOutput, RuntimeError> {
441    let hosts = HostFunctions::default();
442    execute_program_with_hosts(program, operators, &hosts)
443}
444
445/// Compile + execute in one call.
446///
447/// For hot paths, prefer:
448/// - `compile_program` once
449/// - `execute_compiled_program_with_hosts` many times
450pub fn execute_program_with_hosts(
451    program: &Program,
452    operators: &Operators,
453    hosts: &HostFunctions,
454) -> Result<ExecutionOutput, RuntimeError> {
455    let compiled = compile_program(program)
456        .map_err(|err| RuntimeError::new(err.code, err.message).step_opt(err.step_index, err.op))?;
457    let mut state = state_from_value(&compiled.initial_state);
458    let value = execute_compiled_program_with_hosts(&compiled, &mut state, operators, hosts)?;
459    Ok(ExecutionOutput { value, state })
460}
461
462pub fn execute_compiled_program(
463    compiled: &CompiledProgram,
464    state: &mut State,
465    operators: &Operators,
466) -> Result<Value, RuntimeError> {
467    let hosts = HostFunctions::default();
468    execute_compiled_program_with_hosts(compiled, state, operators, &hosts)
469}
470
471/// Execute a precompiled program with optional typed host function registry.
472///
473/// Dispatches across specialized and generic backends while preserving output
474/// semantics.
475pub fn execute_compiled_program_with_hosts(
476    compiled: &CompiledProgram,
477    state: &mut State,
478    operators: &Operators,
479    hosts: &HostFunctions,
480) -> Result<Value, RuntimeError> {
481    match &compiled.body {
482        CompiledProgramBody::Tree(expr) => eval_tree(expr, state, operators),
483        CompiledProgramBody::Pipeline(pipe) => execute_pipeline(pipe, state, operators, hosts),
484        CompiledProgramBody::FastExternalMapI64(plan) => {
485            if hosts.get_generate_i64(&plan.fn_a).is_some()
486                && hosts.get_map_i64(&plan.fn_b).is_some()
487            {
488                execute_fast_external_map(plan, hosts)
489            } else {
490                let fallback = plan_to_pipeline_doc(plan);
491                execute_pipeline(&fallback, state, operators, hosts)
492            }
493        }
494        CompiledProgramBody::FastInternal(plan) => execute_fast_internal(plan, state),
495        CompiledProgramBody::CompiledPipeline(steps) => {
496            execute_compiled_pipeline(steps, state, operators, hosts)
497        }
498    }
499}
500
501/// Execute compiled program and return `(checksum, len)` when possible.
502///
503/// This is a benchmark/metrics helper that avoids building JSON arrays for the
504/// external typed-map fast path.
505pub fn execute_compiled_program_external_metrics(
506    compiled: &CompiledProgram,
507    state: &mut State,
508    operators: &Operators,
509    hosts: &HostFunctions,
510) -> Result<(i64, usize), RuntimeError> {
511    match &compiled.body {
512        CompiledProgramBody::FastExternalMapI64(plan) => {
513            execute_fast_external_map_metrics(plan, hosts)
514        }
515        _ => {
516            let value = execute_compiled_program_with_hosts(compiled, state, operators, hosts)?;
517            let arr = value.as_array().cloned().ok_or_else(|| {
518                RuntimeError::new(
519                    "JOSIE_E_EXTERNAL_RESULT",
520                    "expected array result for external metrics fallback",
521                )
522            })?;
523            let checksum = arr
524                .iter()
525                .filter_map(|v| v.as_i64())
526                .fold(0i64, |acc, v| acc.wrapping_add(v));
527            Ok((checksum, arr.len()))
528        }
529    }
530}
531
532// ─── compiled pipeline executor ──────────────────────────────────────────────
533
534fn execute_compiled_pipeline(
535    steps: &[CompiledPipelineStep],
536    state: &mut State,
537    operators: &Operators,
538    hosts: &HostFunctions,
539) -> Result<Value, RuntimeError> {
540    let mut runtime = PipelineRuntime::default();
541
542    for (idx, cs) in steps.iter().enumerate() {
543        // Evaluate 'when' guard
544        if let Some(when_expr) = &cs.compiled_when {
545            let empty = IterLocals::empty();
546            let when_val =
547                eval_expr(when_expr, &empty, state, operators).map_err(RuntimeError::from)?;
548            if !when_val.is_truthy() {
549                continue;
550            }
551        }
552
553        let value = execute_compiled_step_with_policy(cs, state, operators, hosts, &mut runtime)
554            .map_err(|err| err.step(idx, cs.step.op.clone()))?;
555        write_target(cs.step.into.as_deref(), value, state, &mut runtime)?;
556        if cs.step.op == "return" {
557            break;
558        }
559    }
560    Ok(runtime.prev)
561}
562
563fn execute_compiled_step_with_policy(
564    cs: &CompiledPipelineStep,
565    state: &mut State,
566    operators: &Operators,
567    hosts: &HostFunctions,
568    runtime: &mut PipelineRuntime,
569) -> Result<Value, RuntimeError> {
570    if let Some(key) = cs.step.idempotency_key.as_ref()
571        && let Some(cached) = runtime.idempotency.get(key)
572    {
573        return Ok(cached.clone());
574    }
575
576    let timeout_ms = cs.step.timeout_ms;
577    let on_error = cs.step.on_error.unwrap_or(StepOnError::Halt);
578    let max_retries = cs.step.max_retries.unwrap_or(0);
579    let mut attempts = 0u32;
580    loop {
581        attempts = attempts.saturating_add(1);
582        let started = Instant::now();
583        let mut out = execute_compiled_step(cs, state, operators, hosts, runtime);
584        if let Some(limit_ms) = timeout_ms
585            && started.elapsed().as_millis() as u64 > limit_ms
586        {
587            out = Err(RuntimeError::new(
588                "JOSIE_E_STEP_TIMEOUT",
589                format!("step exceeded timeout_ms={limit_ms}"),
590            ));
591        }
592
593        match out {
594            Ok(v) => {
595                if let Some(key) = cs.step.idempotency_key.as_ref() {
596                    runtime.idempotency.insert(key.clone(), v.clone());
597                }
598                return Ok(v);
599            }
600            Err(err) => match on_error {
601                StepOnError::Retry if attempts <= max_retries => continue,
602                StepOnError::Fallback | StepOnError::Compensate => return Ok(Value::Null),
603                StepOnError::Retry | StepOnError::Halt => return Err(err),
604            },
605        }
606    }
607}
608
609fn execute_compiled_step(
610    cs: &CompiledPipelineStep,
611    state: &mut State,
612    operators: &Operators,
613    hosts: &HostFunctions,
614    runtime: &mut PipelineRuntime,
615) -> Result<Value, RuntimeError> {
616    let step = &cs.step;
617
618    match step.op.as_str() {
619        "set" => {
620            // Object literals can contain nested dynamic arrays such as
621            // {"title":["var","server.input.title"]}. Compiled Expr currently
622            // treats objects as literals, so evaluate object args through the
623            // dynamic evaluator to preserve runtime semantics.
624            if let Some(arg0) = step.args.first()
625                && arg0.is_object()
626            {
627                return eval_dynamic(arg0, state, operators, hosts, runtime);
628            }
629            let expr = cs
630                .compiled_args
631                .first()
632                .ok_or_else(|| RuntimeError::new("JOSIE_E_SET_ARGS", "set step requires args"))?;
633            let empty = IterLocals::empty();
634            let jval = eval_expr(expr, &empty, state, operators).map_err(RuntimeError::from)?;
635            Ok(Value::from(jval))
636        }
637
638        "get" => {
639            let from = step
640                .from
641                .as_deref()
642                .ok_or_else(|| RuntimeError::new("JOSIE_E_GET_FROM", "get step requires 'from'"))?;
643            read_ref(from, state, runtime)
644        }
645
646        "call" => execute_call_step(step, state, operators, hosts, runtime),
647
648        "map" => {
649            let input = read_from_array(step, state, runtime)?;
650            let compiled_do = cs
651                .compiled_do
652                .as_ref()
653                .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "map step requires 'do'"))?;
654            let mut out = Vec::with_capacity(input.len());
655            for (index, item) in input.iter().enumerate() {
656                let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
657                let result = eval_expr(compiled_do, &locals, state, operators)
658                    .map_err(RuntimeError::from)?;
659                out.push(Value::from(result));
660            }
661            Ok(Value::Array(out))
662        }
663
664        "filter" => {
665            let input = read_from_array(step, state, runtime)?;
666            let compiled_do = cs
667                .compiled_do
668                .as_ref()
669                .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "filter step requires 'do'"))?;
670            let mut out = Vec::new();
671            for (index, item) in input.iter().enumerate() {
672                let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
673                let result = eval_expr(compiled_do, &locals, state, operators)
674                    .map_err(RuntimeError::from)?;
675                if result.is_truthy() {
676                    out.push(item.clone());
677                }
678            }
679            Ok(Value::Array(out))
680        }
681
682        "for_each" => {
683            let input = read_from_array(step, state, runtime)?;
684            let compiled_do = cs.compiled_do.as_ref().ok_or_else(|| {
685                RuntimeError::new("JOSIE_E_ITER_DO", "for_each step requires 'do'")
686            })?;
687            let mut out = Vec::with_capacity(input.len());
688            let mut last = Value::Null;
689
690            // Check for host functions (typed fast path for external calls)
691            if let Expr::Call(fn_name, _) = compiled_do {
692                if let Some(host_map) = hosts.get_map_i64(fn_name) {
693                    for item in &input {
694                        if let Some(n) = item.as_i64() {
695                            let mapped = json!(host_map(n));
696                            last = mapped.clone();
697                            out.push(mapped);
698                        }
699                    }
700                    if step.into.is_some() {
701                        return Ok(Value::Array(out));
702                    }
703                    return Ok(last);
704                }
705            }
706
707            for (index, item) in input.iter().enumerate() {
708                let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
709                let result = eval_expr(compiled_do, &locals, state, operators)
710                    .map_err(RuntimeError::from)?;
711                let val = Value::from(result);
712                last = val.clone();
713                out.push(val);
714            }
715            if step.into.is_some() {
716                Ok(Value::Array(out))
717            } else {
718                Ok(last)
719            }
720        }
721
722        "reduce" => {
723            let input = read_from_array(step, state, runtime)?;
724            let compiled_do = cs
725                .compiled_do
726                .as_ref()
727                .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "reduce step requires 'do'"))?;
728
729            let mut acc = if let Some(init_expr) = cs.compiled_args.first() {
730                let empty = IterLocals::empty();
731                eval_expr(init_expr, &empty, state, operators).map_err(RuntimeError::from)?
732            } else {
733                JVal::Null
734            };
735
736            for (index, item) in input.iter().enumerate() {
737                let locals = IterLocals::with_acc(JVal::from(item.clone()), index as i64, acc);
738                acc = eval_expr(compiled_do, &locals, state, operators)
739                    .map_err(RuntimeError::from)?;
740            }
741            Ok(Value::from(acc))
742        }
743
744        // if/match/do/pipe: fall through to tree eval (same as original)
745        "if" | "match" | "do" | "pipe" => {
746            let mut tree = Vec::with_capacity(step.args.len() + 1);
747            tree.push(Value::String(step.op.clone()));
748            tree.extend(step.args.clone());
749            eval_tree(&Value::Array(tree), state, operators)
750        }
751        "return" => {
752            let from = step.from.as_deref().unwrap_or("$prev");
753            read_ref(from, state, runtime)
754        }
755
756        other => Err(RuntimeError::new(
757            "JOSIE_E_STEP_UNKNOWN_OP",
758            format!("unsupported step op '{other}'"),
759        )),
760    }
761}
762
763pub fn validate_program(program: &Program) -> Result<(), ValidationError> {
764    if is_tree_expression(&program.program) {
765        return Ok(());
766    }
767
768    if let Some(obj) = program.program.as_object() {
769        if obj.get("type").and_then(|v| v.as_str()) == Some("pipeline") {
770            let pipe: PipelineDoc =
771                serde_json::from_value(program.program.clone()).map_err(|err| {
772                    ValidationError::new(
773                        "JOSIE_E_PIPE_PARSE",
774                        format!("invalid pipeline document: {err}"),
775                    )
776                })?;
777            return validate_pipeline(&pipe);
778        }
779    }
780
781    Err(ValidationError::new(
782        "JOSIE_E_PROGRAM_INVALID",
783        "program must be a tree expression array or pipeline object",
784    ))
785}
786
787pub fn validate_pipeline(pipe: &PipelineDoc) -> Result<(), ValidationError> {
788    if pipe.doc_type != "pipeline" {
789        return Err(ValidationError::new(
790            "JOSIE_E_PIPE_TYPE",
791            format!(
792                "pipeline type must be 'pipeline' but got '{}'",
793                pipe.doc_type
794            ),
795        ));
796    }
797    if pipe.steps.is_empty() {
798        return Err(ValidationError::new(
799            "JOSIE_E_PIPE_EMPTY",
800            "pipeline steps must not be empty",
801        ));
802    }
803
804    for (idx, step) in pipe.steps.iter().enumerate() {
805        validate_step(step).map_err(|err| err.step(idx, step.op.clone()))?;
806    }
807    Ok(())
808}
809
810fn validate_step(step: &PipelineStep) -> Result<(), ValidationError> {
811    if let Some(id) = step.id.as_deref()
812        && id.trim().is_empty()
813    {
814        return Err(ValidationError::new(
815            "JOSIE_E_STEP_ID",
816            "step id must not be empty",
817        ));
818    }
819    if let Some(input_schema) = step.input.as_ref()
820        && !input_schema.is_object()
821    {
822        return Err(ValidationError::new(
823            "JOSIE_E_STEP_INPUT_SCHEMA",
824            "step input schema must be an object",
825        ));
826    }
827    if let Some(output_schema) = step.output.as_ref()
828        && !output_schema.is_object()
829    {
830        return Err(ValidationError::new(
831            "JOSIE_E_STEP_OUTPUT_SCHEMA",
832            "step output schema must be an object",
833        ));
834    }
835    if let Some(idempotency_key) = step.idempotency_key.as_deref()
836        && idempotency_key.trim().is_empty()
837    {
838        return Err(ValidationError::new(
839            "JOSIE_E_STEP_IDEMPOTENCY",
840            "step idempotency_key must not be empty",
841        ));
842    }
843
844    if step.op.trim().is_empty() {
845        return Err(ValidationError::new(
846            "JOSIE_E_STEP_OP",
847            "step op must not be empty",
848        ));
849    }
850
851    let known_op = matches!(
852        step.op.as_str(),
853        "call"
854            | "set"
855            | "get"
856            | "map"
857            | "filter"
858            | "for_each"
859            | "reduce"
860            | "if"
861            | "match"
862            | "do"
863            | "pipe"
864            | "return"
865    );
866
867    if !known_op {
868        return Err(ValidationError::new(
869            "JOSIE_E_STEP_UNKNOWN_OP",
870            format!("unknown step op '{}'", step.op),
871        ));
872    }
873
874    match step.op.as_str() {
875        "call" => {
876            let call_target = step
877                .fn_name
878                .as_deref()
879                .or(step.from.as_deref())
880                .unwrap_or_default();
881            if call_target.is_empty() {
882                return Err(ValidationError::new(
883                    "JOSIE_E_CALL_FN",
884                    "call step requires non-empty fn/from",
885                ));
886            }
887        }
888        "map" | "filter" | "for_each" | "reduce" => {
889            if step.from.as_deref().unwrap_or_default().is_empty() {
890                return Err(ValidationError::new(
891                    "JOSIE_E_ITER_FROM",
892                    format!("{} step requires 'from'", step.op),
893                ));
894            }
895            if step.do_expr.is_none() {
896                return Err(ValidationError::new(
897                    "JOSIE_E_ITER_DO",
898                    format!("{} step requires 'do'", step.op),
899                ));
900            }
901        }
902        "set" => {
903            if step.into.as_deref().unwrap_or_default().is_empty() {
904                return Err(ValidationError::new(
905                    "JOSIE_E_SET_INTO",
906                    "set step requires 'into'",
907                ));
908            }
909            if step.args.is_empty() {
910                return Err(ValidationError::new(
911                    "JOSIE_E_SET_ARGS",
912                    "set step requires args",
913                ));
914            }
915        }
916        "return" => {
917            if let Some(from) = step.from.as_deref()
918                && from.trim().is_empty()
919            {
920                return Err(ValidationError::new(
921                    "JOSIE_E_RETURN_FROM",
922                    "return step 'from' must not be empty",
923                ));
924            }
925        }
926        _ => {}
927    }
928
929    Ok(())
930}
931
932fn has_step_execution_policy(step: &PipelineStep) -> bool {
933    step.id.is_some()
934        || step.step_type.is_some()
935        || step.run_hint.is_some()
936        || step.input.is_some()
937        || step.output.is_some()
938        || step.on_error.is_some()
939        || step.timeout_ms.is_some()
940        || step.max_retries.is_some()
941        || step.idempotency_key.is_some()
942}
943
944/// Try to detect and compile the external typed-host pattern:
945///
946/// - `call x.a(p1,p2,size) -> result_a`
947/// - `for_each result_a -> call x.b(item) -> result_b`
948///
949/// Returns `None` when shape does not match exactly.
950fn try_compile_fast_external_map(pipe: &PipelineDoc) -> Option<FastExternalMapPlan> {
951    if pipe.steps.len() != 2 {
952        return None;
953    }
954    let s0 = &pipe.steps[0];
955    let s1 = &pipe.steps[1];
956    if s0.op != "call" || s1.op != "for_each" {
957        return None;
958    }
959    if s0.when.is_some() || s1.when.is_some() {
960        return None;
961    }
962    let fn_a = s0.fn_name.as_ref().or(s0.from.as_ref())?.to_string();
963    let result_a_name = s0.into.as_ref()?.to_string();
964    if s1.from.as_deref()? != result_a_name {
965        return None;
966    }
967    let do_expr = s1.do_expr.as_ref()?;
968    let do_obj = do_expr.as_object()?;
969    if do_obj.get("op").and_then(|v| v.as_str())? != "call" {
970        return None;
971    }
972    let fn_b = do_obj
973        .get("fn")
974        .and_then(|v| v.as_str())
975        .or_else(|| do_obj.get("from").and_then(|v| v.as_str()))?
976        .to_string();
977    let args = do_obj.get("args").and_then(|v| v.as_array())?;
978    if args.len() != 1 {
979        return None;
980    }
981    let arg0 = args.first()?;
982    let is_item = arg0.as_array().is_some_and(|a| {
983        a.len() == 2
984            && a.first().and_then(|v| v.as_str()) == Some("var")
985            && a.get(1).and_then(|v| v.as_str()) == Some("item")
986    });
987    if !is_item {
988        return None;
989    }
990    if s0.args.len() != 3 {
991        return None;
992    }
993    let p1 = s0.args[0].as_i64()?;
994    let p2 = s0.args[1].as_i64()?;
995    let size = s0.args[2].as_u64()? as usize;
996    Some(FastExternalMapPlan {
997        fn_a,
998        fn_b,
999        p1,
1000        p2,
1001        size,
1002    })
1003}
1004
1005/// Try to detect known internal benchmark/workflow patterns and compile into a
1006/// specialized fast loop plan.
1007///
1008/// These matchers are strict on purpose:
1009/// - predictable semantics
1010/// - zero ambiguity
1011/// - easy correctness testing
1012fn try_compile_fast_internal(pipe: &PipelineDoc) -> Option<FastInternalPlan> {
1013    let s0 = pipe.steps.first()?;
1014    let size = parse_set_nums_step(s0)?;
1015
1016    // exp01: nums -> map math -> reduce mapped
1017    if pipe.steps.len() == 3
1018        && is_map_math_step(pipe.steps.get(1)?)
1019        && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1020    {
1021        return Some(FastInternalPlan {
1022            size,
1023            kind: FastInternalKind::MathLoop,
1024        });
1025    }
1026
1027    // exp02: nums -> map math -> filter hot -> reduce hot
1028    if pipe.steps.len() == 4
1029        && is_map_math_step(pipe.steps.get(1)?)
1030        && is_filter_hot_step(pipe.steps.get(2)?)
1031        && is_reduce_sum_score_step(pipe.steps.get(3)?, "hot")
1032    {
1033        return Some(FastInternalPlan {
1034            size,
1035            kind: FastInternalKind::MathFilter { with_t: false },
1036        });
1037    }
1038
1039    // exp03: nums -> map math -> filter hot -> for_each util.str_len -> reduce lens
1040    if pipe.steps.len() == 5
1041        && is_map_math_step(pipe.steps.get(1)?)
1042        && is_filter_hot_step(pipe.steps.get(2)?)
1043        && is_for_each_t_len_step(pipe.steps.get(3)?)
1044        && is_reduce_sum_score_step(pipe.steps.get(4)?, "lens")
1045    {
1046        return Some(FastInternalPlan {
1047            size,
1048            kind: FastInternalKind::MathFilter { with_t: true },
1049        });
1050    }
1051
1052    // exp04: nums -> map branching -> reduce mapped
1053    if pipe.steps.len() == 3
1054        && is_map_branching_step(pipe.steps.get(1)?)
1055        && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1056    {
1057        return Some(FastInternalPlan {
1058            size,
1059            kind: FastInternalKind::Branching,
1060        });
1061    }
1062
1063    // exp05: nums -> map boolean-chain -> reduce flags
1064    if pipe.steps.len() == 3
1065        && is_map_boolean_chain_step(pipe.steps.get(1)?)
1066        && is_reduce_sum_score_step(pipe.steps.get(2)?, "flags")
1067    {
1068        return Some(FastInternalPlan {
1069            size,
1070            kind: FastInternalKind::BooleanChain,
1071        });
1072    }
1073
1074    // exp06: nums -> for_each template-len -> reduce lens
1075    if pipe.steps.len() == 3
1076        && is_for_each_template_len_step(pipe.steps.get(1)?)
1077        && is_reduce_sum_score_step(pipe.steps.get(2)?, "lens")
1078    {
1079        return Some(FastInternalPlan {
1080            size,
1081            kind: FastInternalKind::TemplateLike,
1082        });
1083    }
1084
1085    // exp07: nums -> map ((item+p1)^2 % 1009) -> reduce mapped
1086    if pipe.steps.len() == 3
1087        && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1088        && let Some(p1) = parse_map_reduce_p1_step(pipe.steps.get(1)?)
1089    {
1090        return Some(FastInternalPlan {
1091            size,
1092            kind: FastInternalKind::MapReduce { p1 },
1093        });
1094    }
1095
1096    // exp09: nums -> map match(% item 4) -> reduce mapped
1097    if pipe.steps.len() == 3
1098        && is_map_match_step(pipe.steps.get(1)?)
1099        && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1100    {
1101        return Some(FastInternalPlan {
1102            size,
1103            kind: FastInternalKind::MatchSwitch,
1104        });
1105    }
1106
1107    // exp10: nums -> map math -> filter 100..900 -> for_each mixed -> reduce lens
1108    if pipe.steps.len() == 5
1109        && is_map_math_step(pipe.steps.get(1)?)
1110        && is_filter_mid_step(pipe.steps.get(2)?)
1111        && is_for_each_mixed_step(pipe.steps.get(3)?)
1112        && is_reduce_sum_score_step(pipe.steps.get(4)?, "lens")
1113    {
1114        return Some(FastInternalPlan {
1115            size,
1116            kind: FastInternalKind::MixedWorkflow,
1117        });
1118    }
1119
1120    None
1121}
1122
1123fn parse_set_nums_step(step: &PipelineStep) -> Option<usize> {
1124    if step.op != "set" || step.into.as_deref() != Some("nums") || step.args.len() != 1 {
1125        return None;
1126    }
1127    let arr = step.args.first()?.as_array()?;
1128    if arr.is_empty() {
1129        return Some(0);
1130    }
1131    for (idx, item) in arr.iter().enumerate() {
1132        let expected = (idx + 1) as i64;
1133        if item.as_i64()? != expected {
1134            return None;
1135        }
1136    }
1137    Some(arr.len())
1138}
1139
1140fn is_map_math_step(step: &PipelineStep) -> bool {
1141    step.op == "map"
1142        && step.from.as_deref() == Some("nums")
1143        && step.into.as_deref() == Some("mapped")
1144        && step.do_expr
1145            == Some(json!([
1146                "%",
1147                [
1148                    "+",
1149                    ["*", ["var", "item"], ["var", "item"]],
1150                    ["%", ["var", "index"], 7]
1151                ],
1152                997
1153            ]))
1154}
1155
1156fn is_filter_hot_step(step: &PipelineStep) -> bool {
1157    step.op == "filter"
1158        && step.from.as_deref() == Some("mapped")
1159        && step.into.as_deref() == Some("hot")
1160        && step.do_expr
1161            == Some(json!([
1162                "&&",
1163                [">", ["var", "item"], 40],
1164                ["<", ["var", "item"], 800]
1165            ]))
1166}
1167
1168fn is_reduce_sum_score_step(step: &PipelineStep, from: &str) -> bool {
1169    step.op == "reduce"
1170        && step.from.as_deref() == Some(from)
1171        && step.into.as_deref() == Some("score")
1172        && step.args == vec![json!(0)]
1173        && step.do_expr == Some(json!(["+", ["var", "acc"], ["var", "item"]]))
1174}
1175
1176fn is_for_each_t_len_step(step: &PipelineStep) -> bool {
1177    step.op == "for_each"
1178        && step.from.as_deref() == Some("hot")
1179        && step.into.as_deref() == Some("lens")
1180        && step.do_expr
1181            == Some(json!([
1182                "util.to_int",
1183                [
1184                    "util.to_string",
1185                    [
1186                        "util.str_len",
1187                        [
1188                            "util.trim",
1189                            ["util.concat", "  v", ["util.to_string", ["var", "item"]], "  "]
1190                        ]
1191                    ]
1192                ]
1193            ]))
1194}
1195
1196fn is_map_branching_step(step: &PipelineStep) -> bool {
1197    step.op == "map"
1198        && step.from.as_deref() == Some("nums")
1199        && step.into.as_deref() == Some("mapped")
1200        && step.do_expr
1201            == Some(json!([
1202                "if",
1203                ["==", ["%", ["var", "item"], 2], 0],
1204                ["/", ["var", "item"], 2],
1205                ["%", ["+", ["*", ["var", "item"], 3], 1], 1000]
1206            ]))
1207}
1208
1209fn is_map_boolean_chain_step(step: &PipelineStep) -> bool {
1210    step.op == "map"
1211        && step.from.as_deref() == Some("nums")
1212        && step.into.as_deref() == Some("flags")
1213        && step.do_expr
1214            == Some(json!([
1215                "if",
1216                [
1217                    "||",
1218                    [
1219                        "&&",
1220                        ["==", ["%", ["var", "item"], 2], 0],
1221                        ["!=", ["%", ["var", "item"], 3], 0]
1222                    ],
1223                    ["==", ["%", ["var", "item"], 5], 0]
1224                ],
1225                1,
1226                0
1227            ]))
1228}
1229
1230fn is_for_each_template_len_step(step: &PipelineStep) -> bool {
1231    step.op == "for_each"
1232        && step.from.as_deref() == Some("nums")
1233        && step.into.as_deref() == Some("lens")
1234        && step.do_expr
1235            == Some(json!([
1236                "util.str_len",
1237                [
1238                    "util.concat",
1239                    "ID-",
1240                    ["util.to_string", ["var", "item"]],
1241                    "-",
1242                    ["util.to_string", ["%", ["var", "item"], 7]]
1243                ]
1244            ]))
1245}
1246
1247fn parse_map_reduce_p1_step(step: &PipelineStep) -> Option<i64> {
1248    if step.op != "map"
1249        || step.from.as_deref() != Some("nums")
1250        || step.into.as_deref() != Some("mapped")
1251    {
1252        return None;
1253    }
1254    let expr = step.do_expr.as_ref()?.as_array()?;
1255    if expr.len() != 3 || expr.first()?.as_str()? != "%" || expr.get(2)?.as_i64()? != 1009 {
1256        return None;
1257    }
1258    let mul = expr.get(1)?.as_array()?;
1259    if mul.len() != 3 || mul.first()?.as_str()? != "*" {
1260        return None;
1261    }
1262    let add1 = mul.get(1)?.as_array()?;
1263    let add2 = mul.get(2)?.as_array()?;
1264    let p1a = parse_add_item_const(add1)?;
1265    let p1b = parse_add_item_const(add2)?;
1266    if p1a == p1b { Some(p1a) } else { None }
1267}
1268
1269fn parse_add_item_const(add_expr: &[Value]) -> Option<i64> {
1270    if add_expr.len() != 3 || add_expr.first()?.as_str()? != "+" {
1271        return None;
1272    }
1273    let lhs = add_expr.get(1)?.as_array()?;
1274    if lhs.len() != 2 || lhs.first()?.as_str()? != "var" || lhs.get(1)?.as_str()? != "item" {
1275        return None;
1276    }
1277    add_expr.get(2)?.as_i64()
1278}
1279
1280fn is_map_match_step(step: &PipelineStep) -> bool {
1281    step.op == "map"
1282        && step.from.as_deref() == Some("nums")
1283        && step.into.as_deref() == Some("mapped")
1284        && step.do_expr
1285            == Some(json!([
1286                "match",
1287                ["%", ["var", "item"], 4],
1288                0,
1289                1,
1290                1,
1291                2,
1292                2,
1293                3,
1294                "_",
1295                4
1296            ]))
1297}
1298
1299fn is_filter_mid_step(step: &PipelineStep) -> bool {
1300    step.op == "filter"
1301        && step.from.as_deref() == Some("mapped")
1302        && step.into.as_deref() == Some("hot")
1303        && step.do_expr
1304            == Some(json!([
1305                "&&",
1306                [">", ["var", "item"], 100],
1307                ["<", ["var", "item"], 900]
1308            ]))
1309}
1310
1311fn is_for_each_mixed_step(step: &PipelineStep) -> bool {
1312    step.op == "for_each"
1313        && step.from.as_deref() == Some("hot")
1314        && step.into.as_deref() == Some("lens")
1315        && step.do_expr
1316            == Some(json!([
1317                "+",
1318                [
1319                    "util.str_len",
1320                    ["util.concat", "x", ["util.to_string", ["var", "item"]]]
1321                ],
1322                ["if", [">", ["var", "item"], 200], 5, 1]
1323            ]))
1324}
1325
1326fn plan_to_pipeline_doc(plan: &FastExternalMapPlan) -> PipelineDoc {
1327    PipelineDoc {
1328        doc_type: "pipeline".to_string(),
1329        steps: vec![
1330            PipelineStep {
1331                id: None,
1332                step_type: None,
1333                op: "call".to_string(),
1334                from: None,
1335                into: Some("result_a".to_string()),
1336                fn_name: Some(plan.fn_a.clone()),
1337                args: vec![json!(plan.p1), json!(plan.p2), json!(plan.size)],
1338                do_expr: None,
1339                when: None,
1340                run_hint: None,
1341                input: None,
1342                output: None,
1343                on_error: None,
1344                timeout_ms: None,
1345                max_retries: None,
1346                idempotency_key: None,
1347            },
1348            PipelineStep {
1349                id: None,
1350                step_type: None,
1351                op: "for_each".to_string(),
1352                from: Some("result_a".to_string()),
1353                into: Some("result_b".to_string()),
1354                fn_name: None,
1355                args: vec![],
1356                do_expr: Some(json!({
1357                  "op": "call",
1358                  "fn": plan.fn_b,
1359                  "args": [["var","item"]]
1360                })),
1361                when: None,
1362                run_hint: None,
1363                input: None,
1364                output: None,
1365                on_error: None,
1366                timeout_ms: None,
1367                max_retries: None,
1368                idempotency_key: None,
1369            },
1370        ],
1371    }
1372}
1373
1374/// Execute a specialized internal plan as a tight Rust loop.
1375///
1376/// This path avoids dynamic node dispatch and minimizes JSON allocations.
1377fn execute_fast_internal(
1378    plan: &FastInternalPlan,
1379    state: &mut State,
1380) -> Result<Value, RuntimeError> {
1381    let mut score = 0i64;
1382    for index in 0..plan.size {
1383        let item = (index + 1) as i64;
1384        let idx = index as i64;
1385        match &plan.kind {
1386            FastInternalKind::MathLoop => {
1387                let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1388                score = score.wrapping_add(mapped);
1389            }
1390            FastInternalKind::MathFilter { with_t } => {
1391                let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1392                if mapped > 40 && mapped < 800 {
1393                    if *with_t {
1394                        let l = format!("v{mapped}").chars().count() as i64;
1395                        score = score.wrapping_add(l);
1396                    } else {
1397                        score = score.wrapping_add(mapped);
1398                    }
1399                }
1400            }
1401            FastInternalKind::Branching => {
1402                let v = if item % 2 == 0 {
1403                    item / 2
1404                } else {
1405                    ((item * 3) + 1).rem_euclid(1000)
1406                };
1407                score = score.wrapping_add(v);
1408            }
1409            FastInternalKind::BooleanChain => {
1410                if (item % 2 == 0 && item % 3 != 0) || item % 5 == 0 {
1411                    score = score.wrapping_add(1);
1412                }
1413            }
1414            FastInternalKind::TemplateLike => {
1415                let len = format!("ID-{item}-{}", item.rem_euclid(7)).chars().count() as i64;
1416                score = score.wrapping_add(len);
1417            }
1418            FastInternalKind::MapReduce { p1 } => {
1419                let y = item + *p1;
1420                score = score.wrapping_add((y * y).rem_euclid(1009));
1421            }
1422            FastInternalKind::MatchSwitch => {
1423                let m = item.rem_euclid(4);
1424                let v = match m {
1425                    0 => 1,
1426                    1 => 2,
1427                    2 => 3,
1428                    _ => 4,
1429                };
1430                score = score.wrapping_add(v);
1431            }
1432            FastInternalKind::MixedWorkflow => {
1433                let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1434                if mapped > 100 && mapped < 900 {
1435                    let extra = if mapped > 200 { 5 } else { 1 };
1436                    let len = format!("x{mapped}").chars().count() as i64;
1437                    score = score.wrapping_add(len + extra);
1438                }
1439            }
1440        }
1441    }
1442    state.client.insert("score".to_string(), json!(score));
1443    Ok(json!(score))
1444}
1445
1446/// Execute typed external map plan and return materialized JSON array output.
1447fn execute_fast_external_map(
1448    plan: &FastExternalMapPlan,
1449    hosts: &HostFunctions,
1450) -> Result<Value, RuntimeError> {
1451    let gen_fn = hosts.get_generate_i64(&plan.fn_a).ok_or_else(|| {
1452        RuntimeError::new(
1453            "JOSIE_E_HOST_MISSING",
1454            format!("missing typed host generate function '{}'", plan.fn_a),
1455        )
1456    })?;
1457    let map = hosts.get_map_i64(&plan.fn_b).ok_or_else(|| {
1458        RuntimeError::new(
1459            "JOSIE_E_HOST_MISSING",
1460            format!("missing typed host map function '{}'", plan.fn_b),
1461        )
1462    })?;
1463
1464    let a = gen_fn(plan.p1, plan.p2, plan.size);
1465    let mut b = Vec::with_capacity(a.len());
1466    for item in a {
1467        b.push(json!(map(item)));
1468    }
1469    Ok(Value::Array(b))
1470}
1471
1472/// Execute typed external map plan and return checksum + length without array
1473/// materialization.
1474fn execute_fast_external_map_metrics(
1475    plan: &FastExternalMapPlan,
1476    hosts: &HostFunctions,
1477) -> Result<(i64, usize), RuntimeError> {
1478    let gen_fn = hosts.get_generate_i64(&plan.fn_a).ok_or_else(|| {
1479        RuntimeError::new(
1480            "JOSIE_E_HOST_MISSING",
1481            format!("missing typed host generate function '{}'", plan.fn_a),
1482        )
1483    })?;
1484    let map = hosts.get_map_i64(&plan.fn_b).ok_or_else(|| {
1485        RuntimeError::new(
1486            "JOSIE_E_HOST_MISSING",
1487            format!("missing typed host map function '{}'", plan.fn_b),
1488        )
1489    })?;
1490    let a = gen_fn(plan.p1, plan.p2, plan.size);
1491    let mut checksum = 0i64;
1492    for item in a.iter().copied() {
1493        checksum = checksum.wrapping_add(map(item));
1494    }
1495    Ok((checksum, a.len()))
1496}
1497
1498fn execute_pipeline(
1499    pipe: &PipelineDoc,
1500    state: &mut State,
1501    operators: &Operators,
1502    hosts: &HostFunctions,
1503) -> Result<Value, RuntimeError> {
1504    let mut runtime = PipelineRuntime::default();
1505    for (idx, step) in pipe.steps.iter().enumerate() {
1506        if let Some(when_expr) = &step.when {
1507            let when_out = eval_dynamic(when_expr, state, operators, hosts, &mut runtime)?;
1508            if !truthy(&when_out) {
1509                continue;
1510            }
1511        }
1512        let value = execute_step_with_policy(step, state, operators, hosts, &mut runtime)
1513            .map_err(|err| err.step(idx, step.op.clone()))?;
1514        write_target(step.into.as_deref(), value, state, &mut runtime)?;
1515        if step.op == "return" {
1516            break;
1517        }
1518    }
1519    Ok(runtime.prev)
1520}
1521
1522fn execute_step_with_policy(
1523    step: &PipelineStep,
1524    state: &mut State,
1525    operators: &Operators,
1526    hosts: &HostFunctions,
1527    runtime: &mut PipelineRuntime,
1528) -> Result<Value, RuntimeError> {
1529    if let Some(key) = step.idempotency_key.as_ref()
1530        && let Some(cached) = runtime.idempotency.get(key)
1531    {
1532        return Ok(cached.clone());
1533    }
1534
1535    let timeout_ms = step.timeout_ms;
1536    let on_error = step.on_error.unwrap_or(StepOnError::Halt);
1537    let max_retries = step.max_retries.unwrap_or(0);
1538    let mut attempts = 0u32;
1539    loop {
1540        attempts = attempts.saturating_add(1);
1541        let started = Instant::now();
1542        let mut out = execute_step(step, state, operators, hosts, runtime);
1543        if let Some(limit_ms) = timeout_ms
1544            && started.elapsed().as_millis() as u64 > limit_ms
1545        {
1546            out = Err(RuntimeError::new(
1547                "JOSIE_E_STEP_TIMEOUT",
1548                format!("step exceeded timeout_ms={limit_ms}"),
1549            ));
1550        }
1551        match out {
1552            Ok(v) => {
1553                if let Some(key) = step.idempotency_key.as_ref() {
1554                    runtime.idempotency.insert(key.clone(), v.clone());
1555                }
1556                return Ok(v);
1557            }
1558            Err(err) => match on_error {
1559                StepOnError::Retry if attempts <= max_retries => continue,
1560                StepOnError::Fallback | StepOnError::Compensate => return Ok(Value::Null),
1561                StepOnError::Retry | StepOnError::Halt => return Err(err),
1562            },
1563        }
1564    }
1565}
1566
1567fn execute_step(
1568    step: &PipelineStep,
1569    state: &mut State,
1570    operators: &Operators,
1571    hosts: &HostFunctions,
1572    runtime: &mut PipelineRuntime,
1573) -> Result<Value, RuntimeError> {
1574    match step.op.as_str() {
1575        "call" => execute_call_step(step, state, operators, hosts, runtime),
1576        "set" => {
1577            let Some(value_expr) = step.args.first() else {
1578                return Err(RuntimeError::new(
1579                    "JOSIE_E_SET_ARGS",
1580                    "set step requires first arg as value expression",
1581                ));
1582            };
1583            eval_dynamic(value_expr, state, operators, hosts, runtime)
1584        }
1585        "get" => {
1586            let from = step
1587                .from
1588                .as_deref()
1589                .ok_or_else(|| RuntimeError::new("JOSIE_E_GET_FROM", "get step requires 'from'"))?;
1590            read_ref(from, state, runtime)
1591        }
1592        "map" => {
1593            let input = read_from_array(step, state, runtime)?;
1594            let mut out = Vec::with_capacity(input.len());
1595            let nested = parse_nested_step(step.do_expr.as_ref());
1596            for (index, item) in input.iter().enumerate() {
1597                let mapped = if let Some(nested_step) = nested.as_ref() {
1598                    eval_with_iter_locals_step(
1599                        nested_step,
1600                        item.clone(),
1601                        index,
1602                        None,
1603                        state,
1604                        operators,
1605                        hosts,
1606                        runtime,
1607                    )?
1608                } else {
1609                    eval_with_iter_locals_expr(
1610                        &step.do_expr,
1611                        item.clone(),
1612                        index,
1613                        None,
1614                        state,
1615                        operators,
1616                        hosts,
1617                        runtime,
1618                    )?
1619                };
1620                out.push(mapped);
1621            }
1622            Ok(Value::Array(out))
1623        }
1624        "filter" => {
1625            let input = read_from_array(step, state, runtime)?;
1626            let mut out = Vec::with_capacity(input.len());
1627            let nested = parse_nested_step(step.do_expr.as_ref());
1628            for (index, item) in input.iter().enumerate() {
1629                let matched = if let Some(nested_step) = nested.as_ref() {
1630                    eval_with_iter_locals_step(
1631                        nested_step,
1632                        item.clone(),
1633                        index,
1634                        None,
1635                        state,
1636                        operators,
1637                        hosts,
1638                        runtime,
1639                    )?
1640                } else {
1641                    eval_with_iter_locals_expr(
1642                        &step.do_expr,
1643                        item.clone(),
1644                        index,
1645                        None,
1646                        state,
1647                        operators,
1648                        hosts,
1649                        runtime,
1650                    )?
1651                };
1652                if truthy(&matched) {
1653                    out.push(item.clone());
1654                }
1655            }
1656            Ok(Value::Array(out))
1657        }
1658        "for_each" => {
1659            let input = read_from_array(step, state, runtime)?;
1660            let mut out = Vec::with_capacity(input.len());
1661            let mut last = Value::Null;
1662            let nested = parse_nested_step(step.do_expr.as_ref());
1663            for (index, item) in input.iter().enumerate() {
1664                let next = if let Some(nested_step) = nested.as_ref() {
1665                    eval_with_iter_locals_step(
1666                        nested_step,
1667                        item.clone(),
1668                        index,
1669                        None,
1670                        state,
1671                        operators,
1672                        hosts,
1673                        runtime,
1674                    )?
1675                } else {
1676                    eval_with_iter_locals_expr(
1677                        &step.do_expr,
1678                        item.clone(),
1679                        index,
1680                        None,
1681                        state,
1682                        operators,
1683                        hosts,
1684                        runtime,
1685                    )?
1686                };
1687                last = next.clone();
1688                out.push(next);
1689            }
1690            if step.into.is_some() {
1691                Ok(Value::Array(out))
1692            } else {
1693                Ok(last)
1694            }
1695        }
1696        "reduce" => {
1697            let input = read_from_array(step, state, runtime)?;
1698            let mut acc = if let Some(init_expr) = step.args.first() {
1699                eval_dynamic(init_expr, state, operators, hosts, runtime)?
1700            } else {
1701                Value::Null
1702            };
1703            let nested = parse_nested_step(step.do_expr.as_ref());
1704            for (index, item) in input.iter().enumerate() {
1705                acc = if let Some(nested_step) = nested.as_ref() {
1706                    eval_with_iter_locals_step(
1707                        nested_step,
1708                        item.clone(),
1709                        index,
1710                        Some(acc),
1711                        state,
1712                        operators,
1713                        hosts,
1714                        runtime,
1715                    )?
1716                } else {
1717                    eval_with_iter_locals_expr(
1718                        &step.do_expr,
1719                        item.clone(),
1720                        index,
1721                        Some(acc),
1722                        state,
1723                        operators,
1724                        hosts,
1725                        runtime,
1726                    )?
1727                };
1728            }
1729            Ok(acc)
1730        }
1731        "if" | "match" | "do" | "pipe" => {
1732            let mut tree = Vec::with_capacity(step.args.len() + 1);
1733            tree.push(Value::String(step.op.clone()));
1734            tree.extend(step.args.clone());
1735            eval_tree(&Value::Array(tree), state, operators)
1736        }
1737        "return" => {
1738            let from = step.from.as_deref().unwrap_or("$prev");
1739            read_ref(from, state, runtime)
1740        }
1741        other => Err(RuntimeError::new(
1742            "JOSIE_E_STEP_UNKNOWN_OP",
1743            format!("unsupported step op '{other}'"),
1744        )),
1745    }
1746}
1747
1748fn execute_call_step(
1749    step: &PipelineStep,
1750    state: &mut State,
1751    operators: &Operators,
1752    hosts: &HostFunctions,
1753    runtime: &mut PipelineRuntime,
1754) -> Result<Value, RuntimeError> {
1755    let fn_name = step
1756        .fn_name
1757        .as_deref()
1758        .or(step.from.as_deref())
1759        .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_FN", "call step requires fn/from"))?;
1760
1761    if let Some(host_generate) = hosts.get_generate_i64(fn_name) {
1762        if step.args.len() == 3 {
1763            let p1 = eval_dynamic(&step.args[0], state, operators, hosts, runtime)?
1764                .as_i64()
1765                .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "p1 must be i64"))?;
1766            let p2 = eval_dynamic(&step.args[1], state, operators, hosts, runtime)?
1767                .as_i64()
1768                .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "p2 must be i64"))?;
1769            let size = eval_dynamic(&step.args[2], state, operators, hosts, runtime)?
1770                .as_u64()
1771                .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "size must be u64"))?
1772                as usize;
1773            let out = host_generate(p1, p2, size);
1774            return Ok(Value::Array(out.into_iter().map(|v| json!(v)).collect()));
1775        }
1776    }
1777
1778    if let Some(host_map) = hosts.get_map_i64(fn_name) {
1779        if step.args.len() == 1 {
1780            let item = eval_dynamic(&step.args[0], state, operators, hosts, runtime)?
1781                .as_i64()
1782                .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "item must be i64"))?;
1783            return Ok(json!(host_map(item)));
1784        }
1785    }
1786
1787    if let Some(host_call) = hosts.get_call(fn_name) {
1788        let mut eval_args = Vec::with_capacity(step.args.len());
1789        for arg in &step.args {
1790            eval_args.push(eval_dynamic(arg, state, operators, hosts, runtime)?);
1791        }
1792        return host_call(&eval_args);
1793    }
1794
1795    let op_name = fn_name.strip_prefix("core.").unwrap_or(fn_name);
1796    let mut expr = Vec::with_capacity(step.args.len() + 1);
1797    expr.push(Value::String(op_name.to_string()));
1798    expr.extend(step.args.clone());
1799    eval_tree(&Value::Array(expr), state, operators)
1800}
1801
1802fn parse_nested_step(expr: Option<&Value>) -> Option<PipelineStep> {
1803    let expr = expr?;
1804    let obj = expr.as_object()?;
1805    if obj.get("op").and_then(|v| v.as_str()).is_none() {
1806        return None;
1807    }
1808    let step: PipelineStep = serde_json::from_value(expr.clone()).ok()?;
1809    if validate_step(&step).is_err() {
1810        return None;
1811    }
1812    Some(step)
1813}
1814
1815fn read_from_array(
1816    step: &PipelineStep,
1817    state: &State,
1818    runtime: &PipelineRuntime,
1819) -> Result<Vec<Value>, RuntimeError> {
1820    let from = step
1821        .from
1822        .as_deref()
1823        .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_FROM", "iterator step requires 'from'"))?;
1824    let source = read_ref(from, state, runtime)?;
1825    match source {
1826        Value::Array(arr) => Ok(arr),
1827        _ => Err(RuntimeError::new(
1828            "JOSIE_E_ITER_SOURCE",
1829            format!("{} input must resolve to array", step.op),
1830        )),
1831    }
1832}
1833
1834fn eval_with_iter_locals_expr(
1835    do_expr: &Option<Value>,
1836    item: Value,
1837    index: usize,
1838    acc: Option<Value>,
1839    state: &mut State,
1840    operators: &Operators,
1841    hosts: &HostFunctions,
1842    runtime: &mut PipelineRuntime,
1843) -> Result<Value, RuntimeError> {
1844    let Some(expr) = do_expr else {
1845        return Err(RuntimeError::new(
1846            "JOSIE_E_ITER_DO",
1847            "iterator step requires do expression",
1848        ));
1849    };
1850    let prev_item = state.client.insert("item".to_string(), item);
1851    let prev_index = state.client.insert("index".to_string(), json!(index));
1852    let prev_acc = acc.map(|acc_val| state.client.insert("acc".to_string(), acc_val));
1853
1854    let out = eval_dynamic(expr, state, operators, hosts, runtime);
1855
1856    restore_local(&mut state.client, "item", prev_item);
1857    restore_local(&mut state.client, "index", prev_index);
1858    if let Some(prev) = prev_acc {
1859        restore_local(&mut state.client, "acc", prev);
1860    } else {
1861        state.client.remove("acc");
1862    }
1863    out
1864}
1865
1866fn eval_with_iter_locals_step(
1867    step: &PipelineStep,
1868    item: Value,
1869    index: usize,
1870    acc: Option<Value>,
1871    state: &mut State,
1872    operators: &Operators,
1873    hosts: &HostFunctions,
1874    runtime: &mut PipelineRuntime,
1875) -> Result<Value, RuntimeError> {
1876    let prev_item = state.client.insert("item".to_string(), item);
1877    let prev_index = state.client.insert("index".to_string(), json!(index));
1878    let prev_acc = acc.map(|acc_val| state.client.insert("acc".to_string(), acc_val));
1879
1880    let out = execute_step(step, state, operators, hosts, runtime);
1881
1882    restore_local(&mut state.client, "item", prev_item);
1883    restore_local(&mut state.client, "index", prev_index);
1884    if let Some(prev) = prev_acc {
1885        restore_local(&mut state.client, "acc", prev);
1886    } else {
1887        state.client.remove("acc");
1888    }
1889    out
1890}
1891
1892fn eval_dynamic(
1893    expr: &Value,
1894    state: &mut State,
1895    operators: &Operators,
1896    hosts: &HostFunctions,
1897    runtime: &mut PipelineRuntime,
1898) -> Result<Value, RuntimeError> {
1899    if expr.is_array() {
1900        return eval_tree(expr, state, operators);
1901    }
1902    if let Some(obj) = expr.as_object()
1903        && obj.get("op").and_then(|v| v.as_str()).is_some()
1904    {
1905        let step: PipelineStep = serde_json::from_value(expr.clone()).map_err(|err| {
1906            RuntimeError::new(
1907                "JOSIE_E_STEP_PARSE",
1908                format!("invalid nested pipeline step: {err}"),
1909            )
1910        })?;
1911        validate_step(&step).map_err(|err| {
1912            RuntimeError::new(err.code, err.message).step_opt(err.step_index, err.op)
1913        })?;
1914        return execute_step(&step, state, operators, hosts, runtime);
1915    }
1916    if let Some(obj) = expr.as_object() {
1917        // Keep object-literal ergonomics consistent with tree evaluator:
1918        // nested arrays inside object fields are treated as dynamic expressions.
1919        let mut out = Map::with_capacity(obj.len());
1920        for (key, value) in obj {
1921            let next = eval_dynamic(value, state, operators, hosts, runtime)?;
1922            out.insert(key.clone(), next);
1923        }
1924        return Ok(Value::Object(out));
1925    }
1926    Ok(expr.clone())
1927}
1928
1929fn eval_tree(
1930    expr: &Value,
1931    state: &mut State,
1932    operators: &Operators,
1933) -> Result<Value, RuntimeError> {
1934    let mut ctx = Context {
1935        state,
1936        operators,
1937        event: None,
1938    };
1939    evaluate(expr, &mut ctx).map_err(RuntimeError::from)
1940}
1941
1942fn write_target(
1943    target: Option<&str>,
1944    value: Value,
1945    state: &mut State,
1946    runtime: &mut PipelineRuntime,
1947) -> Result<(), RuntimeError> {
1948    let target = target.unwrap_or("$prev");
1949    if target == "$prev" {
1950        runtime.prev = value;
1951        return Ok(());
1952    }
1953    if target.starts_with("client.") || target.starts_with("server.") {
1954        set_state_path(state, target, value.clone())?;
1955    } else {
1956        runtime.vars.insert(target.to_string(), value.clone());
1957    }
1958    runtime.prev = value;
1959    Ok(())
1960}
1961
1962fn read_ref(target: &str, state: &State, runtime: &PipelineRuntime) -> Result<Value, RuntimeError> {
1963    if target == "$prev" {
1964        return Ok(runtime.prev.clone());
1965    }
1966    if let Some(value) = runtime.vars.get(target) {
1967        return Ok(value.clone());
1968    }
1969    if target.starts_with("client.") || target.starts_with("server.") {
1970        return get_state_path(state, target).ok_or_else(|| {
1971            RuntimeError::new(
1972                "JOSIE_E_REF_NOT_FOUND",
1973                format!("missing state reference '{target}'"),
1974            )
1975        });
1976    }
1977    Err(RuntimeError::new(
1978        "JOSIE_E_REF_NOT_FOUND",
1979        format!("missing runtime reference '{target}'"),
1980    ))
1981}
1982
1983fn state_from_value(input: &Value) -> State {
1984    let mut state = State::new();
1985    if let Some(obj) = input.as_object() {
1986        if let Some(client) = obj.get("client").and_then(|v| v.as_object()) {
1987            state.client = client.clone();
1988        }
1989        if let Some(server) = obj.get("server").and_then(|v| v.as_object()) {
1990            state.server = server.clone();
1991        }
1992    }
1993    state
1994}
1995
1996fn get_state_path(state: &State, path: &str) -> Option<Value> {
1997    let (scope, rest) = path.split_once('.')?;
1998    let base = match scope {
1999        "client" => &state.client,
2000        "server" => &state.server,
2001        _ => return None,
2002    };
2003    get_from_map(base, rest)
2004}
2005
2006fn get_from_map(map: &Map<String, Value>, path: &str) -> Option<Value> {
2007    if path.is_empty() {
2008        return Some(Value::Object(map.clone()));
2009    }
2010    let mut current = map.get(path.split('.').next().unwrap_or_default())?;
2011    for part in path.split('.').skip(1) {
2012        match current {
2013            Value::Object(obj) => current = obj.get(part)?,
2014            Value::Array(arr) => {
2015                let idx = part.parse::<usize>().ok()?;
2016                current = arr.get(idx)?;
2017            }
2018            _ => return None,
2019        }
2020    }
2021    Some(current.clone())
2022}
2023
2024fn set_state_path(state: &mut State, path: &str, value: Value) -> Result<(), RuntimeError> {
2025    let (scope, rest) = path
2026        .split_once('.')
2027        .ok_or_else(|| RuntimeError::new("JOSIE_E_SET_PATH", "invalid set path"))?;
2028    let target_map = match scope {
2029        "client" => &mut state.client,
2030        "server" => &mut state.server,
2031        _ => {
2032            return Err(RuntimeError::new(
2033                "JOSIE_E_SET_PATH",
2034                format!("path scope must be client/server, got '{scope}'"),
2035            ));
2036        }
2037    };
2038    set_in_map(target_map, rest, value);
2039    Ok(())
2040}
2041
2042fn set_in_map(map: &mut Map<String, Value>, path: &str, value: Value) {
2043    let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
2044    if parts.is_empty() {
2045        return;
2046    }
2047
2048    let mut current = map;
2049    for part in &parts[..parts.len().saturating_sub(1)] {
2050        let entry = current
2051            .entry((*part).to_string())
2052            .or_insert_with(|| Value::Object(Map::new()));
2053        if !entry.is_object() {
2054            *entry = Value::Object(Map::new());
2055        }
2056        current = entry
2057            .as_object_mut()
2058            .expect("entry should be object after normalization");
2059    }
2060    let last = parts.last().expect("path has at least one part");
2061    current.insert((*last).to_string(), value);
2062}
2063
2064fn restore_local(map: &mut Map<String, Value>, key: &str, previous: Option<Value>) {
2065    if let Some(prev) = previous {
2066        map.insert(key.to_string(), prev);
2067    } else {
2068        map.remove(key);
2069    }
2070}
2071
2072fn truthy(v: &Value) -> bool {
2073    match v {
2074        Value::Null => false,
2075        Value::Bool(b) => *b,
2076        Value::Number(n) => n.as_f64().map(|n| n != 0.0).unwrap_or(false),
2077        Value::String(s) => !s.is_empty(),
2078        Value::Array(a) => !a.is_empty(),
2079        Value::Object(o) => !o.is_empty(),
2080    }
2081}
2082
2083fn is_tree_expression(value: &Value) -> bool {
2084    let Some(arr) = value.as_array() else {
2085        return false;
2086    };
2087    if arr.is_empty() {
2088        return false;
2089    }
2090    arr.first().and_then(|v| v.as_str()).is_some()
2091}
2092
2093trait RuntimeErrExt {
2094    fn step_opt(self, step_index: Option<usize>, op: Option<String>) -> Self;
2095}
2096
2097impl RuntimeErrExt for RuntimeError {
2098    fn step_opt(mut self, step_index: Option<usize>, op: Option<String>) -> Self {
2099        self.step_index = step_index;
2100        self.op = op;
2101        self
2102    }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107    use super::*;
2108    use crate::Operators;
2109    use serde_json::json;
2110    use std::sync::atomic::{AtomicUsize, Ordering};
2111
2112    static RETRY_COUNTER: AtomicUsize = AtomicUsize::new(0);
2113    static IDEMP_COUNTER: AtomicUsize = AtomicUsize::new(0);
2114
2115    fn op_ext_a(args: &[Value], _ctx: &mut Context) -> Result<Value, EvalError> {
2116        let p1 = args.first().and_then(|v| v.as_i64()).unwrap_or(0);
2117        let p2 = args.get(1).and_then(|v| v.as_i64()).unwrap_or(0);
2118        let size = args.get(2).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
2119        let mut out = Vec::with_capacity(size);
2120        for i in 0..size {
2121            out.push(json!(((i as i64 + p1) * p2) % 97));
2122        }
2123        Ok(Value::Array(out))
2124    }
2125
2126    fn op_ext_b(args: &[Value], _ctx: &mut Context) -> Result<Value, EvalError> {
2127        let x = args.first().and_then(|v| v.as_i64()).unwrap_or(0);
2128        Ok(json!((x * 3 + 1) % 101))
2129    }
2130
2131    fn host_gen(p1: i64, p2: i64, size: usize) -> Vec<i64> {
2132        let mut out = Vec::with_capacity(size);
2133        for i in 0..size {
2134            out.push(((i as i64 + p1) * p2) % 97);
2135        }
2136        out
2137    }
2138
2139    fn host_map(item: i64) -> i64 {
2140        (item * 3 + 1) % 101
2141    }
2142
2143    fn host_fail_once(_args: &[Value]) -> Result<Value, RuntimeError> {
2144        let n = RETRY_COUNTER.fetch_add(1, Ordering::SeqCst);
2145        if n == 0 {
2146            return Err(RuntimeError::new("JOSIE_E_TEST_FAIL", "fail once"));
2147        }
2148        Ok(json!("ok"))
2149    }
2150
2151    fn host_counter(_args: &[Value]) -> Result<Value, RuntimeError> {
2152        let n = IDEMP_COUNTER.fetch_add(1, Ordering::SeqCst) + 1;
2153        Ok(json!(n))
2154    }
2155
2156    #[test]
2157    fn parse_pipeline_ok() {
2158        let node = json!({
2159          "v": 2,
2160          "state": {"client": {}, "server": {}},
2161          "program": {
2162            "type": "pipeline",
2163            "steps": [
2164              {"op":"call", "fn":"x.a", "args":[1,2,3], "into":"result_a"},
2165              {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2166            ]
2167          }
2168        });
2169        let parsed = parse_program(&node);
2170        assert!(parsed.is_ok());
2171    }
2172
2173    #[test]
2174    fn parse_pipeline_accepts_run_hint() {
2175        let node = json!({
2176          "state": {"client": {}, "server": {}},
2177          "program": {
2178            "type": "pipeline",
2179            "steps": [
2180              {"op":"call", "fn":"x.a", "args":[1], "into":"out", "run_hint":"worker"}
2181            ]
2182          }
2183        });
2184        let parsed = parse_program(&node).expect("parse");
2185        let pipe: PipelineDoc = serde_json::from_value(parsed.program).expect("pipe");
2186        assert_eq!(pipe.steps[0].run_hint, Some(StepRunHint::Worker));
2187    }
2188
2189    #[test]
2190    fn parse_pipeline_missing_for_each_do_fails() {
2191        let node = json!({
2192          "v": 2,
2193          "program": {
2194            "type": "pipeline",
2195            "steps": [
2196              {"op":"for_each", "from":"items", "into":"out"}
2197            ]
2198          }
2199        });
2200        let err = parse_program(&node).expect_err("expected validation error");
2201        assert_eq!(err.code, "JOSIE_E_ITER_DO");
2202    }
2203
2204    #[test]
2205    fn parse_tree_ok() {
2206        let node = json!({
2207          "v": 2,
2208          "program": ["+", 1, 2]
2209        });
2210        let parsed = parse_program(&node);
2211        assert!(parsed.is_ok());
2212    }
2213
2214    #[test]
2215    fn execute_pipeline_with_registered_functions_and_for_each() {
2216        let node = json!({
2217          "v": 2,
2218          "state": {"client": {}, "server": {}},
2219          "program": {
2220            "type": "pipeline",
2221            "steps": [
2222              {"op":"call", "fn":"x.a", "args":[2,3,5], "into":"result_a"},
2223              {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2224            ]
2225          }
2226        });
2227        let program = parse_program(&node).expect("parse");
2228        let mut operators = Operators::new();
2229        operators.register("x.a", op_ext_a);
2230        operators.register("x.b", op_ext_b);
2231
2232        let out = execute_program(&program, &operators).expect("execute");
2233        assert!(out.value.is_array());
2234        assert_eq!(out.value.as_array().map(|arr| arr.len()), Some(5));
2235    }
2236
2237    #[test]
2238    fn execute_tree_with_util_namespace() {
2239        let node = json!({
2240          "v": 2,
2241          "program": ["util.str_len", ["util.to_string", 1234]]
2242        });
2243        let program = parse_program(&node).expect("parse");
2244        let operators = Operators::new();
2245        let out = execute_program(&program, &operators).expect("execute");
2246        assert_eq!(out.value, json!(4));
2247    }
2248
2249    #[test]
2250    fn fast_external_metrics_path_works() {
2251        let node = json!({
2252          "v": 2,
2253          "state": {"client": {}, "server": {}},
2254          "program": {
2255            "type": "pipeline",
2256            "steps": [
2257              {"op":"call", "fn":"x.a", "args":[2,3,5], "into":"result_a"},
2258              {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2259            ]
2260          }
2261        });
2262        let program = parse_program(&node).expect("parse");
2263        let compiled = compile_program(&program).expect("compile");
2264        let mut hosts = HostFunctions::new();
2265        hosts.register_generate_i64("x.a", host_gen);
2266        hosts.register_map_i64("x.b", host_map);
2267        let operators = Operators::new();
2268        let mut state = State::new();
2269        let (checksum, len) =
2270            execute_compiled_program_external_metrics(&compiled, &mut state, &operators, &hosts)
2271                .expect("fast metrics");
2272        assert_eq!(len, 5);
2273        assert!(checksum > 0);
2274    }
2275
2276    #[test]
2277    fn execute_pipeline_retry_policy_works() {
2278        RETRY_COUNTER.store(0, Ordering::SeqCst);
2279        let node = json!({
2280          "state": {"client": {}, "server": {}},
2281          "program": {
2282            "type": "pipeline",
2283            "steps": [
2284              {
2285                "id": "retry-step",
2286                "type": "action",
2287                "op":"call",
2288                "fn":"x.fail_once",
2289                "on_error":"retry",
2290                "max_retries": 1,
2291                "into":"r"
2292              },
2293              {"op":"return", "from":"r"}
2294            ]
2295          }
2296        });
2297        let program = parse_program(&node).expect("parse");
2298        let mut hosts = HostFunctions::new();
2299        hosts.register_call("x.fail_once", host_fail_once);
2300        let operators = Operators::new();
2301        let out = execute_program_with_hosts(&program, &operators, &hosts).expect("execute");
2302        assert_eq!(out.value, json!("ok"));
2303        assert_eq!(RETRY_COUNTER.load(Ordering::SeqCst), 2);
2304    }
2305
2306    #[test]
2307    fn execute_pipeline_idempotency_key_skips_duplicate_step() {
2308        IDEMP_COUNTER.store(0, Ordering::SeqCst);
2309        let node = json!({
2310          "state": {"client": {}, "server": {}},
2311          "program": {
2312            "type": "pipeline",
2313            "steps": [
2314              {"op":"call", "fn":"x.counter", "idempotency_key":"job-42", "into":"a"},
2315              {"op":"call", "fn":"x.counter", "idempotency_key":"job-42", "into":"b"},
2316              {"op":"return", "from":"b"}
2317            ]
2318          }
2319        });
2320        let program = parse_program(&node).expect("parse");
2321        let mut hosts = HostFunctions::new();
2322        hosts.register_call("x.counter", host_counter);
2323        let operators = Operators::new();
2324        let out = execute_program_with_hosts(&program, &operators, &hosts).expect("execute");
2325        assert_eq!(out.value, json!(1));
2326        assert_eq!(IDEMP_COUNTER.load(Ordering::SeqCst), 1);
2327    }
2328
2329    #[test]
2330    fn set_step_evaluates_nested_object_expressions() {
2331        let node = json!({
2332          "state": {"client": {}, "server": {"input": {"title": "Hello", "slug": "hello"}}},
2333          "program": {
2334            "type": "pipeline",
2335            "steps": [
2336              {"op":"set","into":"row","args":[
2337                {"title":["var","server.input.title"],"slug":["var","server.input.slug"]}
2338              ]},
2339              {"op":"return","from":"row"}
2340            ]
2341          }
2342        });
2343        let program = parse_program(&node).expect("parse");
2344        let operators = Operators::new();
2345        let out = execute_program(&program, &operators).expect("execute");
2346        assert_eq!(out.value, json!({"title":"Hello","slug":"hello"}));
2347    }
2348}