Skip to main content

rch_common/e2e/
process_triage.rs

1//! Stable process triage integration contract.
2//!
3//! This module defines a shared schema for invoking external process-triage
4//! helpers and safely interpreting their actions/results.
5
6use schemars::{JsonSchema, schema::RootSchema, schema_for};
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9
10/// Stable schema version for the process-triage integration contract.
11///
12/// Sourced from the central [`crate::schema_versions`] registry so cross-component
13/// drift is caught by the registry's pinned-snapshot test.
14pub const PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION: &str = crate::schema_versions::current_version(
15    crate::schema_versions::SchemaComponent::ProcessTriageContract,
16);
17
18/// Stable command surface for invoking the process-triage adapter.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
20#[serde(rename_all = "snake_case")]
21pub enum ProcessTriageAdapterCommand {
22    Analyze,
23    Execute,
24    Health,
25    Version,
26}
27
28impl ProcessTriageAdapterCommand {
29    /// Return CLI arguments for the command surface.
30    pub fn args(self) -> &'static [&'static str] {
31        match self {
32            Self::Analyze => &["process-triage", "analyze", "--json"],
33            Self::Execute => &["process-triage", "execute", "--json"],
34            Self::Health => &["process-triage", "health", "--json"],
35            Self::Version => &["process-triage", "version", "--json"],
36        }
37    }
38}
39
40/// Supported process-triage action classes ordered by risk.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
42#[serde(rename_all = "snake_case")]
43pub enum ProcessTriageActionClass {
44    ObserveOnly,
45    SoftTerminate,
46    HardTerminate,
47    ReclaimDisk,
48}
49
50impl ProcessTriageActionClass {
51    fn risk_rank(self) -> u8 {
52        match self {
53            Self::ObserveOnly => 0,
54            Self::ReclaimDisk => 1,
55            Self::SoftTerminate => 2,
56            Self::HardTerminate => 3,
57        }
58    }
59}
60
61/// Trigger class that initiated process triage.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
63#[serde(rename_all = "snake_case")]
64pub enum ProcessTriageTrigger {
65    DiskPressure,
66    WorkerHealth,
67    BuildTimeout,
68    Manual,
69}
70
71/// Process classification label from detector.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
73#[serde(rename_all = "snake_case")]
74pub enum ProcessClassification {
75    BuildRelated,
76    Suspicious,
77    Interactive,
78    SystemCritical,
79    Unknown,
80}
81
82/// Escalation levels used by safe-action policy.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
84#[serde(rename_all = "snake_case")]
85pub enum ProcessTriageEscalationLevel {
86    Automatic,
87    Supervised,
88    ManualReview,
89    Blocked,
90}
91
92/// Failure taxonomy for process triage adapter interactions.
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
94#[serde(rename_all = "snake_case")]
95pub enum ProcessTriageFailureKind {
96    DetectorUncertain,
97    PolicyViolation,
98    TransportError,
99    ExecutorRuntimeError,
100    Timeout,
101    PartialResult,
102    InvalidRequest,
103}
104
105/// Adapter command budget used in timeout/retry policy.
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
107pub struct ProcessTriageCommandBudget {
108    pub command: ProcessTriageAdapterCommand,
109    pub timeout_secs: u64,
110    pub retries: u32,
111}
112
113/// Timeout policy contract.
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
115pub struct ProcessTriageTimeoutPolicy {
116    pub request_timeout_secs: u64,
117    pub action_timeout_secs: u64,
118    pub total_timeout_secs: u64,
119}
120
121impl Default for ProcessTriageTimeoutPolicy {
122    fn default() -> Self {
123        Self {
124            request_timeout_secs: 8,
125            action_timeout_secs: 15,
126            total_timeout_secs: 30,
127        }
128    }
129}
130
131/// Retry policy contract.
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
133pub struct ProcessTriageRetryPolicy {
134    pub max_attempts: u32,
135    pub initial_backoff_ms: u64,
136    pub max_backoff_ms: u64,
137    pub backoff_multiplier_percent: u16,
138}
139
140impl Default for ProcessTriageRetryPolicy {
141    fn default() -> Self {
142        Self {
143            max_attempts: 3,
144            initial_backoff_ms: 250,
145            max_backoff_ms: 2_000,
146            backoff_multiplier_percent: 200,
147        }
148    }
149}
150
151/// Escalation thresholds used by policy evaluation.
152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
153pub struct ProcessTriageEscalationThresholds {
154    pub min_confidence_for_automatic: u8,
155    pub max_actions_before_manual_review: u32,
156    pub max_hard_terminations_before_manual_review: u32,
157}
158
159impl Default for ProcessTriageEscalationThresholds {
160    fn default() -> Self {
161        Self {
162            min_confidence_for_automatic: 85,
163            max_actions_before_manual_review: 5,
164            max_hard_terminations_before_manual_review: 1,
165        }
166    }
167}
168
169/// Safe-action policy with explicit allowlist/denylist and escalation rules.
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
171pub struct ProcessTriageSafeActionPolicy {
172    pub policy_version: String,
173    pub allow_action_classes: Vec<ProcessTriageActionClass>,
174    pub deny_action_classes: Vec<ProcessTriageActionClass>,
175    pub managed_process_patterns: Vec<String>,
176    pub protected_process_patterns: Vec<String>,
177    pub escalation: ProcessTriageEscalationThresholds,
178    pub require_audit_record: bool,
179}
180
181impl Default for ProcessTriageSafeActionPolicy {
182    fn default() -> Self {
183        Self {
184            policy_version: "v1".to_string(),
185            allow_action_classes: vec![
186                ProcessTriageActionClass::ObserveOnly,
187                ProcessTriageActionClass::SoftTerminate,
188                ProcessTriageActionClass::ReclaimDisk,
189            ],
190            deny_action_classes: vec![ProcessTriageActionClass::HardTerminate],
191            managed_process_patterns: vec![
192                "cargo".to_string(),
193                "rustc".to_string(),
194                "clang".to_string(),
195            ],
196            protected_process_patterns: vec![
197                "sshd".to_string(),
198                "systemd".to_string(),
199                "init".to_string(),
200            ],
201            escalation: ProcessTriageEscalationThresholds::default(),
202            require_audit_record: true,
203        }
204    }
205}
206
207/// Full contract bundle for process triage integration.
208#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
209pub struct ProcessTriageContract {
210    pub schema_version: String,
211    pub timeout_policy: ProcessTriageTimeoutPolicy,
212    pub retry_policy: ProcessTriageRetryPolicy,
213    pub command_budgets: Vec<ProcessTriageCommandBudget>,
214    pub safe_action_policy: ProcessTriageSafeActionPolicy,
215}
216
217impl Default for ProcessTriageContract {
218    fn default() -> Self {
219        Self {
220            schema_version: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
221            timeout_policy: ProcessTriageTimeoutPolicy::default(),
222            retry_policy: ProcessTriageRetryPolicy::default(),
223            command_budgets: vec![
224                ProcessTriageCommandBudget {
225                    command: ProcessTriageAdapterCommand::Analyze,
226                    timeout_secs: 8,
227                    retries: 1,
228                },
229                ProcessTriageCommandBudget {
230                    command: ProcessTriageAdapterCommand::Execute,
231                    timeout_secs: 15,
232                    retries: 1,
233                },
234                ProcessTriageCommandBudget {
235                    command: ProcessTriageAdapterCommand::Health,
236                    timeout_secs: 3,
237                    retries: 2,
238                },
239                ProcessTriageCommandBudget {
240                    command: ProcessTriageAdapterCommand::Version,
241                    timeout_secs: 2,
242                    retries: 0,
243                },
244            ],
245            safe_action_policy: ProcessTriageSafeActionPolicy::default(),
246        }
247    }
248}
249
250/// Observed process sample from detector.
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
252pub struct ProcessDescriptor {
253    pub pid: u32,
254    pub ppid: Option<u32>,
255    pub owner: String,
256    pub command: String,
257    pub classification: ProcessClassification,
258    pub cpu_percent_milli: u32,
259    pub rss_mb: u32,
260    pub runtime_secs: u64,
261}
262
263/// Action request item proposed by detector/planner.
264#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
265pub struct ProcessTriageActionRequest {
266    pub action_class: ProcessTriageActionClass,
267    pub pid: u32,
268    pub reason_code: String,
269    pub signal: Option<String>,
270}
271
272/// Adapter request schema.
273#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
274pub struct ProcessTriageRequest {
275    pub schema_version: String,
276    pub correlation_id: String,
277    pub worker_id: String,
278    pub observed_at_unix_ms: i64,
279    pub trigger: ProcessTriageTrigger,
280    pub detector_confidence_percent: u8,
281    pub retry_attempt: u32,
282    pub candidate_processes: Vec<ProcessDescriptor>,
283    pub requested_actions: Vec<ProcessTriageActionRequest>,
284}
285
286/// Action execution outcome.
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
288#[serde(rename_all = "snake_case")]
289pub enum ProcessTriageActionOutcome {
290    Skipped,
291    Executed,
292    Failed,
293    Escalated,
294}
295
296/// Action result in response.
297#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
298pub struct ProcessTriageActionResult {
299    pub pid: u32,
300    pub action_class: ProcessTriageActionClass,
301    pub outcome: ProcessTriageActionOutcome,
302    pub note: Option<String>,
303}
304
305/// High-level response status.
306#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
307#[serde(rename_all = "snake_case")]
308pub enum ProcessTriageResponseStatus {
309    Applied,
310    PartiallyApplied,
311    EscalatedNoAction,
312    RejectedByPolicy,
313    Failed,
314}
315
316/// Failure payload.
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
318pub struct ProcessTriageFailure {
319    pub kind: ProcessTriageFailureKind,
320    pub code: String,
321    pub message: String,
322    pub remediation: Vec<String>,
323}
324
325/// Audit record required for every process triage response.
326#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
327pub struct ProcessTriageAuditRecord {
328    pub policy_version: String,
329    pub evaluated_by: String,
330    pub evaluated_at_unix_ms: i64,
331    pub decision_code: String,
332    pub requires_operator_ack: bool,
333    pub audit_required: bool,
334}
335
336/// Adapter response schema.
337#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
338pub struct ProcessTriageResponse {
339    pub schema_version: String,
340    pub correlation_id: String,
341    pub status: ProcessTriageResponseStatus,
342    pub escalation_level: ProcessTriageEscalationLevel,
343    pub executed_actions: Vec<ProcessTriageActionResult>,
344    pub failure: Option<ProcessTriageFailure>,
345    pub audit: ProcessTriageAuditRecord,
346}
347
348/// Deterministic policy decision for a single action request.
349#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
350pub struct ProcessTriagePolicyDecision {
351    pub permitted: bool,
352    pub escalation_level: ProcessTriageEscalationLevel,
353    pub effective_action: Option<ProcessTriageActionClass>,
354    pub decision_code: String,
355    pub reason: String,
356    pub requires_operator_ack: bool,
357    pub audit_required: bool,
358}
359
360/// Contract validation failures.
361#[derive(Debug, thiserror::Error, PartialEq, Eq)]
362pub enum ProcessTriageContractError {
363    #[error("schema version mismatch: expected {expected}, got {actual}")]
364    SchemaVersionMismatch { expected: String, actual: String },
365    #[error("detector confidence percent must be <= 100, got {0}")]
366    InvalidConfidence(u8),
367    #[error("requested_actions must not be empty")]
368    EmptyRequestedActions,
369    #[error("requested action references unknown pid {0}")]
370    UnknownActionPid(u32),
371    #[error("timeout policy has invalid value for {field}: {value}")]
372    InvalidTimeout { field: &'static str, value: u64 },
373    #[error("retry policy has invalid value for {field}: {value}")]
374    InvalidRetryPolicy { field: &'static str, value: u64 },
375    #[error("allowlist/denylist conflict for action class {0:?}")]
376    AllowDenyConflict(ProcessTriageActionClass),
377    #[error("escalation threshold min_confidence_for_automatic must be <= 100, got {0}")]
378    InvalidEscalationConfidence(u8),
379}
380
381impl ProcessTriageRequest {
382    /// Validate request shape and semantic constraints.
383    pub fn validate(&self) -> Result<(), ProcessTriageContractError> {
384        if self.schema_version != PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION {
385            return Err(ProcessTriageContractError::SchemaVersionMismatch {
386                expected: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
387                actual: self.schema_version.clone(),
388            });
389        }
390        if self.detector_confidence_percent > 100 {
391            return Err(ProcessTriageContractError::InvalidConfidence(
392                self.detector_confidence_percent,
393            ));
394        }
395        if self.requested_actions.is_empty() {
396            return Err(ProcessTriageContractError::EmptyRequestedActions);
397        }
398
399        let candidate_pids: HashSet<u32> = self.candidate_processes.iter().map(|p| p.pid).collect();
400        for action in &self.requested_actions {
401            if !candidate_pids.contains(&action.pid) {
402                return Err(ProcessTriageContractError::UnknownActionPid(action.pid));
403            }
404        }
405
406        Ok(())
407    }
408}
409
410impl ProcessTriageContract {
411    /// Validate contract configuration.
412    pub fn validate(&self) -> Result<(), ProcessTriageContractError> {
413        if self.timeout_policy.request_timeout_secs == 0 {
414            return Err(ProcessTriageContractError::InvalidTimeout {
415                field: "request_timeout_secs",
416                value: self.timeout_policy.request_timeout_secs,
417            });
418        }
419        if self.timeout_policy.action_timeout_secs == 0 {
420            return Err(ProcessTriageContractError::InvalidTimeout {
421                field: "action_timeout_secs",
422                value: self.timeout_policy.action_timeout_secs,
423            });
424        }
425        if self.timeout_policy.total_timeout_secs == 0 {
426            return Err(ProcessTriageContractError::InvalidTimeout {
427                field: "total_timeout_secs",
428                value: self.timeout_policy.total_timeout_secs,
429            });
430        }
431        if self.retry_policy.max_attempts == 0 {
432            return Err(ProcessTriageContractError::InvalidRetryPolicy {
433                field: "max_attempts",
434                value: self.retry_policy.max_attempts as u64,
435            });
436        }
437        if self.retry_policy.initial_backoff_ms == 0 {
438            return Err(ProcessTriageContractError::InvalidRetryPolicy {
439                field: "initial_backoff_ms",
440                value: self.retry_policy.initial_backoff_ms,
441            });
442        }
443        if self.retry_policy.max_backoff_ms < self.retry_policy.initial_backoff_ms {
444            return Err(ProcessTriageContractError::InvalidRetryPolicy {
445                field: "max_backoff_ms",
446                value: self.retry_policy.max_backoff_ms,
447            });
448        }
449        if self
450            .safe_action_policy
451            .escalation
452            .min_confidence_for_automatic
453            > 100
454        {
455            return Err(ProcessTriageContractError::InvalidEscalationConfidence(
456                self.safe_action_policy
457                    .escalation
458                    .min_confidence_for_automatic,
459            ));
460        }
461
462        let allow: HashSet<ProcessTriageActionClass> = self
463            .safe_action_policy
464            .allow_action_classes
465            .iter()
466            .copied()
467            .collect();
468        let deny: HashSet<ProcessTriageActionClass> = self
469            .safe_action_policy
470            .deny_action_classes
471            .iter()
472            .copied()
473            .collect();
474
475        if let Some(action) = allow.intersection(&deny).next() {
476            return Err(ProcessTriageContractError::AllowDenyConflict(*action));
477        }
478
479        Ok(())
480    }
481}
482
483/// Evaluate a requested action against the safe-action policy.
484pub fn evaluate_triage_action(
485    request: &ProcessTriageRequest,
486    contract: &ProcessTriageContract,
487    action: &ProcessTriageActionRequest,
488) -> ProcessTriagePolicyDecision {
489    let policy = &contract.safe_action_policy;
490    let allow: HashSet<ProcessTriageActionClass> =
491        policy.allow_action_classes.iter().copied().collect();
492    let deny: HashSet<ProcessTriageActionClass> =
493        policy.deny_action_classes.iter().copied().collect();
494
495    if deny.contains(&action.action_class) {
496        return ProcessTriagePolicyDecision {
497            permitted: false,
498            escalation_level: ProcessTriageEscalationLevel::Blocked,
499            effective_action: None,
500            decision_code: "PT_BLOCK_DENYLIST".to_string(),
501            reason: format!("action class {:?} is denylisted", action.action_class),
502            requires_operator_ack: true,
503            audit_required: policy.require_audit_record,
504        };
505    }
506
507    if !allow.contains(&action.action_class) {
508        return ProcessTriagePolicyDecision {
509            permitted: false,
510            escalation_level: ProcessTriageEscalationLevel::Blocked,
511            effective_action: None,
512            decision_code: "PT_BLOCK_NOT_ALLOWLISTED".to_string(),
513            reason: format!("action class {:?} is not allowlisted", action.action_class),
514            requires_operator_ack: true,
515            audit_required: policy.require_audit_record,
516        };
517    }
518
519    let target = request
520        .candidate_processes
521        .iter()
522        .find(|proc_desc| proc_desc.pid == action.pid);
523
524    if let Some(proc_desc) = target {
525        let cmd_lower = proc_desc.command.to_ascii_lowercase();
526        if pattern_matches(&cmd_lower, &policy.protected_process_patterns) {
527            return ProcessTriagePolicyDecision {
528                permitted: false,
529                escalation_level: ProcessTriageEscalationLevel::Blocked,
530                effective_action: None,
531                decision_code: "PT_BLOCK_PROTECTED_PROCESS".to_string(),
532                reason: format!(
533                    "target process '{}' matches protected patterns",
534                    proc_desc.command
535                ),
536                requires_operator_ack: true,
537                audit_required: policy.require_audit_record,
538            };
539        }
540        if !policy.managed_process_patterns.is_empty()
541            && !pattern_matches(&cmd_lower, &policy.managed_process_patterns)
542        {
543            return ProcessTriagePolicyDecision {
544                permitted: false,
545                escalation_level: ProcessTriageEscalationLevel::Blocked,
546                effective_action: None,
547                decision_code: "PT_BLOCK_OUT_OF_SCOPE_PROCESS".to_string(),
548                reason: format!(
549                    "target process '{}' does not match managed patterns",
550                    proc_desc.command
551                ),
552                requires_operator_ack: true,
553                audit_required: policy.require_audit_record,
554            };
555        }
556    }
557
558    if request.detector_confidence_percent < policy.escalation.min_confidence_for_automatic {
559        return ProcessTriagePolicyDecision {
560            permitted: false,
561            escalation_level: ProcessTriageEscalationLevel::ManualReview,
562            effective_action: None,
563            decision_code: "PT_MANUAL_LOW_CONFIDENCE".to_string(),
564            reason: format!(
565                "detector confidence {} is below automatic threshold {}",
566                request.detector_confidence_percent, policy.escalation.min_confidence_for_automatic
567            ),
568            requires_operator_ack: true,
569            audit_required: policy.require_audit_record,
570        };
571    }
572
573    if request.retry_attempt + 1 >= contract.retry_policy.max_attempts {
574        return ProcessTriagePolicyDecision {
575            permitted: false,
576            escalation_level: ProcessTriageEscalationLevel::ManualReview,
577            effective_action: None,
578            decision_code: "PT_MANUAL_RETRY_EXHAUSTED".to_string(),
579            reason: format!(
580                "retry attempt {} reached max attempts {}",
581                request.retry_attempt + 1,
582                contract.retry_policy.max_attempts
583            ),
584            requires_operator_ack: true,
585            audit_required: policy.require_audit_record,
586        };
587    }
588
589    let hard_kill_requests = request
590        .requested_actions
591        .iter()
592        .filter(|req| req.action_class == ProcessTriageActionClass::HardTerminate)
593        .count() as u32;
594
595    if hard_kill_requests > policy.escalation.max_hard_terminations_before_manual_review {
596        return ProcessTriagePolicyDecision {
597            permitted: false,
598            escalation_level: ProcessTriageEscalationLevel::ManualReview,
599            effective_action: None,
600            decision_code: "PT_MANUAL_HARD_KILL_THRESHOLD".to_string(),
601            reason: format!(
602                "requested hard terminations {} exceeds threshold {}",
603                hard_kill_requests, policy.escalation.max_hard_terminations_before_manual_review
604            ),
605            requires_operator_ack: true,
606            audit_required: policy.require_audit_record,
607        };
608    }
609
610    if request.requested_actions.len() as u32 > policy.escalation.max_actions_before_manual_review {
611        let downgraded_action = if action.action_class.risk_rank()
612            > ProcessTriageActionClass::ObserveOnly.risk_rank()
613        {
614            ProcessTriageActionClass::ObserveOnly
615        } else {
616            action.action_class
617        };
618
619        return ProcessTriagePolicyDecision {
620            permitted: true,
621            escalation_level: ProcessTriageEscalationLevel::Supervised,
622            effective_action: Some(downgraded_action),
623            decision_code: "PT_SUPERVISED_ACTION_VOLUME".to_string(),
624            reason: format!(
625                "requested action count {} exceeds threshold {}, action downgraded for supervised mode",
626                request.requested_actions.len(),
627                policy.escalation.max_actions_before_manual_review
628            ),
629            requires_operator_ack: true,
630            audit_required: policy.require_audit_record,
631        };
632    }
633
634    ProcessTriagePolicyDecision {
635        permitted: true,
636        escalation_level: ProcessTriageEscalationLevel::Automatic,
637        effective_action: Some(action.action_class),
638        decision_code: "PT_ALLOW_AUTOMATIC".to_string(),
639        reason: "action satisfies allowlist and escalation thresholds".to_string(),
640        requires_operator_ack: false,
641        audit_required: policy.require_audit_record,
642    }
643}
644
645fn pattern_matches(command_lower: &str, patterns: &[String]) -> bool {
646    patterns
647        .iter()
648        .map(|p| p.to_ascii_lowercase())
649        .any(|p| !p.is_empty() && command_lower.contains(&p))
650}
651
652/// JSON schema for request payload.
653pub fn process_triage_request_schema() -> RootSchema {
654    schema_for!(ProcessTriageRequest)
655}
656
657/// JSON schema for response payload.
658pub fn process_triage_response_schema() -> RootSchema {
659    schema_for!(ProcessTriageResponse)
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665    use serde_json::Value;
666
667    fn sample_request() -> ProcessTriageRequest {
668        ProcessTriageRequest {
669            schema_version: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
670            correlation_id: "corr-123".to_string(),
671            worker_id: "worker-a".to_string(),
672            observed_at_unix_ms: 1_768_768_123_000,
673            trigger: ProcessTriageTrigger::WorkerHealth,
674            detector_confidence_percent: 96,
675            retry_attempt: 0,
676            candidate_processes: vec![
677                ProcessDescriptor {
678                    pid: 1001,
679                    ppid: Some(1000),
680                    owner: "ubuntu".to_string(),
681                    command: "cargo test --workspace".to_string(),
682                    classification: ProcessClassification::BuildRelated,
683                    cpu_percent_milli: 92500,
684                    rss_mb: 2100,
685                    runtime_secs: 240,
686                },
687                ProcessDescriptor {
688                    pid: 1002,
689                    ppid: Some(1),
690                    owner: "root".to_string(),
691                    command: "sshd: ubuntu@pts/4".to_string(),
692                    classification: ProcessClassification::SystemCritical,
693                    cpu_percent_milli: 100,
694                    rss_mb: 32,
695                    runtime_secs: 8600,
696                },
697            ],
698            requested_actions: vec![ProcessTriageActionRequest {
699                action_class: ProcessTriageActionClass::SoftTerminate,
700                pid: 1001,
701                reason_code: "stuck_compile".to_string(),
702                signal: Some("TERM".to_string()),
703            }],
704        }
705    }
706
707    fn extract_ref_name(schema: &Value) -> Option<String> {
708        if let Some(reference) = schema.get("$ref").and_then(Value::as_str) {
709            return reference.rsplit('/').next().map(str::to_string);
710        }
711
712        for key in ["anyOf", "oneOf", "allOf"] {
713            if let Some(reference) =
714                schema
715                    .get(key)
716                    .and_then(Value::as_array)
717                    .and_then(|variants| {
718                        variants
719                            .iter()
720                            .find_map(|variant| variant.get("$ref").and_then(Value::as_str))
721                    })
722            {
723                return reference.rsplit('/').next().map(str::to_string);
724            }
725        }
726
727        None
728    }
729
730    fn find_schema_properties(
731        schema_json: &Value,
732        required: &[&str],
733    ) -> serde_json::Map<String, Value> {
734        let root_properties = schema_json
735            .get("properties")
736            .and_then(Value::as_object)
737            .filter(|properties| required.iter().all(|key| properties.contains_key(*key)))
738            .cloned();
739        if let Some(properties) = root_properties {
740            return properties;
741        }
742
743        let definition_properties = schema_json
744            .get("definitions")
745            .and_then(Value::as_object)
746            .and_then(|definitions| {
747                definitions.values().find_map(|node| {
748                    let properties = node.get("properties")?.as_object()?;
749                    if required.iter().all(|key| properties.contains_key(*key)) {
750                        Some(properties.clone())
751                    } else {
752                        None
753                    }
754                })
755            });
756        if let Some(properties) = definition_properties {
757            return properties;
758        }
759
760        panic!("schema properties not found for required keys: {required:?}");
761    }
762
763    fn definition_properties(
764        schema_json: &Value,
765        definition_name: &str,
766    ) -> serde_json::Map<String, Value> {
767        schema_json
768            .get("definitions")
769            .and_then(Value::as_object)
770            .and_then(|definitions| definitions.get(definition_name))
771            .and_then(|definition| definition.get("properties"))
772            .and_then(Value::as_object)
773            .cloned()
774            .unwrap_or_else(|| panic!("definition '{definition_name}' missing properties"))
775    }
776
777    fn definition_enum(schema_json: &Value, definition_name: &str) -> Vec<String> {
778        schema_json
779            .get("definitions")
780            .and_then(Value::as_object)
781            .and_then(|definitions| definitions.get(definition_name))
782            .and_then(|definition| definition.get("enum"))
783            .and_then(Value::as_array)
784            .map(|values| {
785                values
786                    .iter()
787                    .filter_map(Value::as_str)
788                    .map(str::to_string)
789                    .collect()
790            })
791            .unwrap_or_else(|| panic!("definition '{definition_name}' missing enum values"))
792    }
793
794    #[test]
795    fn process_triage_contract_request_roundtrip() {
796        let request = sample_request();
797        request.validate().expect("sample request should validate");
798
799        let json = serde_json::to_string(&request).expect("serialize request");
800        let restored: ProcessTriageRequest =
801            serde_json::from_str(&json).expect("deserialize request");
802        assert_eq!(
803            restored.schema_version,
804            PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION
805        );
806        assert_eq!(restored.worker_id, "worker-a");
807        assert_eq!(restored.requested_actions.len(), 1);
808    }
809
810    #[test]
811    fn process_triage_contract_policy_validation_rejects_allow_deny_overlap() {
812        let mut contract = ProcessTriageContract::default();
813        contract
814            .safe_action_policy
815            .deny_action_classes
816            .push(ProcessTriageActionClass::SoftTerminate);
817
818        let result = contract.validate();
819        assert!(matches!(
820            result,
821            Err(ProcessTriageContractError::AllowDenyConflict(
822                ProcessTriageActionClass::SoftTerminate
823            ))
824        ));
825    }
826
827    #[test]
828    fn process_triage_contract_blocks_protected_process() {
829        let contract = ProcessTriageContract::default();
830        let request = ProcessTriageRequest {
831            requested_actions: vec![ProcessTriageActionRequest {
832                action_class: ProcessTriageActionClass::SoftTerminate,
833                pid: 1002,
834                reason_code: "force_cleanup".to_string(),
835                signal: Some("TERM".to_string()),
836            }],
837            ..sample_request()
838        };
839
840        let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
841        assert!(!decision.permitted);
842        assert_eq!(
843            decision.escalation_level,
844            ProcessTriageEscalationLevel::Blocked
845        );
846        assert_eq!(decision.decision_code, "PT_BLOCK_PROTECTED_PROCESS");
847    }
848
849    #[test]
850    fn process_triage_contract_requires_manual_review_on_low_confidence() {
851        let contract = ProcessTriageContract::default();
852        let request = ProcessTriageRequest {
853            detector_confidence_percent: 40,
854            ..sample_request()
855        };
856
857        let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
858        assert!(!decision.permitted);
859        assert_eq!(
860            decision.escalation_level,
861            ProcessTriageEscalationLevel::ManualReview
862        );
863        assert_eq!(decision.decision_code, "PT_MANUAL_LOW_CONFIDENCE");
864    }
865
866    #[test]
867    fn process_triage_contract_respects_denylist() {
868        let mut contract = ProcessTriageContract::default();
869        contract.safe_action_policy.allow_action_classes = vec![
870            ProcessTriageActionClass::ObserveOnly,
871            ProcessTriageActionClass::HardTerminate,
872        ];
873        contract.safe_action_policy.deny_action_classes =
874            vec![ProcessTriageActionClass::HardTerminate];
875
876        let request = ProcessTriageRequest {
877            requested_actions: vec![ProcessTriageActionRequest {
878                action_class: ProcessTriageActionClass::HardTerminate,
879                pid: 1001,
880                reason_code: "stuck_compile".to_string(),
881                signal: Some("KILL".to_string()),
882            }],
883            ..sample_request()
884        };
885
886        let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
887        assert!(!decision.permitted);
888        assert_eq!(
889            decision.escalation_level,
890            ProcessTriageEscalationLevel::Blocked
891        );
892        assert_eq!(decision.decision_code, "PT_BLOCK_DENYLIST");
893    }
894
895    #[test]
896    fn process_triage_contract_schema_contains_core_fields() {
897        let schema = process_triage_request_schema();
898        let schema_json = serde_json::to_value(&schema).expect("schema to json");
899        let root_properties = schema_json
900            .get("properties")
901            .and_then(|props| props.as_object())
902            .cloned();
903        let definition_properties = schema_json
904            .get("definitions")
905            .and_then(|defs| defs.as_object())
906            .and_then(|defs| {
907                defs.values().find_map(|node| {
908                    let properties = node.get("properties")?.as_object()?;
909                    if properties.contains_key("worker_id")
910                        && properties.contains_key("requested_actions")
911                    {
912                        Some(properties.clone())
913                    } else {
914                        None
915                    }
916                })
917            });
918        let properties = root_properties
919            .or(definition_properties)
920            .expect("request properties in schema");
921
922        assert!(properties.contains_key("schema_version"));
923        assert!(properties.contains_key("worker_id"));
924        assert!(properties.contains_key("requested_actions"));
925    }
926
927    #[test]
928    fn process_triage_contract_response_schema_requires_audit_record() {
929        let schema = process_triage_response_schema();
930        let schema_json = serde_json::to_value(&schema).expect("schema to json");
931        let response_properties =
932            find_schema_properties(&schema_json, &["status", "executed_actions", "audit"]);
933        let audit_schema = response_properties
934            .get("audit")
935            .expect("response schema should contain audit field");
936        let audit_properties = if let Some(definition_name) = extract_ref_name(audit_schema) {
937            definition_properties(&schema_json, &definition_name)
938        } else {
939            audit_schema
940                .get("properties")
941                .and_then(Value::as_object)
942                .cloned()
943                .expect("audit schema should provide properties")
944        };
945
946        assert!(audit_properties.contains_key("policy_version"));
947        assert!(audit_properties.contains_key("decision_code"));
948        assert!(audit_properties.contains_key("audit_required"));
949    }
950
951    #[test]
952    fn process_triage_contract_response_schema_exposes_failure_kind_taxonomy() {
953        let schema = process_triage_response_schema();
954        let schema_json = serde_json::to_value(&schema).expect("schema to json");
955        let response_properties = find_schema_properties(&schema_json, &["status", "failure"]);
956        let failure_schema = response_properties
957            .get("failure")
958            .expect("response schema should contain failure field");
959        let failure_definition = extract_ref_name(failure_schema)
960            .expect("failure field should reference ProcessTriageFailure schema");
961        let failure_properties = definition_properties(&schema_json, &failure_definition);
962        let kind_schema = failure_properties
963            .get("kind")
964            .expect("failure schema should contain kind field");
965        let kind_values = if let Some(definition_name) = extract_ref_name(kind_schema) {
966            definition_enum(&schema_json, &definition_name)
967        } else {
968            kind_schema
969                .get("enum")
970                .and_then(Value::as_array)
971                .map(|values| {
972                    values
973                        .iter()
974                        .filter_map(Value::as_str)
975                        .map(str::to_string)
976                        .collect::<Vec<_>>()
977                })
978                .expect("kind schema should expose enum values")
979        };
980
981        for expected in [
982            "detector_uncertain",
983            "policy_violation",
984            "transport_error",
985            "executor_runtime_error",
986            "timeout",
987            "partial_result",
988            "invalid_request",
989        ] {
990            assert!(
991                kind_values.iter().any(|value| value == expected),
992                "missing failure kind: {}",
993                expected
994            );
995        }
996    }
997
998    #[test]
999    fn process_triage_contract_parser_compatibility() {
1000        let json = r#"{
1001            "schema_version":"1.0.0",
1002            "correlation_id":"corr-xyz",
1003            "worker_id":"worker-z",
1004            "observed_at_unix_ms":1768768123000,
1005            "trigger":"disk_pressure",
1006            "detector_confidence_percent":88,
1007            "retry_attempt":1,
1008            "candidate_processes":[{
1009                "pid":4242,
1010                "ppid":1,
1011                "owner":"ubuntu",
1012                "command":"cargo clippy --workspace",
1013                "classification":"build_related",
1014                "cpu_percent_milli":50000,
1015                "rss_mb":700,
1016                "runtime_secs":128
1017            }],
1018            "requested_actions":[{
1019                "action_class":"soft_terminate",
1020                "pid":4242,
1021                "reason_code":"timeout",
1022                "signal":"TERM"
1023            }]
1024        }"#;
1025
1026        let request: ProcessTriageRequest = serde_json::from_str(json).expect("compat parse");
1027        assert_eq!(
1028            request.schema_version,
1029            PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION
1030        );
1031        assert_eq!(request.trigger, ProcessTriageTrigger::DiskPressure);
1032        assert_eq!(request.requested_actions.len(), 1);
1033    }
1034}