1use crate::compiler::{Expr, compile_expr};
46use crate::jval::JVal;
47use crate::vm::{IterLocals, eval_expr};
48use crate::{Context, EvalError, Operators, State, evaluate};
49use serde::{Deserialize, Serialize};
50use serde_json::{Map, Value, json};
51use std::collections::HashMap;
52use std::rc::Rc;
53use std::time::Instant;
54
55pub type HostCallFn = fn(&[Value]) -> Result<Value, RuntimeError>;
57pub type HostGenerateI64Fn = fn(i64, i64, usize) -> Vec<i64>;
59pub type HostMapI64Fn = fn(i64) -> i64;
61
62#[derive(Debug, Clone, Default)]
68pub struct HostFunctions {
69 call: HashMap<String, HostCallFn>,
70 generate_i64: HashMap<String, HostGenerateI64Fn>,
71 map_i64: HashMap<String, HostMapI64Fn>,
72}
73
74impl HostFunctions {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn register_call(&mut self, name: impl Into<String>, func: HostCallFn) {
82 self.call.insert(name.into(), func);
83 }
84
85 pub fn register_generate_i64(&mut self, name: impl Into<String>, func: HostGenerateI64Fn) {
87 self.generate_i64.insert(name.into(), func);
88 }
89
90 pub fn register_map_i64(&mut self, name: impl Into<String>, func: HostMapI64Fn) {
92 self.map_i64.insert(name.into(), func);
93 }
94
95 fn get_call(&self, name: &str) -> Option<HostCallFn> {
96 self.call.get(name).copied()
97 }
98
99 fn get_generate_i64(&self, name: &str) -> Option<HostGenerateI64Fn> {
100 self.generate_i64.get(name).copied()
101 }
102
103 fn get_map_i64(&self, name: &str) -> Option<HostMapI64Fn> {
104 self.map_i64.get(name).copied()
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct Program {
111 #[serde(default)]
112 pub state: Value,
113 pub program: Value,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct PipelineDoc {
119 #[serde(rename = "type")]
120 pub doc_type: String,
121 pub steps: Vec<PipelineStep>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct PipelineStep {
127 #[serde(default)]
128 pub id: Option<String>,
129 #[serde(rename = "type", default)]
130 pub step_type: Option<StepType>,
131 pub op: String,
132 #[serde(default)]
133 pub from: Option<String>,
134 #[serde(default)]
135 pub into: Option<String>,
136 #[serde(rename = "fn", default)]
137 pub fn_name: Option<String>,
138 #[serde(default)]
139 pub args: Vec<Value>,
140 #[serde(rename = "do", default)]
141 pub do_expr: Option<Value>,
142 #[serde(default)]
143 pub when: Option<Value>,
144 #[serde(default)]
145 pub run_hint: Option<StepRunHint>,
146 #[serde(default)]
147 pub input: Option<Value>,
148 #[serde(default)]
149 pub output: Option<Value>,
150 #[serde(default)]
151 pub on_error: Option<StepOnError>,
152 #[serde(default)]
153 pub timeout_ms: Option<u64>,
154 #[serde(default)]
155 pub max_retries: Option<u32>,
156 #[serde(default)]
157 pub idempotency_key: Option<String>,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub enum StepType {
163 Action,
164 Decision,
165 Transform,
166 Tool,
167 Checkpoint,
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
171#[serde(rename_all = "snake_case")]
172pub enum StepOnError {
173 Retry,
174 Fallback,
175 Halt,
176 Compensate,
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
180#[serde(rename_all = "snake_case")]
181pub enum StepRunHint {
182 Inline,
183 Worker,
184}
185
186#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188pub struct ValidationError {
189 pub code: String,
190 pub message: String,
191 pub step_index: Option<usize>,
192 pub op: Option<String>,
193}
194
195impl ValidationError {
196 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
197 Self {
198 code: code.into(),
199 message: message.into(),
200 step_index: None,
201 op: None,
202 }
203 }
204
205 fn step(mut self, step_index: usize, op: impl Into<String>) -> Self {
206 self.step_index = Some(step_index);
207 self.op = Some(op.into());
208 self
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct RuntimeError {
215 pub code: String,
216 pub message: String,
217 pub step_index: Option<usize>,
218 pub op: Option<String>,
219}
220
221impl RuntimeError {
222 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
223 Self {
224 code: code.into(),
225 message: message.into(),
226 step_index: None,
227 op: None,
228 }
229 }
230
231 fn step(mut self, step_index: usize, op: impl Into<String>) -> Self {
232 self.step_index = Some(step_index);
233 self.op = Some(op.into());
234 self
235 }
236}
237
238impl From<EvalError> for RuntimeError {
239 fn from(value: EvalError) -> Self {
240 Self::new("JOSIE_E_EVAL", value.message)
241 }
242}
243
244#[derive(Debug, Clone)]
245pub struct FastExternalMapPlan {
246 fn_a: String,
247 fn_b: String,
248 p1: i64,
249 p2: i64,
250 size: usize,
251}
252
253#[derive(Debug, Clone)]
254pub enum FastInternalKind {
255 MathLoop,
256 MathFilter { with_t: bool },
257 Branching,
258 BooleanChain,
259 TemplateLike,
260 MapReduce { p1: i64 },
261 MatchSwitch,
262 MixedWorkflow,
263}
264
265#[derive(Debug, Clone)]
270pub struct FastInternalPlan {
271 size: usize,
272 kind: FastInternalKind,
273}
274
275#[derive(Debug, Clone)]
277pub struct CompiledPipelineStep {
278 pub step: PipelineStep,
279 pub compiled_do: Option<Expr>,
280 pub compiled_when: Option<Expr>,
281 pub compiled_args: Vec<Expr>,
282}
283
284#[derive(Debug, Clone)]
285pub enum CompiledProgramBody {
286 Tree(Value),
287 Pipeline(PipelineDoc),
288 FastExternalMapI64(FastExternalMapPlan),
289 FastInternal(FastInternalPlan),
290 CompiledPipeline(Vec<CompiledPipelineStep>),
292}
293
294#[derive(Debug, Clone)]
298pub struct CompiledProgram {
299 pub initial_state: Value,
300 pub body: CompiledProgramBody,
301}
302
303#[derive(Debug, Clone)]
305pub struct ExecutionOutput {
306 pub value: Value,
307 pub state: State,
308}
309
310#[derive(Debug, Clone)]
311struct PipelineRuntime {
312 vars: HashMap<String, Value>,
313 prev: Value,
314 idempotency: HashMap<String, Value>,
315}
316
317impl Default for PipelineRuntime {
318 fn default() -> Self {
319 Self {
320 vars: HashMap::new(),
321 prev: Value::Null,
322 idempotency: HashMap::new(),
323 }
324 }
325}
326
327pub fn parse_program(input: &Value) -> Result<Program, ValidationError> {
328 let program: Program = serde_json::from_value(input.clone()).map_err(|err| {
329 ValidationError::new("JOSIE_E_PARSE", format!("invalid program document: {err}"))
330 })?;
331 validate_program(&program)?;
332 Ok(program)
333}
334
335pub fn compile_program(program: &Program) -> Result<CompiledProgram, ValidationError> {
347 validate_program(program)?;
348 let body = if is_tree_expression(&program.program) {
349 CompiledProgramBody::Tree(program.program.clone())
350 } else {
351 let pipe: PipelineDoc = serde_json::from_value(program.program.clone()).map_err(|err| {
352 ValidationError::new(
353 "JOSIE_E_PIPE_PARSE",
354 format!("invalid pipeline document: {err}"),
355 )
356 })?;
357 if pipe.steps.iter().any(has_step_execution_policy) {
359 CompiledProgramBody::CompiledPipeline(compile_pipeline_steps(&pipe))
360 } else if let Some(plan) = try_compile_fast_internal(&pipe) {
361 CompiledProgramBody::FastInternal(plan)
362 } else if let Some(plan) = try_compile_fast_external_map(&pipe) {
363 CompiledProgramBody::FastExternalMapI64(plan)
364 } else {
365 CompiledProgramBody::CompiledPipeline(compile_pipeline_steps(&pipe))
366 }
367 };
368 Ok(CompiledProgram {
369 initial_state: program.state.clone(),
370 body,
371 })
372}
373
374fn compile_pipeline_steps(pipe: &PipelineDoc) -> Vec<CompiledPipelineStep> {
379 pipe.steps
380 .iter()
381 .map(|step| {
382 let is_reduce = step.op == "reduce";
383 let is_iter = matches!(step.op.as_str(), "map" | "filter" | "for_each" | "reduce");
384
385 let compiled_do = step
386 .do_expr
387 .as_ref()
388 .map(|expr| compile_do_expr(expr, is_iter, is_reduce));
389 let compiled_when = step
390 .when
391 .as_ref()
392 .map(|expr| compile_expr(expr, false, false));
393 let compiled_args = step
394 .args
395 .iter()
396 .map(|a| compile_expr(a, false, false))
397 .collect();
398
399 CompiledPipelineStep {
400 step: step.clone(),
401 compiled_do,
402 compiled_when,
403 compiled_args,
404 }
405 })
406 .collect()
407}
408
409fn compile_do_expr(expr: &Value, iter_ctx: bool, reduce_ctx: bool) -> Expr {
412 if let Some(obj) = expr.as_object() {
413 if let Some(op) = obj.get("op").and_then(|v| v.as_str()) {
414 if op == "call" {
415 let fn_name = obj
416 .get("fn")
417 .and_then(|v| v.as_str())
418 .or_else(|| obj.get("from").and_then(|v| v.as_str()));
419 if let Some(fn_name) = fn_name {
420 let args: Vec<Expr> = obj
421 .get("args")
422 .and_then(|v| v.as_array())
423 .map(|arr| {
424 arr.iter()
425 .map(|a| compile_expr(a, iter_ctx, reduce_ctx))
426 .collect()
427 })
428 .unwrap_or_default();
429 return Expr::Call(Rc::from(fn_name), args);
430 }
431 }
432 }
433 }
434 compile_expr(expr, iter_ctx, reduce_ctx)
435}
436
437pub fn execute_program(
438 program: &Program,
439 operators: &Operators,
440) -> Result<ExecutionOutput, RuntimeError> {
441 let hosts = HostFunctions::default();
442 execute_program_with_hosts(program, operators, &hosts)
443}
444
445pub fn execute_program_with_hosts(
451 program: &Program,
452 operators: &Operators,
453 hosts: &HostFunctions,
454) -> Result<ExecutionOutput, RuntimeError> {
455 let compiled = compile_program(program)
456 .map_err(|err| RuntimeError::new(err.code, err.message).step_opt(err.step_index, err.op))?;
457 let mut state = state_from_value(&compiled.initial_state);
458 let value = execute_compiled_program_with_hosts(&compiled, &mut state, operators, hosts)?;
459 Ok(ExecutionOutput { value, state })
460}
461
462pub fn execute_compiled_program(
463 compiled: &CompiledProgram,
464 state: &mut State,
465 operators: &Operators,
466) -> Result<Value, RuntimeError> {
467 let hosts = HostFunctions::default();
468 execute_compiled_program_with_hosts(compiled, state, operators, &hosts)
469}
470
471pub fn execute_compiled_program_with_hosts(
476 compiled: &CompiledProgram,
477 state: &mut State,
478 operators: &Operators,
479 hosts: &HostFunctions,
480) -> Result<Value, RuntimeError> {
481 match &compiled.body {
482 CompiledProgramBody::Tree(expr) => eval_tree(expr, state, operators),
483 CompiledProgramBody::Pipeline(pipe) => execute_pipeline(pipe, state, operators, hosts),
484 CompiledProgramBody::FastExternalMapI64(plan) => {
485 if hosts.get_generate_i64(&plan.fn_a).is_some()
486 && hosts.get_map_i64(&plan.fn_b).is_some()
487 {
488 execute_fast_external_map(plan, hosts)
489 } else {
490 let fallback = plan_to_pipeline_doc(plan);
491 execute_pipeline(&fallback, state, operators, hosts)
492 }
493 }
494 CompiledProgramBody::FastInternal(plan) => execute_fast_internal(plan, state),
495 CompiledProgramBody::CompiledPipeline(steps) => {
496 execute_compiled_pipeline(steps, state, operators, hosts)
497 }
498 }
499}
500
501pub fn execute_compiled_program_external_metrics(
506 compiled: &CompiledProgram,
507 state: &mut State,
508 operators: &Operators,
509 hosts: &HostFunctions,
510) -> Result<(i64, usize), RuntimeError> {
511 match &compiled.body {
512 CompiledProgramBody::FastExternalMapI64(plan) => {
513 execute_fast_external_map_metrics(plan, hosts)
514 }
515 _ => {
516 let value = execute_compiled_program_with_hosts(compiled, state, operators, hosts)?;
517 let arr = value.as_array().cloned().ok_or_else(|| {
518 RuntimeError::new(
519 "JOSIE_E_EXTERNAL_RESULT",
520 "expected array result for external metrics fallback",
521 )
522 })?;
523 let checksum = arr
524 .iter()
525 .filter_map(|v| v.as_i64())
526 .fold(0i64, |acc, v| acc.wrapping_add(v));
527 Ok((checksum, arr.len()))
528 }
529 }
530}
531
532fn execute_compiled_pipeline(
535 steps: &[CompiledPipelineStep],
536 state: &mut State,
537 operators: &Operators,
538 hosts: &HostFunctions,
539) -> Result<Value, RuntimeError> {
540 let mut runtime = PipelineRuntime::default();
541
542 for (idx, cs) in steps.iter().enumerate() {
543 if let Some(when_expr) = &cs.compiled_when {
545 let empty = IterLocals::empty();
546 let when_val =
547 eval_expr(when_expr, &empty, state, operators).map_err(RuntimeError::from)?;
548 if !when_val.is_truthy() {
549 continue;
550 }
551 }
552
553 let value = execute_compiled_step_with_policy(cs, state, operators, hosts, &mut runtime)
554 .map_err(|err| err.step(idx, cs.step.op.clone()))?;
555 write_target(cs.step.into.as_deref(), value, state, &mut runtime)?;
556 if cs.step.op == "return" {
557 break;
558 }
559 }
560 Ok(runtime.prev)
561}
562
563fn execute_compiled_step_with_policy(
564 cs: &CompiledPipelineStep,
565 state: &mut State,
566 operators: &Operators,
567 hosts: &HostFunctions,
568 runtime: &mut PipelineRuntime,
569) -> Result<Value, RuntimeError> {
570 if let Some(key) = cs.step.idempotency_key.as_ref()
571 && let Some(cached) = runtime.idempotency.get(key)
572 {
573 return Ok(cached.clone());
574 }
575
576 let timeout_ms = cs.step.timeout_ms;
577 let on_error = cs.step.on_error.unwrap_or(StepOnError::Halt);
578 let max_retries = cs.step.max_retries.unwrap_or(0);
579 let mut attempts = 0u32;
580 loop {
581 attempts = attempts.saturating_add(1);
582 let started = Instant::now();
583 let mut out = execute_compiled_step(cs, state, operators, hosts, runtime);
584 if let Some(limit_ms) = timeout_ms
585 && started.elapsed().as_millis() as u64 > limit_ms
586 {
587 out = Err(RuntimeError::new(
588 "JOSIE_E_STEP_TIMEOUT",
589 format!("step exceeded timeout_ms={limit_ms}"),
590 ));
591 }
592
593 match out {
594 Ok(v) => {
595 if let Some(key) = cs.step.idempotency_key.as_ref() {
596 runtime.idempotency.insert(key.clone(), v.clone());
597 }
598 return Ok(v);
599 }
600 Err(err) => match on_error {
601 StepOnError::Retry if attempts <= max_retries => continue,
602 StepOnError::Fallback | StepOnError::Compensate => return Ok(Value::Null),
603 StepOnError::Retry | StepOnError::Halt => return Err(err),
604 },
605 }
606 }
607}
608
609fn execute_compiled_step(
610 cs: &CompiledPipelineStep,
611 state: &mut State,
612 operators: &Operators,
613 hosts: &HostFunctions,
614 runtime: &mut PipelineRuntime,
615) -> Result<Value, RuntimeError> {
616 let step = &cs.step;
617
618 match step.op.as_str() {
619 "set" => {
620 if let Some(arg0) = step.args.first()
625 && arg0.is_object()
626 {
627 return eval_dynamic(arg0, state, operators, hosts, runtime);
628 }
629 let expr = cs
630 .compiled_args
631 .first()
632 .ok_or_else(|| RuntimeError::new("JOSIE_E_SET_ARGS", "set step requires args"))?;
633 let empty = IterLocals::empty();
634 let jval = eval_expr(expr, &empty, state, operators).map_err(RuntimeError::from)?;
635 Ok(Value::from(jval))
636 }
637
638 "get" => {
639 let from = step
640 .from
641 .as_deref()
642 .ok_or_else(|| RuntimeError::new("JOSIE_E_GET_FROM", "get step requires 'from'"))?;
643 read_ref(from, state, runtime)
644 }
645
646 "call" => execute_call_step(step, state, operators, hosts, runtime),
647
648 "map" => {
649 let input = read_from_array(step, state, runtime)?;
650 let compiled_do = cs
651 .compiled_do
652 .as_ref()
653 .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "map step requires 'do'"))?;
654 let mut out = Vec::with_capacity(input.len());
655 for (index, item) in input.iter().enumerate() {
656 let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
657 let result = eval_expr(compiled_do, &locals, state, operators)
658 .map_err(RuntimeError::from)?;
659 out.push(Value::from(result));
660 }
661 Ok(Value::Array(out))
662 }
663
664 "filter" => {
665 let input = read_from_array(step, state, runtime)?;
666 let compiled_do = cs
667 .compiled_do
668 .as_ref()
669 .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "filter step requires 'do'"))?;
670 let mut out = Vec::new();
671 for (index, item) in input.iter().enumerate() {
672 let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
673 let result = eval_expr(compiled_do, &locals, state, operators)
674 .map_err(RuntimeError::from)?;
675 if result.is_truthy() {
676 out.push(item.clone());
677 }
678 }
679 Ok(Value::Array(out))
680 }
681
682 "for_each" => {
683 let input = read_from_array(step, state, runtime)?;
684 let compiled_do = cs.compiled_do.as_ref().ok_or_else(|| {
685 RuntimeError::new("JOSIE_E_ITER_DO", "for_each step requires 'do'")
686 })?;
687 let mut out = Vec::with_capacity(input.len());
688 let mut last = Value::Null;
689
690 if let Expr::Call(fn_name, _) = compiled_do {
692 if let Some(host_map) = hosts.get_map_i64(fn_name) {
693 for item in &input {
694 if let Some(n) = item.as_i64() {
695 let mapped = json!(host_map(n));
696 last = mapped.clone();
697 out.push(mapped);
698 }
699 }
700 if step.into.is_some() {
701 return Ok(Value::Array(out));
702 }
703 return Ok(last);
704 }
705 }
706
707 for (index, item) in input.iter().enumerate() {
708 let locals = IterLocals::new(JVal::from(item.clone()), index as i64);
709 let result = eval_expr(compiled_do, &locals, state, operators)
710 .map_err(RuntimeError::from)?;
711 let val = Value::from(result);
712 last = val.clone();
713 out.push(val);
714 }
715 if step.into.is_some() {
716 Ok(Value::Array(out))
717 } else {
718 Ok(last)
719 }
720 }
721
722 "reduce" => {
723 let input = read_from_array(step, state, runtime)?;
724 let compiled_do = cs
725 .compiled_do
726 .as_ref()
727 .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_DO", "reduce step requires 'do'"))?;
728
729 let mut acc = if let Some(init_expr) = cs.compiled_args.first() {
730 let empty = IterLocals::empty();
731 eval_expr(init_expr, &empty, state, operators).map_err(RuntimeError::from)?
732 } else {
733 JVal::Null
734 };
735
736 for (index, item) in input.iter().enumerate() {
737 let locals = IterLocals::with_acc(JVal::from(item.clone()), index as i64, acc);
738 acc = eval_expr(compiled_do, &locals, state, operators)
739 .map_err(RuntimeError::from)?;
740 }
741 Ok(Value::from(acc))
742 }
743
744 "if" | "match" | "do" | "pipe" => {
746 let mut tree = Vec::with_capacity(step.args.len() + 1);
747 tree.push(Value::String(step.op.clone()));
748 tree.extend(step.args.clone());
749 eval_tree(&Value::Array(tree), state, operators)
750 }
751 "return" => {
752 let from = step.from.as_deref().unwrap_or("$prev");
753 read_ref(from, state, runtime)
754 }
755
756 other => Err(RuntimeError::new(
757 "JOSIE_E_STEP_UNKNOWN_OP",
758 format!("unsupported step op '{other}'"),
759 )),
760 }
761}
762
763pub fn validate_program(program: &Program) -> Result<(), ValidationError> {
764 if is_tree_expression(&program.program) {
765 return Ok(());
766 }
767
768 if let Some(obj) = program.program.as_object() {
769 if obj.get("type").and_then(|v| v.as_str()) == Some("pipeline") {
770 let pipe: PipelineDoc =
771 serde_json::from_value(program.program.clone()).map_err(|err| {
772 ValidationError::new(
773 "JOSIE_E_PIPE_PARSE",
774 format!("invalid pipeline document: {err}"),
775 )
776 })?;
777 return validate_pipeline(&pipe);
778 }
779 }
780
781 Err(ValidationError::new(
782 "JOSIE_E_PROGRAM_INVALID",
783 "program must be a tree expression array or pipeline object",
784 ))
785}
786
787pub fn validate_pipeline(pipe: &PipelineDoc) -> Result<(), ValidationError> {
788 if pipe.doc_type != "pipeline" {
789 return Err(ValidationError::new(
790 "JOSIE_E_PIPE_TYPE",
791 format!(
792 "pipeline type must be 'pipeline' but got '{}'",
793 pipe.doc_type
794 ),
795 ));
796 }
797 if pipe.steps.is_empty() {
798 return Err(ValidationError::new(
799 "JOSIE_E_PIPE_EMPTY",
800 "pipeline steps must not be empty",
801 ));
802 }
803
804 for (idx, step) in pipe.steps.iter().enumerate() {
805 validate_step(step).map_err(|err| err.step(idx, step.op.clone()))?;
806 }
807 Ok(())
808}
809
810fn validate_step(step: &PipelineStep) -> Result<(), ValidationError> {
811 if let Some(id) = step.id.as_deref()
812 && id.trim().is_empty()
813 {
814 return Err(ValidationError::new(
815 "JOSIE_E_STEP_ID",
816 "step id must not be empty",
817 ));
818 }
819 if let Some(input_schema) = step.input.as_ref()
820 && !input_schema.is_object()
821 {
822 return Err(ValidationError::new(
823 "JOSIE_E_STEP_INPUT_SCHEMA",
824 "step input schema must be an object",
825 ));
826 }
827 if let Some(output_schema) = step.output.as_ref()
828 && !output_schema.is_object()
829 {
830 return Err(ValidationError::new(
831 "JOSIE_E_STEP_OUTPUT_SCHEMA",
832 "step output schema must be an object",
833 ));
834 }
835 if let Some(idempotency_key) = step.idempotency_key.as_deref()
836 && idempotency_key.trim().is_empty()
837 {
838 return Err(ValidationError::new(
839 "JOSIE_E_STEP_IDEMPOTENCY",
840 "step idempotency_key must not be empty",
841 ));
842 }
843
844 if step.op.trim().is_empty() {
845 return Err(ValidationError::new(
846 "JOSIE_E_STEP_OP",
847 "step op must not be empty",
848 ));
849 }
850
851 let known_op = matches!(
852 step.op.as_str(),
853 "call"
854 | "set"
855 | "get"
856 | "map"
857 | "filter"
858 | "for_each"
859 | "reduce"
860 | "if"
861 | "match"
862 | "do"
863 | "pipe"
864 | "return"
865 );
866
867 if !known_op {
868 return Err(ValidationError::new(
869 "JOSIE_E_STEP_UNKNOWN_OP",
870 format!("unknown step op '{}'", step.op),
871 ));
872 }
873
874 match step.op.as_str() {
875 "call" => {
876 let call_target = step
877 .fn_name
878 .as_deref()
879 .or(step.from.as_deref())
880 .unwrap_or_default();
881 if call_target.is_empty() {
882 return Err(ValidationError::new(
883 "JOSIE_E_CALL_FN",
884 "call step requires non-empty fn/from",
885 ));
886 }
887 }
888 "map" | "filter" | "for_each" | "reduce" => {
889 if step.from.as_deref().unwrap_or_default().is_empty() {
890 return Err(ValidationError::new(
891 "JOSIE_E_ITER_FROM",
892 format!("{} step requires 'from'", step.op),
893 ));
894 }
895 if step.do_expr.is_none() {
896 return Err(ValidationError::new(
897 "JOSIE_E_ITER_DO",
898 format!("{} step requires 'do'", step.op),
899 ));
900 }
901 }
902 "set" => {
903 if step.into.as_deref().unwrap_or_default().is_empty() {
904 return Err(ValidationError::new(
905 "JOSIE_E_SET_INTO",
906 "set step requires 'into'",
907 ));
908 }
909 if step.args.is_empty() {
910 return Err(ValidationError::new(
911 "JOSIE_E_SET_ARGS",
912 "set step requires args",
913 ));
914 }
915 }
916 "return" => {
917 if let Some(from) = step.from.as_deref()
918 && from.trim().is_empty()
919 {
920 return Err(ValidationError::new(
921 "JOSIE_E_RETURN_FROM",
922 "return step 'from' must not be empty",
923 ));
924 }
925 }
926 _ => {}
927 }
928
929 Ok(())
930}
931
932fn has_step_execution_policy(step: &PipelineStep) -> bool {
933 step.id.is_some()
934 || step.step_type.is_some()
935 || step.run_hint.is_some()
936 || step.input.is_some()
937 || step.output.is_some()
938 || step.on_error.is_some()
939 || step.timeout_ms.is_some()
940 || step.max_retries.is_some()
941 || step.idempotency_key.is_some()
942}
943
944fn try_compile_fast_external_map(pipe: &PipelineDoc) -> Option<FastExternalMapPlan> {
951 if pipe.steps.len() != 2 {
952 return None;
953 }
954 let s0 = &pipe.steps[0];
955 let s1 = &pipe.steps[1];
956 if s0.op != "call" || s1.op != "for_each" {
957 return None;
958 }
959 if s0.when.is_some() || s1.when.is_some() {
960 return None;
961 }
962 let fn_a = s0.fn_name.as_ref().or(s0.from.as_ref())?.to_string();
963 let result_a_name = s0.into.as_ref()?.to_string();
964 if s1.from.as_deref()? != result_a_name {
965 return None;
966 }
967 let do_expr = s1.do_expr.as_ref()?;
968 let do_obj = do_expr.as_object()?;
969 if do_obj.get("op").and_then(|v| v.as_str())? != "call" {
970 return None;
971 }
972 let fn_b = do_obj
973 .get("fn")
974 .and_then(|v| v.as_str())
975 .or_else(|| do_obj.get("from").and_then(|v| v.as_str()))?
976 .to_string();
977 let args = do_obj.get("args").and_then(|v| v.as_array())?;
978 if args.len() != 1 {
979 return None;
980 }
981 let arg0 = args.first()?;
982 let is_item = arg0.as_array().is_some_and(|a| {
983 a.len() == 2
984 && a.first().and_then(|v| v.as_str()) == Some("var")
985 && a.get(1).and_then(|v| v.as_str()) == Some("item")
986 });
987 if !is_item {
988 return None;
989 }
990 if s0.args.len() != 3 {
991 return None;
992 }
993 let p1 = s0.args[0].as_i64()?;
994 let p2 = s0.args[1].as_i64()?;
995 let size = s0.args[2].as_u64()? as usize;
996 Some(FastExternalMapPlan {
997 fn_a,
998 fn_b,
999 p1,
1000 p2,
1001 size,
1002 })
1003}
1004
1005fn try_compile_fast_internal(pipe: &PipelineDoc) -> Option<FastInternalPlan> {
1013 let s0 = pipe.steps.first()?;
1014 let size = parse_set_nums_step(s0)?;
1015
1016 if pipe.steps.len() == 3
1018 && is_map_math_step(pipe.steps.get(1)?)
1019 && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1020 {
1021 return Some(FastInternalPlan {
1022 size,
1023 kind: FastInternalKind::MathLoop,
1024 });
1025 }
1026
1027 if pipe.steps.len() == 4
1029 && is_map_math_step(pipe.steps.get(1)?)
1030 && is_filter_hot_step(pipe.steps.get(2)?)
1031 && is_reduce_sum_score_step(pipe.steps.get(3)?, "hot")
1032 {
1033 return Some(FastInternalPlan {
1034 size,
1035 kind: FastInternalKind::MathFilter { with_t: false },
1036 });
1037 }
1038
1039 if pipe.steps.len() == 5
1041 && is_map_math_step(pipe.steps.get(1)?)
1042 && is_filter_hot_step(pipe.steps.get(2)?)
1043 && is_for_each_t_len_step(pipe.steps.get(3)?)
1044 && is_reduce_sum_score_step(pipe.steps.get(4)?, "lens")
1045 {
1046 return Some(FastInternalPlan {
1047 size,
1048 kind: FastInternalKind::MathFilter { with_t: true },
1049 });
1050 }
1051
1052 if pipe.steps.len() == 3
1054 && is_map_branching_step(pipe.steps.get(1)?)
1055 && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1056 {
1057 return Some(FastInternalPlan {
1058 size,
1059 kind: FastInternalKind::Branching,
1060 });
1061 }
1062
1063 if pipe.steps.len() == 3
1065 && is_map_boolean_chain_step(pipe.steps.get(1)?)
1066 && is_reduce_sum_score_step(pipe.steps.get(2)?, "flags")
1067 {
1068 return Some(FastInternalPlan {
1069 size,
1070 kind: FastInternalKind::BooleanChain,
1071 });
1072 }
1073
1074 if pipe.steps.len() == 3
1076 && is_for_each_template_len_step(pipe.steps.get(1)?)
1077 && is_reduce_sum_score_step(pipe.steps.get(2)?, "lens")
1078 {
1079 return Some(FastInternalPlan {
1080 size,
1081 kind: FastInternalKind::TemplateLike,
1082 });
1083 }
1084
1085 if pipe.steps.len() == 3
1087 && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1088 && let Some(p1) = parse_map_reduce_p1_step(pipe.steps.get(1)?)
1089 {
1090 return Some(FastInternalPlan {
1091 size,
1092 kind: FastInternalKind::MapReduce { p1 },
1093 });
1094 }
1095
1096 if pipe.steps.len() == 3
1098 && is_map_match_step(pipe.steps.get(1)?)
1099 && is_reduce_sum_score_step(pipe.steps.get(2)?, "mapped")
1100 {
1101 return Some(FastInternalPlan {
1102 size,
1103 kind: FastInternalKind::MatchSwitch,
1104 });
1105 }
1106
1107 if pipe.steps.len() == 5
1109 && is_map_math_step(pipe.steps.get(1)?)
1110 && is_filter_mid_step(pipe.steps.get(2)?)
1111 && is_for_each_mixed_step(pipe.steps.get(3)?)
1112 && is_reduce_sum_score_step(pipe.steps.get(4)?, "lens")
1113 {
1114 return Some(FastInternalPlan {
1115 size,
1116 kind: FastInternalKind::MixedWorkflow,
1117 });
1118 }
1119
1120 None
1121}
1122
1123fn parse_set_nums_step(step: &PipelineStep) -> Option<usize> {
1124 if step.op != "set" || step.into.as_deref() != Some("nums") || step.args.len() != 1 {
1125 return None;
1126 }
1127 let arr = step.args.first()?.as_array()?;
1128 if arr.is_empty() {
1129 return Some(0);
1130 }
1131 for (idx, item) in arr.iter().enumerate() {
1132 let expected = (idx + 1) as i64;
1133 if item.as_i64()? != expected {
1134 return None;
1135 }
1136 }
1137 Some(arr.len())
1138}
1139
1140fn is_map_math_step(step: &PipelineStep) -> bool {
1141 step.op == "map"
1142 && step.from.as_deref() == Some("nums")
1143 && step.into.as_deref() == Some("mapped")
1144 && step.do_expr
1145 == Some(json!([
1146 "%",
1147 [
1148 "+",
1149 ["*", ["var", "item"], ["var", "item"]],
1150 ["%", ["var", "index"], 7]
1151 ],
1152 997
1153 ]))
1154}
1155
1156fn is_filter_hot_step(step: &PipelineStep) -> bool {
1157 step.op == "filter"
1158 && step.from.as_deref() == Some("mapped")
1159 && step.into.as_deref() == Some("hot")
1160 && step.do_expr
1161 == Some(json!([
1162 "&&",
1163 [">", ["var", "item"], 40],
1164 ["<", ["var", "item"], 800]
1165 ]))
1166}
1167
1168fn is_reduce_sum_score_step(step: &PipelineStep, from: &str) -> bool {
1169 step.op == "reduce"
1170 && step.from.as_deref() == Some(from)
1171 && step.into.as_deref() == Some("score")
1172 && step.args == vec![json!(0)]
1173 && step.do_expr == Some(json!(["+", ["var", "acc"], ["var", "item"]]))
1174}
1175
1176fn is_for_each_t_len_step(step: &PipelineStep) -> bool {
1177 step.op == "for_each"
1178 && step.from.as_deref() == Some("hot")
1179 && step.into.as_deref() == Some("lens")
1180 && step.do_expr
1181 == Some(json!([
1182 "util.to_int",
1183 [
1184 "util.to_string",
1185 [
1186 "util.str_len",
1187 [
1188 "util.trim",
1189 ["util.concat", " v", ["util.to_string", ["var", "item"]], " "]
1190 ]
1191 ]
1192 ]
1193 ]))
1194}
1195
1196fn is_map_branching_step(step: &PipelineStep) -> bool {
1197 step.op == "map"
1198 && step.from.as_deref() == Some("nums")
1199 && step.into.as_deref() == Some("mapped")
1200 && step.do_expr
1201 == Some(json!([
1202 "if",
1203 ["==", ["%", ["var", "item"], 2], 0],
1204 ["/", ["var", "item"], 2],
1205 ["%", ["+", ["*", ["var", "item"], 3], 1], 1000]
1206 ]))
1207}
1208
1209fn is_map_boolean_chain_step(step: &PipelineStep) -> bool {
1210 step.op == "map"
1211 && step.from.as_deref() == Some("nums")
1212 && step.into.as_deref() == Some("flags")
1213 && step.do_expr
1214 == Some(json!([
1215 "if",
1216 [
1217 "||",
1218 [
1219 "&&",
1220 ["==", ["%", ["var", "item"], 2], 0],
1221 ["!=", ["%", ["var", "item"], 3], 0]
1222 ],
1223 ["==", ["%", ["var", "item"], 5], 0]
1224 ],
1225 1,
1226 0
1227 ]))
1228}
1229
1230fn is_for_each_template_len_step(step: &PipelineStep) -> bool {
1231 step.op == "for_each"
1232 && step.from.as_deref() == Some("nums")
1233 && step.into.as_deref() == Some("lens")
1234 && step.do_expr
1235 == Some(json!([
1236 "util.str_len",
1237 [
1238 "util.concat",
1239 "ID-",
1240 ["util.to_string", ["var", "item"]],
1241 "-",
1242 ["util.to_string", ["%", ["var", "item"], 7]]
1243 ]
1244 ]))
1245}
1246
1247fn parse_map_reduce_p1_step(step: &PipelineStep) -> Option<i64> {
1248 if step.op != "map"
1249 || step.from.as_deref() != Some("nums")
1250 || step.into.as_deref() != Some("mapped")
1251 {
1252 return None;
1253 }
1254 let expr = step.do_expr.as_ref()?.as_array()?;
1255 if expr.len() != 3 || expr.first()?.as_str()? != "%" || expr.get(2)?.as_i64()? != 1009 {
1256 return None;
1257 }
1258 let mul = expr.get(1)?.as_array()?;
1259 if mul.len() != 3 || mul.first()?.as_str()? != "*" {
1260 return None;
1261 }
1262 let add1 = mul.get(1)?.as_array()?;
1263 let add2 = mul.get(2)?.as_array()?;
1264 let p1a = parse_add_item_const(add1)?;
1265 let p1b = parse_add_item_const(add2)?;
1266 if p1a == p1b { Some(p1a) } else { None }
1267}
1268
1269fn parse_add_item_const(add_expr: &[Value]) -> Option<i64> {
1270 if add_expr.len() != 3 || add_expr.first()?.as_str()? != "+" {
1271 return None;
1272 }
1273 let lhs = add_expr.get(1)?.as_array()?;
1274 if lhs.len() != 2 || lhs.first()?.as_str()? != "var" || lhs.get(1)?.as_str()? != "item" {
1275 return None;
1276 }
1277 add_expr.get(2)?.as_i64()
1278}
1279
1280fn is_map_match_step(step: &PipelineStep) -> bool {
1281 step.op == "map"
1282 && step.from.as_deref() == Some("nums")
1283 && step.into.as_deref() == Some("mapped")
1284 && step.do_expr
1285 == Some(json!([
1286 "match",
1287 ["%", ["var", "item"], 4],
1288 0,
1289 1,
1290 1,
1291 2,
1292 2,
1293 3,
1294 "_",
1295 4
1296 ]))
1297}
1298
1299fn is_filter_mid_step(step: &PipelineStep) -> bool {
1300 step.op == "filter"
1301 && step.from.as_deref() == Some("mapped")
1302 && step.into.as_deref() == Some("hot")
1303 && step.do_expr
1304 == Some(json!([
1305 "&&",
1306 [">", ["var", "item"], 100],
1307 ["<", ["var", "item"], 900]
1308 ]))
1309}
1310
1311fn is_for_each_mixed_step(step: &PipelineStep) -> bool {
1312 step.op == "for_each"
1313 && step.from.as_deref() == Some("hot")
1314 && step.into.as_deref() == Some("lens")
1315 && step.do_expr
1316 == Some(json!([
1317 "+",
1318 [
1319 "util.str_len",
1320 ["util.concat", "x", ["util.to_string", ["var", "item"]]]
1321 ],
1322 ["if", [">", ["var", "item"], 200], 5, 1]
1323 ]))
1324}
1325
1326fn plan_to_pipeline_doc(plan: &FastExternalMapPlan) -> PipelineDoc {
1327 PipelineDoc {
1328 doc_type: "pipeline".to_string(),
1329 steps: vec![
1330 PipelineStep {
1331 id: None,
1332 step_type: None,
1333 op: "call".to_string(),
1334 from: None,
1335 into: Some("result_a".to_string()),
1336 fn_name: Some(plan.fn_a.clone()),
1337 args: vec![json!(plan.p1), json!(plan.p2), json!(plan.size)],
1338 do_expr: None,
1339 when: None,
1340 run_hint: None,
1341 input: None,
1342 output: None,
1343 on_error: None,
1344 timeout_ms: None,
1345 max_retries: None,
1346 idempotency_key: None,
1347 },
1348 PipelineStep {
1349 id: None,
1350 step_type: None,
1351 op: "for_each".to_string(),
1352 from: Some("result_a".to_string()),
1353 into: Some("result_b".to_string()),
1354 fn_name: None,
1355 args: vec![],
1356 do_expr: Some(json!({
1357 "op": "call",
1358 "fn": plan.fn_b,
1359 "args": [["var","item"]]
1360 })),
1361 when: None,
1362 run_hint: None,
1363 input: None,
1364 output: None,
1365 on_error: None,
1366 timeout_ms: None,
1367 max_retries: None,
1368 idempotency_key: None,
1369 },
1370 ],
1371 }
1372}
1373
1374fn execute_fast_internal(
1378 plan: &FastInternalPlan,
1379 state: &mut State,
1380) -> Result<Value, RuntimeError> {
1381 let mut score = 0i64;
1382 for index in 0..plan.size {
1383 let item = (index + 1) as i64;
1384 let idx = index as i64;
1385 match &plan.kind {
1386 FastInternalKind::MathLoop => {
1387 let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1388 score = score.wrapping_add(mapped);
1389 }
1390 FastInternalKind::MathFilter { with_t } => {
1391 let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1392 if mapped > 40 && mapped < 800 {
1393 if *with_t {
1394 let l = format!("v{mapped}").chars().count() as i64;
1395 score = score.wrapping_add(l);
1396 } else {
1397 score = score.wrapping_add(mapped);
1398 }
1399 }
1400 }
1401 FastInternalKind::Branching => {
1402 let v = if item % 2 == 0 {
1403 item / 2
1404 } else {
1405 ((item * 3) + 1).rem_euclid(1000)
1406 };
1407 score = score.wrapping_add(v);
1408 }
1409 FastInternalKind::BooleanChain => {
1410 if (item % 2 == 0 && item % 3 != 0) || item % 5 == 0 {
1411 score = score.wrapping_add(1);
1412 }
1413 }
1414 FastInternalKind::TemplateLike => {
1415 let len = format!("ID-{item}-{}", item.rem_euclid(7)).chars().count() as i64;
1416 score = score.wrapping_add(len);
1417 }
1418 FastInternalKind::MapReduce { p1 } => {
1419 let y = item + *p1;
1420 score = score.wrapping_add((y * y).rem_euclid(1009));
1421 }
1422 FastInternalKind::MatchSwitch => {
1423 let m = item.rem_euclid(4);
1424 let v = match m {
1425 0 => 1,
1426 1 => 2,
1427 2 => 3,
1428 _ => 4,
1429 };
1430 score = score.wrapping_add(v);
1431 }
1432 FastInternalKind::MixedWorkflow => {
1433 let mapped = ((item * item) + (idx % 7)).rem_euclid(997);
1434 if mapped > 100 && mapped < 900 {
1435 let extra = if mapped > 200 { 5 } else { 1 };
1436 let len = format!("x{mapped}").chars().count() as i64;
1437 score = score.wrapping_add(len + extra);
1438 }
1439 }
1440 }
1441 }
1442 state.client.insert("score".to_string(), json!(score));
1443 Ok(json!(score))
1444}
1445
1446fn execute_fast_external_map(
1448 plan: &FastExternalMapPlan,
1449 hosts: &HostFunctions,
1450) -> Result<Value, RuntimeError> {
1451 let gen_fn = hosts.get_generate_i64(&plan.fn_a).ok_or_else(|| {
1452 RuntimeError::new(
1453 "JOSIE_E_HOST_MISSING",
1454 format!("missing typed host generate function '{}'", plan.fn_a),
1455 )
1456 })?;
1457 let map = hosts.get_map_i64(&plan.fn_b).ok_or_else(|| {
1458 RuntimeError::new(
1459 "JOSIE_E_HOST_MISSING",
1460 format!("missing typed host map function '{}'", plan.fn_b),
1461 )
1462 })?;
1463
1464 let a = gen_fn(plan.p1, plan.p2, plan.size);
1465 let mut b = Vec::with_capacity(a.len());
1466 for item in a {
1467 b.push(json!(map(item)));
1468 }
1469 Ok(Value::Array(b))
1470}
1471
1472fn execute_fast_external_map_metrics(
1475 plan: &FastExternalMapPlan,
1476 hosts: &HostFunctions,
1477) -> Result<(i64, usize), RuntimeError> {
1478 let gen_fn = hosts.get_generate_i64(&plan.fn_a).ok_or_else(|| {
1479 RuntimeError::new(
1480 "JOSIE_E_HOST_MISSING",
1481 format!("missing typed host generate function '{}'", plan.fn_a),
1482 )
1483 })?;
1484 let map = hosts.get_map_i64(&plan.fn_b).ok_or_else(|| {
1485 RuntimeError::new(
1486 "JOSIE_E_HOST_MISSING",
1487 format!("missing typed host map function '{}'", plan.fn_b),
1488 )
1489 })?;
1490 let a = gen_fn(plan.p1, plan.p2, plan.size);
1491 let mut checksum = 0i64;
1492 for item in a.iter().copied() {
1493 checksum = checksum.wrapping_add(map(item));
1494 }
1495 Ok((checksum, a.len()))
1496}
1497
1498fn execute_pipeline(
1499 pipe: &PipelineDoc,
1500 state: &mut State,
1501 operators: &Operators,
1502 hosts: &HostFunctions,
1503) -> Result<Value, RuntimeError> {
1504 let mut runtime = PipelineRuntime::default();
1505 for (idx, step) in pipe.steps.iter().enumerate() {
1506 if let Some(when_expr) = &step.when {
1507 let when_out = eval_dynamic(when_expr, state, operators, hosts, &mut runtime)?;
1508 if !truthy(&when_out) {
1509 continue;
1510 }
1511 }
1512 let value = execute_step_with_policy(step, state, operators, hosts, &mut runtime)
1513 .map_err(|err| err.step(idx, step.op.clone()))?;
1514 write_target(step.into.as_deref(), value, state, &mut runtime)?;
1515 if step.op == "return" {
1516 break;
1517 }
1518 }
1519 Ok(runtime.prev)
1520}
1521
1522fn execute_step_with_policy(
1523 step: &PipelineStep,
1524 state: &mut State,
1525 operators: &Operators,
1526 hosts: &HostFunctions,
1527 runtime: &mut PipelineRuntime,
1528) -> Result<Value, RuntimeError> {
1529 if let Some(key) = step.idempotency_key.as_ref()
1530 && let Some(cached) = runtime.idempotency.get(key)
1531 {
1532 return Ok(cached.clone());
1533 }
1534
1535 let timeout_ms = step.timeout_ms;
1536 let on_error = step.on_error.unwrap_or(StepOnError::Halt);
1537 let max_retries = step.max_retries.unwrap_or(0);
1538 let mut attempts = 0u32;
1539 loop {
1540 attempts = attempts.saturating_add(1);
1541 let started = Instant::now();
1542 let mut out = execute_step(step, state, operators, hosts, runtime);
1543 if let Some(limit_ms) = timeout_ms
1544 && started.elapsed().as_millis() as u64 > limit_ms
1545 {
1546 out = Err(RuntimeError::new(
1547 "JOSIE_E_STEP_TIMEOUT",
1548 format!("step exceeded timeout_ms={limit_ms}"),
1549 ));
1550 }
1551 match out {
1552 Ok(v) => {
1553 if let Some(key) = step.idempotency_key.as_ref() {
1554 runtime.idempotency.insert(key.clone(), v.clone());
1555 }
1556 return Ok(v);
1557 }
1558 Err(err) => match on_error {
1559 StepOnError::Retry if attempts <= max_retries => continue,
1560 StepOnError::Fallback | StepOnError::Compensate => return Ok(Value::Null),
1561 StepOnError::Retry | StepOnError::Halt => return Err(err),
1562 },
1563 }
1564 }
1565}
1566
1567fn execute_step(
1568 step: &PipelineStep,
1569 state: &mut State,
1570 operators: &Operators,
1571 hosts: &HostFunctions,
1572 runtime: &mut PipelineRuntime,
1573) -> Result<Value, RuntimeError> {
1574 match step.op.as_str() {
1575 "call" => execute_call_step(step, state, operators, hosts, runtime),
1576 "set" => {
1577 let Some(value_expr) = step.args.first() else {
1578 return Err(RuntimeError::new(
1579 "JOSIE_E_SET_ARGS",
1580 "set step requires first arg as value expression",
1581 ));
1582 };
1583 eval_dynamic(value_expr, state, operators, hosts, runtime)
1584 }
1585 "get" => {
1586 let from = step
1587 .from
1588 .as_deref()
1589 .ok_or_else(|| RuntimeError::new("JOSIE_E_GET_FROM", "get step requires 'from'"))?;
1590 read_ref(from, state, runtime)
1591 }
1592 "map" => {
1593 let input = read_from_array(step, state, runtime)?;
1594 let mut out = Vec::with_capacity(input.len());
1595 let nested = parse_nested_step(step.do_expr.as_ref());
1596 for (index, item) in input.iter().enumerate() {
1597 let mapped = if let Some(nested_step) = nested.as_ref() {
1598 eval_with_iter_locals_step(
1599 nested_step,
1600 item.clone(),
1601 index,
1602 None,
1603 state,
1604 operators,
1605 hosts,
1606 runtime,
1607 )?
1608 } else {
1609 eval_with_iter_locals_expr(
1610 &step.do_expr,
1611 item.clone(),
1612 index,
1613 None,
1614 state,
1615 operators,
1616 hosts,
1617 runtime,
1618 )?
1619 };
1620 out.push(mapped);
1621 }
1622 Ok(Value::Array(out))
1623 }
1624 "filter" => {
1625 let input = read_from_array(step, state, runtime)?;
1626 let mut out = Vec::with_capacity(input.len());
1627 let nested = parse_nested_step(step.do_expr.as_ref());
1628 for (index, item) in input.iter().enumerate() {
1629 let matched = if let Some(nested_step) = nested.as_ref() {
1630 eval_with_iter_locals_step(
1631 nested_step,
1632 item.clone(),
1633 index,
1634 None,
1635 state,
1636 operators,
1637 hosts,
1638 runtime,
1639 )?
1640 } else {
1641 eval_with_iter_locals_expr(
1642 &step.do_expr,
1643 item.clone(),
1644 index,
1645 None,
1646 state,
1647 operators,
1648 hosts,
1649 runtime,
1650 )?
1651 };
1652 if truthy(&matched) {
1653 out.push(item.clone());
1654 }
1655 }
1656 Ok(Value::Array(out))
1657 }
1658 "for_each" => {
1659 let input = read_from_array(step, state, runtime)?;
1660 let mut out = Vec::with_capacity(input.len());
1661 let mut last = Value::Null;
1662 let nested = parse_nested_step(step.do_expr.as_ref());
1663 for (index, item) in input.iter().enumerate() {
1664 let next = if let Some(nested_step) = nested.as_ref() {
1665 eval_with_iter_locals_step(
1666 nested_step,
1667 item.clone(),
1668 index,
1669 None,
1670 state,
1671 operators,
1672 hosts,
1673 runtime,
1674 )?
1675 } else {
1676 eval_with_iter_locals_expr(
1677 &step.do_expr,
1678 item.clone(),
1679 index,
1680 None,
1681 state,
1682 operators,
1683 hosts,
1684 runtime,
1685 )?
1686 };
1687 last = next.clone();
1688 out.push(next);
1689 }
1690 if step.into.is_some() {
1691 Ok(Value::Array(out))
1692 } else {
1693 Ok(last)
1694 }
1695 }
1696 "reduce" => {
1697 let input = read_from_array(step, state, runtime)?;
1698 let mut acc = if let Some(init_expr) = step.args.first() {
1699 eval_dynamic(init_expr, state, operators, hosts, runtime)?
1700 } else {
1701 Value::Null
1702 };
1703 let nested = parse_nested_step(step.do_expr.as_ref());
1704 for (index, item) in input.iter().enumerate() {
1705 acc = if let Some(nested_step) = nested.as_ref() {
1706 eval_with_iter_locals_step(
1707 nested_step,
1708 item.clone(),
1709 index,
1710 Some(acc),
1711 state,
1712 operators,
1713 hosts,
1714 runtime,
1715 )?
1716 } else {
1717 eval_with_iter_locals_expr(
1718 &step.do_expr,
1719 item.clone(),
1720 index,
1721 Some(acc),
1722 state,
1723 operators,
1724 hosts,
1725 runtime,
1726 )?
1727 };
1728 }
1729 Ok(acc)
1730 }
1731 "if" | "match" | "do" | "pipe" => {
1732 let mut tree = Vec::with_capacity(step.args.len() + 1);
1733 tree.push(Value::String(step.op.clone()));
1734 tree.extend(step.args.clone());
1735 eval_tree(&Value::Array(tree), state, operators)
1736 }
1737 "return" => {
1738 let from = step.from.as_deref().unwrap_or("$prev");
1739 read_ref(from, state, runtime)
1740 }
1741 other => Err(RuntimeError::new(
1742 "JOSIE_E_STEP_UNKNOWN_OP",
1743 format!("unsupported step op '{other}'"),
1744 )),
1745 }
1746}
1747
1748fn execute_call_step(
1749 step: &PipelineStep,
1750 state: &mut State,
1751 operators: &Operators,
1752 hosts: &HostFunctions,
1753 runtime: &mut PipelineRuntime,
1754) -> Result<Value, RuntimeError> {
1755 let fn_name = step
1756 .fn_name
1757 .as_deref()
1758 .or(step.from.as_deref())
1759 .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_FN", "call step requires fn/from"))?;
1760
1761 if let Some(host_generate) = hosts.get_generate_i64(fn_name) {
1762 if step.args.len() == 3 {
1763 let p1 = eval_dynamic(&step.args[0], state, operators, hosts, runtime)?
1764 .as_i64()
1765 .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "p1 must be i64"))?;
1766 let p2 = eval_dynamic(&step.args[1], state, operators, hosts, runtime)?
1767 .as_i64()
1768 .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "p2 must be i64"))?;
1769 let size = eval_dynamic(&step.args[2], state, operators, hosts, runtime)?
1770 .as_u64()
1771 .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "size must be u64"))?
1772 as usize;
1773 let out = host_generate(p1, p2, size);
1774 return Ok(Value::Array(out.into_iter().map(|v| json!(v)).collect()));
1775 }
1776 }
1777
1778 if let Some(host_map) = hosts.get_map_i64(fn_name) {
1779 if step.args.len() == 1 {
1780 let item = eval_dynamic(&step.args[0], state, operators, hosts, runtime)?
1781 .as_i64()
1782 .ok_or_else(|| RuntimeError::new("JOSIE_E_CALL_ARG", "item must be i64"))?;
1783 return Ok(json!(host_map(item)));
1784 }
1785 }
1786
1787 if let Some(host_call) = hosts.get_call(fn_name) {
1788 let mut eval_args = Vec::with_capacity(step.args.len());
1789 for arg in &step.args {
1790 eval_args.push(eval_dynamic(arg, state, operators, hosts, runtime)?);
1791 }
1792 return host_call(&eval_args);
1793 }
1794
1795 let op_name = fn_name.strip_prefix("core.").unwrap_or(fn_name);
1796 let mut expr = Vec::with_capacity(step.args.len() + 1);
1797 expr.push(Value::String(op_name.to_string()));
1798 expr.extend(step.args.clone());
1799 eval_tree(&Value::Array(expr), state, operators)
1800}
1801
1802fn parse_nested_step(expr: Option<&Value>) -> Option<PipelineStep> {
1803 let expr = expr?;
1804 let obj = expr.as_object()?;
1805 if obj.get("op").and_then(|v| v.as_str()).is_none() {
1806 return None;
1807 }
1808 let step: PipelineStep = serde_json::from_value(expr.clone()).ok()?;
1809 if validate_step(&step).is_err() {
1810 return None;
1811 }
1812 Some(step)
1813}
1814
1815fn read_from_array(
1816 step: &PipelineStep,
1817 state: &State,
1818 runtime: &PipelineRuntime,
1819) -> Result<Vec<Value>, RuntimeError> {
1820 let from = step
1821 .from
1822 .as_deref()
1823 .ok_or_else(|| RuntimeError::new("JOSIE_E_ITER_FROM", "iterator step requires 'from'"))?;
1824 let source = read_ref(from, state, runtime)?;
1825 match source {
1826 Value::Array(arr) => Ok(arr),
1827 _ => Err(RuntimeError::new(
1828 "JOSIE_E_ITER_SOURCE",
1829 format!("{} input must resolve to array", step.op),
1830 )),
1831 }
1832}
1833
1834fn eval_with_iter_locals_expr(
1835 do_expr: &Option<Value>,
1836 item: Value,
1837 index: usize,
1838 acc: Option<Value>,
1839 state: &mut State,
1840 operators: &Operators,
1841 hosts: &HostFunctions,
1842 runtime: &mut PipelineRuntime,
1843) -> Result<Value, RuntimeError> {
1844 let Some(expr) = do_expr else {
1845 return Err(RuntimeError::new(
1846 "JOSIE_E_ITER_DO",
1847 "iterator step requires do expression",
1848 ));
1849 };
1850 let prev_item = state.client.insert("item".to_string(), item);
1851 let prev_index = state.client.insert("index".to_string(), json!(index));
1852 let prev_acc = acc.map(|acc_val| state.client.insert("acc".to_string(), acc_val));
1853
1854 let out = eval_dynamic(expr, state, operators, hosts, runtime);
1855
1856 restore_local(&mut state.client, "item", prev_item);
1857 restore_local(&mut state.client, "index", prev_index);
1858 if let Some(prev) = prev_acc {
1859 restore_local(&mut state.client, "acc", prev);
1860 } else {
1861 state.client.remove("acc");
1862 }
1863 out
1864}
1865
1866fn eval_with_iter_locals_step(
1867 step: &PipelineStep,
1868 item: Value,
1869 index: usize,
1870 acc: Option<Value>,
1871 state: &mut State,
1872 operators: &Operators,
1873 hosts: &HostFunctions,
1874 runtime: &mut PipelineRuntime,
1875) -> Result<Value, RuntimeError> {
1876 let prev_item = state.client.insert("item".to_string(), item);
1877 let prev_index = state.client.insert("index".to_string(), json!(index));
1878 let prev_acc = acc.map(|acc_val| state.client.insert("acc".to_string(), acc_val));
1879
1880 let out = execute_step(step, state, operators, hosts, runtime);
1881
1882 restore_local(&mut state.client, "item", prev_item);
1883 restore_local(&mut state.client, "index", prev_index);
1884 if let Some(prev) = prev_acc {
1885 restore_local(&mut state.client, "acc", prev);
1886 } else {
1887 state.client.remove("acc");
1888 }
1889 out
1890}
1891
1892fn eval_dynamic(
1893 expr: &Value,
1894 state: &mut State,
1895 operators: &Operators,
1896 hosts: &HostFunctions,
1897 runtime: &mut PipelineRuntime,
1898) -> Result<Value, RuntimeError> {
1899 if expr.is_array() {
1900 return eval_tree(expr, state, operators);
1901 }
1902 if let Some(obj) = expr.as_object()
1903 && obj.get("op").and_then(|v| v.as_str()).is_some()
1904 {
1905 let step: PipelineStep = serde_json::from_value(expr.clone()).map_err(|err| {
1906 RuntimeError::new(
1907 "JOSIE_E_STEP_PARSE",
1908 format!("invalid nested pipeline step: {err}"),
1909 )
1910 })?;
1911 validate_step(&step).map_err(|err| {
1912 RuntimeError::new(err.code, err.message).step_opt(err.step_index, err.op)
1913 })?;
1914 return execute_step(&step, state, operators, hosts, runtime);
1915 }
1916 if let Some(obj) = expr.as_object() {
1917 let mut out = Map::with_capacity(obj.len());
1920 for (key, value) in obj {
1921 let next = eval_dynamic(value, state, operators, hosts, runtime)?;
1922 out.insert(key.clone(), next);
1923 }
1924 return Ok(Value::Object(out));
1925 }
1926 Ok(expr.clone())
1927}
1928
1929fn eval_tree(
1930 expr: &Value,
1931 state: &mut State,
1932 operators: &Operators,
1933) -> Result<Value, RuntimeError> {
1934 let mut ctx = Context {
1935 state,
1936 operators,
1937 event: None,
1938 };
1939 evaluate(expr, &mut ctx).map_err(RuntimeError::from)
1940}
1941
1942fn write_target(
1943 target: Option<&str>,
1944 value: Value,
1945 state: &mut State,
1946 runtime: &mut PipelineRuntime,
1947) -> Result<(), RuntimeError> {
1948 let target = target.unwrap_or("$prev");
1949 if target == "$prev" {
1950 runtime.prev = value;
1951 return Ok(());
1952 }
1953 if target.starts_with("client.") || target.starts_with("server.") {
1954 set_state_path(state, target, value.clone())?;
1955 } else {
1956 runtime.vars.insert(target.to_string(), value.clone());
1957 }
1958 runtime.prev = value;
1959 Ok(())
1960}
1961
1962fn read_ref(target: &str, state: &State, runtime: &PipelineRuntime) -> Result<Value, RuntimeError> {
1963 if target == "$prev" {
1964 return Ok(runtime.prev.clone());
1965 }
1966 if let Some(value) = runtime.vars.get(target) {
1967 return Ok(value.clone());
1968 }
1969 if target.starts_with("client.") || target.starts_with("server.") {
1970 return get_state_path(state, target).ok_or_else(|| {
1971 RuntimeError::new(
1972 "JOSIE_E_REF_NOT_FOUND",
1973 format!("missing state reference '{target}'"),
1974 )
1975 });
1976 }
1977 Err(RuntimeError::new(
1978 "JOSIE_E_REF_NOT_FOUND",
1979 format!("missing runtime reference '{target}'"),
1980 ))
1981}
1982
1983fn state_from_value(input: &Value) -> State {
1984 let mut state = State::new();
1985 if let Some(obj) = input.as_object() {
1986 if let Some(client) = obj.get("client").and_then(|v| v.as_object()) {
1987 state.client = client.clone();
1988 }
1989 if let Some(server) = obj.get("server").and_then(|v| v.as_object()) {
1990 state.server = server.clone();
1991 }
1992 }
1993 state
1994}
1995
1996fn get_state_path(state: &State, path: &str) -> Option<Value> {
1997 let (scope, rest) = path.split_once('.')?;
1998 let base = match scope {
1999 "client" => &state.client,
2000 "server" => &state.server,
2001 _ => return None,
2002 };
2003 get_from_map(base, rest)
2004}
2005
2006fn get_from_map(map: &Map<String, Value>, path: &str) -> Option<Value> {
2007 if path.is_empty() {
2008 return Some(Value::Object(map.clone()));
2009 }
2010 let mut current = map.get(path.split('.').next().unwrap_or_default())?;
2011 for part in path.split('.').skip(1) {
2012 match current {
2013 Value::Object(obj) => current = obj.get(part)?,
2014 Value::Array(arr) => {
2015 let idx = part.parse::<usize>().ok()?;
2016 current = arr.get(idx)?;
2017 }
2018 _ => return None,
2019 }
2020 }
2021 Some(current.clone())
2022}
2023
2024fn set_state_path(state: &mut State, path: &str, value: Value) -> Result<(), RuntimeError> {
2025 let (scope, rest) = path
2026 .split_once('.')
2027 .ok_or_else(|| RuntimeError::new("JOSIE_E_SET_PATH", "invalid set path"))?;
2028 let target_map = match scope {
2029 "client" => &mut state.client,
2030 "server" => &mut state.server,
2031 _ => {
2032 return Err(RuntimeError::new(
2033 "JOSIE_E_SET_PATH",
2034 format!("path scope must be client/server, got '{scope}'"),
2035 ));
2036 }
2037 };
2038 set_in_map(target_map, rest, value);
2039 Ok(())
2040}
2041
2042fn set_in_map(map: &mut Map<String, Value>, path: &str, value: Value) {
2043 let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
2044 if parts.is_empty() {
2045 return;
2046 }
2047
2048 let mut current = map;
2049 for part in &parts[..parts.len().saturating_sub(1)] {
2050 let entry = current
2051 .entry((*part).to_string())
2052 .or_insert_with(|| Value::Object(Map::new()));
2053 if !entry.is_object() {
2054 *entry = Value::Object(Map::new());
2055 }
2056 current = entry
2057 .as_object_mut()
2058 .expect("entry should be object after normalization");
2059 }
2060 let last = parts.last().expect("path has at least one part");
2061 current.insert((*last).to_string(), value);
2062}
2063
2064fn restore_local(map: &mut Map<String, Value>, key: &str, previous: Option<Value>) {
2065 if let Some(prev) = previous {
2066 map.insert(key.to_string(), prev);
2067 } else {
2068 map.remove(key);
2069 }
2070}
2071
2072fn truthy(v: &Value) -> bool {
2073 match v {
2074 Value::Null => false,
2075 Value::Bool(b) => *b,
2076 Value::Number(n) => n.as_f64().map(|n| n != 0.0).unwrap_or(false),
2077 Value::String(s) => !s.is_empty(),
2078 Value::Array(a) => !a.is_empty(),
2079 Value::Object(o) => !o.is_empty(),
2080 }
2081}
2082
2083fn is_tree_expression(value: &Value) -> bool {
2084 let Some(arr) = value.as_array() else {
2085 return false;
2086 };
2087 if arr.is_empty() {
2088 return false;
2089 }
2090 arr.first().and_then(|v| v.as_str()).is_some()
2091}
2092
2093trait RuntimeErrExt {
2094 fn step_opt(self, step_index: Option<usize>, op: Option<String>) -> Self;
2095}
2096
2097impl RuntimeErrExt for RuntimeError {
2098 fn step_opt(mut self, step_index: Option<usize>, op: Option<String>) -> Self {
2099 self.step_index = step_index;
2100 self.op = op;
2101 self
2102 }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107 use super::*;
2108 use crate::Operators;
2109 use serde_json::json;
2110 use std::sync::atomic::{AtomicUsize, Ordering};
2111
2112 static RETRY_COUNTER: AtomicUsize = AtomicUsize::new(0);
2113 static IDEMP_COUNTER: AtomicUsize = AtomicUsize::new(0);
2114
2115 fn op_ext_a(args: &[Value], _ctx: &mut Context) -> Result<Value, EvalError> {
2116 let p1 = args.first().and_then(|v| v.as_i64()).unwrap_or(0);
2117 let p2 = args.get(1).and_then(|v| v.as_i64()).unwrap_or(0);
2118 let size = args.get(2).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
2119 let mut out = Vec::with_capacity(size);
2120 for i in 0..size {
2121 out.push(json!(((i as i64 + p1) * p2) % 97));
2122 }
2123 Ok(Value::Array(out))
2124 }
2125
2126 fn op_ext_b(args: &[Value], _ctx: &mut Context) -> Result<Value, EvalError> {
2127 let x = args.first().and_then(|v| v.as_i64()).unwrap_or(0);
2128 Ok(json!((x * 3 + 1) % 101))
2129 }
2130
2131 fn host_gen(p1: i64, p2: i64, size: usize) -> Vec<i64> {
2132 let mut out = Vec::with_capacity(size);
2133 for i in 0..size {
2134 out.push(((i as i64 + p1) * p2) % 97);
2135 }
2136 out
2137 }
2138
2139 fn host_map(item: i64) -> i64 {
2140 (item * 3 + 1) % 101
2141 }
2142
2143 fn host_fail_once(_args: &[Value]) -> Result<Value, RuntimeError> {
2144 let n = RETRY_COUNTER.fetch_add(1, Ordering::SeqCst);
2145 if n == 0 {
2146 return Err(RuntimeError::new("JOSIE_E_TEST_FAIL", "fail once"));
2147 }
2148 Ok(json!("ok"))
2149 }
2150
2151 fn host_counter(_args: &[Value]) -> Result<Value, RuntimeError> {
2152 let n = IDEMP_COUNTER.fetch_add(1, Ordering::SeqCst) + 1;
2153 Ok(json!(n))
2154 }
2155
2156 #[test]
2157 fn parse_pipeline_ok() {
2158 let node = json!({
2159 "v": 2,
2160 "state": {"client": {}, "server": {}},
2161 "program": {
2162 "type": "pipeline",
2163 "steps": [
2164 {"op":"call", "fn":"x.a", "args":[1,2,3], "into":"result_a"},
2165 {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2166 ]
2167 }
2168 });
2169 let parsed = parse_program(&node);
2170 assert!(parsed.is_ok());
2171 }
2172
2173 #[test]
2174 fn parse_pipeline_accepts_run_hint() {
2175 let node = json!({
2176 "state": {"client": {}, "server": {}},
2177 "program": {
2178 "type": "pipeline",
2179 "steps": [
2180 {"op":"call", "fn":"x.a", "args":[1], "into":"out", "run_hint":"worker"}
2181 ]
2182 }
2183 });
2184 let parsed = parse_program(&node).expect("parse");
2185 let pipe: PipelineDoc = serde_json::from_value(parsed.program).expect("pipe");
2186 assert_eq!(pipe.steps[0].run_hint, Some(StepRunHint::Worker));
2187 }
2188
2189 #[test]
2190 fn parse_pipeline_missing_for_each_do_fails() {
2191 let node = json!({
2192 "v": 2,
2193 "program": {
2194 "type": "pipeline",
2195 "steps": [
2196 {"op":"for_each", "from":"items", "into":"out"}
2197 ]
2198 }
2199 });
2200 let err = parse_program(&node).expect_err("expected validation error");
2201 assert_eq!(err.code, "JOSIE_E_ITER_DO");
2202 }
2203
2204 #[test]
2205 fn parse_tree_ok() {
2206 let node = json!({
2207 "v": 2,
2208 "program": ["+", 1, 2]
2209 });
2210 let parsed = parse_program(&node);
2211 assert!(parsed.is_ok());
2212 }
2213
2214 #[test]
2215 fn execute_pipeline_with_registered_functions_and_for_each() {
2216 let node = json!({
2217 "v": 2,
2218 "state": {"client": {}, "server": {}},
2219 "program": {
2220 "type": "pipeline",
2221 "steps": [
2222 {"op":"call", "fn":"x.a", "args":[2,3,5], "into":"result_a"},
2223 {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2224 ]
2225 }
2226 });
2227 let program = parse_program(&node).expect("parse");
2228 let mut operators = Operators::new();
2229 operators.register("x.a", op_ext_a);
2230 operators.register("x.b", op_ext_b);
2231
2232 let out = execute_program(&program, &operators).expect("execute");
2233 assert!(out.value.is_array());
2234 assert_eq!(out.value.as_array().map(|arr| arr.len()), Some(5));
2235 }
2236
2237 #[test]
2238 fn execute_tree_with_util_namespace() {
2239 let node = json!({
2240 "v": 2,
2241 "program": ["util.str_len", ["util.to_string", 1234]]
2242 });
2243 let program = parse_program(&node).expect("parse");
2244 let operators = Operators::new();
2245 let out = execute_program(&program, &operators).expect("execute");
2246 assert_eq!(out.value, json!(4));
2247 }
2248
2249 #[test]
2250 fn fast_external_metrics_path_works() {
2251 let node = json!({
2252 "v": 2,
2253 "state": {"client": {}, "server": {}},
2254 "program": {
2255 "type": "pipeline",
2256 "steps": [
2257 {"op":"call", "fn":"x.a", "args":[2,3,5], "into":"result_a"},
2258 {"op":"for_each", "from":"result_a", "do":{"op":"call", "fn":"x.b", "args":[["var","item"]]}, "into":"result_b"}
2259 ]
2260 }
2261 });
2262 let program = parse_program(&node).expect("parse");
2263 let compiled = compile_program(&program).expect("compile");
2264 let mut hosts = HostFunctions::new();
2265 hosts.register_generate_i64("x.a", host_gen);
2266 hosts.register_map_i64("x.b", host_map);
2267 let operators = Operators::new();
2268 let mut state = State::new();
2269 let (checksum, len) =
2270 execute_compiled_program_external_metrics(&compiled, &mut state, &operators, &hosts)
2271 .expect("fast metrics");
2272 assert_eq!(len, 5);
2273 assert!(checksum > 0);
2274 }
2275
2276 #[test]
2277 fn execute_pipeline_retry_policy_works() {
2278 RETRY_COUNTER.store(0, Ordering::SeqCst);
2279 let node = json!({
2280 "state": {"client": {}, "server": {}},
2281 "program": {
2282 "type": "pipeline",
2283 "steps": [
2284 {
2285 "id": "retry-step",
2286 "type": "action",
2287 "op":"call",
2288 "fn":"x.fail_once",
2289 "on_error":"retry",
2290 "max_retries": 1,
2291 "into":"r"
2292 },
2293 {"op":"return", "from":"r"}
2294 ]
2295 }
2296 });
2297 let program = parse_program(&node).expect("parse");
2298 let mut hosts = HostFunctions::new();
2299 hosts.register_call("x.fail_once", host_fail_once);
2300 let operators = Operators::new();
2301 let out = execute_program_with_hosts(&program, &operators, &hosts).expect("execute");
2302 assert_eq!(out.value, json!("ok"));
2303 assert_eq!(RETRY_COUNTER.load(Ordering::SeqCst), 2);
2304 }
2305
2306 #[test]
2307 fn execute_pipeline_idempotency_key_skips_duplicate_step() {
2308 IDEMP_COUNTER.store(0, Ordering::SeqCst);
2309 let node = json!({
2310 "state": {"client": {}, "server": {}},
2311 "program": {
2312 "type": "pipeline",
2313 "steps": [
2314 {"op":"call", "fn":"x.counter", "idempotency_key":"job-42", "into":"a"},
2315 {"op":"call", "fn":"x.counter", "idempotency_key":"job-42", "into":"b"},
2316 {"op":"return", "from":"b"}
2317 ]
2318 }
2319 });
2320 let program = parse_program(&node).expect("parse");
2321 let mut hosts = HostFunctions::new();
2322 hosts.register_call("x.counter", host_counter);
2323 let operators = Operators::new();
2324 let out = execute_program_with_hosts(&program, &operators, &hosts).expect("execute");
2325 assert_eq!(out.value, json!(1));
2326 assert_eq!(IDEMP_COUNTER.load(Ordering::SeqCst), 1);
2327 }
2328
2329 #[test]
2330 fn set_step_evaluates_nested_object_expressions() {
2331 let node = json!({
2332 "state": {"client": {}, "server": {"input": {"title": "Hello", "slug": "hello"}}},
2333 "program": {
2334 "type": "pipeline",
2335 "steps": [
2336 {"op":"set","into":"row","args":[
2337 {"title":["var","server.input.title"],"slug":["var","server.input.slug"]}
2338 ]},
2339 {"op":"return","from":"row"}
2340 ]
2341 }
2342 });
2343 let program = parse_program(&node).expect("parse");
2344 let operators = Operators::new();
2345 let out = execute_program(&program, &operators).expect("execute");
2346 assert_eq!(out.value, json!({"title":"Hello","slug":"hello"}));
2347 }
2348}