Skip to main content

logicpearl_pipeline/
lib.rs

1// SPDX-License-Identifier: MIT
2//! Pipeline composition and execution for "string of pearls" workflows.
3//!
4//! Pipelines wire together pearl artifacts, plugin stages, stage inputs,
5//! exports, and final outputs. This crate parses pipeline definitions,
6//! prepares manifest-relative paths with the shared path policy, executes
7//! trusted local plugins when requested, and preserves each stage's raw
8//! runtime result for audit.
9
10use logicpearl_core::{resolve_manifest_member_path, LogicPearlError, Result};
11use logicpearl_ir::LogicPearlGateIr;
12use logicpearl_plugin::{
13    run_plugin_batch_with_policy_and_metadata, run_plugin_with_policy_and_metadata,
14    PluginExecutionPolicy, PluginExecutionResult, PluginManifest, PluginRequest, PluginResponse,
15    PluginStage,
16};
17use serde::{Deserialize, Serialize};
18use serde_json::Map;
19use serde_json::Value;
20use std::collections::{BTreeSet, HashMap};
21use std::fs;
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25pub struct PipelineDefinition {
26    pub pipeline_version: String,
27    pub pipeline_id: String,
28    pub entrypoint: String,
29    pub stages: Vec<PipelineStage>,
30    pub output: HashMap<String, Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct PipelineStage {
35    pub id: String,
36    pub kind: PipelineStageKind,
37    pub artifact: Option<String>,
38    pub plugin_manifest: Option<String>,
39    #[serde(default)]
40    pub input: HashMap<String, Value>,
41    #[serde(default)]
42    pub payload: Option<Value>,
43    #[serde(default)]
44    pub options: Option<Value>,
45    #[serde(default)]
46    pub export: HashMap<String, Value>,
47    #[serde(default)]
48    pub when: Option<Value>,
49    #[serde(default)]
50    pub foreach: Option<Value>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum PipelineStageKind {
56    Pearl,
57    ObserverPlugin,
58    TraceSourcePlugin,
59    EnricherPlugin,
60    VerifyPlugin,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64pub struct ValidatedPipeline {
65    pub pipeline_id: String,
66    pub pipeline_version: String,
67    pub entrypoint: String,
68    pub stage_count: usize,
69    pub stages: Vec<ValidatedStage>,
70    pub exports: Vec<String>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74pub struct ValidatedStage {
75    pub id: String,
76    pub kind: PipelineStageKind,
77    pub artifact: Option<String>,
78    pub plugin_manifest: Option<String>,
79    pub exports: Vec<String>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
83pub struct PipelineExecution {
84    pub schema_version: String,
85    pub engine_version: String,
86    pub artifact_id: String,
87    pub artifact_hash: String,
88    pub decision_kind: String,
89    pub pipeline_id: String,
90    pub ok: bool,
91    pub output: HashMap<String, Value>,
92    pub stages: Vec<StageExecution>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub struct StageExecution {
97    pub id: String,
98    pub kind: PipelineStageKind,
99    pub ok: bool,
100    pub skipped: bool,
101    pub exports: HashMap<String, Value>,
102    pub raw_result: Value,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106pub struct ComposePlan {
107    pub pipeline: PipelineDefinition,
108    pub notes: Vec<String>,
109}
110
111#[derive(Debug, Clone)]
112pub struct PreparedPipeline {
113    definition: PipelineDefinition,
114    stages: Vec<PreparedStage>,
115    plugin_policy: PluginExecutionPolicy,
116}
117
118#[derive(Debug, Clone)]
119struct PreparedStage {
120    stage: PipelineStage,
121    executable: PreparedStageExecutable,
122}
123
124#[derive(Debug, Clone)]
125enum PreparedStageExecutable {
126    Pearl(LogicPearlGateIr),
127    Plugin {
128        manifest: PluginManifest,
129        stage: PluginStage,
130    },
131}
132
133impl PipelineDefinition {
134    pub fn from_json_str(input: &str) -> Result<Self> {
135        let pipeline: Self = serde_json::from_str(input)?;
136        Ok(pipeline)
137    }
138
139    pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
140        let content = fs::read_to_string(path)?;
141        Self::from_json_str(&content)
142    }
143
144    pub fn validate(&self, base_dir: impl AsRef<Path>) -> Result<ValidatedPipeline> {
145        if self.pipeline_version != "1.0" {
146            return Err(LogicPearlError::message(format!(
147                "unsupported pipeline_version: {}",
148                self.pipeline_version
149            )));
150        }
151        if self.pipeline_id.trim().is_empty() {
152            return Err(LogicPearlError::message("pipeline_id must be non-empty"));
153        }
154        if self.entrypoint.trim().is_empty() {
155            return Err(LogicPearlError::message("entrypoint must be non-empty"));
156        }
157        if self.stages.is_empty() {
158            return Err(LogicPearlError::message(
159                "pipeline must define at least one stage",
160            ));
161        }
162        if self.output.is_empty() {
163            return Err(LogicPearlError::message(
164                "pipeline output must define at least one field",
165            ));
166        }
167
168        let base_dir = base_dir.as_ref();
169        let mut stage_ids = BTreeSet::new();
170        let mut visible_exports: HashMap<String, BTreeSet<String>> = HashMap::new();
171        let mut validated_stages = Vec::with_capacity(self.stages.len());
172
173        for stage in &self.stages {
174            if stage.id.trim().is_empty() {
175                return Err(LogicPearlError::message("stage id must be non-empty"));
176            }
177            if !stage_ids.insert(stage.id.clone()) {
178                return Err(LogicPearlError::message(format!(
179                    "duplicate stage id: {}",
180                    stage.id
181                )));
182            }
183            stage.validate(base_dir, &visible_exports)?;
184            let export_names = stage.export.keys().cloned().collect::<BTreeSet<_>>();
185            visible_exports.insert(stage.id.clone(), export_names.clone());
186            validated_stages.push(ValidatedStage {
187                id: stage.id.clone(),
188                kind: stage.kind.clone(),
189                artifact: stage
190                    .artifact
191                    .as_ref()
192                    .map(|value| {
193                        resolve_relative_path(base_dir, value)
194                            .map(|path| path.display().to_string())
195                    })
196                    .transpose()?,
197                plugin_manifest: stage
198                    .plugin_manifest
199                    .as_ref()
200                    .map(|value| {
201                        resolve_relative_path(base_dir, value)
202                            .map(|path| path.display().to_string())
203                    })
204                    .transpose()?,
205                exports: export_names.into_iter().collect(),
206            });
207        }
208
209        for (field, value) in &self.output {
210            if field.trim().is_empty() {
211                return Err(LogicPearlError::message(
212                    "pipeline output keys must be non-empty",
213                ));
214            }
215            validate_value_reference(value, &visible_exports)?;
216        }
217
218        let exports = self.output.keys().cloned().collect();
219        Ok(ValidatedPipeline {
220            pipeline_id: self.pipeline_id.clone(),
221            pipeline_version: self.pipeline_version.clone(),
222            entrypoint: self.entrypoint.clone(),
223            stage_count: self.stages.len(),
224            stages: validated_stages,
225            exports,
226        })
227    }
228
229    pub fn inspect(&self, base_dir: impl AsRef<Path>) -> Result<ValidatedPipeline> {
230        self.validate(base_dir)
231    }
232
233    pub fn run(&self, base_dir: impl AsRef<Path>, root_input: &Value) -> Result<PipelineExecution> {
234        self.run_with_plugin_policy(base_dir, root_input, PluginExecutionPolicy::default())
235    }
236
237    pub fn run_with_plugin_policy(
238        &self,
239        base_dir: impl AsRef<Path>,
240        root_input: &Value,
241        plugin_policy: PluginExecutionPolicy,
242    ) -> Result<PipelineExecution> {
243        self.prepare_with_plugin_policy(base_dir, plugin_policy)?
244            .run(root_input)
245    }
246
247    pub fn write_pretty(&self, path: impl AsRef<Path>) -> Result<()> {
248        fs::write(path, serde_json::to_string_pretty(self)? + "\n")?;
249        Ok(())
250    }
251
252    pub fn prepare(&self, base_dir: impl AsRef<Path>) -> Result<PreparedPipeline> {
253        self.prepare_with_plugin_policy(base_dir, PluginExecutionPolicy::default())
254    }
255
256    pub fn prepare_with_plugin_policy(
257        &self,
258        base_dir: impl AsRef<Path>,
259        plugin_policy: PluginExecutionPolicy,
260    ) -> Result<PreparedPipeline> {
261        self.validate(&base_dir)?;
262        let base_dir = base_dir.as_ref();
263        let mut prepared_stages = Vec::with_capacity(self.stages.len());
264        for stage in &self.stages {
265            let executable = match stage.kind {
266                PipelineStageKind::Pearl => {
267                    let artifact_path = resolve_relative_path(
268                        base_dir,
269                        stage.artifact.as_ref().expect("validated pearl artifact"),
270                    )?;
271                    PreparedStageExecutable::Pearl(LogicPearlGateIr::from_path(&artifact_path)?)
272                }
273                PipelineStageKind::ObserverPlugin
274                | PipelineStageKind::TraceSourcePlugin
275                | PipelineStageKind::EnricherPlugin
276                | PipelineStageKind::VerifyPlugin => {
277                    let manifest_path = resolve_relative_path(
278                        base_dir,
279                        stage
280                            .plugin_manifest
281                            .as_ref()
282                            .expect("validated plugin manifest"),
283                    )?;
284                    let manifest = PluginManifest::from_path(&manifest_path)?;
285                    let plugin_stage = plugin_stage_for_kind(&stage.kind).ok_or_else(|| {
286                        LogicPearlError::message(format!(
287                            "stage {} does not map to a plugin stage",
288                            stage.id
289                        ))
290                    })?;
291                    PreparedStageExecutable::Plugin {
292                        manifest,
293                        stage: plugin_stage,
294                    }
295                }
296            };
297            prepared_stages.push(PreparedStage {
298                stage: stage.clone(),
299                executable,
300            });
301        }
302        Ok(PreparedPipeline {
303            definition: self.clone(),
304            stages: prepared_stages,
305            plugin_policy,
306        })
307    }
308}
309
310impl PreparedPipeline {
311    pub fn run(&self, root_input: &Value) -> Result<PipelineExecution> {
312        let mut stage_exports: HashMap<String, HashMap<String, Value>> = HashMap::new();
313        let mut stages = Vec::with_capacity(self.stages.len());
314
315        for prepared_stage in &self.stages {
316            let stage = &prepared_stage.stage;
317            let should_run = match &stage.when {
318                Some(condition) => truthy(&resolve_stage_input_value(
319                    condition,
320                    root_input,
321                    &stage_exports,
322                )?),
323                None => true,
324            };
325
326            if !should_run {
327                stages.push(StageExecution {
328                    id: stage.id.clone(),
329                    kind: stage.kind.clone(),
330                    ok: true,
331                    skipped: true,
332                    exports: HashMap::new(),
333                    raw_result: Value::Null,
334                });
335                stage_exports.insert(stage.id.clone(), HashMap::new());
336                continue;
337            }
338
339            let raw_result = run_prepared_stage(
340                prepared_stage,
341                root_input,
342                &stage_exports,
343                &self.plugin_policy,
344            )?;
345
346            let exports = build_stage_exports(&stage.export, &raw_result)?;
347            stage_exports.insert(stage.id.clone(), exports.clone());
348            stages.push(StageExecution {
349                id: stage.id.clone(),
350                kind: stage.kind.clone(),
351                ok: true,
352                skipped: false,
353                exports,
354                raw_result,
355            });
356        }
357
358        let mut output = HashMap::new();
359        for (key, value) in &self.definition.output {
360            output.insert(
361                key.clone(),
362                resolve_pipeline_output_value(value, root_input, &stage_exports)?,
363            );
364        }
365
366        Ok(PipelineExecution {
367            schema_version: logicpearl_runtime::PIPELINE_RESULT_SCHEMA_VERSION.to_string(),
368            engine_version: logicpearl_runtime::LOGICPEARL_ENGINE_VERSION.to_string(),
369            artifact_id: self.definition.pipeline_id.clone(),
370            artifact_hash: logicpearl_runtime::artifact_hash(&self.definition),
371            decision_kind: "pipeline".to_string(),
372            pipeline_id: self.definition.pipeline_id.clone(),
373            ok: true,
374            output,
375            stages,
376        })
377    }
378
379    pub fn run_batch(&self, root_inputs: &[Value]) -> Result<Vec<PipelineExecution>> {
380        let mut stage_exports: Vec<HashMap<String, HashMap<String, Value>>> =
381            vec![HashMap::new(); root_inputs.len()];
382        let mut case_stages: Vec<Vec<StageExecution>> = (0..root_inputs.len())
383            .map(|_| Vec::with_capacity(self.stages.len()))
384            .collect();
385
386        for prepared_stage in &self.stages {
387            let stage = &prepared_stage.stage;
388            let should_run: Vec<bool> = root_inputs
389                .iter()
390                .zip(stage_exports.iter())
391                .map(|(root_input, exports)| -> Result<bool> {
392                    Ok(match &stage.when {
393                        Some(condition) => {
394                            truthy(&resolve_stage_input_value(condition, root_input, exports)?)
395                        }
396                        None => true,
397                    })
398                })
399                .collect::<Result<Vec<_>>>()?;
400
401            let runnable_indexes: Vec<usize> = should_run
402                .iter()
403                .enumerate()
404                .filter_map(|(index, should)| should.then_some(index))
405                .collect();
406
407            let raw_results = run_prepared_stage_batch(
408                prepared_stage,
409                root_inputs,
410                &stage_exports,
411                &runnable_indexes,
412                &self.plugin_policy,
413            )?;
414            let mut raw_iter = raw_results.into_iter();
415
416            for index in 0..root_inputs.len() {
417                if !should_run[index] {
418                    case_stages[index].push(StageExecution {
419                        id: stage.id.clone(),
420                        kind: stage.kind.clone(),
421                        ok: true,
422                        skipped: true,
423                        exports: HashMap::new(),
424                        raw_result: Value::Null,
425                    });
426                    stage_exports[index].insert(stage.id.clone(), HashMap::new());
427                    continue;
428                }
429
430                let raw_result = raw_iter.next().ok_or_else(|| {
431                    LogicPearlError::message(format!(
432                        "prepared stage batch for {} returned fewer results than expected",
433                        stage.id
434                    ))
435                })?;
436                let exports = build_stage_exports(&stage.export, &raw_result)?;
437                stage_exports[index].insert(stage.id.clone(), exports.clone());
438                case_stages[index].push(StageExecution {
439                    id: stage.id.clone(),
440                    kind: stage.kind.clone(),
441                    ok: true,
442                    skipped: false,
443                    exports,
444                    raw_result,
445                });
446            }
447        }
448
449        root_inputs
450            .iter()
451            .enumerate()
452            .map(|(index, root_input)| {
453                let mut output = HashMap::new();
454                for (key, value) in &self.definition.output {
455                    output.insert(
456                        key.clone(),
457                        resolve_pipeline_output_value(value, root_input, &stage_exports[index])?,
458                    );
459                }
460                Ok(PipelineExecution {
461                    schema_version: logicpearl_runtime::PIPELINE_RESULT_SCHEMA_VERSION.to_string(),
462                    engine_version: logicpearl_runtime::LOGICPEARL_ENGINE_VERSION.to_string(),
463                    artifact_id: self.definition.pipeline_id.clone(),
464                    artifact_hash: logicpearl_runtime::artifact_hash(&self.definition),
465                    decision_kind: "pipeline".to_string(),
466                    pipeline_id: self.definition.pipeline_id.clone(),
467                    ok: true,
468                    output,
469                    stages: case_stages[index].clone(),
470                })
471            })
472            .collect()
473    }
474}
475
476pub fn compose_pipeline(
477    pipeline_id: impl Into<String>,
478    artifact_paths: &[PathBuf],
479    base_dir: impl AsRef<Path>,
480) -> Result<ComposePlan> {
481    if artifact_paths.is_empty() {
482        return Err(LogicPearlError::message(
483            "compose requires at least one pearl artifact path",
484        ));
485    }
486
487    let pipeline_id = pipeline_id.into();
488    let base_dir = base_dir.as_ref();
489    let mut stages = Vec::with_capacity(artifact_paths.len());
490    let mut notes = Vec::new();
491
492    for (index, artifact_path) in artifact_paths.iter().enumerate() {
493        let gate = LogicPearlGateIr::from_path(artifact_path)?;
494        let stage_id = sanitize_stage_id(&gate.gate_id, index);
495        let artifact = manifest_member_path_for_base(base_dir, artifact_path)?;
496
497        let mut input = HashMap::new();
498        for feature in &gate.input_schema.features {
499            input.insert(
500                feature.id.clone(),
501                Value::String(format!("$.TODO_{}", feature.id)),
502            );
503        }
504
505        let mut export = HashMap::new();
506        export.insert(
507            "bitmask".to_string(),
508            Value::String("$.bitmask".to_string()),
509        );
510        export.insert("allow".to_string(), Value::String("$.allow".to_string()));
511
512        notes.push(format!(
513            "stage `{}` maps {} input feature(s) from placeholder root paths; replace `$.TODO_*` with real paths or `@stage.export` references",
514            stage_id,
515            gate.input_schema.features.len()
516        ));
517
518        stages.push(PipelineStage {
519            id: stage_id,
520            kind: PipelineStageKind::Pearl,
521            artifact: Some(artifact),
522            plugin_manifest: None,
523            input,
524            payload: None,
525            options: None,
526            export,
527            when: None,
528            foreach: None,
529        });
530    }
531
532    let mut output = HashMap::new();
533    let final_stage = stages
534        .last()
535        .ok_or_else(|| LogicPearlError::message("compose produced no stages"))?;
536    output.insert(
537        "bitmask".to_string(),
538        Value::String(format!("@{}.bitmask", final_stage.id)),
539    );
540    output.insert(
541        "allow".to_string(),
542        Value::String(format!("@{}.allow", final_stage.id)),
543    );
544
545    Ok(ComposePlan {
546        pipeline: PipelineDefinition {
547            pipeline_version: "1.0".to_string(),
548            pipeline_id,
549            entrypoint: "input".to_string(),
550            stages,
551            output,
552        },
553        notes,
554    })
555}
556
557impl PipelineStage {
558    fn validate(
559        &self,
560        base_dir: &Path,
561        visible_exports: &HashMap<String, BTreeSet<String>>,
562    ) -> Result<()> {
563        match self.kind {
564            PipelineStageKind::Pearl => {
565                let artifact = self.artifact.as_ref().ok_or_else(|| {
566                    LogicPearlError::message(format!(
567                        "stage {} of kind pearl requires artifact",
568                        self.id
569                    ))
570                })?;
571                if self.plugin_manifest.is_some() {
572                    return Err(LogicPearlError::message(format!(
573                        "stage {} of kind pearl must not set plugin_manifest",
574                        self.id
575                    )));
576                }
577                let artifact_path = resolve_relative_path(base_dir, artifact)?;
578                if !artifact_path.exists() {
579                    return Err(LogicPearlError::message(format!(
580                        "stage {} artifact does not exist: {}",
581                        self.id,
582                        artifact_path.display()
583                    )));
584                }
585                LogicPearlGateIr::from_path(&artifact_path)?;
586            }
587            PipelineStageKind::ObserverPlugin
588            | PipelineStageKind::TraceSourcePlugin
589            | PipelineStageKind::EnricherPlugin
590            | PipelineStageKind::VerifyPlugin => {
591                let manifest = self.plugin_manifest.as_ref().ok_or_else(|| {
592                    LogicPearlError::message(format!(
593                        "stage {} of kind {:?} requires plugin_manifest",
594                        self.id, self.kind
595                    ))
596                })?;
597                if self.artifact.is_some() {
598                    return Err(LogicPearlError::message(format!(
599                        "stage {} plugin stage must not set artifact",
600                        self.id
601                    )));
602                }
603                let manifest_path = resolve_relative_path(base_dir, manifest)?;
604                if !manifest_path.exists() {
605                    return Err(LogicPearlError::message(format!(
606                        "stage {} plugin manifest does not exist: {}",
607                        self.id,
608                        manifest_path.display()
609                    )));
610                }
611                let manifest = PluginManifest::from_path(&manifest_path)?;
612                let expected_stage = plugin_stage_for_kind(&self.kind).ok_or_else(|| {
613                    LogicPearlError::message(format!(
614                        "stage {} does not map to a plugin stage",
615                        self.id
616                    ))
617                })?;
618                if manifest.stage != expected_stage {
619                    return Err(LogicPearlError::message(format!(
620                        "stage {} expects plugin stage {:?}, found {:?}",
621                        self.id, expected_stage, manifest.stage
622                    )));
623                }
624            }
625        }
626
627        for (field, value) in &self.input {
628            if field.trim().is_empty() {
629                return Err(LogicPearlError::message(format!(
630                    "stage {} input keys must be non-empty",
631                    self.id
632                )));
633            }
634            validate_value_reference(value, visible_exports)?;
635        }
636        if let Some(value) = &self.payload {
637            validate_value_reference(value, visible_exports)?;
638        }
639        if let Some(value) = &self.options {
640            validate_value_reference(value, visible_exports)?;
641        }
642        for (field, value) in &self.export {
643            if field.trim().is_empty() {
644                return Err(LogicPearlError::message(format!(
645                    "stage {} export keys must be non-empty",
646                    self.id
647                )));
648            }
649            validate_value_reference(value, visible_exports)?;
650        }
651        if let Some(value) = &self.when {
652            validate_value_reference(value, visible_exports)?;
653        }
654        if let Some(value) = &self.foreach {
655            validate_value_reference(value, visible_exports)?;
656        }
657        Ok(())
658    }
659}
660
661fn resolve_relative_path(base_dir: &Path, value: &str) -> Result<PathBuf> {
662    resolve_manifest_member_path(base_dir, value)
663}
664
665fn plugin_stage_for_kind(kind: &PipelineStageKind) -> Option<PluginStage> {
666    match kind {
667        PipelineStageKind::ObserverPlugin => Some(PluginStage::Observer),
668        PipelineStageKind::TraceSourcePlugin => Some(PluginStage::TraceSource),
669        PipelineStageKind::EnricherPlugin => Some(PluginStage::Enricher),
670        PipelineStageKind::VerifyPlugin => Some(PluginStage::Verify),
671        PipelineStageKind::Pearl => None,
672    }
673}
674
675fn run_prepared_stage(
676    prepared_stage: &PreparedStage,
677    root_input: &Value,
678    stage_exports: &HashMap<String, HashMap<String, Value>>,
679    plugin_policy: &PluginExecutionPolicy,
680) -> Result<Value> {
681    let stage = &prepared_stage.stage;
682    match &prepared_stage.executable {
683        PreparedStageExecutable::Pearl(gate) => {
684            let features = build_stage_input_object(stage, root_input, stage_exports)?;
685            serde_json::to_value(logicpearl_runtime::evaluate_gate_with_explanation(
686                gate, &features,
687            )?)
688            .map_err(Into::into)
689        }
690        PreparedStageExecutable::Plugin {
691            manifest,
692            stage: plugin_stage,
693        } => {
694            let payload = logicpearl_plugin::build_canonical_payload(
695                plugin_stage,
696                build_stage_payload_value(stage, root_input, stage_exports)?,
697                build_stage_options_value(stage, root_input, stage_exports)?,
698            );
699            let execution = run_plugin_with_policy_and_metadata(
700                manifest,
701                &PluginRequest {
702                    protocol_version: "1".to_string(),
703                    stage: plugin_stage.clone(),
704                    payload,
705                },
706                plugin_policy,
707            )?;
708            plugin_execution_to_value(execution)
709        }
710    }
711}
712
713fn run_prepared_stage_batch(
714    prepared_stage: &PreparedStage,
715    root_inputs: &[Value],
716    stage_exports: &[HashMap<String, HashMap<String, Value>>],
717    runnable_indexes: &[usize],
718    plugin_policy: &PluginExecutionPolicy,
719) -> Result<Vec<Value>> {
720    if runnable_indexes.is_empty() {
721        return Ok(Vec::new());
722    }
723
724    let stage = &prepared_stage.stage;
725    match &prepared_stage.executable {
726        PreparedStageExecutable::Pearl(gate) => runnable_indexes
727            .iter()
728            .map(|index| {
729                let features =
730                    build_stage_input_object(stage, &root_inputs[*index], &stage_exports[*index])?;
731                serde_json::to_value(logicpearl_runtime::evaluate_gate_with_explanation(
732                    gate, &features,
733                )?)
734                .map_err(Into::into)
735            })
736            .collect(),
737        PreparedStageExecutable::Plugin {
738            manifest,
739            stage: plugin_stage,
740        } => {
741            let payloads: Vec<Value> = runnable_indexes
742                .iter()
743                .map(|index| {
744                    Ok(logicpearl_plugin::build_canonical_payload(
745                        plugin_stage,
746                        build_stage_payload_value(
747                            stage,
748                            &root_inputs[*index],
749                            &stage_exports[*index],
750                        )?,
751                        build_stage_options_value(
752                            stage,
753                            &root_inputs[*index],
754                            &stage_exports[*index],
755                        )?,
756                    ))
757                })
758                .collect::<Result<Vec<_>>>()?;
759            let execution = run_plugin_batch_with_policy_and_metadata(
760                manifest,
761                plugin_stage.clone(),
762                &payloads,
763                plugin_policy,
764            )?;
765            if execution.runs.len() != execution.responses.len() {
766                return Err(LogicPearlError::message(format!(
767                    "plugin {} returned {} execution records for {} responses",
768                    manifest.name,
769                    execution.runs.len(),
770                    execution.responses.len()
771                )));
772            }
773            execution
774                .responses
775                .into_iter()
776                .zip(execution.runs)
777                .map(|(response, run)| plugin_response_with_run_to_value(response, &run))
778                .collect()
779        }
780    }
781}
782
783fn plugin_execution_to_value(execution: PluginExecutionResult) -> Result<Value> {
784    plugin_response_with_run_to_value(execution.response, &execution.run)
785}
786
787fn plugin_response_with_run_to_value(
788    response: PluginResponse,
789    run: &logicpearl_plugin::PluginRunMetadata,
790) -> Result<Value> {
791    let mut map = Map::new();
792    map.insert("ok".to_string(), Value::Bool(response.ok));
793    if !response.warnings.is_empty() {
794        map.insert(
795            "warnings".to_string(),
796            Value::Array(response.warnings.into_iter().map(Value::String).collect()),
797        );
798    }
799    if let Some(error) = response.error {
800        map.insert(
801            "error".to_string(),
802            serde_json::to_value(error).map_err(LogicPearlError::from)?,
803        );
804    }
805    for (key, value) in response.extra {
806        map.insert(key, value);
807    }
808    map.insert(
809        "plugin_run".to_string(),
810        serde_json::to_value(run).map_err(LogicPearlError::from)?,
811    );
812    Ok(Value::Object(map))
813}
814
815fn validate_value_reference(
816    value: &Value,
817    visible_exports: &HashMap<String, BTreeSet<String>>,
818) -> Result<()> {
819    match value {
820        Value::String(reference) => validate_reference(reference, visible_exports),
821        Value::Array(items) => {
822            for item in items {
823                validate_value_reference(item, visible_exports)?;
824            }
825            Ok(())
826        }
827        Value::Object(map) => {
828            for item in map.values() {
829                validate_value_reference(item, visible_exports)?;
830            }
831            Ok(())
832        }
833        _ => Ok(()),
834    }
835}
836
837fn validate_reference(
838    reference: &str,
839    visible_exports: &HashMap<String, BTreeSet<String>>,
840) -> Result<()> {
841    if reference.starts_with("$.") {
842        if reference.len() < 3 {
843            return Err(LogicPearlError::message(format!(
844                "invalid root reference: {reference}"
845            )));
846        }
847        return Ok(());
848    }
849    if let Some(rest) = reference.strip_prefix('@') {
850        let mut parts = rest.split('.');
851        let stage_id = parts.next().ok_or_else(|| {
852            LogicPearlError::message(format!("invalid stage reference: {reference}"))
853        })?;
854        let export_name = parts.next().ok_or_else(|| {
855            LogicPearlError::message(format!("invalid stage reference: {reference}"))
856        })?;
857        if parts.next().is_some() {
858            return Err(LogicPearlError::message(format!(
859                "invalid stage reference: {reference}"
860            )));
861        }
862        let exports = visible_exports.get(stage_id).ok_or_else(|| {
863            LogicPearlError::message(format!(
864                "reference uses unknown or future stage {stage_id}: {reference}"
865            ))
866        })?;
867        if !exports.contains(export_name) {
868            return Err(LogicPearlError::message(format!(
869                "reference uses unknown export {export_name} from stage {stage_id}"
870            )));
871        }
872        return Ok(());
873    }
874    Ok(())
875}
876
877fn build_stage_input_object(
878    stage: &PipelineStage,
879    root_input: &Value,
880    stage_exports: &HashMap<String, HashMap<String, Value>>,
881) -> Result<HashMap<String, Value>> {
882    let payload = build_stage_payload_value(stage, root_input, stage_exports)?;
883    let object = payload.as_object().ok_or_else(|| {
884        LogicPearlError::message(format!(
885            "stage {} expected an object payload for pearl input",
886            stage.id
887        ))
888    })?;
889    let mut resolved = HashMap::new();
890    for (key, value) in object {
891        resolved.insert(key.clone(), value.clone());
892    }
893    Ok(resolved)
894}
895
896fn build_stage_payload_value(
897    stage: &PipelineStage,
898    root_input: &Value,
899    stage_exports: &HashMap<String, HashMap<String, Value>>,
900) -> Result<Value> {
901    match &stage.payload {
902        Some(payload) => resolve_stage_input_value(payload, root_input, stage_exports),
903        None => Ok(Value::Object(Map::from_iter(
904            stage
905                .input
906                .iter()
907                .map(|(key, value)| {
908                    Ok((
909                        key.clone(),
910                        resolve_stage_input_value(value, root_input, stage_exports)?,
911                    ))
912                })
913                .collect::<Result<Vec<_>>>()?,
914        ))),
915    }
916}
917
918fn build_stage_options_value(
919    stage: &PipelineStage,
920    root_input: &Value,
921    stage_exports: &HashMap<String, HashMap<String, Value>>,
922) -> Result<Option<Value>> {
923    stage
924        .options
925        .as_ref()
926        .map(|value| resolve_stage_input_value(value, root_input, stage_exports))
927        .transpose()
928}
929
930fn build_stage_exports(
931    export_map: &HashMap<String, Value>,
932    raw_result: &Value,
933) -> Result<HashMap<String, Value>> {
934    let mut resolved = HashMap::new();
935    for (key, value) in export_map {
936        resolved.insert(key.clone(), resolve_stage_output_value(value, raw_result)?);
937    }
938    Ok(resolved)
939}
940
941fn resolve_stage_input_value(
942    value: &Value,
943    root_input: &Value,
944    stage_exports: &HashMap<String, HashMap<String, Value>>,
945) -> Result<Value> {
946    resolve_value(value, root_input, None, stage_exports)
947}
948
949fn resolve_stage_output_value(value: &Value, stage_result: &Value) -> Result<Value> {
950    resolve_value(value, stage_result, Some(stage_result), &HashMap::new())
951}
952
953fn resolve_pipeline_output_value(
954    value: &Value,
955    root_input: &Value,
956    stage_exports: &HashMap<String, HashMap<String, Value>>,
957) -> Result<Value> {
958    resolve_value(value, root_input, None, stage_exports)
959}
960
961fn resolve_value(
962    value: &Value,
963    dollar_scope: &Value,
964    local_scope: Option<&Value>,
965    stage_exports: &HashMap<String, HashMap<String, Value>>,
966) -> Result<Value> {
967    match value {
968        Value::String(reference) if reference.starts_with("$.") => {
969            lookup_json_path(local_scope.unwrap_or(dollar_scope), reference)
970        }
971        Value::String(reference) if reference.starts_with('@') => {
972            let mut parts = reference[1..].split('.');
973            let stage_id = parts.next().ok_or_else(|| {
974                LogicPearlError::message(format!("invalid stage reference: {reference}"))
975            })?;
976            let export_name = parts.next().ok_or_else(|| {
977                LogicPearlError::message(format!("invalid stage reference: {reference}"))
978            })?;
979            if parts.next().is_some() {
980                return Err(LogicPearlError::message(format!(
981                    "invalid stage reference: {reference}"
982                )));
983            }
984            let exports = stage_exports.get(stage_id).ok_or_else(|| {
985                LogicPearlError::message(format!("unknown stage reference: {reference}"))
986            })?;
987            exports.get(export_name).cloned().ok_or_else(|| {
988                LogicPearlError::message(format!("unknown export reference: {reference}"))
989            })
990        }
991        Value::Array(items) => {
992            let mut resolved = Vec::with_capacity(items.len());
993            for item in items {
994                resolved.push(resolve_value(
995                    item,
996                    dollar_scope,
997                    local_scope,
998                    stage_exports,
999                )?);
1000            }
1001            Ok(Value::Array(resolved))
1002        }
1003        Value::Object(map) => {
1004            let mut resolved = Map::new();
1005            for (key, item) in map {
1006                resolved.insert(
1007                    key.clone(),
1008                    resolve_value(item, dollar_scope, local_scope, stage_exports)?,
1009                );
1010            }
1011            Ok(Value::Object(resolved))
1012        }
1013        _ => Ok(value.clone()),
1014    }
1015}
1016
1017fn lookup_json_path(scope: &Value, reference: &str) -> Result<Value> {
1018    let mut current = scope;
1019    for segment in reference.trim_start_matches("$.").split('.') {
1020        if segment.is_empty() {
1021            continue;
1022        }
1023        current = current
1024            .as_object()
1025            .and_then(|object| object.get(segment))
1026            .ok_or_else(|| LogicPearlError::message(format!("path not found: {reference}")))?;
1027    }
1028    Ok(current.clone())
1029}
1030
1031fn truthy(value: &Value) -> bool {
1032    match value {
1033        Value::Null => false,
1034        Value::Bool(flag) => *flag,
1035        Value::Number(number) => {
1036            if let Some(int) = number.as_i64() {
1037                int != 0
1038            } else if let Some(float) = number.as_f64() {
1039                float != 0.0
1040            } else {
1041                false
1042            }
1043        }
1044        Value::String(text) => !text.is_empty(),
1045        Value::Array(items) => !items.is_empty(),
1046        Value::Object(map) => !map.is_empty(),
1047    }
1048}
1049
1050fn sanitize_stage_id(value: &str, index: usize) -> String {
1051    let mut out = String::new();
1052    for ch in value.chars() {
1053        if ch.is_ascii_alphanumeric() {
1054            out.push(ch.to_ascii_lowercase());
1055        } else {
1056            out.push('_');
1057        }
1058    }
1059    let out = out.trim_matches('_').to_string();
1060    if out.is_empty() {
1061        format!("stage_{}", index + 1)
1062    } else {
1063        out
1064    }
1065}
1066
1067fn manifest_member_path_for_base(base_dir: &Path, path: &Path) -> Result<String> {
1068    let relative = if let Ok(relative) = path.strip_prefix(base_dir) {
1069        relative.to_path_buf()
1070    } else {
1071        let canonical_base = fs::canonicalize(base_dir).map_err(|error| {
1072            LogicPearlError::message(format!(
1073                "failed to canonicalize pipeline bundle directory {}: {error}",
1074                base_dir.display()
1075            ))
1076        })?;
1077        let canonical_path = fs::canonicalize(path).map_err(|error| {
1078            LogicPearlError::message(format!(
1079                "failed to canonicalize pipeline artifact {}: {error}",
1080                path.display()
1081            ))
1082        })?;
1083        canonical_path
1084            .strip_prefix(&canonical_base)
1085            .map(Path::to_path_buf)
1086            .map_err(|_| {
1087                LogicPearlError::message(format!(
1088                    "compose artifact must be inside the pipeline bundle directory: {}",
1089                    path.display()
1090                ))
1091            })?
1092    };
1093
1094    let rendered = relative.display().to_string();
1095    resolve_manifest_member_path(base_dir, &rendered)?;
1096    Ok(rendered)
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use super::{compose_pipeline, PipelineDefinition, PipelineStageKind};
1102    use serde_json::json;
1103    use std::path::{Path, PathBuf};
1104
1105    fn repo_root() -> PathBuf {
1106        Path::new(env!("CARGO_MANIFEST_DIR"))
1107            .parent()
1108            .and_then(|path| path.parent())
1109            .expect("crate should live under workspace/crates/logicpearl-pipeline")
1110            .to_path_buf()
1111    }
1112
1113    #[test]
1114    fn validates_basic_pipeline() {
1115        let pipeline = PipelineDefinition::from_json_str(
1116            r#"{
1117              "pipeline_version": "1.0",
1118              "pipeline_id": "demo",
1119              "entrypoint": "input",
1120              "stages": [
1121                {
1122                  "id": "authz",
1123                  "kind": "pearl",
1124                  "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1125                  "input": {
1126                    "member_age": "$.member.age"
1127                  },
1128                  "export": {
1129                    "bitmask": "$.bitmask"
1130                  }
1131                }
1132              ],
1133              "output": {
1134                "bitmask": "@authz.bitmask"
1135              }
1136            }"#,
1137        )
1138        .expect("pipeline parses");
1139        let base_dir = repo_root();
1140        let validated = pipeline.validate(base_dir).expect("pipeline validates");
1141        assert_eq!(validated.pipeline_id, "demo");
1142        assert_eq!(validated.stage_count, 1);
1143        assert_eq!(validated.stages[0].kind, PipelineStageKind::Pearl);
1144    }
1145
1146    #[test]
1147    fn rejects_pipeline_stage_paths_that_escape_base_dir() {
1148        let pipeline = PipelineDefinition::from_json_str(
1149            r#"{
1150              "pipeline_version": "1.0",
1151              "pipeline_id": "demo",
1152              "entrypoint": "input",
1153              "stages": [
1154                {
1155                  "id": "authz",
1156                  "kind": "pearl",
1157                  "artifact": "../fixtures/ir/valid/auth-demo-v1.json",
1158                  "input": {
1159                    "member_age": "$.member.age"
1160                  },
1161                  "export": {
1162                    "bitmask": "$.bitmask"
1163                  }
1164                }
1165              ],
1166              "output": {
1167                "bitmask": "@authz.bitmask"
1168              }
1169            }"#,
1170        )
1171        .expect("pipeline parses");
1172        let err = pipeline
1173            .validate(repo_root())
1174            .expect_err("escaping stage paths should fail");
1175        assert!(err.to_string().contains("escapes bundle directory"));
1176    }
1177
1178    #[test]
1179    fn rejects_absolute_pipeline_stage_paths() {
1180        let pipeline = PipelineDefinition::from_json_str(
1181            r#"{
1182              "pipeline_version": "1.0",
1183              "pipeline_id": "demo",
1184              "entrypoint": "input",
1185              "stages": [
1186                {
1187                  "id": "authz",
1188                  "kind": "pearl",
1189                  "artifact": "/tmp/auth-demo-v1.json",
1190                  "input": {
1191                    "member_age": "$.member.age"
1192                  },
1193                  "export": {
1194                    "bitmask": "$.bitmask"
1195                  }
1196                }
1197              ],
1198              "output": {
1199                "bitmask": "@authz.bitmask"
1200              }
1201            }"#,
1202        )
1203        .expect("pipeline parses");
1204        let err = pipeline
1205            .validate(repo_root())
1206            .expect_err("absolute stage paths should fail");
1207        assert!(err.to_string().contains("must be relative"));
1208    }
1209
1210    #[test]
1211    fn rejects_future_stage_reference() {
1212        let pipeline = PipelineDefinition::from_json_str(
1213            r#"{
1214              "pipeline_version": "1.0",
1215              "pipeline_id": "demo",
1216              "entrypoint": "input",
1217              "stages": [
1218                {
1219                  "id": "authz",
1220                  "kind": "pearl",
1221                  "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1222                  "input": {
1223                    "member_age": "@later.bitmask"
1224                  },
1225                  "export": {
1226                    "bitmask": "$.bitmask"
1227                  }
1228                }
1229              ],
1230              "output": {
1231                "bitmask": "@authz.bitmask"
1232              }
1233            }"#,
1234        )
1235        .expect("pipeline parses");
1236        let base_dir = repo_root();
1237        let err = pipeline
1238            .validate(base_dir)
1239            .expect_err("validation should fail");
1240        assert!(err.to_string().contains("unknown or future stage"));
1241    }
1242
1243    #[test]
1244    fn runs_basic_pearl_pipeline() {
1245        let pipeline = PipelineDefinition::from_json_str(
1246            r#"{
1247              "pipeline_version": "1.0",
1248              "pipeline_id": "demo",
1249              "entrypoint": "input",
1250              "stages": [
1251                {
1252                  "id": "authz",
1253                  "kind": "pearl",
1254                  "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1255                  "input": {
1256                    "action": "$.request.action",
1257                    "resource_archived": "$.request.resource_archived",
1258                    "user_role": "$.user.role",
1259                    "failed_attempts": "$.user.failed_attempts"
1260                  },
1261                  "export": {
1262                    "bitmask": "$.bitmask",
1263                    "allow": "$.allow"
1264                  }
1265                }
1266              ],
1267              "output": {
1268                "bitmask": "@authz.bitmask",
1269                "allow": "@authz.allow"
1270              }
1271            }"#,
1272        )
1273        .expect("pipeline parses");
1274        let base_dir = repo_root();
1275        let input = json!({
1276            "request": {
1277                "action": "delete",
1278                "resource_archived": true
1279            },
1280            "user": {
1281                "role": "viewer",
1282                "failed_attempts": 99
1283            }
1284        });
1285        let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1286        assert_eq!(execution.output.get("bitmask"), Some(&json!(7)));
1287        assert_eq!(execution.output.get("allow"), Some(&json!(false)));
1288    }
1289
1290    #[test]
1291    fn runs_observer_then_pearl_pipeline() {
1292        let pipeline = PipelineDefinition::from_json_str(
1293            r#"{
1294              "pipeline_version": "1.0",
1295              "pipeline_id": "observer_demo",
1296              "entrypoint": "input",
1297              "stages": [
1298                {
1299                  "id": "observer",
1300                  "kind": "observer_plugin",
1301                  "plugin_manifest": "examples/plugins/python_observer/manifest.json",
1302                  "input": {
1303                    "age": "$.age",
1304                    "member": "$.member",
1305                    "country": "$.country"
1306                  },
1307                  "export": {
1308                    "age": "$.features.age",
1309                    "is_member": "$.features.is_member"
1310                  }
1311                },
1312                {
1313                  "id": "gate",
1314                  "kind": "pearl",
1315                  "artifact": "fixtures/ir/valid/membership-demo-v1.json",
1316                  "input": {
1317                    "age": "@observer.age",
1318                    "is_member": "@observer.is_member"
1319                  },
1320                  "export": {
1321                    "bitmask": "$.bitmask",
1322                    "allow": "$.allow"
1323                  }
1324                }
1325              ],
1326              "output": {
1327                "bitmask": "@gate.bitmask",
1328                "allow": "@gate.allow"
1329              }
1330            }"#,
1331        )
1332        .expect("pipeline parses");
1333        let base_dir = repo_root();
1334        let input = json!({
1335            "age": 34,
1336            "member": true,
1337            "country": "US"
1338        });
1339        let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1340        assert_eq!(execution.output.get("bitmask"), Some(&json!(0)));
1341        assert_eq!(execution.output.get("allow"), Some(&json!(true)));
1342    }
1343
1344    #[test]
1345    fn runs_observer_pearl_verify_pipeline() {
1346        let pipeline = PipelineDefinition::from_json_str(
1347            r#"{
1348              "pipeline_version": "1.0",
1349              "pipeline_id": "observer_verify_demo",
1350              "entrypoint": "input",
1351              "stages": [
1352                {
1353                  "id": "observer",
1354                  "kind": "observer_plugin",
1355                  "plugin_manifest": "examples/plugins/python_observer/manifest.json",
1356                  "input": {
1357                    "age": "$.age",
1358                    "member": "$.member",
1359                    "country": "$.country"
1360                  },
1361                  "export": {
1362                    "age": "$.features.age",
1363                    "is_member": "$.features.is_member"
1364                  }
1365                },
1366                {
1367                  "id": "gate",
1368                  "kind": "pearl",
1369                  "artifact": "fixtures/ir/valid/membership-demo-v1.json",
1370                  "input": {
1371                    "age": "@observer.age",
1372                    "is_member": "@observer.is_member"
1373                  },
1374                  "export": {
1375                    "bitmask": "$.bitmask",
1376                    "allow": "$.allow"
1377                  }
1378                },
1379                {
1380                  "id": "audit",
1381                  "kind": "verify_plugin",
1382                  "plugin_manifest": "examples/plugins/python_pipeline_verify/manifest.json",
1383                  "input": {
1384                    "bitmask": "@gate.bitmask",
1385                    "allow": "@gate.allow"
1386                  },
1387                  "export": {
1388                    "audit_status": "$.audit_status",
1389                    "consistent": "$.summary.consistent"
1390                  }
1391                }
1392              ],
1393              "output": {
1394                "bitmask": "@gate.bitmask",
1395                "allow": "@gate.allow",
1396                "audit_status": "@audit.audit_status",
1397                "consistent": "@audit.consistent"
1398              }
1399            }"#,
1400        )
1401        .expect("pipeline parses");
1402        let base_dir = repo_root();
1403        let input = json!({
1404            "age": 34,
1405            "member": true,
1406            "country": "US"
1407        });
1408        let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1409        assert_eq!(execution.output.get("bitmask"), Some(&json!(0)));
1410        assert_eq!(execution.output.get("allow"), Some(&json!(true)));
1411        assert_eq!(
1412            execution.output.get("audit_status"),
1413            Some(&json!("clean_pass"))
1414        );
1415        assert_eq!(execution.output.get("consistent"), Some(&json!(true)));
1416    }
1417
1418    #[test]
1419    fn runs_trace_source_plugin_pipeline() {
1420        let pipeline = PipelineDefinition::from_json_str(
1421            r#"{
1422              "pipeline_version": "1.0",
1423              "pipeline_id": "trace_source_demo",
1424              "entrypoint": "input",
1425              "stages": [
1426                {
1427                  "id": "trace_source",
1428                  "kind": "trace_source_plugin",
1429                  "plugin_manifest": "examples/plugins/python_trace_source/manifest.json",
1430                  "payload": "$.source",
1431                  "options": {
1432                    "label_column": "$.label_column"
1433                  },
1434                  "export": {
1435                    "decision_traces": "$.decision_traces"
1436                  }
1437                }
1438              ],
1439              "output": {
1440                "decision_traces": "@trace_source.decision_traces"
1441              }
1442            }"#,
1443        )
1444        .expect("pipeline parses");
1445        let base_dir = repo_root();
1446        let input = json!({
1447            "source": Path::new(env!("CARGO_MANIFEST_DIR"))
1448                .join("../../examples/getting_started/decision_traces.csv")
1449                .display()
1450                .to_string(),
1451            "label_column": "allowed"
1452        });
1453        let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1454        let rows = execution
1455            .output
1456            .get("decision_traces")
1457            .and_then(|value| value.as_array())
1458            .expect("pipeline should export decision traces");
1459        assert!(!rows.is_empty());
1460        assert!(rows[0].get("features").is_some());
1461        assert!(rows[0].get("allowed").is_some());
1462    }
1463
1464    #[test]
1465    fn composes_starter_pipeline_from_artifacts() {
1466        let base_dir = repo_root();
1467        let artifact_paths = vec![base_dir.join("fixtures/ir/valid/auth-demo-v1.json")];
1468        let plan = compose_pipeline("starter", &artifact_paths, &base_dir).expect("compose works");
1469        assert_eq!(plan.pipeline.pipeline_id, "starter");
1470        assert_eq!(plan.pipeline.stages.len(), 1);
1471        assert_eq!(plan.pipeline.stages[0].id, "auth_demo_v1");
1472        assert_eq!(
1473            plan.pipeline.stages[0].artifact.as_deref(),
1474            Some("fixtures/ir/valid/auth-demo-v1.json")
1475        );
1476        assert!(plan.pipeline.stages[0].input.contains_key("action"));
1477        assert_eq!(
1478            plan.pipeline.output.get("allow"),
1479            Some(&json!("@auth_demo_v1.allow"))
1480        );
1481    }
1482}