Skip to main content

logicpearl_plugin/
lib.rs

1// SPDX-License-Identifier: MIT
2//! Trusted local process plugin contracts and runner.
3//!
4//! Plugins adapt external sources into normalized LogicPearl inputs or verify
5//! generated artifacts. This crate validates plugin manifests, constructs the
6//! canonical JSON payload, enforces conservative process execution defaults,
7//! captures bounded stdout/stderr metadata, and emits plugin run provenance.
8//! It is not an OS sandbox for untrusted code.
9
10use logicpearl_core::{LogicPearlError, Result};
11use serde::{Deserialize, Serialize};
12use serde_json::{Map, Value};
13use sha2::{Digest, Sha256};
14use std::io::{Read, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, ExitStatus, Stdio};
17use std::thread;
18use std::time::{Duration, Instant};
19use time::{format_description::well_known::Rfc3339, OffsetDateTime};
20
21const MAX_PLUGIN_STDOUT_BYTES: usize = 64 * 1024 * 1024;
22const MAX_PLUGIN_STDERR_BYTES: usize = 8 * 1024 * 1024;
23pub const DEFAULT_PLUGIN_TIMEOUT_MS: u64 = 30_000;
24const PLUGIN_SPAWN_TEXT_BUSY_RETRIES: usize = 5;
25const PLUGIN_SPAWN_TEXT_BUSY_BACKOFF_MS: u64 = 20;
26const SUPPORTED_SCHEMA_VALIDATION_KEYWORDS: &[&str] = &[
27    "additionalProperties",
28    "const",
29    "enum",
30    "items",
31    "properties",
32    "required",
33    "type",
34];
35const SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS: &[&str] = &["$id", "$schema", "description", "title"];
36
37#[cfg(unix)]
38use libc::{getpgid, getpgrp, kill, SIGKILL, SIGTERM};
39
40/// The pipeline stage a plugin implements.
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum PluginStage {
44    Observer,
45    TraceSource,
46    Enricher,
47    Verify,
48    Render,
49}
50
51/// JSON manifest describing a plugin's entrypoint, capabilities, and schemas.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct PluginManifest {
54    pub name: String,
55    #[serde(default)]
56    pub plugin_id: Option<String>,
57    #[serde(default)]
58    pub plugin_version: Option<String>,
59    pub protocol_version: String,
60    pub stage: PluginStage,
61    pub entrypoint: Vec<String>,
62    pub language: Option<String>,
63    pub capabilities: Option<Vec<String>>,
64    pub timeout_ms: Option<u64>,
65    #[serde(default)]
66    pub input_schema: Option<Value>,
67    #[serde(default)]
68    pub options_schema: Option<Value>,
69    #[serde(default)]
70    pub output_schema: Option<Value>,
71    #[serde(skip)]
72    pub manifest_dir: Option<PathBuf>,
73    #[serde(skip)]
74    pub manifest_path: Option<PathBuf>,
75}
76
77/// Security policy controlling plugin execution privileges.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct PluginExecutionPolicy {
80    pub default_timeout_ms: u64,
81    pub allow_no_timeout: bool,
82    pub allow_absolute_entrypoint: bool,
83    pub allow_path_lookup: bool,
84}
85
86impl Default for PluginExecutionPolicy {
87    fn default() -> Self {
88        Self {
89            default_timeout_ms: DEFAULT_PLUGIN_TIMEOUT_MS,
90            allow_no_timeout: false,
91            allow_absolute_entrypoint: false,
92            allow_path_lookup: false,
93        }
94    }
95}
96
97impl PluginExecutionPolicy {
98    #[must_use]
99    pub fn trusted_local() -> Self {
100        Self {
101            allow_no_timeout: true,
102            allow_absolute_entrypoint: true,
103            allow_path_lookup: true,
104            ..Self::default()
105        }
106    }
107
108    #[must_use]
109    pub fn with_default_timeout_ms(mut self, timeout_ms: u64) -> Self {
110        self.default_timeout_ms = timeout_ms;
111        self
112    }
113
114    #[must_use]
115    pub fn with_allow_no_timeout(mut self, allow: bool) -> Self {
116        self.allow_no_timeout = allow;
117        self
118    }
119
120    #[must_use]
121    pub fn with_allow_absolute_entrypoint(mut self, allow: bool) -> Self {
122        self.allow_absolute_entrypoint = allow;
123        self
124    }
125
126    #[must_use]
127    pub fn with_allow_path_lookup(mut self, allow: bool) -> Self {
128        self.allow_path_lookup = allow;
129        self
130    }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PluginRequest {
135    pub protocol_version: String,
136    pub stage: PluginStage,
137    pub payload: Value,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PluginBatchRequest {
142    pub protocol_version: String,
143    pub stage: PluginStage,
144    pub payloads: Vec<Value>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct PluginErrorPayload {
149    pub code: String,
150    pub message: String,
151    #[serde(default)]
152    pub details: Value,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct PluginResponse {
157    pub ok: bool,
158    #[serde(default)]
159    pub warnings: Vec<String>,
160    #[serde(default)]
161    pub error: Option<PluginErrorPayload>,
162    #[serde(flatten)]
163    pub extra: serde_json::Map<String, Value>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PluginBatchResponse {
168    pub ok: bool,
169    #[serde(default)]
170    pub warnings: Vec<String>,
171    #[serde(default)]
172    pub error: Option<PluginErrorPayload>,
173    #[serde(default)]
174    pub responses: Vec<PluginResponse>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct PluginExecutionResult {
179    pub response: PluginResponse,
180    pub run: PluginRunMetadata,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PluginBatchExecutionResult {
185    pub responses: Vec<PluginResponse>,
186    #[serde(default)]
187    pub runs: Vec<PluginRunMetadata>,
188    pub run: PluginRunMetadata,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PluginRunMetadata {
193    pub schema_version: String,
194    pub plugin_run_id: String,
195    pub plugin_id: String,
196    #[serde(default, skip_serializing_if = "Option::is_none")]
197    pub plugin_version: Option<String>,
198    pub plugin_name: String,
199    pub stage: PluginStage,
200    pub protocol_version: String,
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub manifest_path: Option<String>,
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub manifest_hash: Option<String>,
205    pub entrypoint_hash: String,
206    pub entrypoint: PluginEntrypointMetadata,
207    pub request_hash: String,
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub input_hash: Option<String>,
210    pub output_hash: String,
211    pub timeout_policy: PluginTimeoutPolicyMetadata,
212    pub execution_policy: PluginExecutionPolicyMetadata,
213    pub capabilities: PluginCapabilityMetadata,
214    pub access: PluginAccessMetadata,
215    pub stdio: PluginStdioMetadata,
216    pub started_at: String,
217    pub completed_at: String,
218    pub duration_ms: u64,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PluginEntrypointMetadata {
223    pub declared: Vec<String>,
224    pub resolved: Vec<String>,
225    #[serde(default)]
226    pub hashes: Vec<PluginEntrypointSegmentHash>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct PluginEntrypointSegmentHash {
231    pub index: usize,
232    pub path: String,
233    pub hash: String,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct PluginTimeoutPolicyMetadata {
238    pub manifest_timeout_ms: Option<u64>,
239    pub default_timeout_ms: u64,
240    pub effective_timeout_ms: Option<u64>,
241    pub allow_no_timeout: bool,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct PluginExecutionPolicyMetadata {
246    pub allow_absolute_entrypoint: bool,
247    pub allow_path_lookup: bool,
248    pub allow_no_timeout: bool,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct PluginCapabilityMetadata {
253    #[serde(default)]
254    pub declared: Vec<String>,
255    #[serde(default)]
256    pub allowed: Vec<String>,
257    #[serde(default)]
258    pub enforced: Vec<String>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct PluginAccessMetadata {
263    pub network: String,
264    pub filesystem: String,
265    pub enforcement: String,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct PluginStdioMetadata {
270    pub stdout_hash: String,
271    pub stdout_bytes: usize,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub stdout_summary: Option<String>,
274    pub stderr_hash: String,
275    pub stderr_bytes: usize,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub stderr_summary: Option<String>,
278}
279
280/// Build the canonical JSON payload sent to a plugin process on stdin.
281pub fn build_canonical_payload(
282    _stage: &PluginStage,
283    input: Value,
284    options: Option<Value>,
285) -> Value {
286    let mut payload = Map::new();
287    payload.insert("input".to_string(), input);
288
289    if let Some(options) = options {
290        payload.insert("options".to_string(), options);
291    }
292
293    Value::Object(payload)
294}
295
296impl PluginManifest {
297    pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
298        let path = path.as_ref();
299        let content = std::fs::read_to_string(path)?;
300        let mut manifest: Self = serde_json::from_str(&content)?;
301        manifest.manifest_dir = path.parent().map(Path::to_path_buf);
302        manifest.manifest_path = Some(path.to_path_buf());
303        manifest.validate()?;
304        Ok(manifest)
305    }
306
307    pub fn validate(&self) -> Result<()> {
308        if self.name.trim().is_empty() {
309            return Err(LogicPearlError::message(
310                "plugin manifest name must be non-empty",
311            ));
312        }
313        if self
314            .plugin_id
315            .as_ref()
316            .is_some_and(|value| value.trim().is_empty())
317        {
318            return Err(LogicPearlError::message(
319                "plugin manifest plugin_id must be non-empty when present",
320            ));
321        }
322        if self
323            .plugin_version
324            .as_ref()
325            .is_some_and(|value| value.trim().is_empty())
326        {
327            return Err(LogicPearlError::message(
328                "plugin manifest plugin_version must be non-empty when present",
329            ));
330        }
331        if self.protocol_version != "1" {
332            return Err(LogicPearlError::message(format!(
333                "unsupported plugin protocol_version: {}",
334                self.protocol_version
335            )));
336        }
337        if self.entrypoint.is_empty() {
338            return Err(LogicPearlError::message(
339                "plugin manifest entrypoint must contain at least one command segment",
340            ));
341        }
342        validate_declared_schema("input_schema", self.input_schema.as_ref())?;
343        validate_declared_schema("options_schema", self.options_schema.as_ref())?;
344        validate_declared_schema("output_schema", self.output_schema.as_ref())?;
345        Ok(())
346    }
347
348    pub fn supports_capability(&self, capability: &str) -> bool {
349        self.capabilities
350            .as_ref()
351            .map(|caps| caps.iter().any(|item| item == capability))
352            .unwrap_or(false)
353    }
354}
355
356/// Execute a plugin with default execution policy.
357pub fn run_plugin(manifest: &PluginManifest, request: &PluginRequest) -> Result<PluginResponse> {
358    run_plugin_with_policy(manifest, request, &PluginExecutionPolicy::default())
359}
360
361/// Execute a plugin under the given execution policy.
362pub fn run_plugin_with_policy(
363    manifest: &PluginManifest,
364    request: &PluginRequest,
365    policy: &PluginExecutionPolicy,
366) -> Result<PluginResponse> {
367    Ok(run_plugin_with_policy_and_metadata(manifest, request, policy)?.response)
368}
369
370/// Execute a plugin under the given execution policy and return execution metadata.
371pub fn run_plugin_with_policy_and_metadata(
372    manifest: &PluginManifest,
373    request: &PluginRequest,
374    policy: &PluginExecutionPolicy,
375) -> Result<PluginExecutionResult> {
376    if manifest.stage != request.stage {
377        return Err(LogicPearlError::message(format!(
378            "plugin stage mismatch: manifest is {:?}, request is {:?}",
379            manifest.stage, request.stage
380        )));
381    }
382    validate_plugin_request_contract(manifest, request)?;
383
384    let raw = run_plugin_raw(manifest, request, policy)?;
385    let response = parse_plugin_response(manifest, &raw.stdout)?;
386    Ok(PluginExecutionResult {
387        response,
388        run: raw.metadata,
389    })
390}
391
392/// Execute a plugin for multiple payloads with default execution policy.
393pub fn run_plugin_batch(
394    manifest: &PluginManifest,
395    stage: PluginStage,
396    payloads: &[Value],
397) -> Result<Vec<PluginResponse>> {
398    run_plugin_batch_with_policy(manifest, stage, payloads, &PluginExecutionPolicy::default())
399}
400
401/// Execute a plugin for multiple payloads under the given execution policy.
402pub fn run_plugin_batch_with_policy(
403    manifest: &PluginManifest,
404    stage: PluginStage,
405    payloads: &[Value],
406    policy: &PluginExecutionPolicy,
407) -> Result<Vec<PluginResponse>> {
408    if payloads.is_empty() {
409        return Ok(Vec::new());
410    }
411    Ok(run_plugin_batch_with_policy_and_metadata(manifest, stage, payloads, policy)?.responses)
412}
413
414/// Execute a plugin for multiple payloads under the given execution policy and return execution metadata.
415pub fn run_plugin_batch_with_policy_and_metadata(
416    manifest: &PluginManifest,
417    stage: PluginStage,
418    payloads: &[Value],
419    policy: &PluginExecutionPolicy,
420) -> Result<PluginBatchExecutionResult> {
421    if manifest.stage != stage {
422        return Err(LogicPearlError::message(format!(
423            "plugin stage mismatch: manifest is {:?}, request is {:?}",
424            manifest.stage, stage
425        )));
426    }
427    if payloads.is_empty() {
428        return Ok(PluginBatchExecutionResult {
429            responses: Vec::new(),
430            runs: Vec::new(),
431            run: empty_plugin_batch_metadata(manifest, &stage, policy)?,
432        });
433    }
434    if !manifest.supports_capability("batch_requests") {
435        let mut responses = Vec::with_capacity(payloads.len());
436        let mut runs = Vec::with_capacity(payloads.len());
437        let mut last_run = None;
438        for payload in payloads {
439            validate_plugin_payload_contract(manifest, &stage, payload)?;
440            let execution = run_plugin_with_policy_and_metadata(
441                manifest,
442                &PluginRequest {
443                    protocol_version: "1".to_string(),
444                    stage: stage.clone(),
445                    payload: payload.clone(),
446                },
447                policy,
448            )?;
449            let run = execution.run;
450            last_run = Some(run.clone());
451            runs.push(run);
452            responses.push(execution.response);
453        }
454        return Ok(PluginBatchExecutionResult {
455            responses,
456            runs,
457            run: last_run.unwrap_or(empty_plugin_batch_metadata(manifest, &stage, policy)?),
458        });
459    }
460    for payload in payloads {
461        validate_plugin_payload_contract(manifest, &stage, payload)?;
462    }
463
464    let raw = run_plugin_raw(
465        manifest,
466        &PluginBatchRequest {
467            protocol_version: "1".to_string(),
468            stage: stage.clone(),
469            payloads: payloads.to_vec(),
470        },
471        policy,
472    )?;
473    let batch: PluginBatchResponse = serde_json::from_str(&raw.stdout).map_err(|err| {
474        LogicPearlError::message(format!(
475            "plugin {} returned invalid batch JSON: {}",
476            manifest.name, err
477        ))
478    })?;
479    if !batch.ok {
480        if let Some(error) = &batch.error {
481            return Err(LogicPearlError::message(format!(
482                "plugin {} failed [{}]: {}",
483                manifest.name, error.code, error.message
484            )));
485        }
486        return Err(LogicPearlError::message(format!(
487            "plugin {} returned ok=false without structured batch error",
488            manifest.name
489        )));
490    }
491    if batch.responses.len() != payloads.len() {
492        return Err(LogicPearlError::message(format!(
493            "plugin {} returned {} batch responses for {} payloads",
494            manifest.name,
495            batch.responses.len(),
496            payloads.len()
497        )));
498    }
499    for response in &batch.responses {
500        validate_ok_plugin_response(manifest, response)?;
501    }
502    let runs = vec![raw.metadata.clone(); batch.responses.len()];
503    Ok(PluginBatchExecutionResult {
504        responses: batch.responses,
505        runs,
506        run: raw.metadata,
507    })
508}
509
510fn empty_plugin_batch_metadata(
511    manifest: &PluginManifest,
512    stage: &PluginStage,
513    policy: &PluginExecutionPolicy,
514) -> Result<PluginRunMetadata> {
515    let request = PluginBatchRequest {
516        protocol_version: "1".to_string(),
517        stage: stage.clone(),
518        payloads: Vec::new(),
519    };
520    let request_value = serde_json::to_value(&request).map_err(LogicPearlError::from)?;
521    build_plugin_run_metadata(PluginRunMetadataInputs {
522        manifest,
523        policy,
524        resolved_entrypoint: &resolve_entrypoint(manifest, policy)?,
525        request_value: &request_value,
526        stdout: Vec::new(),
527        stderr: Vec::new(),
528        effective_timeout_ms: effective_timeout_ms(manifest, policy)?,
529        started_at: now_utc_rfc3339(),
530        completed_at: now_utc_rfc3339(),
531        duration_ms: 0,
532    })
533}
534
535struct RawPluginRun {
536    stdout: String,
537    metadata: PluginRunMetadata,
538}
539
540fn run_plugin_raw<T: Serialize>(
541    manifest: &PluginManifest,
542    request: &T,
543    policy: &PluginExecutionPolicy,
544) -> Result<RawPluginRun> {
545    let entrypoint = resolve_entrypoint(manifest, policy)?;
546    let mut command = Command::new(&entrypoint.program);
547    command.args(&entrypoint.args);
548    #[cfg(unix)]
549    {
550        use std::os::unix::process::CommandExt;
551        command.process_group(0);
552    }
553    command
554        .stdin(Stdio::piped())
555        .stdout(Stdio::piped())
556        .stderr(Stdio::piped());
557
558    let request_value = serde_json::to_value(request).map_err(LogicPearlError::from)?;
559    let timeout_ms = effective_timeout_ms(manifest, policy)?;
560    let started_at = now_utc_rfc3339();
561    let started = Instant::now();
562    let mut child = spawn_plugin_process(&mut command)?;
563    let stdout_handle = spawn_pipe_reader(
564        child
565            .stdout
566            .take()
567            .ok_or_else(|| LogicPearlError::message("failed to open plugin stdout"))?,
568        MAX_PLUGIN_STDOUT_BYTES,
569    );
570    let stderr_handle = spawn_pipe_reader(
571        child
572            .stderr
573            .take()
574            .ok_or_else(|| LogicPearlError::message("failed to open plugin stderr"))?,
575        MAX_PLUGIN_STDERR_BYTES,
576    );
577
578    let mut stdin = child
579        .stdin
580        .take()
581        .ok_or_else(|| LogicPearlError::message("failed to open plugin stdin"))?;
582    let payload = serde_json::to_vec(request)?;
583    stdin.write_all(&payload)?;
584    stdin.write_all(b"\n")?;
585    drop(stdin);
586
587    let (status, timed_out) = wait_for_plugin_exit(timeout_ms, &mut child)?;
588    let duration_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
589    let completed_at = now_utc_rfc3339();
590    let stdout = join_pipe_reader(manifest, "stdout", stdout_handle)?;
591    let stderr = join_pipe_reader(manifest, "stderr", stderr_handle)?;
592    let stderr_display = String::from_utf8_lossy(&stderr).trim().to_string();
593
594    if timed_out {
595        let timeout_display = timeout_ms.unwrap_or_default();
596        return Err(LogicPearlError::message(format!(
597            "plugin {} exceeded timeout_ms={} and was terminated{}",
598            manifest.name,
599            timeout_display,
600            if stderr_display.is_empty() {
601                String::new()
602            } else {
603                format!(": {stderr_display}")
604            }
605        )));
606    }
607
608    if !status.success() {
609        return Err(LogicPearlError::message(format!(
610            "plugin {} exited with status {}{}",
611            manifest.name,
612            status,
613            if stderr_display.is_empty() {
614                String::new()
615            } else {
616                format!(": {stderr_display}")
617            }
618        )));
619    }
620
621    let metadata = build_plugin_run_metadata(PluginRunMetadataInputs {
622        manifest,
623        policy,
624        resolved_entrypoint: &entrypoint,
625        request_value: &request_value,
626        stdout: stdout.clone(),
627        stderr,
628        effective_timeout_ms: timeout_ms,
629        started_at,
630        completed_at,
631        duration_ms,
632    })?;
633    let stdout = String::from_utf8(stdout).map_err(|err| {
634        LogicPearlError::message(format!(
635            "plugin {} returned invalid UTF-8: {}",
636            manifest.name, err
637        ))
638    })?;
639    Ok(RawPluginRun { stdout, metadata })
640}
641
642struct PluginRunMetadataInputs<'a> {
643    manifest: &'a PluginManifest,
644    policy: &'a PluginExecutionPolicy,
645    resolved_entrypoint: &'a ResolvedPluginEntrypoint,
646    request_value: &'a Value,
647    stdout: Vec<u8>,
648    stderr: Vec<u8>,
649    effective_timeout_ms: Option<u64>,
650    started_at: String,
651    completed_at: String,
652    duration_ms: u64,
653}
654
655fn build_plugin_run_metadata(inputs: PluginRunMetadataInputs<'_>) -> Result<PluginRunMetadata> {
656    let manifest = inputs.manifest;
657    let policy = inputs.policy;
658    let entrypoint = build_entrypoint_metadata(manifest, inputs.resolved_entrypoint);
659    let entrypoint_hash = hash_serializable(&entrypoint)?;
660    let request_hash = hash_serializable(inputs.request_value)?;
661    let input_hash = inputs
662        .request_value
663        .get("payload")
664        .and_then(|payload| payload.get("input"))
665        .map(hash_serializable)
666        .transpose()?;
667    let output_hash = sha256_prefixed(&inputs.stdout);
668    let manifest_hash = manifest
669        .manifest_path
670        .as_ref()
671        .and_then(|path| sha256_prefixed_file(path).ok());
672    let plugin_id = manifest
673        .plugin_id
674        .clone()
675        .unwrap_or_else(|| manifest.name.clone());
676    let protocol_version = inputs
677        .request_value
678        .get("protocol_version")
679        .and_then(Value::as_str)
680        .unwrap_or(&manifest.protocol_version)
681        .to_string();
682    let declared_capabilities = manifest.capabilities.clone().unwrap_or_default();
683    let enforced_capabilities = enforced_capabilities_for_request(manifest, inputs.request_value);
684    let stdout_hash = output_hash.clone();
685    let stderr_hash = sha256_prefixed(&inputs.stderr);
686    let mut metadata = PluginRunMetadata {
687        schema_version: "logicpearl.plugin_run_provenance.v1".to_string(),
688        plugin_run_id: String::new(),
689        plugin_id,
690        plugin_version: manifest.plugin_version.clone(),
691        plugin_name: manifest.name.clone(),
692        stage: manifest.stage.clone(),
693        protocol_version,
694        manifest_path: manifest
695            .manifest_path
696            .as_ref()
697            .map(|path| path.display().to_string()),
698        manifest_hash,
699        entrypoint_hash,
700        entrypoint,
701        request_hash,
702        input_hash,
703        output_hash,
704        timeout_policy: PluginTimeoutPolicyMetadata {
705            manifest_timeout_ms: manifest.timeout_ms,
706            default_timeout_ms: policy.default_timeout_ms,
707            effective_timeout_ms: inputs.effective_timeout_ms,
708            allow_no_timeout: policy.allow_no_timeout,
709        },
710        execution_policy: PluginExecutionPolicyMetadata {
711            allow_absolute_entrypoint: policy.allow_absolute_entrypoint,
712            allow_path_lookup: policy.allow_path_lookup,
713            allow_no_timeout: policy.allow_no_timeout,
714        },
715        capabilities: PluginCapabilityMetadata {
716            declared: declared_capabilities.clone(),
717            allowed: declared_capabilities,
718            enforced: enforced_capabilities,
719        },
720        access: PluginAccessMetadata {
721            network: "not_enforced".to_string(),
722            filesystem: "process_default".to_string(),
723            enforcement: "none".to_string(),
724        },
725        stdio: PluginStdioMetadata {
726            stdout_hash,
727            stdout_bytes: inputs.stdout.len(),
728            stdout_summary: (!inputs.stdout.is_empty())
729                .then(|| redacted_hash_summary(&inputs.stdout)),
730            stderr_hash,
731            stderr_bytes: inputs.stderr.len(),
732            stderr_summary: (!inputs.stderr.is_empty())
733                .then(|| redacted_hash_summary(&inputs.stderr)),
734        },
735        started_at: inputs.started_at,
736        completed_at: inputs.completed_at,
737        duration_ms: inputs.duration_ms,
738    };
739    metadata.plugin_run_id = build_plugin_run_id(&metadata)?;
740    Ok(metadata)
741}
742
743fn build_entrypoint_metadata(
744    manifest: &PluginManifest,
745    resolved_entrypoint: &ResolvedPluginEntrypoint,
746) -> PluginEntrypointMetadata {
747    let resolved = resolved_entrypoint.segments();
748    let hashes = resolved
749        .iter()
750        .enumerate()
751        .filter_map(|(index, segment)| {
752            let path = Path::new(segment);
753            if !path.is_file() {
754                return None;
755            }
756            sha256_prefixed_file(path)
757                .ok()
758                .map(|hash| PluginEntrypointSegmentHash {
759                    index,
760                    path: segment.clone(),
761                    hash,
762                })
763        })
764        .collect();
765
766    PluginEntrypointMetadata {
767        declared: manifest.entrypoint.clone(),
768        resolved,
769        hashes,
770    }
771}
772
773fn enforced_capabilities_for_request(
774    manifest: &PluginManifest,
775    request_value: &Value,
776) -> Vec<String> {
777    if request_value.get("payloads").is_some() && manifest.supports_capability("batch_requests") {
778        vec!["batch_requests".to_string()]
779    } else {
780        Vec::new()
781    }
782}
783
784fn build_plugin_run_id(metadata: &PluginRunMetadata) -> Result<String> {
785    hash_serializable(&serde_json::json!({
786        "schema_version": metadata.schema_version,
787        "plugin_id": metadata.plugin_id,
788        "plugin_version": metadata.plugin_version,
789        "plugin_name": metadata.plugin_name,
790        "stage": metadata.stage,
791        "protocol_version": metadata.protocol_version,
792        "manifest_hash": metadata.manifest_hash,
793        "entrypoint_hash": metadata.entrypoint_hash,
794        "request_hash": metadata.request_hash,
795        "output_hash": metadata.output_hash,
796        "started_at": metadata.started_at,
797        "completed_at": metadata.completed_at,
798    }))
799}
800
801fn hash_serializable<T: Serialize>(value: &T) -> Result<String> {
802    serde_json::to_vec(value)
803        .map(|bytes| sha256_prefixed(&bytes))
804        .map_err(LogicPearlError::from)
805}
806
807fn sha256_prefixed_file(path: &Path) -> std::io::Result<String> {
808    std::fs::read(path).map(|bytes| sha256_prefixed(&bytes))
809}
810
811fn sha256_prefixed(bytes: &[u8]) -> String {
812    let mut digest = Sha256::new();
813    digest.update(bytes);
814    format!("sha256:{}", hex::encode(digest.finalize()))
815}
816
817fn redacted_hash_summary(bytes: &[u8]) -> String {
818    format!("<redacted:{}>", sha256_prefixed(bytes))
819}
820
821fn now_utc_rfc3339() -> String {
822    OffsetDateTime::now_utc()
823        .format(&Rfc3339)
824        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
825}
826
827/// Return a JSON summary of the manifest's declared schemas and capabilities.
828pub fn manifest_contract_summary(manifest: &PluginManifest) -> Value {
829    serde_json::json!({
830        "input_schema": manifest.input_schema,
831        "options_schema": manifest.options_schema,
832        "output_schema": manifest.output_schema,
833    })
834}
835
836#[derive(Debug, Clone, PartialEq, Eq)]
837struct ResolvedPluginEntrypoint {
838    program: String,
839    args: Vec<String>,
840}
841
842impl ResolvedPluginEntrypoint {
843    fn segments(&self) -> Vec<String> {
844        std::iter::once(self.program.clone())
845            .chain(self.args.iter().cloned())
846            .collect()
847    }
848}
849
850fn resolve_entrypoint(
851    manifest: &PluginManifest,
852    policy: &PluginExecutionPolicy,
853) -> Result<ResolvedPluginEntrypoint> {
854    let program = manifest
855        .entrypoint
856        .first()
857        .ok_or_else(|| LogicPearlError::message("plugin entrypoint is empty"))?;
858    if program.trim().is_empty() {
859        return Err(LogicPearlError::message(
860            "plugin entrypoint program must be non-empty",
861        ));
862    }
863
864    let resolved_program = resolve_entrypoint_program(manifest, policy, program)?;
865    let args = manifest
866        .entrypoint
867        .iter()
868        .skip(1)
869        .map(|segment| resolve_entrypoint_arg(manifest, policy, segment))
870        .collect::<Result<Vec<_>>>()?;
871
872    Ok(ResolvedPluginEntrypoint {
873        program: resolved_program,
874        args,
875    })
876}
877
878fn resolve_entrypoint_program(
879    manifest: &PluginManifest,
880    policy: &PluginExecutionPolicy,
881    program: &str,
882) -> Result<String> {
883    let program_path = Path::new(program);
884    if program_path.is_absolute() {
885        if !policy.allow_absolute_entrypoint {
886            return Err(LogicPearlError::message(format!(
887                "plugin {} entrypoint uses absolute program path {}; rerun with an execution policy that allows absolute entrypoints only for trusted manifests",
888                manifest.name, program
889            )));
890        }
891        return Ok(program.to_string());
892    }
893
894    if let Some(manifest_relative) = manifest_relative_existing_path(manifest, program) {
895        return Ok(manifest_relative);
896    }
897
898    if has_path_separator(program) {
899        return Err(LogicPearlError::message(format!(
900            "plugin {} entrypoint path was not found relative to the manifest: {}",
901            manifest.name, program
902        )));
903    }
904
905    if policy.allow_path_lookup {
906        return Ok(program.to_string());
907    }
908
909    if is_allowed_manifest_script_interpreter(program) && has_manifest_local_script_arg(manifest) {
910        return Ok(program.to_string());
911    }
912
913    Err(LogicPearlError::message(format!(
914        "plugin {} entrypoint program {program:?} is not manifest-relative and PATH lookup is disabled; use a manifest-relative script path or enable PATH lookup only for trusted manifests",
915        manifest.name
916    )))
917}
918
919fn resolve_entrypoint_arg(
920    manifest: &PluginManifest,
921    policy: &PluginExecutionPolicy,
922    segment: &str,
923) -> Result<String> {
924    if segment.trim().is_empty() {
925        return Err(LogicPearlError::message(format!(
926            "plugin {} entrypoint arguments must be non-empty",
927            manifest.name
928        )));
929    }
930
931    let path = Path::new(segment);
932    if path.is_absolute() && !policy.allow_absolute_entrypoint {
933        return Err(LogicPearlError::message(format!(
934            "plugin {} entrypoint argument uses absolute path {}; enable absolute entrypoints only for trusted manifests",
935            manifest.name, segment
936        )));
937    }
938    if path.is_absolute() {
939        return Ok(segment.to_string());
940    }
941
942    if let Some(manifest_relative) = manifest_relative_existing_path(manifest, segment) {
943        return Ok(manifest_relative);
944    }
945
946    if has_path_separator(segment) {
947        return Err(LogicPearlError::message(format!(
948            "plugin {} entrypoint argument path was not found relative to the manifest: {}",
949            manifest.name, segment
950        )));
951    }
952
953    Ok(segment.to_string())
954}
955
956fn manifest_relative_existing_path(manifest: &PluginManifest, segment: &str) -> Option<String> {
957    manifest.manifest_dir.as_ref().and_then(|dir| {
958        let candidate = dir.join(segment);
959        candidate.exists().then(|| candidate.display().to_string())
960    })
961}
962
963fn has_path_separator(segment: &str) -> bool {
964    segment.contains('/') || segment.contains('\\')
965}
966
967fn has_manifest_local_script_arg(manifest: &PluginManifest) -> bool {
968    manifest
969        .entrypoint
970        .iter()
971        .skip(1)
972        .any(|segment| manifest_relative_existing_path(manifest, segment).is_some())
973}
974
975fn is_allowed_manifest_script_interpreter(program: &str) -> bool {
976    let normalized = Path::new(program)
977        .file_name()
978        .and_then(|name| name.to_str())
979        .unwrap_or(program)
980        .trim_end_matches(".exe")
981        .to_ascii_lowercase();
982    matches!(
983        normalized.as_str(),
984        "bash" | "bun" | "deno" | "node" | "perl" | "php" | "python" | "python3" | "ruby" | "sh"
985    )
986}
987
988fn effective_timeout_ms(
989    manifest: &PluginManifest,
990    policy: &PluginExecutionPolicy,
991) -> Result<Option<u64>> {
992    match manifest.timeout_ms {
993        Some(0) if policy.allow_no_timeout => Ok(None),
994        Some(0) => Err(LogicPearlError::message(format!(
995            "plugin {} declares timeout_ms=0, which disables the plugin timeout; enable no-timeout execution only for trusted manifests",
996            manifest.name
997        ))),
998        Some(timeout_ms) => Ok(Some(timeout_ms)),
999        None if policy.default_timeout_ms == 0 && policy.allow_no_timeout => Ok(None),
1000        None if policy.default_timeout_ms == 0 => Err(LogicPearlError::message(format!(
1001            "plugin {} has no timeout and the execution policy default is disabled; enable no-timeout execution only for trusted manifests",
1002            manifest.name
1003        ))),
1004        None => Ok(Some(policy.default_timeout_ms)),
1005    }
1006}
1007
1008fn spawn_plugin_process(command: &mut Command) -> std::io::Result<Child> {
1009    for attempt in 0..=PLUGIN_SPAWN_TEXT_BUSY_RETRIES {
1010        match command.spawn() {
1011            Ok(child) => return Ok(child),
1012            Err(error)
1013                if is_executable_file_busy(&error) && attempt < PLUGIN_SPAWN_TEXT_BUSY_RETRIES =>
1014            {
1015                let backoff =
1016                    PLUGIN_SPAWN_TEXT_BUSY_BACKOFF_MS * u64::try_from(attempt + 1).unwrap_or(1);
1017                thread::sleep(Duration::from_millis(backoff));
1018            }
1019            Err(error) => return Err(error),
1020        }
1021    }
1022
1023    unreachable!("spawn loop returns after the final attempt")
1024}
1025
1026#[cfg(unix)]
1027fn is_executable_file_busy(error: &std::io::Error) -> bool {
1028    error.raw_os_error() == Some(libc::ETXTBSY)
1029}
1030
1031#[cfg(not(unix))]
1032fn is_executable_file_busy(_error: &std::io::Error) -> bool {
1033    false
1034}
1035
1036fn spawn_pipe_reader<R: Read + Send + 'static>(
1037    reader: R,
1038    max_bytes: usize,
1039) -> thread::JoinHandle<std::io::Result<Vec<u8>>> {
1040    spawn_limited_pipe_reader(reader, max_bytes)
1041}
1042
1043fn spawn_limited_pipe_reader<R: Read + Send + 'static>(
1044    mut reader: R,
1045    max_bytes: usize,
1046) -> thread::JoinHandle<std::io::Result<Vec<u8>>> {
1047    thread::spawn(move || read_limited(&mut reader, max_bytes))
1048}
1049
1050fn read_limited(reader: &mut impl Read, max_bytes: usize) -> std::io::Result<Vec<u8>> {
1051    let mut buffer = Vec::new();
1052    let mut chunk = [0_u8; 8192];
1053    let mut exceeded_limit = false;
1054    loop {
1055        let read = reader.read(&mut chunk)?;
1056        if read == 0 {
1057            if exceeded_limit {
1058                return Err(std::io::Error::new(
1059                    std::io::ErrorKind::InvalidData,
1060                    format!("plugin output exceeded {max_bytes} bytes"),
1061                ));
1062            }
1063            return Ok(buffer);
1064        }
1065        if exceeded_limit {
1066            continue;
1067        }
1068        let remaining = max_bytes.saturating_sub(buffer.len());
1069        if read > remaining {
1070            buffer.extend_from_slice(&chunk[..remaining]);
1071            exceeded_limit = true;
1072        } else {
1073            buffer.extend_from_slice(&chunk[..read]);
1074        }
1075    }
1076}
1077
1078fn join_pipe_reader(
1079    manifest: &PluginManifest,
1080    stream: &str,
1081    handle: thread::JoinHandle<std::io::Result<Vec<u8>>>,
1082) -> Result<Vec<u8>> {
1083    let result = handle.join().map_err(|_| {
1084        LogicPearlError::message(format!(
1085            "plugin {} {} reader thread panicked",
1086            manifest.name, stream
1087        ))
1088    })?;
1089    result.map_err(|err| {
1090        LogicPearlError::message(format!(
1091            "failed to read plugin {} {}: {}",
1092            manifest.name, stream, err
1093        ))
1094    })
1095}
1096
1097fn wait_for_plugin_exit(timeout_ms: Option<u64>, child: &mut Child) -> Result<(ExitStatus, bool)> {
1098    const MAX_POLL_INTERVAL: Duration = Duration::from_millis(200);
1099
1100    if let Some(timeout_ms) = timeout_ms {
1101        let timeout = Duration::from_millis(timeout_ms);
1102        let started_at = Instant::now();
1103        let mut poll_interval = Duration::from_millis(10);
1104        loop {
1105            if let Some(status) = child.try_wait()? {
1106                return Ok((status, false));
1107            }
1108            if started_at.elapsed() >= timeout {
1109                terminate_plugin_process(child);
1110                let status = child.wait()?;
1111                return Ok((status, true));
1112            }
1113            thread::sleep(poll_interval);
1114            poll_interval = (poll_interval * 2).min(MAX_POLL_INTERVAL);
1115        }
1116    }
1117
1118    Ok((child.wait()?, false))
1119}
1120
1121#[cfg(unix)]
1122fn terminate_plugin_process(child: &mut Child) {
1123    let child_process_group = safe_child_process_group(child);
1124    if let Some(process_group) = child_process_group {
1125        signal_process_group(process_group, SIGTERM);
1126    } else {
1127        let _ = child.kill();
1128    }
1129    thread::sleep(Duration::from_millis(50));
1130    if child.try_wait().ok().flatten().is_none() {
1131        if let Some(process_group) = child_process_group {
1132            signal_process_group(process_group, SIGKILL);
1133        }
1134        let _ = child.kill();
1135    }
1136}
1137
1138#[cfg(unix)]
1139fn safe_child_process_group(child: &Child) -> Option<i32> {
1140    let pid = i32::try_from(child.id()).ok()?;
1141    // SAFETY: getpgid only reads kernel process metadata for the spawned child pid.
1142    let process_group = unsafe { getpgid(pid) };
1143    if process_group <= 0 {
1144        return None;
1145    }
1146    // SAFETY: getpgrp has no arguments and returns the current process group id.
1147    let parent_process_group = unsafe { getpgrp() };
1148    if process_group == parent_process_group {
1149        return None;
1150    }
1151    Some(process_group)
1152}
1153
1154#[cfg(unix)]
1155fn signal_process_group(process_group: i32, signal: i32) {
1156    // SAFETY: negative pid values are the POSIX API for signaling a process group.
1157    let _ = unsafe { kill(-process_group, signal) };
1158}
1159
1160#[cfg(not(unix))]
1161fn terminate_plugin_process(child: &mut Child) {
1162    let _ = child.kill();
1163}
1164
1165fn parse_plugin_response(manifest: &PluginManifest, stdout: &str) -> Result<PluginResponse> {
1166    let response: PluginResponse = serde_json::from_str(stdout).map_err(|err| {
1167        LogicPearlError::message(format!(
1168            "plugin {} returned invalid JSON: {}",
1169            manifest.name, err
1170        ))
1171    })?;
1172    validate_ok_plugin_response(manifest, &response)?;
1173    Ok(response)
1174}
1175
1176fn validate_ok_plugin_response(manifest: &PluginManifest, response: &PluginResponse) -> Result<()> {
1177    if !response.ok {
1178        if let Some(error) = &response.error {
1179            return Err(LogicPearlError::message(format!(
1180                "plugin {} failed [{}]: {}",
1181                manifest.name, error.code, error.message
1182            )));
1183        }
1184        return Err(LogicPearlError::message(format!(
1185            "plugin {} returned ok=false without structured error",
1186            manifest.name
1187        )));
1188    }
1189    if let Some(schema) = &manifest.output_schema {
1190        let response_value = serde_json::to_value(response).map_err(LogicPearlError::from)?;
1191        validate_value_against_declared_schema(
1192            "output_schema",
1193            schema,
1194            &response_value,
1195            "$response",
1196        )?;
1197    }
1198    Ok(())
1199}
1200
1201fn validate_plugin_request_contract(
1202    manifest: &PluginManifest,
1203    request: &PluginRequest,
1204) -> Result<()> {
1205    validate_plugin_payload_contract(manifest, &request.stage, &request.payload)
1206}
1207
1208fn validate_plugin_payload_contract(
1209    manifest: &PluginManifest,
1210    stage: &PluginStage,
1211    payload: &Value,
1212) -> Result<()> {
1213    if let Some(schema) = &manifest.input_schema {
1214        let input = extract_payload_input(stage, payload).ok_or_else(|| {
1215            LogicPearlError::message(format!(
1216                "plugin {} manifest declares input_schema but request payload is missing payload.input",
1217                manifest.name
1218            ))
1219        })?;
1220        validate_value_against_declared_schema("input_schema", schema, input, "$payload.input")?;
1221    }
1222    if let Some(schema) = &manifest.options_schema {
1223        let null = Value::Null;
1224        let options = extract_payload_options(payload).unwrap_or(&null);
1225        validate_value_against_declared_schema(
1226            "options_schema",
1227            schema,
1228            options,
1229            "$payload.options",
1230        )?;
1231    }
1232    Ok(())
1233}
1234
1235fn extract_payload_input<'a>(_stage: &PluginStage, payload: &'a Value) -> Option<&'a Value> {
1236    payload.as_object().and_then(|object| object.get("input"))
1237}
1238
1239fn extract_payload_options(payload: &Value) -> Option<&Value> {
1240    payload.as_object().and_then(|object| object.get("options"))
1241}
1242
1243fn validate_declared_schema(label: &str, schema: Option<&Value>) -> Result<()> {
1244    if let Some(schema) = schema {
1245        validate_schema_document(label, schema, format!("${label}"))?;
1246    }
1247    Ok(())
1248}
1249
1250fn validate_schema_document(label: &str, schema: &Value, path: String) -> Result<()> {
1251    let Some(object) = schema.as_object() else {
1252        return Err(LogicPearlError::message(format!(
1253            "{label} must be a JSON object at {path}"
1254        )));
1255    };
1256    validate_schema_keywords(label, object, &path)?;
1257    validate_schema_annotations(label, object, &path)?;
1258    if let Some(value) = object.get("type") {
1259        validate_schema_type_decl(label, value, &path)?;
1260    }
1261    if let Some(value) = object.get("properties") {
1262        let properties = value.as_object().ok_or_else(|| {
1263            LogicPearlError::message(format!(
1264                "{label} properties must be an object at {path}.properties"
1265            ))
1266        })?;
1267        for (key, child) in properties {
1268            validate_schema_document(label, child, format!("{path}.properties.{key}"))?;
1269        }
1270    }
1271    if let Some(value) = object.get("required") {
1272        let required = value.as_array().ok_or_else(|| {
1273            LogicPearlError::message(format!(
1274                "{label} required must be an array at {path}.required"
1275            ))
1276        })?;
1277        for item in required {
1278            if item
1279                .as_str()
1280                .filter(|value| !value.trim().is_empty())
1281                .is_none()
1282            {
1283                return Err(LogicPearlError::message(format!(
1284                    "{label} required entries must be non-empty strings at {path}.required"
1285                )));
1286            }
1287        }
1288    }
1289    if let Some(value) = object.get("items") {
1290        validate_schema_document(label, value, format!("{path}.items"))?;
1291    }
1292    if let Some(value) = object.get("additionalProperties") {
1293        match value {
1294            Value::Bool(_) => {}
1295            Value::Object(_) => {
1296                validate_schema_document(label, value, format!("{path}.additionalProperties"))?
1297            }
1298            _ => {
1299                return Err(LogicPearlError::message(format!(
1300                    "{label} additionalProperties must be a boolean or object at {path}.additionalProperties"
1301                )));
1302            }
1303        }
1304    }
1305    if let Some(value) = object.get("enum") {
1306        if value.as_array().filter(|items| !items.is_empty()).is_none() {
1307            return Err(LogicPearlError::message(format!(
1308                "{label} enum must be a non-empty array at {path}.enum"
1309            )));
1310        }
1311    }
1312    Ok(())
1313}
1314
1315fn validate_schema_keywords(label: &str, object: &Map<String, Value>, path: &str) -> Result<()> {
1316    for key in object.keys() {
1317        if is_supported_schema_keyword(key) {
1318            continue;
1319        }
1320        return Err(LogicPearlError::message(format!(
1321            "{label} uses unsupported LogicPearl schema subset keyword {key:?} at {path}; supported validation keywords are: {}; supported annotation keywords are: {}. Use a plugin smoke test or an external JSON Schema validator for full JSON Schema constraints.",
1322            SUPPORTED_SCHEMA_VALIDATION_KEYWORDS.join(", "),
1323            SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS.join(", ")
1324        )));
1325    }
1326    Ok(())
1327}
1328
1329fn is_supported_schema_keyword(key: &str) -> bool {
1330    SUPPORTED_SCHEMA_VALIDATION_KEYWORDS.contains(&key)
1331        || SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS.contains(&key)
1332}
1333
1334fn validate_schema_annotations(label: &str, object: &Map<String, Value>, path: &str) -> Result<()> {
1335    for key in SUPPORTED_SCHEMA_ANNOTATION_KEYWORDS {
1336        if let Some(value) = object.get(*key) {
1337            if !value.is_string() {
1338                return Err(LogicPearlError::message(format!(
1339                    "{label} annotation keyword {key:?} must be a string at {path}.{key}"
1340                )));
1341            }
1342        }
1343    }
1344    Ok(())
1345}
1346
1347fn validate_schema_type_decl(label: &str, value: &Value, path: &str) -> Result<()> {
1348    match value {
1349        Value::String(kind) => validate_schema_type_name(label, kind, path),
1350        Value::Array(items) if !items.is_empty() => {
1351            for item in items {
1352                let kind = item.as_str().ok_or_else(|| {
1353                    LogicPearlError::message(format!(
1354                        "{label} type arrays must contain only strings at {path}"
1355                    ))
1356                })?;
1357                validate_schema_type_name(label, kind, path)?;
1358            }
1359            Ok(())
1360        }
1361        _ => Err(LogicPearlError::message(format!(
1362            "{label} type must be a string or non-empty array of strings at {path}"
1363        ))),
1364    }
1365}
1366
1367fn validate_schema_type_name(label: &str, kind: &str, path: &str) -> Result<()> {
1368    match kind {
1369        "null" | "boolean" | "integer" | "number" | "string" | "array" | "object" => Ok(()),
1370        _ => Err(LogicPearlError::message(format!(
1371            "{label} uses unsupported JSON Schema type {kind:?} at {path}"
1372        ))),
1373    }
1374}
1375
1376fn validate_value_against_declared_schema(
1377    label: &str,
1378    schema: &Value,
1379    value: &Value,
1380    path: &str,
1381) -> Result<()> {
1382    let object = schema
1383        .as_object()
1384        .ok_or_else(|| LogicPearlError::message(format!("{label} must be a JSON object")))?;
1385
1386    if let Some(type_decl) = object.get("type") {
1387        let matches = schema_type_names(type_decl)?
1388            .iter()
1389            .any(|kind| value_matches_type(value, kind));
1390        if !matches {
1391            return Err(LogicPearlError::message(format!(
1392                "{label} rejected {path}: expected type {}, got {}",
1393                render_schema_types(type_decl)?,
1394                describe_json_type(value)
1395            )));
1396        }
1397    }
1398
1399    if let Some(expected) = object.get("const") {
1400        if value != expected {
1401            return Err(LogicPearlError::message(format!(
1402                "{label} rejected {path}: value did not match const"
1403            )));
1404        }
1405    }
1406
1407    if let Some(choices) = object.get("enum").and_then(Value::as_array) {
1408        if !choices.iter().any(|choice| choice == value) {
1409            return Err(LogicPearlError::message(format!(
1410                "{label} rejected {path}: value was not in enum"
1411            )));
1412        }
1413    }
1414
1415    if let Some(required) = object.get("required").and_then(Value::as_array) {
1416        if let Some(map) = value.as_object() {
1417            for field in required.iter().filter_map(Value::as_str) {
1418                if !map.contains_key(field) {
1419                    return Err(LogicPearlError::message(format!(
1420                        "{label} rejected {path}: missing required field {field:?}"
1421                    )));
1422                }
1423            }
1424        }
1425    }
1426
1427    if let Some(properties) = object.get("properties").and_then(Value::as_object) {
1428        if let Some(map) = value.as_object() {
1429            for (key, child_schema) in properties {
1430                if let Some(child_value) = map.get(key) {
1431                    validate_value_against_declared_schema(
1432                        label,
1433                        child_schema,
1434                        child_value,
1435                        &format!("{path}.{key}"),
1436                    )?;
1437                }
1438            }
1439
1440            match object.get("additionalProperties") {
1441                Some(Value::Bool(false)) => {
1442                    for key in map.keys() {
1443                        if !properties.contains_key(key) {
1444                            return Err(LogicPearlError::message(format!(
1445                                "{label} rejected {path}: unexpected field {key:?}"
1446                            )));
1447                        }
1448                    }
1449                }
1450                Some(Value::Object(_)) => {
1451                    let extra_schema = object
1452                        .get("additionalProperties")
1453                        .expect("checked additionalProperties");
1454                    for (key, child_value) in map {
1455                        if !properties.contains_key(key) {
1456                            validate_value_against_declared_schema(
1457                                label,
1458                                extra_schema,
1459                                child_value,
1460                                &format!("{path}.{key}"),
1461                            )?;
1462                        }
1463                    }
1464                }
1465                _ => {}
1466            }
1467        }
1468    }
1469
1470    if let Some(items_schema) = object.get("items") {
1471        if let Some(items) = value.as_array() {
1472            for (index, item) in items.iter().enumerate() {
1473                validate_value_against_declared_schema(
1474                    label,
1475                    items_schema,
1476                    item,
1477                    &format!("{path}[{index}]"),
1478                )?;
1479            }
1480        }
1481    }
1482
1483    Ok(())
1484}
1485
1486fn schema_type_names(value: &Value) -> Result<Vec<&str>> {
1487    match value {
1488        Value::String(kind) => Ok(vec![kind.as_str()]),
1489        Value::Array(items) => items
1490            .iter()
1491            .map(|item| {
1492                item.as_str().ok_or_else(|| {
1493                    LogicPearlError::message("schema type arrays must contain only strings")
1494                })
1495            })
1496            .collect(),
1497        _ => Err(LogicPearlError::message(
1498            "schema type must be a string or non-empty array of strings",
1499        )),
1500    }
1501}
1502
1503fn render_schema_types(value: &Value) -> Result<String> {
1504    Ok(schema_type_names(value)?.join(" | "))
1505}
1506
1507fn value_matches_type(value: &Value, kind: &str) -> bool {
1508    match kind {
1509        "null" => value.is_null(),
1510        "boolean" => value.is_boolean(),
1511        "integer" => value.as_i64().is_some() || value.as_u64().is_some(),
1512        "number" => value.is_number(),
1513        "string" => value.is_string(),
1514        "array" => value.is_array(),
1515        "object" => value.is_object(),
1516        _ => false,
1517    }
1518}
1519
1520fn describe_json_type(value: &Value) -> &'static str {
1521    match value {
1522        Value::Null => "null",
1523        Value::Bool(_) => "boolean",
1524        Value::Number(number) => {
1525            if number.as_i64().is_some() || number.as_u64().is_some() {
1526                "integer"
1527            } else {
1528                "number"
1529            }
1530        }
1531        Value::String(_) => "string",
1532        Value::Array(_) => "array",
1533        Value::Object(_) => "object",
1534    }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use super::{
1540        run_plugin, run_plugin_with_policy, run_plugin_with_policy_and_metadata,
1541        PluginExecutionPolicy, PluginManifest, PluginRequest, PluginStage,
1542    };
1543    use serde_json::json;
1544    use tempfile::tempdir;
1545
1546    #[test]
1547    fn validates_basic_manifest() {
1548        let manifest = PluginManifest {
1549            name: "demo".to_string(),
1550            plugin_id: None,
1551            plugin_version: None,
1552            protocol_version: "1".to_string(),
1553            stage: PluginStage::Observer,
1554            entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1555            language: Some("python".to_string()),
1556            capabilities: None,
1557            timeout_ms: None,
1558            input_schema: None,
1559            options_schema: None,
1560            output_schema: None,
1561            manifest_dir: None,
1562            manifest_path: None,
1563        };
1564        assert!(manifest.validate().is_ok());
1565    }
1566
1567    #[test]
1568    fn validates_declared_input_options_and_output_schemas() {
1569        let manifest = PluginManifest {
1570            name: "demo".to_string(),
1571            plugin_id: None,
1572            plugin_version: None,
1573            protocol_version: "1".to_string(),
1574            stage: PluginStage::Observer,
1575            entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1576            language: Some("python".to_string()),
1577            capabilities: None,
1578            timeout_ms: None,
1579            input_schema: Some(json!({
1580                "type": "object",
1581                "required": ["age", "member"],
1582                "properties": {
1583                    "age": { "type": "integer" },
1584                    "member": { "type": "boolean" }
1585                },
1586                "additionalProperties": false
1587            })),
1588            options_schema: Some(json!({
1589                "type": ["object", "null"],
1590                "properties": {
1591                    "mode": { "type": "string" }
1592                },
1593                "additionalProperties": false
1594            })),
1595            output_schema: Some(json!({
1596                "type": "object",
1597                "required": ["ok", "features"],
1598                "properties": {
1599                    "ok": { "const": true },
1600                    "features": {
1601                        "type": "object",
1602                        "required": ["age"],
1603                        "properties": {
1604                            "age": { "type": "integer" }
1605                        }
1606                    }
1607                }
1608            })),
1609            manifest_dir: None,
1610            manifest_path: None,
1611        };
1612        assert!(manifest.validate().is_ok());
1613
1614        let request = super::PluginRequest {
1615            protocol_version: "1".to_string(),
1616            stage: PluginStage::Observer,
1617            payload: super::build_canonical_payload(
1618                &PluginStage::Observer,
1619                json!({"age": 34, "member": true}),
1620                Some(json!({"mode": "strict"})),
1621            ),
1622        };
1623        assert!(super::validate_plugin_request_contract(&manifest, &request).is_ok());
1624
1625        let bad_request = super::PluginRequest {
1626            protocol_version: "1".to_string(),
1627            stage: PluginStage::Observer,
1628            payload: super::build_canonical_payload(
1629                &PluginStage::Observer,
1630                json!({"age": "34", "member": true, "extra": 1}),
1631                None,
1632            ),
1633        };
1634        assert!(super::validate_plugin_request_contract(&manifest, &bad_request).is_err());
1635
1636        let good_response = super::PluginResponse {
1637            ok: true,
1638            warnings: Vec::new(),
1639            error: None,
1640            extra: serde_json::Map::from_iter([("features".to_string(), json!({"age": 34}))]),
1641        };
1642        assert!(super::validate_ok_plugin_response(&manifest, &good_response).is_ok());
1643
1644        let bad_response = super::PluginResponse {
1645            ok: true,
1646            warnings: Vec::new(),
1647            error: None,
1648            extra: serde_json::Map::new(),
1649        };
1650        assert!(super::validate_ok_plugin_response(&manifest, &bad_response).is_err());
1651    }
1652
1653    #[test]
1654    fn rejects_unsupported_schema_subset_keywords() {
1655        let manifest = PluginManifest {
1656            name: "demo".to_string(),
1657            plugin_id: None,
1658            plugin_version: None,
1659            protocol_version: "1".to_string(),
1660            stage: PluginStage::Observer,
1661            entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1662            language: Some("python".to_string()),
1663            capabilities: None,
1664            timeout_ms: None,
1665            input_schema: Some(json!({
1666                "type": "object",
1667                "properties": {
1668                    "age": {
1669                        "type": "integer",
1670                        "minimum": 0
1671                    }
1672                }
1673            })),
1674            options_schema: None,
1675            output_schema: None,
1676            manifest_dir: None,
1677            manifest_path: None,
1678        };
1679
1680        let err = manifest.validate().unwrap_err();
1681        assert!(err
1682            .to_string()
1683            .contains("unsupported LogicPearl schema subset keyword \"minimum\""));
1684    }
1685
1686    #[test]
1687    fn accepts_schema_subset_annotation_keywords() {
1688        let manifest = PluginManifest {
1689            name: "demo".to_string(),
1690            plugin_id: None,
1691            plugin_version: None,
1692            protocol_version: "1".to_string(),
1693            stage: PluginStage::Observer,
1694            entrypoint: vec!["python3".to_string(), "plugin.py".to_string()],
1695            language: Some("python".to_string()),
1696            capabilities: None,
1697            timeout_ms: None,
1698            input_schema: Some(json!({
1699                "$schema": "https://logicpearl.com/schema/plugin-contract-subset",
1700                "title": "Observer input",
1701                "description": "Annotation fields are accepted but do not add validation.",
1702                "type": "object"
1703            })),
1704            options_schema: None,
1705            output_schema: None,
1706            manifest_dir: None,
1707            manifest_path: None,
1708        };
1709
1710        assert!(manifest.validate().is_ok());
1711    }
1712
1713    #[cfg(unix)]
1714    fn write_plugin_script(script_body: &str) -> (tempfile::TempDir, std::path::PathBuf) {
1715        use std::os::unix::fs::PermissionsExt;
1716
1717        let dir = tempdir().expect("tempdir");
1718        let path = dir.path().join("plugin.sh");
1719        std::fs::write(&path, script_body).expect("write script");
1720        let mut permissions = std::fs::metadata(&path).expect("stat script").permissions();
1721        permissions.set_mode(0o755);
1722        std::fs::set_permissions(&path, permissions).expect("chmod script");
1723        (dir, path)
1724    }
1725
1726    #[cfg(unix)]
1727    fn test_request() -> PluginRequest {
1728        PluginRequest {
1729            protocol_version: "1".to_string(),
1730            stage: PluginStage::Observer,
1731            payload: super::build_canonical_payload(
1732                &PluginStage::Observer,
1733                json!({"value": 1}),
1734                None,
1735            ),
1736        }
1737    }
1738
1739    #[cfg(unix)]
1740    #[test]
1741    fn enforces_plugin_timeout_when_declared() {
1742        let (dir, _script_path) =
1743            write_plugin_script("#!/bin/sh\nsleep 1\nprintf '{\"ok\":true}\\n'\n");
1744        let manifest = PluginManifest {
1745            name: "slow".to_string(),
1746            plugin_id: None,
1747            plugin_version: None,
1748            protocol_version: "1".to_string(),
1749            stage: PluginStage::Observer,
1750            entrypoint: vec!["plugin.sh".to_string()],
1751            language: Some("shell".to_string()),
1752            capabilities: None,
1753            timeout_ms: Some(50),
1754            input_schema: None,
1755            options_schema: None,
1756            output_schema: None,
1757            manifest_dir: Some(dir.path().to_path_buf()),
1758            manifest_path: None,
1759        };
1760
1761        let error = run_plugin(&manifest, &test_request()).expect_err("plugin should time out");
1762        let message = error.to_string();
1763        assert!(message.contains("exceeded timeout_ms=50"), "{message}");
1764    }
1765
1766    #[cfg(unix)]
1767    #[test]
1768    fn applies_policy_default_timeout_when_manifest_timeout_is_unset() {
1769        let (dir, _script_path) =
1770            write_plugin_script("#!/bin/sh\nsleep 1\nprintf '{\"ok\":true}\\n'\n");
1771        let manifest = PluginManifest {
1772            name: "slow-default".to_string(),
1773            plugin_id: None,
1774            plugin_version: None,
1775            protocol_version: "1".to_string(),
1776            stage: PluginStage::Observer,
1777            entrypoint: vec!["plugin.sh".to_string()],
1778            language: Some("shell".to_string()),
1779            capabilities: None,
1780            timeout_ms: None,
1781            input_schema: None,
1782            options_schema: None,
1783            output_schema: None,
1784            manifest_dir: Some(dir.path().to_path_buf()),
1785            manifest_path: None,
1786        };
1787
1788        let policy = PluginExecutionPolicy::default().with_default_timeout_ms(50);
1789        let error = run_plugin_with_policy(&manifest, &test_request(), &policy)
1790            .expect_err("policy default timeout should apply");
1791        let message = error.to_string();
1792        assert!(message.contains("exceeded timeout_ms=50"), "{message}");
1793    }
1794
1795    #[cfg(unix)]
1796    #[test]
1797    fn rejects_no_timeout_manifest_without_policy_opt_in() {
1798        let (dir, _script_path) = write_plugin_script("#!/bin/sh\nprintf '{\"ok\":true}\\n'\n");
1799        let manifest = PluginManifest {
1800            name: "no-timeout".to_string(),
1801            plugin_id: None,
1802            plugin_version: None,
1803            protocol_version: "1".to_string(),
1804            stage: PluginStage::Observer,
1805            entrypoint: vec!["plugin.sh".to_string()],
1806            language: Some("shell".to_string()),
1807            capabilities: None,
1808            timeout_ms: Some(0),
1809            input_schema: None,
1810            options_schema: None,
1811            output_schema: None,
1812            manifest_dir: Some(dir.path().to_path_buf()),
1813            manifest_path: None,
1814        };
1815
1816        let error = run_plugin(&manifest, &test_request()).expect_err("no timeout should reject");
1817        let message = error.to_string();
1818        assert!(message.contains("timeout_ms=0"), "{message}");
1819        assert!(message.contains("disables the plugin timeout"), "{message}");
1820    }
1821
1822    #[cfg(unix)]
1823    #[test]
1824    fn allows_no_timeout_when_policy_opts_in() {
1825        let (dir, _script_path) = write_plugin_script(
1826            "#!/bin/sh\nsleep 0.1\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1827        );
1828        let manifest = PluginManifest {
1829            name: "trusted-no-timeout".to_string(),
1830            plugin_id: None,
1831            plugin_version: None,
1832            protocol_version: "1".to_string(),
1833            stage: PluginStage::Observer,
1834            entrypoint: vec!["plugin.sh".to_string()],
1835            language: Some("shell".to_string()),
1836            capabilities: None,
1837            timeout_ms: Some(0),
1838            input_schema: None,
1839            options_schema: None,
1840            output_schema: None,
1841            manifest_dir: Some(dir.path().to_path_buf()),
1842            manifest_path: None,
1843        };
1844
1845        let policy = PluginExecutionPolicy::default().with_allow_no_timeout(true);
1846        let response = run_plugin_with_policy(&manifest, &test_request(), &policy)
1847            .expect("trusted no-timeout plugin should succeed");
1848        assert!(response.ok);
1849        assert_eq!(response.extra.get("features"), Some(&json!({"value": 1})));
1850    }
1851
1852    #[cfg(unix)]
1853    #[test]
1854    fn allows_known_interpreter_for_manifest_local_script() {
1855        let (dir, _script_path) = write_plugin_script(
1856            "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1857        );
1858        let manifest = PluginManifest {
1859            name: "shell-wrapper".to_string(),
1860            plugin_id: None,
1861            plugin_version: None,
1862            protocol_version: "1".to_string(),
1863            stage: PluginStage::Observer,
1864            entrypoint: vec!["sh".to_string(), "plugin.sh".to_string()],
1865            language: Some("shell".to_string()),
1866            capabilities: None,
1867            timeout_ms: None,
1868            input_schema: None,
1869            options_schema: None,
1870            output_schema: None,
1871            manifest_dir: Some(dir.path().to_path_buf()),
1872            manifest_path: None,
1873        };
1874
1875        let response = run_plugin(&manifest, &test_request())
1876            .expect("known interpreter with manifest-local script should succeed");
1877        assert!(response.ok);
1878    }
1879
1880    #[cfg(unix)]
1881    #[test]
1882    fn returns_redacted_plugin_run_metadata() {
1883        let (dir, _script_path) = write_plugin_script(
1884            "#!/bin/sh\nprintf 'debug secret\\n' >&2\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1885        );
1886        let manifest = PluginManifest {
1887            name: "metadata".to_string(),
1888            plugin_id: Some("metadata-plugin".to_string()),
1889            plugin_version: Some("0.1.0".to_string()),
1890            protocol_version: "1".to_string(),
1891            stage: PluginStage::Observer,
1892            entrypoint: vec!["plugin.sh".to_string()],
1893            language: Some("shell".to_string()),
1894            capabilities: Some(vec!["feature_output".to_string()]),
1895            timeout_ms: None,
1896            input_schema: None,
1897            options_schema: None,
1898            output_schema: None,
1899            manifest_dir: Some(dir.path().to_path_buf()),
1900            manifest_path: None,
1901        };
1902
1903        let execution = run_plugin_with_policy_and_metadata(
1904            &manifest,
1905            &test_request(),
1906            &PluginExecutionPolicy::default(),
1907        )
1908        .expect("plugin should run with metadata");
1909
1910        assert!(execution.response.ok);
1911        assert_eq!(execution.run.plugin_id, "metadata-plugin");
1912        assert_eq!(execution.run.plugin_version.as_deref(), Some("0.1.0"));
1913        assert_eq!(execution.run.access.network, "not_enforced");
1914        assert_eq!(execution.run.access.filesystem, "process_default");
1915        assert_eq!(
1916            execution.run.timeout_policy.effective_timeout_ms,
1917            Some(30_000)
1918        );
1919        assert_eq!(
1920            execution.run.capabilities.allowed,
1921            vec!["feature_output".to_string()]
1922        );
1923        assert!(execution.run.plugin_run_id.starts_with("sha256:"));
1924        assert!(execution.run.entrypoint_hash.starts_with("sha256:"));
1925        assert_eq!(execution.run.entrypoint.hashes.len(), 1);
1926        assert!(execution
1927            .run
1928            .stdio
1929            .stdout_summary
1930            .as_deref()
1931            .is_some_and(|value| value.starts_with("<redacted:sha256:")));
1932        assert!(execution
1933            .run
1934            .stdio
1935            .stderr_summary
1936            .as_deref()
1937            .is_some_and(|value| value.starts_with("<redacted:sha256:")));
1938        assert_ne!(
1939            execution.run.stdio.stderr_summary.as_deref(),
1940            Some("debug secret")
1941        );
1942    }
1943
1944    #[cfg(unix)]
1945    #[test]
1946    fn rejects_bare_path_lookup_by_default() {
1947        let dir = tempdir().expect("tempdir");
1948        let manifest = PluginManifest {
1949            name: "path-command".to_string(),
1950            plugin_id: None,
1951            plugin_version: None,
1952            protocol_version: "1".to_string(),
1953            stage: PluginStage::Observer,
1954            entrypoint: vec!["logicpearl-plugin-not-in-manifest".to_string()],
1955            language: Some("shell".to_string()),
1956            capabilities: None,
1957            timeout_ms: None,
1958            input_schema: None,
1959            options_schema: None,
1960            output_schema: None,
1961            manifest_dir: Some(dir.path().to_path_buf()),
1962            manifest_path: None,
1963        };
1964
1965        let error = run_plugin(&manifest, &test_request()).expect_err("PATH lookup should reject");
1966        let message = error.to_string();
1967        assert!(message.contains("PATH lookup is disabled"), "{message}");
1968    }
1969
1970    #[cfg(unix)]
1971    #[test]
1972    fn rejects_absolute_entrypoint_by_default() {
1973        let (dir, script_path) = write_plugin_script(
1974            "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
1975        );
1976        let manifest = PluginManifest {
1977            name: "absolute".to_string(),
1978            plugin_id: None,
1979            plugin_version: None,
1980            protocol_version: "1".to_string(),
1981            stage: PluginStage::Observer,
1982            entrypoint: vec![script_path.display().to_string()],
1983            language: Some("shell".to_string()),
1984            capabilities: None,
1985            timeout_ms: None,
1986            input_schema: None,
1987            options_schema: None,
1988            output_schema: None,
1989            manifest_dir: Some(dir.path().to_path_buf()),
1990            manifest_path: None,
1991        };
1992
1993        let error =
1994            run_plugin(&manifest, &test_request()).expect_err("absolute entrypoint should reject");
1995        let message = error.to_string();
1996        assert!(message.contains("absolute program path"), "{message}");
1997    }
1998
1999    #[cfg(unix)]
2000    #[test]
2001    fn allows_absolute_entrypoint_when_policy_opts_in() {
2002        let (dir, script_path) = write_plugin_script(
2003            "#!/bin/sh\nprintf '{\"ok\":true,\"features\":{\"value\":1}}\\n'\n",
2004        );
2005        let manifest = PluginManifest {
2006            name: "absolute".to_string(),
2007            plugin_id: None,
2008            plugin_version: None,
2009            protocol_version: "1".to_string(),
2010            stage: PluginStage::Observer,
2011            entrypoint: vec![script_path.display().to_string()],
2012            language: Some("shell".to_string()),
2013            capabilities: None,
2014            timeout_ms: None,
2015            input_schema: None,
2016            options_schema: None,
2017            output_schema: None,
2018            manifest_dir: Some(dir.path().to_path_buf()),
2019            manifest_path: None,
2020        };
2021
2022        let policy = PluginExecutionPolicy::default().with_allow_absolute_entrypoint(true);
2023        let response = run_plugin_with_policy(&manifest, &test_request(), &policy)
2024            .expect("absolute entrypoint should run only under explicit policy");
2025        assert!(response.ok);
2026    }
2027
2028    #[cfg(unix)]
2029    #[test]
2030    fn timeout_terminates_descendants_that_keep_output_pipes_open() {
2031        let (dir, _script_path) = write_plugin_script(
2032            "#!/bin/sh\n(sh -c 'sleep 5') &\nsleep 5\nprintf '{\"ok\":true}\\n'\n",
2033        );
2034        let manifest = PluginManifest {
2035            name: "tree".to_string(),
2036            plugin_id: None,
2037            plugin_version: None,
2038            protocol_version: "1".to_string(),
2039            stage: PluginStage::Observer,
2040            entrypoint: vec!["plugin.sh".to_string()],
2041            language: Some("shell".to_string()),
2042            capabilities: None,
2043            timeout_ms: Some(50),
2044            input_schema: None,
2045            options_schema: None,
2046            output_schema: None,
2047            manifest_dir: Some(dir.path().to_path_buf()),
2048            manifest_path: None,
2049        };
2050
2051        let started_at = std::time::Instant::now();
2052        let error = run_plugin(&manifest, &test_request()).expect_err("plugin should time out");
2053        assert!(
2054            started_at.elapsed() < std::time::Duration::from_secs(2),
2055            "timeout should not wait for descendant sleep process"
2056        );
2057        let message = error.to_string();
2058        assert!(message.contains("exceeded timeout_ms=50"), "{message}");
2059    }
2060
2061    #[test]
2062    fn limited_reader_rejects_outputs_above_cap() {
2063        let mut reader = std::io::Cursor::new(vec![b'x'; 5]);
2064        let error = super::read_limited(&mut reader, 4).expect_err("reader should reject overflow");
2065        assert_eq!(error.kind(), std::io::ErrorKind::InvalidData);
2066        assert_eq!(reader.position(), 5, "reader should drain capped streams");
2067    }
2068}