1use logicpearl_core::{LogicPearlError, Result};
11use serde::{Deserialize, Serialize};
12use serde_json::{Map, Value};
13use sha2::{Digest, Sha256};
14use std::io::{Read, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, ExitStatus, Stdio};
17use std::thread;
18use std::time::{Duration, Instant};
19use time::{format_description::well_known::Rfc3339, OffsetDateTime};
20
21const MAX_PLUGIN_STDOUT_BYTES: usize = 64 * 1024 * 1024;
22const MAX_PLUGIN_STDERR_BYTES: usize = 8 * 1024 * 1024;
23pub const DEFAULT_PLUGIN_TIMEOUT_MS: u64 = 30_000;
24const PLUGIN_SPAWN_TEXT_BUSY_RETRIES: usize = 5;
25const PLUGIN_SPAWN_TEXT_BUSY_BACKOFF_MS: u64 = 20;
26const SUPPORTED_SCHEMA_VALIDATION_KEYWORDS: &[&str] = &[
27 "additionalProperties",
28 "const",
29 "enum",
30 "items",
31 "properties",
32 "required",
33 "type",
34];
35const SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS: &[&str] = &["$id", "$schema", "description", "title"];
36
37#[cfg(unix)]
38use libc::{getpgid, getpgrp, kill, SIGKILL, SIGTERM};
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum PluginStage {
44 Observer,
45 TraceSource,
46 Enricher,
47 Verify,
48 Render,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct PluginManifest {
54 pub name: String,
55 #[serde(default)]
56 pub plugin_id: Option<String>,
57 #[serde(default)]
58 pub plugin_version: Option<String>,
59 pub protocol_version: String,
60 pub stage: PluginStage,
61 pub entrypoint: Vec<String>,
62 pub language: Option<String>,
63 pub capabilities: Option<Vec<String>>,
64 pub timeout_ms: Option<u64>,
65 #[serde(default)]
66 pub input_schema: Option<Value>,
67 #[serde(default)]
68 pub options_schema: Option<Value>,
69 #[serde(default)]
70 pub output_schema: Option<Value>,
71 #[serde(skip)]
72 pub manifest_dir: Option<PathBuf>,
73 #[serde(skip)]
74 pub manifest_path: Option<PathBuf>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct PluginExecutionPolicy {
80 pub default_timeout_ms: u64,
81 pub allow_no_timeout: bool,
82 pub allow_absolute_entrypoint: bool,
83 pub allow_path_lookup: bool,
84}
85
86impl Default for PluginExecutionPolicy {
87 fn default() -> Self {
88 Self {
89 default_timeout_ms: DEFAULT_PLUGIN_TIMEOUT_MS,
90 allow_no_timeout: false,
91 allow_absolute_entrypoint: false,
92 allow_path_lookup: false,
93 }
94 }
95}
96
97impl PluginExecutionPolicy {
98 #[must_use]
99 pub fn trusted_local() -> Self {
100 Self {
101 allow_no_timeout: true,
102 allow_absolute_entrypoint: true,
103 allow_path_lookup: true,
104 ..Self::default()
105 }
106 }
107
108 #[must_use]
109 pub fn with_default_timeout_ms(mut self, timeout_ms: u64) -> Self {
110 self.default_timeout_ms = timeout_ms;
111 self
112 }
113
114 #[must_use]
115 pub fn with_allow_no_timeout(mut self, allow: bool) -> Self {
116 self.allow_no_timeout = allow;
117 self
118 }
119
120 #[must_use]
121 pub fn with_allow_absolute_entrypoint(mut self, allow: bool) -> Self {
122 self.allow_absolute_entrypoint = allow;
123 self
124 }
125
126 #[must_use]
127 pub fn with_allow_path_lookup(mut self, allow: bool) -> Self {
128 self.allow_path_lookup = allow;
129 self
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PluginRequest {
135 pub protocol_version: String,
136 pub stage: PluginStage,
137 pub payload: Value,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PluginBatchRequest {
142 pub protocol_version: String,
143 pub stage: PluginStage,
144 pub payloads: Vec<Value>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct PluginErrorPayload {
149 pub code: String,
150 pub message: String,
151 #[serde(default)]
152 pub details: Value,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct PluginResponse {
157 pub ok: bool,
158 #[serde(default)]
159 pub warnings: Vec<String>,
160 #[serde(default)]
161 pub error: Option<PluginErrorPayload>,
162 #[serde(flatten)]
163 pub extra: serde_json::Map<String, Value>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PluginBatchResponse {
168 pub ok: bool,
169 #[serde(default)]
170 pub warnings: Vec<String>,
171 #[serde(default)]
172 pub error: Option<PluginErrorPayload>,
173 #[serde(default)]
174 pub responses: Vec<PluginResponse>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct PluginExecutionResult {
179 pub response: PluginResponse,
180 pub run: PluginRunMetadata,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PluginBatchExecutionResult {
185 pub responses: Vec<PluginResponse>,
186 #[serde(default)]
187 pub runs: Vec<PluginRunMetadata>,
188 pub run: PluginRunMetadata,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PluginRunMetadata {
193 pub schema_version: String,
194 pub plugin_run_id: String,
195 pub plugin_id: String,
196 #[serde(default, skip_serializing_if = "Option::is_none")]
197 pub plugin_version: Option<String>,
198 pub plugin_name: String,
199 pub stage: PluginStage,
200 pub protocol_version: String,
201 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub manifest_path: Option<String>,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub manifest_hash: Option<String>,
205 pub entrypoint_hash: String,
206 pub entrypoint: PluginEntrypointMetadata,
207 pub request_hash: String,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub input_hash: Option<String>,
210 pub output_hash: String,
211 pub timeout_policy: PluginTimeoutPolicyMetadata,
212 pub execution_policy: PluginExecutionPolicyMetadata,
213 pub capabilities: PluginCapabilityMetadata,
214 pub access: PluginAccessMetadata,
215 pub stdio: PluginStdioMetadata,
216 pub started_at: String,
217 pub completed_at: String,
218 pub duration_ms: u64,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PluginEntrypointMetadata {
223 pub declared: Vec<String>,
224 pub resolved: Vec<String>,
225 #[serde(default)]
226 pub hashes: Vec<PluginEntrypointSegmentHash>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct PluginEntrypointSegmentHash {
231 pub index: usize,
232 pub path: String,
233 pub hash: String,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct PluginTimeoutPolicyMetadata {
238 pub manifest_timeout_ms: Option<u64>,
239 pub default_timeout_ms: u64,
240 pub effective_timeout_ms: Option<u64>,
241 pub allow_no_timeout: bool,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct PluginExecutionPolicyMetadata {
246 pub allow_absolute_entrypoint: bool,
247 pub allow_path_lookup: bool,
248 pub allow_no_timeout: bool,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct PluginCapabilityMetadata {
253 #[serde(default)]
254 pub declared: Vec<String>,
255 #[serde(default)]
256 pub allowed: Vec<String>,
257 #[serde(default)]
258 pub enforced: Vec<String>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct PluginAccessMetadata {
263 pub network: String,
264 pub filesystem: String,
265 pub enforcement: String,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct PluginStdioMetadata {
270 pub stdout_hash: String,
271 pub stdout_bytes: usize,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub stdout_summary: Option<String>,
274 pub stderr_hash: String,
275 pub stderr_bytes: usize,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub stderr_summary: Option<String>,
278}
279
280pub fn build_canonical_payload(
282 _stage: &PluginStage,
283 input: Value,
284 options: Option<Value>,
285) -> Value {
286 let mut payload = Map::new();
287 payload.insert("input".to_string(), input);
288
289 if let Some(options) = options {
290 payload.insert("options".to_string(), options);
291 }
292
293 Value::Object(payload)
294}
295
296impl PluginManifest {
297 pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
298 let path = path.as_ref();
299 let content = std::fs::read_to_string(path)?;
300 let mut manifest: Self = serde_json::from_str(&content)?;
301 manifest.manifest_dir = path.parent().map(Path::to_path_buf);
302 manifest.manifest_path = Some(path.to_path_buf());
303 manifest.validate()?;
304 Ok(manifest)
305 }
306
307 pub fn validate(&self) -> Result<()> {
308 if self.name.trim().is_empty() {
309 return Err(LogicPearlError::message(
310 "plugin manifest name must be non-empty",
311 ));
312 }
313 if self
314 .plugin_id
315 .as_ref()
316 .is_some_and(|value| value.trim().is_empty())
317 {
318 return Err(LogicPearlError::message(
319 "plugin manifest plugin_id must be non-empty when present",
320 ));
321 }
322 if self
323 .plugin_version
324 .as_ref()
325 .is_some_and(|value| value.trim().is_empty())
326 {
327 return Err(LogicPearlError::message(
328 "plugin manifest plugin_version must be non-empty when present",
329 ));
330 }
331 if self.protocol_version != "1" {
332 return Err(LogicPearlError::message(format!(
333 "unsupported plugin protocol_version: {}",
334 self.protocol_version
335 )));
336 }
337 if self.entrypoint.is_empty() {
338 return Err(LogicPearlError::message(
339 "plugin manifest entrypoint must contain at least one command segment",
340 ));
341 }
342 validate_declared_schema("input_schema", self.input_schema.as_ref())?;
343 validate_declared_schema("options_schema", self.options_schema.as_ref())?;
344 validate_declared_schema("output_schema", self.output_schema.as_ref())?;
345 Ok(())
346 }
347
348 pub fn supports_capability(&self, capability: &str) -> bool {
349 self.capabilities
350 .as_ref()
351 .map(|caps| caps.iter().any(|item| item == capability))
352 .unwrap_or(false)
353 }
354}
355
356pub fn run_plugin(manifest: &PluginManifest, request: &PluginRequest) -> Result<PluginResponse> {
358 run_plugin_with_policy(manifest, request, &PluginExecutionPolicy::default())
359}
360
361pub fn run_plugin_with_policy(
363 manifest: &PluginManifest,
364 request: &PluginRequest,
365 policy: &PluginExecutionPolicy,
366) -> Result<PluginResponse> {
367 Ok(run_plugin_with_policy_and_metadata(manifest, request, policy)?.response)
368}
369
370pub fn run_plugin_with_policy_and_metadata(
372 manifest: &PluginManifest,
373 request: &PluginRequest,
374 policy: &PluginExecutionPolicy,
375) -> Result<PluginExecutionResult> {
376 if manifest.stage != request.stage {
377 return Err(LogicPearlError::message(format!(
378 "plugin stage mismatch: manifest is {:?}, request is {:?}",
379 manifest.stage, request.stage
380 )));
381 }
382 validate_plugin_request_contract(manifest, request)?;
383
384 let raw = run_plugin_raw(manifest, request, policy)?;
385 let response = parse_plugin_response(manifest, &raw.stdout)?;
386 Ok(PluginExecutionResult {
387 response,
388 run: raw.metadata,
389 })
390}
391
392pub fn run_plugin_batch(
394 manifest: &PluginManifest,
395 stage: PluginStage,
396 payloads: &[Value],
397) -> Result<Vec<PluginResponse>> {
398 run_plugin_batch_with_policy(manifest, stage, payloads, &PluginExecutionPolicy::default())
399}
400
401pub fn run_plugin_batch_with_policy(
403 manifest: &PluginManifest,
404 stage: PluginStage,
405 payloads: &[Value],
406 policy: &PluginExecutionPolicy,
407) -> Result<Vec<PluginResponse>> {
408 if payloads.is_empty() {
409 return Ok(Vec::new());
410 }
411 Ok(run_plugin_batch_with_policy_and_metadata(manifest, stage, payloads, policy)?.responses)
412}
413
414pub fn run_plugin_batch_with_policy_and_metadata(
416 manifest: &PluginManifest,
417 stage: PluginStage,
418 payloads: &[Value],
419 policy: &PluginExecutionPolicy,
420) -> Result<PluginBatchExecutionResult> {
421 if manifest.stage != stage {
422 return Err(LogicPearlError::message(format!(
423 "plugin stage mismatch: manifest is {:?}, request is {:?}",
424 manifest.stage, stage
425 )));
426 }
427 if payloads.is_empty() {
428 return Ok(PluginBatchExecutionResult {
429 responses: Vec::new(),
430 runs: Vec::new(),
431 run: empty_plugin_batch_metadata(manifest, &stage, policy)?,
432 });
433 }
434 if !manifest.supports_capability("batch_requests") {
435 let mut responses = Vec::with_capacity(payloads.len());
436 let mut runs = Vec::with_capacity(payloads.len());
437 let mut last_run = None;
438 for payload in payloads {
439 validate_plugin_payload_contract(manifest, &stage, payload)?;
440 let execution = run_plugin_with_policy_and_metadata(
441 manifest,
442 &PluginRequest {
443 protocol_version: "1".to_string(),
444 stage: stage.clone(),
445 payload: payload.clone(),
446 },
447 policy,
448 )?;
449 let run = execution.run;
450 last_run = Some(run.clone());
451 runs.push(run);
452 responses.push(execution.response);
453 }
454 return Ok(PluginBatchExecutionResult {
455 responses,
456 runs,
457 run: last_run.unwrap_or(empty_plugin_batch_metadata(manifest, &stage, policy)?),
458 });
459 }
460 for payload in payloads {
461 validate_plugin_payload_contract(manifest, &stage, payload)?;
462 }
463
464 let raw = run_plugin_raw(
465 manifest,
466 &PluginBatchRequest {
467 protocol_version: "1".to_string(),
468 stage: stage.clone(),
469 payloads: payloads.to_vec(),
470 },
471 policy,
472 )?;
473 let batch: PluginBatchResponse = serde_json::from_str(&raw.stdout).map_err(|err| {
474 LogicPearlError::message(format!(
475 "plugin {} returned invalid batch JSON: {}",
476 manifest.name, err
477 ))
478 })?;
479 if !batch.ok {
480 if let Some(error) = &batch.error {
481 return Err(LogicPearlError::message(format!(
482 "plugin {} failed [{}]: {}",
483 manifest.name, error.code, error.message
484 )));
485 }
486 return Err(LogicPearlError::message(format!(
487 "plugin {} returned ok=false without structured batch error",
488 manifest.name
489 )));
490 }
491 if batch.responses.len() != payloads.len() {
492 return Err(LogicPearlError::message(format!(
493 "plugin {} returned {} batch responses for {} payloads",
494 manifest.name,
495 batch.responses.len(),
496 payloads.len()
497 )));
498 }
499 for response in &batch.responses {
500 validate_ok_plugin_response(manifest, response)?;
501 }
502 let runs = vec![raw.metadata.clone(); batch.responses.len()];
503 Ok(PluginBatchExecutionResult {
504 responses: batch.responses,
505 runs,
506 run: raw.metadata,
507 })
508}
509
510fn empty_plugin_batch_metadata(
511 manifest: &PluginManifest,
512 stage: &PluginStage,
513 policy: &PluginExecutionPolicy,
514) -> Result<PluginRunMetadata> {
515 let request = PluginBatchRequest {
516 protocol_version: "1".to_string(),
517 stage: stage.clone(),
518 payloads: Vec::new(),
519 };
520 let request_value = serde_json::to_value(&request).map_err(LogicPearlError::from)?;
521 build_plugin_run_metadata(PluginRunMetadataInputs {
522 manifest,
523 policy,
524 resolved_entrypoint: &resolve_entrypoint(manifest, policy)?,
525 request_value: &request_value,
526 stdout: Vec::new(),
527 stderr: Vec::new(),
528 effective_timeout_ms: effective_timeout_ms(manifest, policy)?,
529 started_at: now_utc_rfc3339(),
530 completed_at: now_utc_rfc3339(),
531 duration_ms: 0,
532 })
533}
534
535struct RawPluginRun {
536 stdout: String,
537 metadata: PluginRunMetadata,
538}
539
540fn run_plugin_raw<T: Serialize>(
541 manifest: &PluginManifest,
542 request: &T,
543 policy: &PluginExecutionPolicy,
544) -> Result<RawPluginRun> {
545 let entrypoint = resolve_entrypoint(manifest, policy)?;
546 let mut command = Command::new(&entrypoint.program);
547 command.args(&entrypoint.args);
548 #[cfg(unix)]
549 {
550 use std::os::unix::process::CommandExt;
551 command.process_group(0);
552 }
553 command
554 .stdin(Stdio::piped())
555 .stdout(Stdio::piped())
556 .stderr(Stdio::piped());
557
558 let request_value = serde_json::to_value(request).map_err(LogicPearlError::from)?;
559 let timeout_ms = effective_timeout_ms(manifest, policy)?;
560 let started_at = now_utc_rfc3339();
561 let started = Instant::now();
562 let mut child = spawn_plugin_process(&mut command)?;
563 let stdout_handle = spawn_pipe_reader(
564 child
565 .stdout
566 .take()
567 .ok_or_else(|| LogicPearlError::message("failed to open plugin stdout"))?,
568 MAX_PLUGIN_STDOUT_BYTES,
569 );
570 let stderr_handle = spawn_pipe_reader(
571 child
572 .stderr
573 .take()
574 .ok_or_else(|| LogicPearlError::message("failed to open plugin stderr"))?,
575 MAX_PLUGIN_STDERR_BYTES,
576 );
577
578 let mut stdin = child
579 .stdin
580 .take()
581 .ok_or_else(|| LogicPearlError::message("failed to open plugin stdin"))?;
582 let payload = serde_json::to_vec(request)?;
583 stdin.write_all(&payload)?;
584 stdin.write_all(b"\n")?;
585 drop(stdin);
586
587 let (status, timed_out) = wait_for_plugin_exit(timeout_ms, &mut child)?;
588 let duration_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
589 let completed_at = now_utc_rfc3339();
590 let stdout = join_pipe_reader(manifest, "stdout", stdout_handle)?;
591 let stderr = join_pipe_reader(manifest, "stderr", stderr_handle)?;
592 let stderr_display = String::from_utf8_lossy(&stderr).trim().to_string();
593
594 if timed_out {
595 let timeout_display = timeout_ms.unwrap_or_default();
596 return Err(LogicPearlError::message(format!(
597 "plugin {} exceeded timeout_ms={} and was terminated{}",
598 manifest.name,
599 timeout_display,
600 if stderr_display.is_empty() {
601 String::new()
602 } else {
603 format!(": {stderr_display}")
604 }
605 )));
606 }
607
608 if !status.success() {
609 return Err(LogicPearlError::message(format!(
610 "plugin {} exited with status {}{}",
611 manifest.name,
612 status,
613 if stderr_display.is_empty() {
614 String::new()
615 } else {
616 format!(": {stderr_display}")
617 }
618 )));
619 }
620
621 let metadata = build_plugin_run_metadata(PluginRunMetadataInputs {
622 manifest,
623 policy,
624 resolved_entrypoint: &entrypoint,
625 request_value: &request_value,
626 stdout: stdout.clone(),
627 stderr,
628 effective_timeout_ms: timeout_ms,
629 started_at,
630 completed_at,
631 duration_ms,
632 })?;
633 let stdout = String::from_utf8(stdout).map_err(|err| {
634 LogicPearlError::message(format!(
635 "plugin {} returned invalid UTF-8: {}",
636 manifest.name, err
637 ))
638 })?;
639 Ok(RawPluginRun { stdout, metadata })
640}
641
642struct PluginRunMetadataInputs<'a> {
643 manifest: &'a PluginManifest,
644 policy: &'a PluginExecutionPolicy,
645 resolved_entrypoint: &'a ResolvedPluginEntrypoint,
646 request_value: &'a Value,
647 stdout: Vec<u8>,
648 stderr: Vec<u8>,
649 effective_timeout_ms: Option<u64>,
650 started_at: String,
651 completed_at: String,
652 duration_ms: u64,
653}
654
655fn build_plugin_run_metadata(inputs: PluginRunMetadataInputs<'_>) -> Result<PluginRunMetadata> {
656 let manifest = inputs.manifest;
657 let policy = inputs.policy;
658 let entrypoint = build_entrypoint_metadata(manifest, inputs.resolved_entrypoint);
659 let entrypoint_hash = hash_serializable(&entrypoint)?;
660 let request_hash = hash_serializable(inputs.request_value)?;
661 let input_hash = inputs
662 .request_value
663 .get("payload")
664 .and_then(|payload| payload.get("input"))
665 .map(hash_serializable)
666 .transpose()?;
667 let output_hash = sha256_prefixed(&inputs.stdout);
668 let manifest_hash = manifest
669 .manifest_path
670 .as_ref()
671 .and_then(|path| sha256_prefixed_file(path).ok());
672 let plugin_id = manifest
673 .plugin_id
674 .clone()
675 .unwrap_or_else(|| manifest.name.clone());
676 let protocol_version = inputs
677 .request_value
678 .get("protocol_version")
679 .and_then(Value::as_str)
680 .unwrap_or(&manifest.protocol_version)
681 .to_string();
682 let declared_capabilities = manifest.capabilities.clone().unwrap_or_default();
683 let enforced_capabilities = enforced_capabilities_for_request(manifest, inputs.request_value);
684 let stdout_hash = output_hash.clone();
685 let stderr_hash = sha256_prefixed(&inputs.stderr);
686 let mut metadata = PluginRunMetadata {
687 schema_version: "logicpearl.plugin_run_provenance.v1".to_string(),
688 plugin_run_id: String::new(),
689 plugin_id,
690 plugin_version: manifest.plugin_version.clone(),
691 plugin_name: manifest.name.clone(),
692 stage: manifest.stage.clone(),
693 protocol_version,
694 manifest_path: manifest
695 .manifest_path
696 .as_ref()
697 .map(|path| path.display().to_string()),
698 manifest_hash,
699 entrypoint_hash,
700 entrypoint,
701 request_hash,
702 input_hash,
703 output_hash,
704 timeout_policy: PluginTimeoutPolicyMetadata {
705 manifest_timeout_ms: manifest.timeout_ms,
706 default_timeout_ms: policy.default_timeout_ms,
707 effective_timeout_ms: inputs.effective_timeout_ms,
708 allow_no_timeout: policy.allow_no_timeout,
709 },
710 execution_policy: PluginExecutionPolicyMetadata {
711 allow_absolute_entrypoint: policy.allow_absolute_entrypoint,
712 allow_path_lookup: policy.allow_path_lookup,
713 allow_no_timeout: policy.allow_no_timeout,
714 },
715 capabilities: PluginCapabilityMetadata {
716 declared: declared_capabilities.clone(),
717 allowed: declared_capabilities,
718 enforced: enforced_capabilities,
719 },
720 access: PluginAccessMetadata {
721 network: "not_enforced".to_string(),
722 filesystem: "process_default".to_string(),
723 enforcement: "none".to_string(),
724 },
725 stdio: PluginStdioMetadata {
726 stdout_hash,
727 stdout_bytes: inputs.stdout.len(),
728 stdout_summary: (!inputs.stdout.is_empty())
729 .then(|| redacted_hash_summary(&inputs.stdout)),
730 stderr_hash,
731 stderr_bytes: inputs.stderr.len(),
732 stderr_summary: (!inputs.stderr.is_empty())
733 .then(|| redacted_hash_summary(&inputs.stderr)),
734 },
735 started_at: inputs.started_at,
736 completed_at: inputs.completed_at,
737 duration_ms: inputs.duration_ms,
738 };
739 metadata.plugin_run_id = build_plugin_run_id(&metadata)?;
740 Ok(metadata)
741}
742
743fn build_entrypoint_metadata(
744 manifest: &PluginManifest,
745 resolved_entrypoint: &ResolvedPluginEntrypoint,
746) -> PluginEntrypointMetadata {
747 let resolved = resolved_entrypoint.segments();
748 let hashes = resolved
749 .iter()
750 .enumerate()
751 .filter_map(|(index, segment)| {
752 let path = Path::new(segment);
753 if !path.is_file() {
754 return None;
755 }
756 sha256_prefixed_file(path)
757 .ok()
758 .map(|hash| PluginEntrypointSegmentHash {
759 index,
760 path: segment.clone(),
761 hash,
762 })
763 })
764 .collect();
765
766 PluginEntrypointMetadata {
767 declared: manifest.entrypoint.clone(),
768 resolved,
769 hashes,
770 }
771}
772
773fn enforced_capabilities_for_request(
774 manifest: &PluginManifest,
775 request_value: &Value,
776) -> Vec<String> {
777 if request_value.get("payloads").is_some() && manifest.supports_capability("batch_requests") {
778 vec!["batch_requests".to_string()]
779 } else {
780 Vec::new()
781 }
782}
783
784fn build_plugin_run_id(metadata: &PluginRunMetadata) -> Result<String> {
785 hash_serializable(&serde_json::json!({
786 "schema_version": metadata.schema_version,
787 "plugin_id": metadata.plugin_id,
788 "plugin_version": metadata.plugin_version,
789 "plugin_name": metadata.plugin_name,
790 "stage": metadata.stage,
791 "protocol_version": metadata.protocol_version,
792 "manifest_hash": metadata.manifest_hash,
793 "entrypoint_hash": metadata.entrypoint_hash,
794 "request_hash": metadata.request_hash,
795 "output_hash": metadata.output_hash,
796 "started_at": metadata.started_at,
797 "completed_at": metadata.completed_at,
798 }))
799}
800
801fn hash_serializable<T: Serialize>(value: &T) -> Result<String> {
802 serde_json::to_vec(value)
803 .map(|bytes| sha256_prefixed(&bytes))
804 .map_err(LogicPearlError::from)
805}
806
807fn sha256_prefixed_file(path: &Path) -> std::io::Result<String> {
808 std::fs::read(path).map(|bytes| sha256_prefixed(&bytes))
809}
810
811fn sha256_prefixed(bytes: &[u8]) -> String {
812 let mut digest = Sha256::new();
813 digest.update(bytes);
814 format!("sha256:{}", hex::encode(digest.finalize()))
815}
816
817fn redacted_hash_summary(bytes: &[u8]) -> String {
818 format!("<redacted:{}>", sha256_prefixed(bytes))
819}
820
821fn now_utc_rfc3339() -> String {
822 OffsetDateTime::now_utc()
823 .format(&Rfc3339)
824 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
825}
826
827pub fn manifest_contract_summary(manifest: &PluginManifest) -> Value {
829 serde_json::json!({
830 "input_schema": manifest.input_schema,
831 "options_schema": manifest.options_schema,
832 "output_schema": manifest.output_schema,
833 })
834}
835
836#[derive(Debug, Clone, PartialEq, Eq)]
837struct ResolvedPluginEntrypoint {
838 program: String,
839 args: Vec<String>,
840}
841
842impl ResolvedPluginEntrypoint {
843 fn segments(&self) -> Vec<String> {
844 std::iter::once(self.program.clone())
845 .chain(self.args.iter().cloned())
846 .collect()
847 }
848}
849
850fn resolve_entrypoint(
851 manifest: &PluginManifest,
852 policy: &PluginExecutionPolicy,
853) -> Result<ResolvedPluginEntrypoint> {
854 let program = manifest
855 .entrypoint
856 .first()
857 .ok_or_else(|| LogicPearlError::message("plugin entrypoint is empty"))?;
858 if program.trim().is_empty() {
859 return Err(LogicPearlError::message(
860 "plugin entrypoint program must be non-empty",
861 ));
862 }
863
864 let resolved_program = resolve_entrypoint_program(manifest, policy, program)?;
865 let args = manifest
866 .entrypoint
867 .iter()
868 .skip(1)
869 .map(|segment| resolve_entrypoint_arg(manifest, policy, segment))
870 .collect::<Result<Vec<_>>>()?;
871
872 Ok(ResolvedPluginEntrypoint {
873 program: resolved_program,
874 args,
875 })
876}
877
878fn resolve_entrypoint_program(
879 manifest: &PluginManifest,
880 policy: &PluginExecutionPolicy,
881 program: &str,
882) -> Result<String> {
883 let program_path = Path::new(program);
884 if program_path.is_absolute() {
885 if !policy.allow_absolute_entrypoint {
886 return Err(LogicPearlError::message(format!(
887 "plugin {} entrypoint uses absolute program path {}; rerun with an execution policy that allows absolute entrypoints only for trusted manifests",
888 manifest.name, program
889 )));
890 }
891 return Ok(program.to_string());
892 }
893
894 if let Some(manifest_relative) = manifest_relative_existing_path(manifest, program) {
895 return Ok(manifest_relative);
896 }
897
898 if has_path_separator(program) {
899 return Err(LogicPearlError::message(format!(
900 "plugin {} entrypoint path was not found relative to the manifest: {}",
901 manifest.name, program
902 )));
903 }
904
905 if policy.allow_path_lookup {
906 return Ok(program.to_string());
907 }
908
909 if is_allowed_manifest_script_interpreter(program) && has_manifest_local_script_arg(manifest) {
910 return Ok(program.to_string());
911 }
912
913 Err(LogicPearlError::message(format!(
914 "plugin {} entrypoint program {program:?} is not manifest-relative and PATH lookup is disabled; use a manifest-relative script path or enable PATH lookup only for trusted manifests",
915 manifest.name
916 )))
917}
918
919fn resolve_entrypoint_arg(
920 manifest: &PluginManifest,
921 policy: &PluginExecutionPolicy,
922 segment: &str,
923) -> Result<String> {
924 if segment.trim().is_empty() {
925 return Err(LogicPearlError::message(format!(
926 "plugin {} entrypoint arguments must be non-empty",
927 manifest.name
928 )));
929 }
930
931 let path = Path::new(segment);
932 if path.is_absolute() && !policy.allow_absolute_entrypoint {
933 return Err(LogicPearlError::message(format!(
934 "plugin {} entrypoint argument uses absolute path {}; enable absolute entrypoints only for trusted manifests",
935 manifest.name, segment
936 )));
937 }
938 if path.is_absolute() {
939 return Ok(segment.to_string());
940 }
941
942 if let Some(manifest_relative) = manifest_relative_existing_path(manifest, segment) {
943 return Ok(manifest_relative);
944 }
945
946 if has_path_separator(segment) {
947 return Err(LogicPearlError::message(format!(
948 "plugin {} entrypoint argument path was not found relative to the manifest: {}",
949 manifest.name, segment
950 )));
951 }
952
953 Ok(segment.to_string())
954}
955
956fn manifest_relative_existing_path(manifest: &PluginManifest, segment: &str) -> Option<String> {
957 manifest.manifest_dir.as_ref().and_then(|dir| {
958 let candidate = dir.join(segment);
959 candidate.exists().then(|| candidate.display().to_string())
960 })
961}
962
963fn has_path_separator(segment: &str) -> bool {
964 segment.contains('/') || segment.contains('\\')
965}
966
967fn has_manifest_local_script_arg(manifest: &PluginManifest) -> bool {
968 manifest
969 .entrypoint
970 .iter()
971 .skip(1)
972 .any(|segment| manifest_relative_existing_path(manifest, segment).is_some())
973}
974
975fn is_allowed_manifest_script_interpreter(program: &str) -> bool {
976 let normalized = Path::new(program)
977 .file_name()
978 .and_then(|name| name.to_str())
979 .unwrap_or(program)
980 .trim_end_matches(".exe")
981 .to_ascii_lowercase();
982 matches!(
983 normalized.as_str(),
984 "bash" | "bun" | "deno" | "node" | "perl" | "php" | "python" | "python3" | "ruby" | "sh"
985 )
986}
987
988fn effective_timeout_ms(
989 manifest: &PluginManifest,
990 policy: &PluginExecutionPolicy,
991) -> Result<Option<u64>> {
992 match manifest.timeout_ms {
993 Some(0) if policy.allow_no_timeout => Ok(None),
994 Some(0) => Err(LogicPearlError::message(format!(
995 "plugin {} declares timeout_ms=0, which disables the plugin timeout; enable no-timeout execution only for trusted manifests",
996 manifest.name
997 ))),
998 Some(timeout_ms) => Ok(Some(timeout_ms)),
999 None if policy.default_timeout_ms == 0 && policy.allow_no_timeout => Ok(None),
1000 None if policy.default_timeout_ms == 0 => Err(LogicPearlError::message(format!(
1001 "plugin {} has no timeout and the execution policy default is disabled; enable no-timeout execution only for trusted manifests",
1002 manifest.name
1003 ))),
1004 None => Ok(Some(policy.default_timeout_ms)),
1005 }
1006}
1007
1008fn spawn_plugin_process(command: &mut Command) -> std::io::Result<Child> {
1009 for attempt in 0..=PLUGIN_SPAWN_TEXT_BUSY_RETRIES {
1010 match command.spawn() {
1011 Ok(child) => return Ok(child),
1012 Err(error)
1013 if is_executable_file_busy(&error) && attempt < PLUGIN_SPAWN_TEXT_BUSY_RETRIES =>
1014 {
1015 let backoff =
1016 PLUGIN_SPAWN_TEXT_BUSY_BACKOFF_MS * u64::try_from(attempt + 1).unwrap_or(1);
1017 thread::sleep(Duration::from_millis(backoff));
1018 }
1019 Err(error) => return Err(error),
1020 }
1021 }
1022
1023 unreachable!("spawn loop returns after the final attempt")
1024}
1025
1026#[cfg(unix)]
1027fn is_executable_file_busy(error: &std::io::Error) -> bool {
1028 error.raw_os_error() == Some(libc::ETXTBSY)
1029}
1030
1031#[cfg(not(unix))]
1032fn is_executable_file_busy(_error: &std::io::Error) -> bool {
1033 false
1034}
1035
1036fn spawn_pipe_reader<R: Read + Send + 'static>(
1037 reader: R,
1038 max_bytes: usize,
1039) -> thread::JoinHandle<std::io::Result<Vec<u8>>> {
1040 spawn_limited_pipe_reader(reader, max_bytes)
1041}
1042
1043fn spawn_limited_pipe_reader<R: Read + Send + 'static>(
1044 mut reader: R,
1045 max_bytes: usize,
1046) -> thread::JoinHandle<std::io::Result<Vec<u8>>> {
1047 thread::spawn(move || read_limited(&mut reader, max_bytes))
1048}
1049
1050fn read_limited(reader: &mut impl Read, max_bytes: usize) -> std::io::Result<Vec<u8>> {
1051 let mut buffer = Vec::new();
1052 let mut chunk = [0_u8; 8192];
1053 let mut exceeded_limit = false;
1054 loop {
1055 let read = reader.read(&mut chunk)?;
1056 if read == 0 {
1057 if exceeded_limit {
1058 return Err(std::io::Error::new(
1059 std::io::ErrorKind::InvalidData,
1060 format!("plugin output exceeded {max_bytes} bytes"),
1061 ));
1062 }
1063 return Ok(buffer);
1064 }
1065 if exceeded_limit {
1066 continue;
1067 }
1068 let remaining = max_bytes.saturating_sub(buffer.len());
1069 if read > remaining {
1070 buffer.extend_from_slice(&chunk[..remaining]);
1071 exceeded_limit = true;
1072 } else {
1073 buffer.extend_from_slice(&chunk[..read]);
1074 }
1075 }
1076}
1077
1078fn join_pipe_reader(
1079 manifest: &PluginManifest,
1080 stream: &str,
1081 handle: thread::JoinHandle<std::io::Result<Vec<u8>>>,
1082) -> Result<Vec<u8>> {
1083 let result = handle.join().map_err(|_| {
1084 LogicPearlError::message(format!(
1085 "plugin {} {} reader thread panicked",
1086 manifest.name, stream
1087 ))
1088 })?;
1089 result.map_err(|err| {
1090 LogicPearlError::message(format!(
1091 "failed to read plugin {} {}: {}",
1092 manifest.name, stream, err
1093 ))
1094 })
1095}
1096
1097fn wait_for_plugin_exit(timeout_ms: Option<u64>, child: &mut Child) -> Result<(ExitStatus, bool)> {
1098 const MAX_POLL_INTERVAL: Duration = Duration::from_millis(200);
1099
1100 if let Some(timeout_ms) = timeout_ms {
1101 let timeout = Duration::from_millis(timeout_ms);
1102 let started_at = Instant::now();
1103 let mut poll_interval = Duration::from_millis(10);
1104 loop {
1105 if let Some(status) = child.try_wait()? {
1106 return Ok((status, false));
1107 }
1108 if started_at.elapsed() >= timeout {
1109 terminate_plugin_process(child);
1110 let status = child.wait()?;
1111 return Ok((status, true));
1112 }
1113 thread::sleep(poll_interval);
1114 poll_interval = (poll_interval * 2).min(MAX_POLL_INTERVAL);
1115 }
1116 }
1117
1118 Ok((child.wait()?, false))
1119}
1120
1121#[cfg(unix)]
1122fn terminate_plugin_process(child: &mut Child) {
1123 let child_process_group = safe_child_process_group(child);
1124 if let Some(process_group) = child_process_group {
1125 signal_process_group(process_group, SIGTERM);
1126 } else {
1127 let _ = child.kill();
1128 }
1129 thread::sleep(Duration::from_millis(50));
1130 if child.try_wait().ok().flatten().is_none() {
1131 if let Some(process_group) = child_process_group {
1132 signal_process_group(process_group, SIGKILL);
1133 }
1134 let _ = child.kill();
1135 }
1136}
1137
1138#[cfg(unix)]
1139fn safe_child_process_group(child: &Child) -> Option<i32> {
1140 let pid = i32::try_from(child.id()).ok()?;
1141 let process_group = unsafe { getpgid(pid) };
1143 if process_group <= 0 {
1144 return None;
1145 }
1146 let parent_process_group = unsafe { getpgrp() };
1148 if process_group == parent_process_group {
1149 return None;
1150 }
1151 Some(process_group)
1152}
1153
1154#[cfg(unix)]
1155fn signal_process_group(process_group: i32, signal: i32) {
1156 let _ = unsafe { kill(-process_group, signal) };
1158}
1159
1160#[cfg(not(unix))]
1161fn terminate_plugin_process(child: &mut Child) {
1162 let _ = child.kill();
1163}
1164
1165fn parse_plugin_response(manifest: &PluginManifest, stdout: &str) -> Result<PluginResponse> {
1166 let response: PluginResponse = serde_json::from_str(stdout).map_err(|err| {
1167 LogicPearlError::message(format!(
1168 "plugin {} returned invalid JSON: {}",
1169 manifest.name, err
1170 ))
1171 })?;
1172 validate_ok_plugin_response(manifest, &response)?;
1173 Ok(response)
1174}
1175
1176fn validate_ok_plugin_response(manifest: &PluginManifest, response: &PluginResponse) -> Result<()> {
1177 if !response.ok {
1178 if let Some(error) = &response.error {
1179 return Err(LogicPearlError::message(format!(
1180 "plugin {} failed [{}]: {}",
1181 manifest.name, error.code, error.message
1182 )));
1183 }
1184 return Err(LogicPearlError::message(format!(
1185 "plugin {} returned ok=false without structured error",
1186 manifest.name
1187 )));
1188 }
1189 if let Some(schema) = &manifest.output_schema {
1190 let response_value = serde_json::to_value(response).map_err(LogicPearlError::from)?;
1191 validate_value_against_declared_schema(
1192 "output_schema",
1193 schema,
1194 &response_value,
1195 "$response",
1196 )?;
1197 }
1198 Ok(())
1199}
1200
1201fn validate_plugin_request_contract(
1202 manifest: &PluginManifest,
1203 request: &PluginRequest,
1204) -> Result<()> {
1205 validate_plugin_payload_contract(manifest, &request.stage, &request.payload)
1206}
1207
1208fn validate_plugin_payload_contract(
1209 manifest: &PluginManifest,
1210 stage: &PluginStage,
1211 payload: &Value,
1212) -> Result<()> {
1213 if let Some(schema) = &manifest.input_schema {
1214 let input = extract_payload_input(stage, payload).ok_or_else(|| {
1215 LogicPearlError::message(format!(
1216 "plugin {} manifest declares input_schema but request payload is missing payload.input",
1217 manifest.name
1218 ))
1219 })?;
1220 validate_value_against_declared_schema("input_schema", schema, input, "$payload.input")?;
1221 }
1222 if let Some(schema) = &manifest.options_schema {
1223 let null = Value::Null;
1224 let options = extract_payload_options(payload).unwrap_or(&null);
1225 validate_value_against_declared_schema(
1226 "options_schema",
1227 schema,
1228 options,
1229 "$payload.options",
1230 )?;
1231 }
1232 Ok(())
1233}
1234
1235fn extract_payload_input<'a>(_stage: &PluginStage, payload: &'a Value) -> Option<&'a Value> {
1236 payload.as_object().and_then(|object| object.get("input"))
1237}
1238
1239fn extract_payload_options(payload: &Value) -> Option<&Value> {
1240 payload.as_object().and_then(|object| object.get("options"))
1241}
1242
1243fn validate_declared_schema(label: &str, schema: Option<&Value>) -> Result<()> {
1244 if let Some(schema) = schema {
1245 validate_schema_document(label, schema, format!("${label}"))?;
1246 }
1247 Ok(())
1248}
1249
1250fn validate_schema_document(label: &str, schema: &Value, path: String) -> Result<()> {
1251 let Some(object) = schema.as_object() else {
1252 return Err(LogicPearlError::message(format!(
1253 "{label} must be a JSON object at {path}"
1254 )));
1255 };
1256 validate_schema_keywords(label, object, &path)?;
1257 validate_schema_annotations(label, object, &path)?;
1258 if let Some(value) = object.get("type") {
1259 validate_schema_type_decl(label, value, &path)?;
1260 }
1261 if let Some(value) = object.get("properties") {
1262 let properties = value.as_object().ok_or_else(|| {
1263 LogicPearlError::message(format!(
1264 "{label} properties must be an object at {path}.properties"
1265 ))
1266 })?;
1267 for (key, child) in properties {
1268 validate_schema_document(label, child, format!("{path}.properties.{key}"))?;
1269 }
1270 }
1271 if let Some(value) = object.get("required") {
1272 let required = value.as_array().ok_or_else(|| {
1273 LogicPearlError::message(format!(
1274 "{label} required must be an array at {path}.required"
1275 ))
1276 })?;
1277 for item in required {
1278 if item
1279 .as_str()
1280 .filter(|value| !value.trim().is_empty())
1281 .is_none()
1282 {
1283 return Err(LogicPearlError::message(format!(
1284 "{label} required entries must be non-empty strings at {path}.required"
1285 )));
1286 }
1287 }
1288 }
1289 if let Some(value) = object.get("items") {
1290 validate_schema_document(label, value, format!("{path}.items"))?;
1291 }
1292 if let Some(value) = object.get("additionalProperties") {
1293 match value {
1294 Value::Bool(_) => {}
1295 Value::Object(_) => {
1296 validate_schema_document(label, value, format!("{path}.additionalProperties"))?
1297 }
1298 _ => {
1299 return Err(LogicPearlError::message(format!(
1300 "{label} additionalProperties must be a boolean or object at {path}.additionalProperties"
1301 )));
1302 }
1303 }
1304 }
1305 if let Some(value) = object.get("enum") {
1306 if value.as_array().filter(|items| !items.is_empty()).is_none() {
1307 return Err(LogicPearlError::message(format!(
1308 "{label} enum must be a non-empty array at {path}.enum"
1309 )));
1310 }
1311 }
1312 Ok(())
1313}
1314
1315fn validate_schema_keywords(label: &str, object: &Map<String, Value>, path: &str) -> Result<()> {
1316 for key in object.keys() {
1317 if is_supported_schema_keyword(key) {
1318 continue;
1319 }
1320 return Err(LogicPearlError::message(format!(
1321 "{label} uses unsupported LogicPearl schema subset keyword {key:?} at {path}; supported validation keywords are: {}; supported annotation keywords are: {}. Use a plugin smoke test or an external JSON Schema validator for full JSON Schema constraints.",
1322 SUPPORTED_SCHEMA_VALIDATION_KEYWORDS.join(", "),
1323 SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS.join(", ")
1324 )));
1325 }
1326 Ok(())
1327}
1328
1329fn is_supported_schema_keyword(key: &str) -> bool {
1330 SUPPORTED_SCHEMA_VALIDATION_KEYWORDS.contains(&key)
1331 || SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS.contains(&key)
1332}
1333
1334fn validate_schema_annotations(label: &str, object: &Map<String, Value>, path: &str) -> Result<()> {
1335 for key in SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS {
1336 if let Some(value) = object.get(*key) {
1337 if !value.is_string() {
1338 return Err(LogicPearlError::message(format!(
1339 "{label} annotation keyword {key:?} must be a string at {path}.{key}"
1340 )));
1341 }
1342 }
1343 }
1344 Ok(())
1345}
1346
1347fn validate_schema_type_decl(label: &str, value: &Value, path: &str) -> Result<()> {
1348 match value {
1349 Value::String(kind) => validate_schema_type_name(label, kind, path),
1350 Value::Array(items) if !items.is_empty() => {
1351 for item in items {
1352 let kind = item.as_str().ok_or_else(|| {
1353 LogicPearlError::message(format!(
1354 "{label} type arrays must contain only strings at {path}"
1355 ))
1356 })?;
1357 validate_schema_type_name(label, kind, path)?;
1358 }
1359 Ok(())
1360 }
1361 _ => Err(LogicPearlError::message(format!(
1362 "{label} type must be a string or non-empty array of strings at {path}"
1363 ))),
1364 }
1365}
1366
1367fn validate_schema_type_name(label: &str, kind: &str, path: &str) -> Result<()> {
1368 match kind {
1369 "null" | "boolean" | "integer" | "number" | "string" | "array" | "object" => Ok(()),
1370 _ => Err(LogicPearlError::message(format!(
1371 "{label} uses unsupported JSON Schema type {kind:?} at {path}"
1372 ))),
1373 }
1374}
1375
1376fn validate_value_against_declared_schema(
1377 label: &str,
1378 schema: &Value,
1379 value: &Value,
1380 path: &str,
1381) -> Result<()> {
1382 let object = schema
1383 .as_object()
1384 .ok_or_else(|| LogicPearlError::message(format!("{label} must be a JSON object")))?;
1385
1386 if let Some(type_decl) = object.get("type") {
1387 let matches = schema_type_names(type_decl)?
1388 .iter()
1389 .any(|kind| value_matches_type(value, kind));
1390 if !matches {
1391 return Err(LogicPearlError::message(format!(
1392 "{label} rejected {path}: expected type {}, got {}",
1393 render_schema_types(type_decl)?,
1394 describe_json_type(value)
1395 )));
1396 }
1397 }
1398
1399 if let Some(expected) = object.get("const") {
1400 if value != expected {
1401 return Err(LogicPearlError::message(format!(
1402 "{label} rejected {path}: value did not match const"
1403 )));
1404 }
1405 }
1406
1407 if let Some(choices) = object.get("enum").and_then(Value::as_array) {
1408 if !choices.iter().any(|choice| choice == value) {
1409 return Err(LogicPearlError::message(format!(
1410 "{label} rejected {path}: value was not in enum"
1411 )));
1412 }
1413 }
1414
1415 if let Some(required) = object.get("required").and_then(Value::as_array) {
1416 if let Some(map) = value.as_object() {
1417 for field in required.iter().filter_map(Value::as_str) {
1418 if !map.contains_key(field) {
1419 return Err(LogicPearlError::message(format!(
1420 "{label} rejected {path}: missing required field {field:?}"
1421 )));
1422 }
1423 }
1424 }
1425 }
1426
1427 if let Some(properties) = object.get("properties").and_then(Value::as_object) {
1428 if let Some(map) = value.as_object() {
1429 for (key, child_schema) in properties {
1430 if let Some(child_value) = map.get(key) {
1431 validate_value_against_declared_schema(
1432 label,
1433 child_schema,
1434 child_value,
1435 &format!("{path}.{key}"),
1436 )?;
1437 }
1438 }
1439
1440 match object.get("additionalProperties") {
1441 Some(Value::Bool(false)) => {
1442 for key in map.keys() {
1443 if !properties.contains_key(key) {
1444 return Err(LogicPearlError::message(format!(
1445 "{label} rejected {path}: unexpected field {key:?}"
1446 )));
1447 }
1448 }
1449 }
1450 Some(Value::Object(_)) => {
1451 let extra_schema = object
1452 .get("additionalProperties")
1453 .expect("checked additionalProperties");
1454 for (key, child_value) in map {
1455 if !properties.contains_key(key) {
1456 validate_value_against_declared_schema(
1457 label,
1458 extra_schema,
1459 child_value,
1460 &format!("{path}.{key}"),
1461 )?;
1462 }
1463 }
1464 }
1465 _ => {}
1466 }
1467 }
1468 }
1469
1470 if let Some(items_schema) = object.get("items") {
1471 if let Some(items) = value.as_array() {
1472 for (index, item) in items.iter().enumerate() {
1473 validate_value_against_declared_schema(
1474 label,
1475 items_schema,
1476 item,
1477 &format!("{path}[{index}]"),
1478 )?;
1479 }
1480 }
1481 }
1482
1483 Ok(())
1484}
1485
1486fn schema_type_names(value: &Value) -> Result<Vec<&str>> {
1487 match value {
1488 Value::String(kind) => Ok(vec![kind.as_str()]),
1489 Value::Array(items) => items
1490 .iter()
1491 .map(|item| {
1492 item.as_str().ok_or_else(|| {
1493 LogicPearlError::message("schema type arrays must contain only strings")
1494 })
1495 })
1496 .collect(),
1497 _ => Err(LogicPearlError::message(
1498 "schema type must be a string or non-empty array of strings",
1499 )),
1500 }
1501}
1502
1503fn render_schema_types(value: &Value) -> Result<String> {
1504 Ok(schema_type_names(value)?.join(" | "))
1505}
1506
1507fn value_matches_type(value: &Value, kind: &str) -> bool {
1508 match kind {
1509 "null" => value.is_null(),
1510 "boolean" => value.is_boolean(),
1511 "integer" => value.as_i64().is_some() || value.as_u64().is_some(),
1512 "number" => value.is_number(),
1513 "string" => value.is_string(),
1514 "array" => value.is_array(),
1515 "object" => value.is_object(),
1516 _ => false,
1517 }
1518}
1519
1520fn describe_json_type(value: &Value) -> &'static str {
1521 match value {
1522 Value::Null => "null",
1523 Value::Bool(_) => "boolean",
1524 Value::Number(number) => {
1525 if number.as_i64().is_some() || number.as_u64().is_some() {
1526 "integer"
1527 } else {
1528 "number"
1529 }
1530 }
1531 Value::String(_) => "string",
1532 Value::Array(_) => "array",
1533 Value::Object(_) => "object",
1534 }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use super::{
1540 run_plugin, run_plugin_with_policy, run_plugin_with_policy_and_metadata,
1541 PluginExecutionPolicy, PluginManifest, PluginRequest, PluginStage,
1542 };
1543 use serde_json::json;
1544 use tempfile::tempdir;
1545
1546 #[test]
1547 fn validates_basic_manifest() {
1548 let manifest = PluginManifest {
1549 name: "demo".to_string(),
1550 plugin_id: None,
1551 plugin_version: None,
1552 protocol_version: "1".to_string(),
1553 stage: PluginStage::Observer,
1554 entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1555 language: Some("python".to_string()),
1556 capabilities: None,
1557 timeout_ms: None,
1558 input_schema: None,
1559 options_schema: None,
1560 output_schema: None,
1561 manifest_dir: None,
1562 manifest_path: None,
1563 };
1564 assert!(manifest.validate().is_ok());
1565 }
1566
1567 #[test]
1568 fn validates_declared_input_options_and_output_schemas() {
1569 let manifest = PluginManifest {
1570 name: "demo".to_string(),
1571 plugin_id: None,
1572 plugin_version: None,
1573 protocol_version: "1".to_string(),
1574 stage: PluginStage::Observer,
1575 entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1576 language: Some("python".to_string()),
1577 capabilities: None,
1578 timeout_ms: None,
1579 input_schema: Some(json!({
1580 "type": "object",
1581 "required": ["age", "member"],
1582 "properties": {
1583 "age": { "type": "integer" },
1584 "member": { "type": "boolean" }
1585 },
1586 "additionalProperties": false
1587 })),
1588 options_schema: Some(json!({
1589 "type": ["object", "null"],
1590 "properties": {
1591 "mode": { "type": "string" }
1592 },
1593 "additionalProperties": false
1594 })),
1595 output_schema: Some(json!({
1596 "type": "object",
1597 "required": ["ok", "features"],
1598 "properties": {
1599 "ok": { "const": true },
1600 "features": {
1601 "type": "object",
1602 "required": ["age"],
1603 "properties": {
1604 "age": { "type": "integer" }
1605 }
1606 }
1607 }
1608 })),
1609 manifest_dir: None,
1610 manifest_path: None,
1611 };
1612 assert!(manifest.validate().is_ok());
1613
1614 let request = super::PluginRequest {
1615 protocol_version: "1".to_string(),
1616 stage: PluginStage::Observer,
1617 payload: super::build_canonical_payload(
1618 &PluginStage::Observer,
1619 json!({"age": 34, "member": true}),
1620 Some(json!({"mode": "strict"})),
1621 ),
1622 };
1623 assert!(super::validate_plugin_request_contract(&manifest, &request).is_ok());
1624
1625 let bad_request = super::PluginRequest {
1626 protocol_version: "1".to_string(),
1627 stage: PluginStage::Observer,
1628 payload: super::build_canonical_payload(
1629 &PluginStage::Observer,
1630 json!({"age": "34", "member": true, "extra": 1}),
1631 None,
1632 ),
1633 };
1634 assert!(super::validate_plugin_request_contract(&manifest, &bad_request).is_err());
1635
1636 let good_response = super::PluginResponse {
1637 ok: true,
1638 warnings: Vec::new(),
1639 error: None,
1640 extra: serde_json::Map::from_iter([("features".to_string(), json!({"age": 34}))]),
1641 };
1642 assert!(super::validate_ok_plugin_response(&manifest, &good_response).is_ok());
1643
1644 let bad_response = super::PluginResponse {
1645 ok: true,
1646 warnings: Vec::new(),
1647 error: None,
1648 extra: serde_json::Map::new(),
1649 };
1650 assert!(super::validate_ok_plugin_response(&manifest, &bad_response).is_err());
1651 }
1652
1653 #[test]
1654 fn rejects_unsupported_schema_subset_keywords() {
1655 let manifest = PluginManifest {
1656 name: "demo".to_string(),
1657 plugin_id: None,
1658 plugin_version: None,
1659 protocol_version: "1".to_string(),
1660 stage: PluginStage::Observer,
1661 entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1662 language: Some("python".to_string()),
1663 capabilities: None,
1664 timeout_ms: None,
1665 input_schema: Some(json!({
1666 "type": "object",
1667 "properties": {
1668 "age": {
1669 "type": "integer",
1670 "minimum": 0
1671 }
1672 }
1673 })),
1674 options_schema: None,
1675 output_schema: None,
1676 manifest_dir: None,
1677 manifest_path: None,
1678 };
1679
1680 let err = manifest.validate().unwrap_err();
1681 assert!(err
1682 .to_string()
1683 .contains("unsupported LogicPearl schema subset keyword \"minimum\""));
1684 }
1685
1686 #[test]
1687 fn accepts_schema_subset_annotation_keywords() {
1688 let manifest = PluginManifest {
1689 name: "demo".to_string(),
1690 plugin_id: None,
1691 plugin_version: None,
1692 protocol_version: "1".to_string(),
1693 stage: PluginStage::Observer,
1694 entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1695 language: Some("python".to_string()),
1696 capabilities: None,
1697 timeout_ms: None,
1698 input_schema: Some(json!({
1699 "$schema": "https://logicpearl.com/schema/plugin-contract-subset",
1700 "title": "Observer input",
1701 "description": "Annotation fields are accepted but do not add validation.",
1702 "type": "object"
1703 })),
1704 options_schema: None,
1705 output_schema: None,
1706 manifest_dir: None,
1707 manifest_path: None,
1708 };
1709
1710 assert!(manifest.validate().is_ok());
1711 }
1712
1713 #[cfg(unix)]
1714 fn write_plugin_script(script_body: &str) -> (tempfile::TempDir, std::path::PathBuf) {
1715 use std::os::unix::fs::PermissionsExt;
1716
1717 let dir = tempdir().expect("tempdir");
1718 let path = dir.path().join("plugin.sh");
1719 std::fs::write(&path, script_body).expect("write script");
1720 let mut permissions = std::fs::metadata(&path).expect("stat script").permissions();
1721 permissions.set_mode(0o755);
1722 std::fs::set_permissions(&path, permissions).expect("chmod script");
1723 (dir, path)
1724 }
1725
1726 #[cfg(unix)]
1727 fn test_request() -> PluginRequest {
1728 PluginRequest {
1729 protocol_version: "1".to_string(),
1730 stage: PluginStage::Observer,
1731 payload: super::build_canonical_payload(
1732 &PluginStage::Observer,
1733 json!({"value": 1}),
1734 None,
1735 ),
1736 }
1737 }
1738
1739 #[cfg(unix)]
1740 #[test]
1741 fn enforces_plugin_timeout_when_declared() {
1742 let (dir, _script_path) =
1743 write_plugin_script("#!/bin/sh\nsleep 1\nprintf '{\"ok\":true}\\n'\n");
1744 let manifest = PluginManifest {
1745 name: "slow".to_string(),
1746 plugin_id: None,
1747 plugin_version: None,
1748 protocol_version: "1".to_string(),
1749 stage: PluginStage::Observer,
1750 entrypoint: vec!["plugin.sh".to_string()],
1751 language: Some("shell".to_string()),
1752 capabilities: None,
1753 timeout_ms: Some(50),
1754 input_schema: None,
1755 options_schema: None,
1756 output_schema: None,
1757 manifest_dir: Some(dir.path().to_path_buf()),
1758 manifest_path: None,
1759 };
1760
1761 let error = run_plugin(&manifest, &test_request()).expect_err("plugin should time out");
1762 let message = error.to_string();
1763 assert!(message.contains("exceeded timeout_ms=50"), "{message}");
1764 }
1765
1766 #[cfg(unix)]
1767 #[test]
1768 fn applies_policy_default_timeout_when_manifest_timeout_is_unset() {
1769 let (dir, _script_path) =
1770 write_plugin_script("#!/bin/sh\nsleep 1\nprintf '{\"ok\":true}\\n'\n");
1771 let manifest = PluginManifest {
1772 name: "slow-default".to_string(),
1773 plugin_id: None,
1774 plugin_version: None,
1775 protocol_version: "1".to_string(),
1776 stage: PluginStage::Observer,
1777 entrypoint: vec!["plugin.sh".to_string()],
1778 language: Some("shell".to_string()),
1779 capabilities: None,
1780 timeout_ms: None,
1781 input_schema: None,
1782 options_schema: None,
1783 output_schema: None,
1784 manifest_dir: Some(dir.path().to_path_buf()),
1785 manifest_path: None,
1786 };
1787
1788 let policy = PluginExecutionPolicy::default().with_default_timeout_ms(50);
1789 let error = run_plugin_with_policy(&manifest, &test_request(), &policy)
1790 .expect_err("policy default timeout should apply");
1791 let message = error.to_string();
1792 assert!(message.contains("exceeded timeout_ms=50"), "{message}");
1793 }
1794
1795 #[cfg(unix)]
1796 #[test]
1797 fn rejects_no_timeout_manifest_without_policy_opt_in() {
1798 let (dir, _script_path) = write_plugin_script("#!/bin/sh\nprintf '{\"ok\":true}\\n'\n");
1799 let manifest = PluginManifest {
1800 name: "no-timeout".to_string(),
1801 plugin_id: None,
1802 plugin_version: None,
1803 protocol_version: "1".to_string(),
1804 stage: PluginStage::Observer,
1805 entrypoint: vec!["plugin.sh".to_string()],
1806 language: Some("shell".to_string()),
1807 capabilities: None,
1808 timeout_ms: Some(0),
1809 input_schema: None,
1810 options_schema: None,
1811 output_schema: None,
1812 manifest_dir: Some(dir.path().to_path_buf()),
1813 manifest_path: None,
1814 };
1815
1816 let error = run_plugin(&manifest, &test_request()).expect_err("no timeout should reject");
1817 let message = error.to_string();
1818 assert!(message.contains("timeout_ms=0"), "{message}");
1819 assert!(message.contains("disables the plugin timeout"), "{message}");
1820 }
1821
1822 #[cfg(unix)]
1823 #[test]
1824 fn allows_no_timeout_when_policy_opts_in() {
1825 let (dir, _script_path) = write_plugin_script(
1826 "#!/bin/sh\nsleep 0.1\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1827 );
1828 let manifest = PluginManifest {
1829 name: "trusted-no-timeout".to_string(),
1830 plugin_id: None,
1831 plugin_version: None,
1832 protocol_version: "1".to_string(),
1833 stage: PluginStage::Observer,
1834 entrypoint: vec!["plugin.sh".to_string()],
1835 language: Some("shell".to_string()),
1836 capabilities: None,
1837 timeout_ms: Some(0),
1838 input_schema: None,
1839 options_schema: None,
1840 output_schema: None,
1841 manifest_dir: Some(dir.path().to_path_buf()),
1842 manifest_path: None,
1843 };
1844
1845 let policy = PluginExecutionPolicy::default().with_allow_no_timeout(true);
1846 let response = run_plugin_with_policy(&manifest, &test_request(), &policy)
1847 .expect("trusted no-timeout plugin should succeed");
1848 assert!(response.ok);
1849 assert_eq!(response.extra.get("features"), Some(&json!({"value": 1})));
1850 }
1851
1852 #[cfg(unix)]
1853 #[test]
1854 fn allows_known_interpreter_for_manifest_local_script() {
1855 let (dir, _script_path) = write_plugin_script(
1856 "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1857 );
1858 let manifest = PluginManifest {
1859 name: "shell-wrapper".to_string(),
1860 plugin_id: None,
1861 plugin_version: None,
1862 protocol_version: "1".to_string(),
1863 stage: PluginStage::Observer,
1864 entrypoint: vec!["sh".to_string(), "plugin.sh".to_string()],
1865 language: Some("shell".to_string()),
1866 capabilities: None,
1867 timeout_ms: None,
1868 input_schema: None,
1869 options_schema: None,
1870 output_schema: None,
1871 manifest_dir: Some(dir.path().to_path_buf()),
1872 manifest_path: None,
1873 };
1874
1875 let response = run_plugin(&manifest, &test_request())
1876 .expect("known interpreter with manifest-local script should succeed");
1877 assert!(response.ok);
1878 }
1879
1880 #[cfg(unix)]
1881 #[test]
1882 fn returns_redacted_plugin_run_metadata() {
1883 let (dir, _script_path) = write_plugin_script(
1884 "#!/bin/sh\nprintf 'debug secret\\n' >&2\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1885 );
1886 let manifest = PluginManifest {
1887 name: "metadata".to_string(),
1888 plugin_id: Some("metadata-plugin".to_string()),
1889 plugin_version: Some("0.1.0".to_string()),
1890 protocol_version: "1".to_string(),
1891 stage: PluginStage::Observer,
1892 entrypoint: vec!["plugin.sh".to_string()],
1893 language: Some("shell".to_string()),
1894 capabilities: Some(vec!["feature_output".to_string()]),
1895 timeout_ms: None,
1896 input_schema: None,
1897 options_schema: None,
1898 output_schema: None,
1899 manifest_dir: Some(dir.path().to_path_buf()),
1900 manifest_path: None,
1901 };
1902
1903 let execution = run_plugin_with_policy_and_metadata(
1904 &manifest,
1905 &test_request(),
1906 &PluginExecutionPolicy::default(),
1907 )
1908 .expect("plugin should run with metadata");
1909
1910 assert!(execution.response.ok);
1911 assert_eq!(execution.run.plugin_id, "metadata-plugin");
1912 assert_eq!(execution.run.plugin_version.as_deref(), Some("0.1.0"));
1913 assert_eq!(execution.run.access.network, "not_enforced");
1914 assert_eq!(execution.run.access.filesystem, "process_default");
1915 assert_eq!(
1916 execution.run.timeout_policy.effective_timeout_ms,
1917 Some(30_000)
1918 );
1919 assert_eq!(
1920 execution.run.capabilities.allowed,
1921 vec!["feature_output".to_string()]
1922 );
1923 assert!(execution.run.plugin_run_id.starts_with("sha256:"));
1924 assert!(execution.run.entrypoint_hash.starts_with("sha256:"));
1925 assert_eq!(execution.run.entrypoint.hashes.len(), 1);
1926 assert!(execution
1927 .run
1928 .stdio
1929 .stdout_summary
1930 .as_deref()
1931 .is_some_and(|value| value.starts_with("<redacted:sha256:")));
1932 assert!(execution
1933 .run
1934 .stdio
1935 .stderr_summary
1936 .as_deref()
1937 .is_some_and(|value| value.starts_with("<redacted:sha256:")));
1938 assert_ne!(
1939 execution.run.stdio.stderr_summary.as_deref(),
1940 Some("debug secret")
1941 );
1942 }
1943
1944 #[cfg(unix)]
1945 #[test]
1946 fn rejects_bare_path_lookup_by_default() {
1947 let dir = tempdir().expect("tempdir");
1948 let manifest = PluginManifest {
1949 name: "path-command".to_string(),
1950 plugin_id: None,
1951 plugin_version: None,
1952 protocol_version: "1".to_string(),
1953 stage: PluginStage::Observer,
1954 entrypoint: vec!["logicpearl-plugin-not-in-manifest".to_string()],
1955 language: Some("shell".to_string()),
1956 capabilities: None,
1957 timeout_ms: None,
1958 input_schema: None,
1959 options_schema: None,
1960 output_schema: None,
1961 manifest_dir: Some(dir.path().to_path_buf()),
1962 manifest_path: None,
1963 };
1964
1965 let error = run_plugin(&manifest, &test_request()).expect_err("PATH lookup should reject");
1966 let message = error.to_string();
1967 assert!(message.contains("PATH lookup is disabled"), "{message}");
1968 }
1969
1970 #[cfg(unix)]
1971 #[test]
1972 fn rejects_absolute_entrypoint_by_default() {
1973 let (dir, script_path) = write_plugin_script(
1974 "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1975 );
1976 let manifest = PluginManifest {
1977 name: "absolute".to_string(),
1978 plugin_id: None,
1979 plugin_version: None,
1980 protocol_version: "1".to_string(),
1981 stage: PluginStage::Observer,
1982 entrypoint: vec![script_path.display().to_string()],
1983 language: Some("shell".to_string()),
1984 capabilities: None,
1985 timeout_ms: None,
1986 input_schema: None,
1987 options_schema: None,
1988 output_schema: None,
1989 manifest_dir: Some(dir.path().to_path_buf()),
1990 manifest_path: None,
1991 };
1992
1993 let error =
1994 run_plugin(&manifest, &test_request()).expect_err("absolute entrypoint should reject");
1995 let message = error.to_string();
1996 assert!(message.contains("absolute program path"), "{message}");
1997 }
1998
1999 #[cfg(unix)]
2000 #[test]
2001 fn allows_absolute_entrypoint_when_policy_opts_in() {
2002 let (dir, script_path) = write_plugin_script(
2003 "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
2004 );
2005 let manifest = PluginManifest {
2006 name: "absolute".to_string(),
2007 plugin_id: None,
2008 plugin_version: None,
2009 protocol_version: "1".to_string(),
2010 stage: PluginStage::Observer,
2011 entrypoint: vec![script_path.display().to_string()],
2012 language: Some("shell".to_string()),
2013 capabilities: None,
2014 timeout_ms: None,
2015 input_schema: None,
2016 options_schema: None,
2017 output_schema: None,
2018 manifest_dir: Some(dir.path().to_path_buf()),
2019 manifest_path: None,
2020 };
2021
2022 let policy = PluginExecutionPolicy::default().with_allow_absolute_entrypoint(true);
2023 let response = run_plugin_with_policy(&manifest, &test_request(), &policy)
2024 .expect("absolute entrypoint should run only under explicit policy");
2025 assert!(response.ok);
2026 }
2027
2028 #[cfg(unix)]
2029 #[test]
2030 fn timeout_terminates_descendants_that_keep_output_pipes_open() {
2031 let (dir, _script_path) = write_plugin_script(
2032 "#!/bin/sh\n(sh -c 'sleep 5') &\nsleep 5\nprintf '{\"ok\":true}\\n'\n",
2033 );
2034 let manifest = PluginManifest {
2035 name: "tree".to_string(),
2036 plugin_id: None,
2037 plugin_version: None,
2038 protocol_version: "1".to_string(),
2039 stage: PluginStage::Observer,
2040 entrypoint: vec!["plugin.sh".to_string()],
2041 language: Some("shell".to_string()),
2042 capabilities: None,
2043 timeout_ms: Some(50),
2044 input_schema: None,
2045 options_schema: None,
2046 output_schema: None,
2047 manifest_dir: Some(dir.path().to_path_buf()),
2048 manifest_path: None,
2049 };
2050
2051 let started_at = std::time::Instant::now();
2052 let error = run_plugin(&manifest, &test_request()).expect_err("plugin should time out");
2053 assert!(
2054 started_at.elapsed() < std::time::Duration::from_secs(2),
2055 "timeout should not wait for descendant sleep process"
2056 );
2057 let message = error.to_string();
2058 assert!(message.contains("exceeded timeout_ms=50"), "{message}");
2059 }
2060
2061 #[test]
2062 fn limited_reader_rejects_outputs_above_cap() {
2063 let mut reader = std::io::Cursor::new(vec![b'x'; 5]);
2064 let error = super::read_limited(&mut reader, 4).expect_err("reader should reject overflow");
2065 assert_eq!(error.kind(), std::io::ErrorKind::InvalidData);
2066 assert_eq!(reader.position(), 5, "reader should drain capped streams");
2067 }
2068}