1use logicpearl_core::{resolve_manifest_member_path, LogicPearlError, Result};
11use logicpearl_ir::LogicPearlGateIr;
12use logicpearl_plugin::{
13 run_plugin_batch_with_policy_and_metadata, run_plugin_with_policy_and_metadata,
14 PluginExecutionPolicy, PluginExecutionResult, PluginManifest, PluginRequest, PluginResponse,
15 PluginStage,
16};
17use serde::{Deserialize, Serialize};
18use serde_json::Map;
19use serde_json::Value;
20use std::collections::{BTreeSet, HashMap};
21use std::fs;
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25pub struct PipelineDefinition {
26 pub pipeline_version: String,
27 pub pipeline_id: String,
28 pub entrypoint: String,
29 pub stages: Vec<PipelineStage>,
30 pub output: HashMap<String, Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct PipelineStage {
35 pub id: String,
36 pub kind: PipelineStageKind,
37 pub artifact: Option<String>,
38 pub plugin_manifest: Option<String>,
39 #[serde(default)]
40 pub input: HashMap<String, Value>,
41 #[serde(default)]
42 pub payload: Option<Value>,
43 #[serde(default)]
44 pub options: Option<Value>,
45 #[serde(default)]
46 pub export: HashMap<String, Value>,
47 #[serde(default)]
48 pub when: Option<Value>,
49 #[serde(default)]
50 pub foreach: Option<Value>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum PipelineStageKind {
56 Pearl,
57 ObserverPlugin,
58 TraceSourcePlugin,
59 EnricherPlugin,
60 VerifyPlugin,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64pub struct ValidatedPipeline {
65 pub pipeline_id: String,
66 pub pipeline_version: String,
67 pub entrypoint: String,
68 pub stage_count: usize,
69 pub stages: Vec<ValidatedStage>,
70 pub exports: Vec<String>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74pub struct ValidatedStage {
75 pub id: String,
76 pub kind: PipelineStageKind,
77 pub artifact: Option<String>,
78 pub plugin_manifest: Option<String>,
79 pub exports: Vec<String>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
83pub struct PipelineExecution {
84 pub schema_version: String,
85 pub engine_version: String,
86 pub artifact_id: String,
87 pub artifact_hash: String,
88 pub decision_kind: String,
89 pub pipeline_id: String,
90 pub ok: bool,
91 pub output: HashMap<String, Value>,
92 pub stages: Vec<StageExecution>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub struct StageExecution {
97 pub id: String,
98 pub kind: PipelineStageKind,
99 pub ok: bool,
100 pub skipped: bool,
101 pub exports: HashMap<String, Value>,
102 pub raw_result: Value,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106pub struct ComposePlan {
107 pub pipeline: PipelineDefinition,
108 pub notes: Vec<String>,
109}
110
111#[derive(Debug, Clone)]
112pub struct PreparedPipeline {
113 definition: PipelineDefinition,
114 stages: Vec<PreparedStage>,
115 plugin_policy: PluginExecutionPolicy,
116}
117
118#[derive(Debug, Clone)]
119struct PreparedStage {
120 stage: PipelineStage,
121 executable: PreparedStageExecutable,
122}
123
124#[derive(Debug, Clone)]
125enum PreparedStageExecutable {
126 Pearl(LogicPearlGateIr),
127 Plugin {
128 manifest: PluginManifest,
129 stage: PluginStage,
130 },
131}
132
133impl PipelineDefinition {
134 pub fn from_json_str(input: &str) -> Result<Self> {
135 let pipeline: Self = serde_json::from_str(input)?;
136 Ok(pipeline)
137 }
138
139 pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
140 let content = fs::read_to_string(path)?;
141 Self::from_json_str(&content)
142 }
143
144 pub fn validate(&self, base_dir: impl AsRef<Path>) -> Result<ValidatedPipeline> {
145 if self.pipeline_version != "1.0" {
146 return Err(LogicPearlError::message(format!(
147 "unsupported pipeline_version: {}",
148 self.pipeline_version
149 )));
150 }
151 if self.pipeline_id.trim().is_empty() {
152 return Err(LogicPearlError::message("pipeline_id must be non-empty"));
153 }
154 if self.entrypoint.trim().is_empty() {
155 return Err(LogicPearlError::message("entrypoint must be non-empty"));
156 }
157 if self.stages.is_empty() {
158 return Err(LogicPearlError::message(
159 "pipeline must define at least one stage",
160 ));
161 }
162 if self.output.is_empty() {
163 return Err(LogicPearlError::message(
164 "pipeline output must define at least one field",
165 ));
166 }
167
168 let base_dir = base_dir.as_ref();
169 let mut stage_ids = BTreeSet::new();
170 let mut visible_exports: HashMap<String, BTreeSet<String>> = HashMap::new();
171 let mut validated_stages = Vec::with_capacity(self.stages.len());
172
173 for stage in &self.stages {
174 if stage.id.trim().is_empty() {
175 return Err(LogicPearlError::message("stage id must be non-empty"));
176 }
177 if !stage_ids.insert(stage.id.clone()) {
178 return Err(LogicPearlError::message(format!(
179 "duplicate stage id: {}",
180 stage.id
181 )));
182 }
183 stage.validate(base_dir, &visible_exports)?;
184 let export_names = stage.export.keys().cloned().collect::<BTreeSet<_>>();
185 visible_exports.insert(stage.id.clone(), export_names.clone());
186 validated_stages.push(ValidatedStage {
187 id: stage.id.clone(),
188 kind: stage.kind.clone(),
189 artifact: stage
190 .artifact
191 .as_ref()
192 .map(|value| {
193 resolve_relative_path(base_dir, value)
194 .map(|path| path.display().to_string())
195 })
196 .transpose()?,
197 plugin_manifest: stage
198 .plugin_manifest
199 .as_ref()
200 .map(|value| {
201 resolve_relative_path(base_dir, value)
202 .map(|path| path.display().to_string())
203 })
204 .transpose()?,
205 exports: export_names.into_iter().collect(),
206 });
207 }
208
209 for (field, value) in &self.output {
210 if field.trim().is_empty() {
211 return Err(LogicPearlError::message(
212 "pipeline output keys must be non-empty",
213 ));
214 }
215 validate_value_reference(value, &visible_exports)?;
216 }
217
218 let exports = self.output.keys().cloned().collect();
219 Ok(ValidatedPipeline {
220 pipeline_id: self.pipeline_id.clone(),
221 pipeline_version: self.pipeline_version.clone(),
222 entrypoint: self.entrypoint.clone(),
223 stage_count: self.stages.len(),
224 stages: validated_stages,
225 exports,
226 })
227 }
228
229 pub fn inspect(&self, base_dir: impl AsRef<Path>) -> Result<ValidatedPipeline> {
230 self.validate(base_dir)
231 }
232
233 pub fn run(&self, base_dir: impl AsRef<Path>, root_input: &Value) -> Result<PipelineExecution> {
234 self.run_with_plugin_policy(base_dir, root_input, PluginExecutionPolicy::default())
235 }
236
237 pub fn run_with_plugin_policy(
238 &self,
239 base_dir: impl AsRef<Path>,
240 root_input: &Value,
241 plugin_policy: PluginExecutionPolicy,
242 ) -> Result<PipelineExecution> {
243 self.prepare_with_plugin_policy(base_dir, plugin_policy)?
244 .run(root_input)
245 }
246
247 pub fn write_pretty(&self, path: impl AsRef<Path>) -> Result<()> {
248 fs::write(path, serde_json::to_string_pretty(self)? + "\n")?;
249 Ok(())
250 }
251
252 pub fn prepare(&self, base_dir: impl AsRef<Path>) -> Result<PreparedPipeline> {
253 self.prepare_with_plugin_policy(base_dir, PluginExecutionPolicy::default())
254 }
255
256 pub fn prepare_with_plugin_policy(
257 &self,
258 base_dir: impl AsRef<Path>,
259 plugin_policy: PluginExecutionPolicy,
260 ) -> Result<PreparedPipeline> {
261 self.validate(&base_dir)?;
262 let base_dir = base_dir.as_ref();
263 let mut prepared_stages = Vec::with_capacity(self.stages.len());
264 for stage in &self.stages {
265 let executable = match stage.kind {
266 PipelineStageKind::Pearl => {
267 let artifact_path = resolve_relative_path(
268 base_dir,
269 stage.artifact.as_ref().expect("validated pearl artifact"),
270 )?;
271 PreparedStageExecutable::Pearl(LogicPearlGateIr::from_path(&artifact_path)?)
272 }
273 PipelineStageKind::ObserverPlugin
274 | PipelineStageKind::TraceSourcePlugin
275 | PipelineStageKind::EnricherPlugin
276 | PipelineStageKind::VerifyPlugin => {
277 let manifest_path = resolve_relative_path(
278 base_dir,
279 stage
280 .plugin_manifest
281 .as_ref()
282 .expect("validated plugin manifest"),
283 )?;
284 let manifest = PluginManifest::from_path(&manifest_path)?;
285 let plugin_stage = plugin_stage_for_kind(&stage.kind).ok_or_else(|| {
286 LogicPearlError::message(format!(
287 "stage {} does not map to a plugin stage",
288 stage.id
289 ))
290 })?;
291 PreparedStageExecutable::Plugin {
292 manifest,
293 stage: plugin_stage,
294 }
295 }
296 };
297 prepared_stages.push(PreparedStage {
298 stage: stage.clone(),
299 executable,
300 });
301 }
302 Ok(PreparedPipeline {
303 definition: self.clone(),
304 stages: prepared_stages,
305 plugin_policy,
306 })
307 }
308}
309
310impl PreparedPipeline {
311 pub fn run(&self, root_input: &Value) -> Result<PipelineExecution> {
312 let mut stage_exports: HashMap<String, HashMap<String, Value>> = HashMap::new();
313 let mut stages = Vec::with_capacity(self.stages.len());
314
315 for prepared_stage in &self.stages {
316 let stage = &prepared_stage.stage;
317 let should_run = match &stage.when {
318 Some(condition) => truthy(&resolve_stage_input_value(
319 condition,
320 root_input,
321 &stage_exports,
322 )?),
323 None => true,
324 };
325
326 if !should_run {
327 stages.push(StageExecution {
328 id: stage.id.clone(),
329 kind: stage.kind.clone(),
330 ok: true,
331 skipped: true,
332 exports: HashMap::new(),
333 raw_result: Value::Null,
334 });
335 stage_exports.insert(stage.id.clone(), HashMap::new());
336 continue;
337 }
338
339 let raw_result = run_prepared_stage(
340 prepared_stage,
341 root_input,
342 &stage_exports,
343 &self.plugin_policy,
344 )?;
345
346 let exports = build_stage_exports(&stage.export, &raw_result)?;
347 stage_exports.insert(stage.id.clone(), exports.clone());
348 stages.push(StageExecution {
349 id: stage.id.clone(),
350 kind: stage.kind.clone(),
351 ok: true,
352 skipped: false,
353 exports,
354 raw_result,
355 });
356 }
357
358 let mut output = HashMap::new();
359 for (key, value) in &self.definition.output {
360 output.insert(
361 key.clone(),
362 resolve_pipeline_output_value(value, root_input, &stage_exports)?,
363 );
364 }
365
366 Ok(PipelineExecution {
367 schema_version: logicpearl_runtime::PIPELINE_RESULT_SCHEMA_VERSION.to_string(),
368 engine_version: logicpearl_runtime::LOGICPEARL_ENGINE_VERSION.to_string(),
369 artifact_id: self.definition.pipeline_id.clone(),
370 artifact_hash: logicpearl_runtime::artifact_hash(&self.definition),
371 decision_kind: "pipeline".to_string(),
372 pipeline_id: self.definition.pipeline_id.clone(),
373 ok: true,
374 output,
375 stages,
376 })
377 }
378
379 pub fn run_batch(&self, root_inputs: &[Value]) -> Result<Vec<PipelineExecution>> {
380 let mut stage_exports: Vec<HashMap<String, HashMap<String, Value>>> =
381 vec![HashMap::new(); root_inputs.len()];
382 let mut case_stages: Vec<Vec<StageExecution>> = (0..root_inputs.len())
383 .map(|_| Vec::with_capacity(self.stages.len()))
384 .collect();
385
386 for prepared_stage in &self.stages {
387 let stage = &prepared_stage.stage;
388 let should_run: Vec<bool> = root_inputs
389 .iter()
390 .zip(stage_exports.iter())
391 .map(|(root_input, exports)| -> Result<bool> {
392 Ok(match &stage.when {
393 Some(condition) => {
394 truthy(&resolve_stage_input_value(condition, root_input, exports)?)
395 }
396 None => true,
397 })
398 })
399 .collect::<Result<Vec<_>>>()?;
400
401 let runnable_indexes: Vec<usize> = should_run
402 .iter()
403 .enumerate()
404 .filter_map(|(index, should)| should.then_some(index))
405 .collect();
406
407 let raw_results = run_prepared_stage_batch(
408 prepared_stage,
409 root_inputs,
410 &stage_exports,
411 &runnable_indexes,
412 &self.plugin_policy,
413 )?;
414 let mut raw_iter = raw_results.into_iter();
415
416 for index in 0..root_inputs.len() {
417 if !should_run[index] {
418 case_stages[index].push(StageExecution {
419 id: stage.id.clone(),
420 kind: stage.kind.clone(),
421 ok: true,
422 skipped: true,
423 exports: HashMap::new(),
424 raw_result: Value::Null,
425 });
426 stage_exports[index].insert(stage.id.clone(), HashMap::new());
427 continue;
428 }
429
430 let raw_result = raw_iter.next().ok_or_else(|| {
431 LogicPearlError::message(format!(
432 "prepared stage batch for {} returned fewer results than expected",
433 stage.id
434 ))
435 })?;
436 let exports = build_stage_exports(&stage.export, &raw_result)?;
437 stage_exports[index].insert(stage.id.clone(), exports.clone());
438 case_stages[index].push(StageExecution {
439 id: stage.id.clone(),
440 kind: stage.kind.clone(),
441 ok: true,
442 skipped: false,
443 exports,
444 raw_result,
445 });
446 }
447 }
448
449 root_inputs
450 .iter()
451 .enumerate()
452 .map(|(index, root_input)| {
453 let mut output = HashMap::new();
454 for (key, value) in &self.definition.output {
455 output.insert(
456 key.clone(),
457 resolve_pipeline_output_value(value, root_input, &stage_exports[index])?,
458 );
459 }
460 Ok(PipelineExecution {
461 schema_version: logicpearl_runtime::PIPELINE_RESULT_SCHEMA_VERSION.to_string(),
462 engine_version: logicpearl_runtime::LOGICPEARL_ENGINE_VERSION.to_string(),
463 artifact_id: self.definition.pipeline_id.clone(),
464 artifact_hash: logicpearl_runtime::artifact_hash(&self.definition),
465 decision_kind: "pipeline".to_string(),
466 pipeline_id: self.definition.pipeline_id.clone(),
467 ok: true,
468 output,
469 stages: case_stages[index].clone(),
470 })
471 })
472 .collect()
473 }
474}
475
476pub fn compose_pipeline(
477 pipeline_id: impl Into<String>,
478 artifact_paths: &[PathBuf],
479 base_dir: impl AsRef<Path>,
480) -> Result<ComposePlan> {
481 if artifact_paths.is_empty() {
482 return Err(LogicPearlError::message(
483 "compose requires at least one pearl artifact path",
484 ));
485 }
486
487 let pipeline_id = pipeline_id.into();
488 let base_dir = base_dir.as_ref();
489 let mut stages = Vec::with_capacity(artifact_paths.len());
490 let mut notes = Vec::new();
491
492 for (index, artifact_path) in artifact_paths.iter().enumerate() {
493 let gate = LogicPearlGateIr::from_path(artifact_path)?;
494 let stage_id = sanitize_stage_id(&gate.gate_id, index);
495 let artifact = manifest_member_path_for_base(base_dir, artifact_path)?;
496
497 let mut input = HashMap::new();
498 for feature in &gate.input_schema.features {
499 input.insert(
500 feature.id.clone(),
501 Value::String(format!("$.TODO_{}", feature.id)),
502 );
503 }
504
505 let mut export = HashMap::new();
506 export.insert(
507 "bitmask".to_string(),
508 Value::String("$.bitmask".to_string()),
509 );
510 export.insert("allow".to_string(), Value::String("$.allow".to_string()));
511
512 notes.push(format!(
513 "stage `{}` maps {} input feature(s) from placeholder root paths; replace `$.TODO_*` with real paths or `@stage.export` references",
514 stage_id,
515 gate.input_schema.features.len()
516 ));
517
518 stages.push(PipelineStage {
519 id: stage_id,
520 kind: PipelineStageKind::Pearl,
521 artifact: Some(artifact),
522 plugin_manifest: None,
523 input,
524 payload: None,
525 options: None,
526 export,
527 when: None,
528 foreach: None,
529 });
530 }
531
532 let mut output = HashMap::new();
533 let final_stage = stages
534 .last()
535 .ok_or_else(|| LogicPearlError::message("compose produced no stages"))?;
536 output.insert(
537 "bitmask".to_string(),
538 Value::String(format!("@{}.bitmask", final_stage.id)),
539 );
540 output.insert(
541 "allow".to_string(),
542 Value::String(format!("@{}.allow", final_stage.id)),
543 );
544
545 Ok(ComposePlan {
546 pipeline: PipelineDefinition {
547 pipeline_version: "1.0".to_string(),
548 pipeline_id,
549 entrypoint: "input".to_string(),
550 stages,
551 output,
552 },
553 notes,
554 })
555}
556
557impl PipelineStage {
558 fn validate(
559 &self,
560 base_dir: &Path,
561 visible_exports: &HashMap<String, BTreeSet<String>>,
562 ) -> Result<()> {
563 match self.kind {
564 PipelineStageKind::Pearl => {
565 let artifact = self.artifact.as_ref().ok_or_else(|| {
566 LogicPearlError::message(format!(
567 "stage {} of kind pearl requires artifact",
568 self.id
569 ))
570 })?;
571 if self.plugin_manifest.is_some() {
572 return Err(LogicPearlError::message(format!(
573 "stage {} of kind pearl must not set plugin_manifest",
574 self.id
575 )));
576 }
577 let artifact_path = resolve_relative_path(base_dir, artifact)?;
578 if !artifact_path.exists() {
579 return Err(LogicPearlError::message(format!(
580 "stage {} artifact does not exist: {}",
581 self.id,
582 artifact_path.display()
583 )));
584 }
585 LogicPearlGateIr::from_path(&artifact_path)?;
586 }
587 PipelineStageKind::ObserverPlugin
588 | PipelineStageKind::TraceSourcePlugin
589 | PipelineStageKind::EnricherPlugin
590 | PipelineStageKind::VerifyPlugin => {
591 let manifest = self.plugin_manifest.as_ref().ok_or_else(|| {
592 LogicPearlError::message(format!(
593 "stage {} of kind {:?} requires plugin_manifest",
594 self.id, self.kind
595 ))
596 })?;
597 if self.artifact.is_some() {
598 return Err(LogicPearlError::message(format!(
599 "stage {} plugin stage must not set artifact",
600 self.id
601 )));
602 }
603 let manifest_path = resolve_relative_path(base_dir, manifest)?;
604 if !manifest_path.exists() {
605 return Err(LogicPearlError::message(format!(
606 "stage {} plugin manifest does not exist: {}",
607 self.id,
608 manifest_path.display()
609 )));
610 }
611 let manifest = PluginManifest::from_path(&manifest_path)?;
612 let expected_stage = plugin_stage_for_kind(&self.kind).ok_or_else(|| {
613 LogicPearlError::message(format!(
614 "stage {} does not map to a plugin stage",
615 self.id
616 ))
617 })?;
618 if manifest.stage != expected_stage {
619 return Err(LogicPearlError::message(format!(
620 "stage {} expects plugin stage {:?}, found {:?}",
621 self.id, expected_stage, manifest.stage
622 )));
623 }
624 }
625 }
626
627 for (field, value) in &self.input {
628 if field.trim().is_empty() {
629 return Err(LogicPearlError::message(format!(
630 "stage {} input keys must be non-empty",
631 self.id
632 )));
633 }
634 validate_value_reference(value, visible_exports)?;
635 }
636 if let Some(value) = &self.payload {
637 validate_value_reference(value, visible_exports)?;
638 }
639 if let Some(value) = &self.options {
640 validate_value_reference(value, visible_exports)?;
641 }
642 for (field, value) in &self.export {
643 if field.trim().is_empty() {
644 return Err(LogicPearlError::message(format!(
645 "stage {} export keys must be non-empty",
646 self.id
647 )));
648 }
649 validate_value_reference(value, visible_exports)?;
650 }
651 if let Some(value) = &self.when {
652 validate_value_reference(value, visible_exports)?;
653 }
654 if let Some(value) = &self.foreach {
655 validate_value_reference(value, visible_exports)?;
656 }
657 Ok(())
658 }
659}
660
661fn resolve_relative_path(base_dir: &Path, value: &str) -> Result<PathBuf> {
662 resolve_manifest_member_path(base_dir, value)
663}
664
665fn plugin_stage_for_kind(kind: &PipelineStageKind) -> Option<PluginStage> {
666 match kind {
667 PipelineStageKind::ObserverPlugin => Some(PluginStage::Observer),
668 PipelineStageKind::TraceSourcePlugin => Some(PluginStage::TraceSource),
669 PipelineStageKind::EnricherPlugin => Some(PluginStage::Enricher),
670 PipelineStageKind::VerifyPlugin => Some(PluginStage::Verify),
671 PipelineStageKind::Pearl => None,
672 }
673}
674
675fn run_prepared_stage(
676 prepared_stage: &PreparedStage,
677 root_input: &Value,
678 stage_exports: &HashMap<String, HashMap<String, Value>>,
679 plugin_policy: &PluginExecutionPolicy,
680) -> Result<Value> {
681 let stage = &prepared_stage.stage;
682 match &prepared_stage.executable {
683 PreparedStageExecutable::Pearl(gate) => {
684 let features = build_stage_input_object(stage, root_input, stage_exports)?;
685 serde_json::to_value(logicpearl_runtime::evaluate_gate_with_explanation(
686 gate, &features,
687 )?)
688 .map_err(Into::into)
689 }
690 PreparedStageExecutable::Plugin {
691 manifest,
692 stage: plugin_stage,
693 } => {
694 let payload = logicpearl_plugin::build_canonical_payload(
695 plugin_stage,
696 build_stage_payload_value(stage, root_input, stage_exports)?,
697 build_stage_options_value(stage, root_input, stage_exports)?,
698 );
699 let execution = run_plugin_with_policy_and_metadata(
700 manifest,
701 &PluginRequest {
702 protocol_version: "1".to_string(),
703 stage: plugin_stage.clone(),
704 payload,
705 },
706 plugin_policy,
707 )?;
708 plugin_execution_to_value(execution)
709 }
710 }
711}
712
713fn run_prepared_stage_batch(
714 prepared_stage: &PreparedStage,
715 root_inputs: &[Value],
716 stage_exports: &[HashMap<String, HashMap<String, Value>>],
717 runnable_indexes: &[usize],
718 plugin_policy: &PluginExecutionPolicy,
719) -> Result<Vec<Value>> {
720 if runnable_indexes.is_empty() {
721 return Ok(Vec::new());
722 }
723
724 let stage = &prepared_stage.stage;
725 match &prepared_stage.executable {
726 PreparedStageExecutable::Pearl(gate) => runnable_indexes
727 .iter()
728 .map(|index| {
729 let features =
730 build_stage_input_object(stage, &root_inputs[*index], &stage_exports[*index])?;
731 serde_json::to_value(logicpearl_runtime::evaluate_gate_with_explanation(
732 gate, &features,
733 )?)
734 .map_err(Into::into)
735 })
736 .collect(),
737 PreparedStageExecutable::Plugin {
738 manifest,
739 stage: plugin_stage,
740 } => {
741 let payloads: Vec<Value> = runnable_indexes
742 .iter()
743 .map(|index| {
744 Ok(logicpearl_plugin::build_canonical_payload(
745 plugin_stage,
746 build_stage_payload_value(
747 stage,
748 &root_inputs[*index],
749 &stage_exports[*index],
750 )?,
751 build_stage_options_value(
752 stage,
753 &root_inputs[*index],
754 &stage_exports[*index],
755 )?,
756 ))
757 })
758 .collect::<Result<Vec<_>>>()?;
759 let execution = run_plugin_batch_with_policy_and_metadata(
760 manifest,
761 plugin_stage.clone(),
762 &payloads,
763 plugin_policy,
764 )?;
765 if execution.runs.len() != execution.responses.len() {
766 return Err(LogicPearlError::message(format!(
767 "plugin {} returned {} execution records for {} responses",
768 manifest.name,
769 execution.runs.len(),
770 execution.responses.len()
771 )));
772 }
773 execution
774 .responses
775 .into_iter()
776 .zip(execution.runs)
777 .map(|(response, run)| plugin_response_with_run_to_value(response, &run))
778 .collect()
779 }
780 }
781}
782
783fn plugin_execution_to_value(execution: PluginExecutionResult) -> Result<Value> {
784 plugin_response_with_run_to_value(execution.response, &execution.run)
785}
786
787fn plugin_response_with_run_to_value(
788 response: PluginResponse,
789 run: &logicpearl_plugin::PluginRunMetadata,
790) -> Result<Value> {
791 let mut map = Map::new();
792 map.insert("ok".to_string(), Value::Bool(response.ok));
793 if !response.warnings.is_empty() {
794 map.insert(
795 "warnings".to_string(),
796 Value::Array(response.warnings.into_iter().map(Value::String).collect()),
797 );
798 }
799 if let Some(error) = response.error {
800 map.insert(
801 "error".to_string(),
802 serde_json::to_value(error).map_err(LogicPearlError::from)?,
803 );
804 }
805 for (key, value) in response.extra {
806 map.insert(key, value);
807 }
808 map.insert(
809 "plugin_run".to_string(),
810 serde_json::to_value(run).map_err(LogicPearlError::from)?,
811 );
812 Ok(Value::Object(map))
813}
814
815fn validate_value_reference(
816 value: &Value,
817 visible_exports: &HashMap<String, BTreeSet<String>>,
818) -> Result<()> {
819 match value {
820 Value::String(reference) => validate_reference(reference, visible_exports),
821 Value::Array(items) => {
822 for item in items {
823 validate_value_reference(item, visible_exports)?;
824 }
825 Ok(())
826 }
827 Value::Object(map) => {
828 for item in map.values() {
829 validate_value_reference(item, visible_exports)?;
830 }
831 Ok(())
832 }
833 _ => Ok(()),
834 }
835}
836
837fn validate_reference(
838 reference: &str,
839 visible_exports: &HashMap<String, BTreeSet<String>>,
840) -> Result<()> {
841 if reference.starts_with("$.") {
842 if reference.len() < 3 {
843 return Err(LogicPearlError::message(format!(
844 "invalid root reference: {reference}"
845 )));
846 }
847 return Ok(());
848 }
849 if let Some(rest) = reference.strip_prefix('@') {
850 let mut parts = rest.split('.');
851 let stage_id = parts.next().ok_or_else(|| {
852 LogicPearlError::message(format!("invalid stage reference: {reference}"))
853 })?;
854 let export_name = parts.next().ok_or_else(|| {
855 LogicPearlError::message(format!("invalid stage reference: {reference}"))
856 })?;
857 if parts.next().is_some() {
858 return Err(LogicPearlError::message(format!(
859 "invalid stage reference: {reference}"
860 )));
861 }
862 let exports = visible_exports.get(stage_id).ok_or_else(|| {
863 LogicPearlError::message(format!(
864 "reference uses unknown or future stage {stage_id}: {reference}"
865 ))
866 })?;
867 if !exports.contains(export_name) {
868 return Err(LogicPearlError::message(format!(
869 "reference uses unknown export {export_name} from stage {stage_id}"
870 )));
871 }
872 return Ok(());
873 }
874 Ok(())
875}
876
877fn build_stage_input_object(
878 stage: &PipelineStage,
879 root_input: &Value,
880 stage_exports: &HashMap<String, HashMap<String, Value>>,
881) -> Result<HashMap<String, Value>> {
882 let payload = build_stage_payload_value(stage, root_input, stage_exports)?;
883 let object = payload.as_object().ok_or_else(|| {
884 LogicPearlError::message(format!(
885 "stage {} expected an object payload for pearl input",
886 stage.id
887 ))
888 })?;
889 let mut resolved = HashMap::new();
890 for (key, value) in object {
891 resolved.insert(key.clone(), value.clone());
892 }
893 Ok(resolved)
894}
895
896fn build_stage_payload_value(
897 stage: &PipelineStage,
898 root_input: &Value,
899 stage_exports: &HashMap<String, HashMap<String, Value>>,
900) -> Result<Value> {
901 match &stage.payload {
902 Some(payload) => resolve_stage_input_value(payload, root_input, stage_exports),
903 None => Ok(Value::Object(Map::from_iter(
904 stage
905 .input
906 .iter()
907 .map(|(key, value)| {
908 Ok((
909 key.clone(),
910 resolve_stage_input_value(value, root_input, stage_exports)?,
911 ))
912 })
913 .collect::<Result<Vec<_>>>()?,
914 ))),
915 }
916}
917
918fn build_stage_options_value(
919 stage: &PipelineStage,
920 root_input: &Value,
921 stage_exports: &HashMap<String, HashMap<String, Value>>,
922) -> Result<Option<Value>> {
923 stage
924 .options
925 .as_ref()
926 .map(|value| resolve_stage_input_value(value, root_input, stage_exports))
927 .transpose()
928}
929
930fn build_stage_exports(
931 export_map: &HashMap<String, Value>,
932 raw_result: &Value,
933) -> Result<HashMap<String, Value>> {
934 let mut resolved = HashMap::new();
935 for (key, value) in export_map {
936 resolved.insert(key.clone(), resolve_stage_output_value(value, raw_result)?);
937 }
938 Ok(resolved)
939}
940
941fn resolve_stage_input_value(
942 value: &Value,
943 root_input: &Value,
944 stage_exports: &HashMap<String, HashMap<String, Value>>,
945) -> Result<Value> {
946 resolve_value(value, root_input, None, stage_exports)
947}
948
949fn resolve_stage_output_value(value: &Value, stage_result: &Value) -> Result<Value> {
950 resolve_value(value, stage_result, Some(stage_result), &HashMap::new())
951}
952
953fn resolve_pipeline_output_value(
954 value: &Value,
955 root_input: &Value,
956 stage_exports: &HashMap<String, HashMap<String, Value>>,
957) -> Result<Value> {
958 resolve_value(value, root_input, None, stage_exports)
959}
960
961fn resolve_value(
962 value: &Value,
963 dollar_scope: &Value,
964 local_scope: Option<&Value>,
965 stage_exports: &HashMap<String, HashMap<String, Value>>,
966) -> Result<Value> {
967 match value {
968 Value::String(reference) if reference.starts_with("$.") => {
969 lookup_json_path(local_scope.unwrap_or(dollar_scope), reference)
970 }
971 Value::String(reference) if reference.starts_with('@') => {
972 let mut parts = reference[1..].split('.');
973 let stage_id = parts.next().ok_or_else(|| {
974 LogicPearlError::message(format!("invalid stage reference: {reference}"))
975 })?;
976 let export_name = parts.next().ok_or_else(|| {
977 LogicPearlError::message(format!("invalid stage reference: {reference}"))
978 })?;
979 if parts.next().is_some() {
980 return Err(LogicPearlError::message(format!(
981 "invalid stage reference: {reference}"
982 )));
983 }
984 let exports = stage_exports.get(stage_id).ok_or_else(|| {
985 LogicPearlError::message(format!("unknown stage reference: {reference}"))
986 })?;
987 exports.get(export_name).cloned().ok_or_else(|| {
988 LogicPearlError::message(format!("unknown export reference: {reference}"))
989 })
990 }
991 Value::Array(items) => {
992 let mut resolved = Vec::with_capacity(items.len());
993 for item in items {
994 resolved.push(resolve_value(
995 item,
996 dollar_scope,
997 local_scope,
998 stage_exports,
999 )?);
1000 }
1001 Ok(Value::Array(resolved))
1002 }
1003 Value::Object(map) => {
1004 let mut resolved = Map::new();
1005 for (key, item) in map {
1006 resolved.insert(
1007 key.clone(),
1008 resolve_value(item, dollar_scope, local_scope, stage_exports)?,
1009 );
1010 }
1011 Ok(Value::Object(resolved))
1012 }
1013 _ => Ok(value.clone()),
1014 }
1015}
1016
1017fn lookup_json_path(scope: &Value, reference: &str) -> Result<Value> {
1018 let mut current = scope;
1019 for segment in reference.trim_start_matches("$.").split('.') {
1020 if segment.is_empty() {
1021 continue;
1022 }
1023 current = current
1024 .as_object()
1025 .and_then(|object| object.get(segment))
1026 .ok_or_else(|| LogicPearlError::message(format!("path not found: {reference}")))?;
1027 }
1028 Ok(current.clone())
1029}
1030
1031fn truthy(value: &Value) -> bool {
1032 match value {
1033 Value::Null => false,
1034 Value::Bool(flag) => *flag,
1035 Value::Number(number) => {
1036 if let Some(int) = number.as_i64() {
1037 int != 0
1038 } else if let Some(float) = number.as_f64() {
1039 float != 0.0
1040 } else {
1041 false
1042 }
1043 }
1044 Value::String(text) => !text.is_empty(),
1045 Value::Array(items) => !items.is_empty(),
1046 Value::Object(map) => !map.is_empty(),
1047 }
1048}
1049
1050fn sanitize_stage_id(value: &str, index: usize) -> String {
1051 let mut out = String::new();
1052 for ch in value.chars() {
1053 if ch.is_ascii_alphanumeric() {
1054 out.push(ch.to_ascii_lowercase());
1055 } else {
1056 out.push('_');
1057 }
1058 }
1059 let out = out.trim_matches('_').to_string();
1060 if out.is_empty() {
1061 format!("stage_{}", index + 1)
1062 } else {
1063 out
1064 }
1065}
1066
1067fn manifest_member_path_for_base(base_dir: &Path, path: &Path) -> Result<String> {
1068 let relative = if let Ok(relative) = path.strip_prefix(base_dir) {
1069 relative.to_path_buf()
1070 } else {
1071 let canonical_base = fs::canonicalize(base_dir).map_err(|error| {
1072 LogicPearlError::message(format!(
1073 "failed to canonicalize pipeline bundle directory {}: {error}",
1074 base_dir.display()
1075 ))
1076 })?;
1077 let canonical_path = fs::canonicalize(path).map_err(|error| {
1078 LogicPearlError::message(format!(
1079 "failed to canonicalize pipeline artifact {}: {error}",
1080 path.display()
1081 ))
1082 })?;
1083 canonical_path
1084 .strip_prefix(&canonical_base)
1085 .map(Path::to_path_buf)
1086 .map_err(|_| {
1087 LogicPearlError::message(format!(
1088 "compose artifact must be inside the pipeline bundle directory: {}",
1089 path.display()
1090 ))
1091 })?
1092 };
1093
1094 let rendered = relative.display().to_string();
1095 resolve_manifest_member_path(base_dir, &rendered)?;
1096 Ok(rendered)
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use super::{compose_pipeline, PipelineDefinition, PipelineStageKind};
1102 use serde_json::json;
1103 use std::path::{Path, PathBuf};
1104
1105 fn repo_root() -> PathBuf {
1106 Path::new(env!("CARGO_MANIFEST_DIR"))
1107 .parent()
1108 .and_then(|path| path.parent())
1109 .expect("crate should live under workspace/crates/logicpearl-pipeline")
1110 .to_path_buf()
1111 }
1112
1113 #[test]
1114 fn validates_basic_pipeline() {
1115 let pipeline = PipelineDefinition::from_json_str(
1116 r#"{
1117 "pipeline_version": "1.0",
1118 "pipeline_id": "demo",
1119 "entrypoint": "input",
1120 "stages": [
1121 {
1122 "id": "authz",
1123 "kind": "pearl",
1124 "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1125 "input": {
1126 "member_age": "$.member.age"
1127 },
1128 "export": {
1129 "bitmask": "$.bitmask"
1130 }
1131 }
1132 ],
1133 "output": {
1134 "bitmask": "@authz.bitmask"
1135 }
1136 }"#,
1137 )
1138 .expect("pipeline parses");
1139 let base_dir = repo_root();
1140 let validated = pipeline.validate(base_dir).expect("pipeline validates");
1141 assert_eq!(validated.pipeline_id, "demo");
1142 assert_eq!(validated.stage_count, 1);
1143 assert_eq!(validated.stages[0].kind, PipelineStageKind::Pearl);
1144 }
1145
1146 #[test]
1147 fn rejects_pipeline_stage_paths_that_escape_base_dir() {
1148 let pipeline = PipelineDefinition::from_json_str(
1149 r#"{
1150 "pipeline_version": "1.0",
1151 "pipeline_id": "demo",
1152 "entrypoint": "input",
1153 "stages": [
1154 {
1155 "id": "authz",
1156 "kind": "pearl",
1157 "artifact": "../fixtures/ir/valid/auth-demo-v1.json",
1158 "input": {
1159 "member_age": "$.member.age"
1160 },
1161 "export": {
1162 "bitmask": "$.bitmask"
1163 }
1164 }
1165 ],
1166 "output": {
1167 "bitmask": "@authz.bitmask"
1168 }
1169 }"#,
1170 )
1171 .expect("pipeline parses");
1172 let err = pipeline
1173 .validate(repo_root())
1174 .expect_err("escaping stage paths should fail");
1175 assert!(err.to_string().contains("escapes bundle directory"));
1176 }
1177
1178 #[test]
1179 fn rejects_absolute_pipeline_stage_paths() {
1180 let pipeline = PipelineDefinition::from_json_str(
1181 r#"{
1182 "pipeline_version": "1.0",
1183 "pipeline_id": "demo",
1184 "entrypoint": "input",
1185 "stages": [
1186 {
1187 "id": "authz",
1188 "kind": "pearl",
1189 "artifact": "/tmp/auth-demo-v1.json",
1190 "input": {
1191 "member_age": "$.member.age"
1192 },
1193 "export": {
1194 "bitmask": "$.bitmask"
1195 }
1196 }
1197 ],
1198 "output": {
1199 "bitmask": "@authz.bitmask"
1200 }
1201 }"#,
1202 )
1203 .expect("pipeline parses");
1204 let err = pipeline
1205 .validate(repo_root())
1206 .expect_err("absolute stage paths should fail");
1207 assert!(err.to_string().contains("must be relative"));
1208 }
1209
1210 #[test]
1211 fn rejects_future_stage_reference() {
1212 let pipeline = PipelineDefinition::from_json_str(
1213 r#"{
1214 "pipeline_version": "1.0",
1215 "pipeline_id": "demo",
1216 "entrypoint": "input",
1217 "stages": [
1218 {
1219 "id": "authz",
1220 "kind": "pearl",
1221 "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1222 "input": {
1223 "member_age": "@later.bitmask"
1224 },
1225 "export": {
1226 "bitmask": "$.bitmask"
1227 }
1228 }
1229 ],
1230 "output": {
1231 "bitmask": "@authz.bitmask"
1232 }
1233 }"#,
1234 )
1235 .expect("pipeline parses");
1236 let base_dir = repo_root();
1237 let err = pipeline
1238 .validate(base_dir)
1239 .expect_err("validation should fail");
1240 assert!(err.to_string().contains("unknown or future stage"));
1241 }
1242
1243 #[test]
1244 fn runs_basic_pearl_pipeline() {
1245 let pipeline = PipelineDefinition::from_json_str(
1246 r#"{
1247 "pipeline_version": "1.0",
1248 "pipeline_id": "demo",
1249 "entrypoint": "input",
1250 "stages": [
1251 {
1252 "id": "authz",
1253 "kind": "pearl",
1254 "artifact": "fixtures/ir/valid/auth-demo-v1.json",
1255 "input": {
1256 "action": "$.request.action",
1257 "resource_archived": "$.request.resource_archived",
1258 "user_role": "$.user.role",
1259 "failed_attempts": "$.user.failed_attempts"
1260 },
1261 "export": {
1262 "bitmask": "$.bitmask",
1263 "allow": "$.allow"
1264 }
1265 }
1266 ],
1267 "output": {
1268 "bitmask": "@authz.bitmask",
1269 "allow": "@authz.allow"
1270 }
1271 }"#,
1272 )
1273 .expect("pipeline parses");
1274 let base_dir = repo_root();
1275 let input = json!({
1276 "request": {
1277 "action": "delete",
1278 "resource_archived": true
1279 },
1280 "user": {
1281 "role": "viewer",
1282 "failed_attempts": 99
1283 }
1284 });
1285 let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1286 assert_eq!(execution.output.get("bitmask"), Some(&json!(7)));
1287 assert_eq!(execution.output.get("allow"), Some(&json!(false)));
1288 }
1289
1290 #[test]
1291 fn runs_observer_then_pearl_pipeline() {
1292 let pipeline = PipelineDefinition::from_json_str(
1293 r#"{
1294 "pipeline_version": "1.0",
1295 "pipeline_id": "observer_demo",
1296 "entrypoint": "input",
1297 "stages": [
1298 {
1299 "id": "observer",
1300 "kind": "observer_plugin",
1301 "plugin_manifest": "examples/plugins/python_observer/manifest.json",
1302 "input": {
1303 "age": "$.age",
1304 "member": "$.member",
1305 "country": "$.country"
1306 },
1307 "export": {
1308 "age": "$.features.age",
1309 "is_member": "$.features.is_member"
1310 }
1311 },
1312 {
1313 "id": "gate",
1314 "kind": "pearl",
1315 "artifact": "fixtures/ir/valid/membership-demo-v1.json",
1316 "input": {
1317 "age": "@observer.age",
1318 "is_member": "@observer.is_member"
1319 },
1320 "export": {
1321 "bitmask": "$.bitmask",
1322 "allow": "$.allow"
1323 }
1324 }
1325 ],
1326 "output": {
1327 "bitmask": "@gate.bitmask",
1328 "allow": "@gate.allow"
1329 }
1330 }"#,
1331 )
1332 .expect("pipeline parses");
1333 let base_dir = repo_root();
1334 let input = json!({
1335 "age": 34,
1336 "member": true,
1337 "country": "US"
1338 });
1339 let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1340 assert_eq!(execution.output.get("bitmask"), Some(&json!(0)));
1341 assert_eq!(execution.output.get("allow"), Some(&json!(true)));
1342 }
1343
1344 #[test]
1345 fn runs_observer_pearl_verify_pipeline() {
1346 let pipeline = PipelineDefinition::from_json_str(
1347 r#"{
1348 "pipeline_version": "1.0",
1349 "pipeline_id": "observer_verify_demo",
1350 "entrypoint": "input",
1351 "stages": [
1352 {
1353 "id": "observer",
1354 "kind": "observer_plugin",
1355 "plugin_manifest": "examples/plugins/python_observer/manifest.json",
1356 "input": {
1357 "age": "$.age",
1358 "member": "$.member",
1359 "country": "$.country"
1360 },
1361 "export": {
1362 "age": "$.features.age",
1363 "is_member": "$.features.is_member"
1364 }
1365 },
1366 {
1367 "id": "gate",
1368 "kind": "pearl",
1369 "artifact": "fixtures/ir/valid/membership-demo-v1.json",
1370 "input": {
1371 "age": "@observer.age",
1372 "is_member": "@observer.is_member"
1373 },
1374 "export": {
1375 "bitmask": "$.bitmask",
1376 "allow": "$.allow"
1377 }
1378 },
1379 {
1380 "id": "audit",
1381 "kind": "verify_plugin",
1382 "plugin_manifest": "examples/plugins/python_pipeline_verify/manifest.json",
1383 "input": {
1384 "bitmask": "@gate.bitmask",
1385 "allow": "@gate.allow"
1386 },
1387 "export": {
1388 "audit_status": "$.audit_status",
1389 "consistent": "$.summary.consistent"
1390 }
1391 }
1392 ],
1393 "output": {
1394 "bitmask": "@gate.bitmask",
1395 "allow": "@gate.allow",
1396 "audit_status": "@audit.audit_status",
1397 "consistent": "@audit.consistent"
1398 }
1399 }"#,
1400 )
1401 .expect("pipeline parses");
1402 let base_dir = repo_root();
1403 let input = json!({
1404 "age": 34,
1405 "member": true,
1406 "country": "US"
1407 });
1408 let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1409 assert_eq!(execution.output.get("bitmask"), Some(&json!(0)));
1410 assert_eq!(execution.output.get("allow"), Some(&json!(true)));
1411 assert_eq!(
1412 execution.output.get("audit_status"),
1413 Some(&json!("clean_pass"))
1414 );
1415 assert_eq!(execution.output.get("consistent"), Some(&json!(true)));
1416 }
1417
1418 #[test]
1419 fn runs_trace_source_plugin_pipeline() {
1420 let pipeline = PipelineDefinition::from_json_str(
1421 r#"{
1422 "pipeline_version": "1.0",
1423 "pipeline_id": "trace_source_demo",
1424 "entrypoint": "input",
1425 "stages": [
1426 {
1427 "id": "trace_source",
1428 "kind": "trace_source_plugin",
1429 "plugin_manifest": "examples/plugins/python_trace_source/manifest.json",
1430 "payload": "$.source",
1431 "options": {
1432 "label_column": "$.label_column"
1433 },
1434 "export": {
1435 "decision_traces": "$.decision_traces"
1436 }
1437 }
1438 ],
1439 "output": {
1440 "decision_traces": "@trace_source.decision_traces"
1441 }
1442 }"#,
1443 )
1444 .expect("pipeline parses");
1445 let base_dir = repo_root();
1446 let input = json!({
1447 "source": Path::new(env!("CARGO_MANIFEST_DIR"))
1448 .join("../../examples/getting_started/decision_traces.csv")
1449 .display()
1450 .to_string(),
1451 "label_column": "allowed"
1452 });
1453 let execution = pipeline.run(base_dir, &input).expect("pipeline runs");
1454 let rows = execution
1455 .output
1456 .get("decision_traces")
1457 .and_then(|value| value.as_array())
1458 .expect("pipeline should export decision traces");
1459 assert!(!rows.is_empty());
1460 assert!(rows[0].get("features").is_some());
1461 assert!(rows[0].get("allowed").is_some());
1462 }
1463
1464 #[test]
1465 fn composes_starter_pipeline_from_artifacts() {
1466 let base_dir = repo_root();
1467 let artifact_paths = vec![base_dir.join("fixtures/ir/valid/auth-demo-v1.json")];
1468 let plan = compose_pipeline("starter", &artifact_paths, &base_dir).expect("compose works");
1469 assert_eq!(plan.pipeline.pipeline_id, "starter");
1470 assert_eq!(plan.pipeline.stages.len(), 1);
1471 assert_eq!(plan.pipeline.stages[0].id, "auth_demo_v1");
1472 assert_eq!(
1473 plan.pipeline.stages[0].artifact.as_deref(),
1474 Some("fixtures/ir/valid/auth-demo-v1.json")
1475 );
1476 assert!(plan.pipeline.stages[0].input.contains_key("action"));
1477 assert_eq!(
1478 plan.pipeline.output.get("allow"),
1479 Some(&json!("@auth_demo_v1.allow"))
1480 );
1481 }
1482}