1use schemars::{JsonSchema, schema::RootSchema, schema_for};
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9
10pub const PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION: &str = crate::schema_versions::current_version(
15 crate::schema_versions::SchemaComponent::ProcessTriageContract,
16);
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
20#[serde(rename_all = "snake_case")]
21pub enum ProcessTriageAdapterCommand {
22 Analyze,
23 Execute,
24 Health,
25 Version,
26}
27
28impl ProcessTriageAdapterCommand {
29 pub fn args(self) -> &'static [&'static str] {
31 match self {
32 Self::Analyze => &["process-triage", "analyze", "--json"],
33 Self::Execute => &["process-triage", "execute", "--json"],
34 Self::Health => &["process-triage", "health", "--json"],
35 Self::Version => &["process-triage", "version", "--json"],
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
42#[serde(rename_all = "snake_case")]
43pub enum ProcessTriageActionClass {
44 ObserveOnly,
45 SoftTerminate,
46 HardTerminate,
47 ReclaimDisk,
48}
49
50impl ProcessTriageActionClass {
51 fn risk_rank(self) -> u8 {
52 match self {
53 Self::ObserveOnly => 0,
54 Self::ReclaimDisk => 1,
55 Self::SoftTerminate => 2,
56 Self::HardTerminate => 3,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
63#[serde(rename_all = "snake_case")]
64pub enum ProcessTriageTrigger {
65 DiskPressure,
66 WorkerHealth,
67 BuildTimeout,
68 Manual,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
73#[serde(rename_all = "snake_case")]
74pub enum ProcessClassification {
75 BuildRelated,
76 Suspicious,
77 Interactive,
78 SystemCritical,
79 Unknown,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
84#[serde(rename_all = "snake_case")]
85pub enum ProcessTriageEscalationLevel {
86 Automatic,
87 Supervised,
88 ManualReview,
89 Blocked,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
94#[serde(rename_all = "snake_case")]
95pub enum ProcessTriageFailureKind {
96 DetectorUncertain,
97 PolicyViolation,
98 TransportError,
99 ExecutorRuntimeError,
100 Timeout,
101 PartialResult,
102 InvalidRequest,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
107pub struct ProcessTriageCommandBudget {
108 pub command: ProcessTriageAdapterCommand,
109 pub timeout_secs: u64,
110 pub retries: u32,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
115pub struct ProcessTriageTimeoutPolicy {
116 pub request_timeout_secs: u64,
117 pub action_timeout_secs: u64,
118 pub total_timeout_secs: u64,
119}
120
121impl Default for ProcessTriageTimeoutPolicy {
122 fn default() -> Self {
123 Self {
124 request_timeout_secs: 8,
125 action_timeout_secs: 15,
126 total_timeout_secs: 30,
127 }
128 }
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
133pub struct ProcessTriageRetryPolicy {
134 pub max_attempts: u32,
135 pub initial_backoff_ms: u64,
136 pub max_backoff_ms: u64,
137 pub backoff_multiplier_percent: u16,
138}
139
140impl Default for ProcessTriageRetryPolicy {
141 fn default() -> Self {
142 Self {
143 max_attempts: 3,
144 initial_backoff_ms: 250,
145 max_backoff_ms: 2_000,
146 backoff_multiplier_percent: 200,
147 }
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
153pub struct ProcessTriageEscalationThresholds {
154 pub min_confidence_for_automatic: u8,
155 pub max_actions_before_manual_review: u32,
156 pub max_hard_terminations_before_manual_review: u32,
157}
158
159impl Default for ProcessTriageEscalationThresholds {
160 fn default() -> Self {
161 Self {
162 min_confidence_for_automatic: 85,
163 max_actions_before_manual_review: 5,
164 max_hard_terminations_before_manual_review: 1,
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
171pub struct ProcessTriageSafeActionPolicy {
172 pub policy_version: String,
173 pub allow_action_classes: Vec<ProcessTriageActionClass>,
174 pub deny_action_classes: Vec<ProcessTriageActionClass>,
175 pub managed_process_patterns: Vec<String>,
176 pub protected_process_patterns: Vec<String>,
177 pub escalation: ProcessTriageEscalationThresholds,
178 pub require_audit_record: bool,
179}
180
181impl Default for ProcessTriageSafeActionPolicy {
182 fn default() -> Self {
183 Self {
184 policy_version: "v1".to_string(),
185 allow_action_classes: vec![
186 ProcessTriageActionClass::ObserveOnly,
187 ProcessTriageActionClass::SoftTerminate,
188 ProcessTriageActionClass::ReclaimDisk,
189 ],
190 deny_action_classes: vec![ProcessTriageActionClass::HardTerminate],
191 managed_process_patterns: vec![
192 "cargo".to_string(),
193 "rustc".to_string(),
194 "clang".to_string(),
195 ],
196 protected_process_patterns: vec![
197 "sshd".to_string(),
198 "systemd".to_string(),
199 "init".to_string(),
200 ],
201 escalation: ProcessTriageEscalationThresholds::default(),
202 require_audit_record: true,
203 }
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
209pub struct ProcessTriageContract {
210 pub schema_version: String,
211 pub timeout_policy: ProcessTriageTimeoutPolicy,
212 pub retry_policy: ProcessTriageRetryPolicy,
213 pub command_budgets: Vec<ProcessTriageCommandBudget>,
214 pub safe_action_policy: ProcessTriageSafeActionPolicy,
215}
216
217impl Default for ProcessTriageContract {
218 fn default() -> Self {
219 Self {
220 schema_version: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
221 timeout_policy: ProcessTriageTimeoutPolicy::default(),
222 retry_policy: ProcessTriageRetryPolicy::default(),
223 command_budgets: vec![
224 ProcessTriageCommandBudget {
225 command: ProcessTriageAdapterCommand::Analyze,
226 timeout_secs: 8,
227 retries: 1,
228 },
229 ProcessTriageCommandBudget {
230 command: ProcessTriageAdapterCommand::Execute,
231 timeout_secs: 15,
232 retries: 1,
233 },
234 ProcessTriageCommandBudget {
235 command: ProcessTriageAdapterCommand::Health,
236 timeout_secs: 3,
237 retries: 2,
238 },
239 ProcessTriageCommandBudget {
240 command: ProcessTriageAdapterCommand::Version,
241 timeout_secs: 2,
242 retries: 0,
243 },
244 ],
245 safe_action_policy: ProcessTriageSafeActionPolicy::default(),
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
252pub struct ProcessDescriptor {
253 pub pid: u32,
254 pub ppid: Option<u32>,
255 pub owner: String,
256 pub command: String,
257 pub classification: ProcessClassification,
258 pub cpu_percent_milli: u32,
259 pub rss_mb: u32,
260 pub runtime_secs: u64,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
265pub struct ProcessTriageActionRequest {
266 pub action_class: ProcessTriageActionClass,
267 pub pid: u32,
268 pub reason_code: String,
269 pub signal: Option<String>,
270}
271
272#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
274pub struct ProcessTriageRequest {
275 pub schema_version: String,
276 pub correlation_id: String,
277 pub worker_id: String,
278 pub observed_at_unix_ms: i64,
279 pub trigger: ProcessTriageTrigger,
280 pub detector_confidence_percent: u8,
281 pub retry_attempt: u32,
282 pub candidate_processes: Vec<ProcessDescriptor>,
283 pub requested_actions: Vec<ProcessTriageActionRequest>,
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
288#[serde(rename_all = "snake_case")]
289pub enum ProcessTriageActionOutcome {
290 Skipped,
291 Executed,
292 Failed,
293 Escalated,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
298pub struct ProcessTriageActionResult {
299 pub pid: u32,
300 pub action_class: ProcessTriageActionClass,
301 pub outcome: ProcessTriageActionOutcome,
302 pub note: Option<String>,
303}
304
305#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
307#[serde(rename_all = "snake_case")]
308pub enum ProcessTriageResponseStatus {
309 Applied,
310 PartiallyApplied,
311 EscalatedNoAction,
312 RejectedByPolicy,
313 Failed,
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
318pub struct ProcessTriageFailure {
319 pub kind: ProcessTriageFailureKind,
320 pub code: String,
321 pub message: String,
322 pub remediation: Vec<String>,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
327pub struct ProcessTriageAuditRecord {
328 pub policy_version: String,
329 pub evaluated_by: String,
330 pub evaluated_at_unix_ms: i64,
331 pub decision_code: String,
332 pub requires_operator_ack: bool,
333 pub audit_required: bool,
334}
335
336#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
338pub struct ProcessTriageResponse {
339 pub schema_version: String,
340 pub correlation_id: String,
341 pub status: ProcessTriageResponseStatus,
342 pub escalation_level: ProcessTriageEscalationLevel,
343 pub executed_actions: Vec<ProcessTriageActionResult>,
344 pub failure: Option<ProcessTriageFailure>,
345 pub audit: ProcessTriageAuditRecord,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
350pub struct ProcessTriagePolicyDecision {
351 pub permitted: bool,
352 pub escalation_level: ProcessTriageEscalationLevel,
353 pub effective_action: Option<ProcessTriageActionClass>,
354 pub decision_code: String,
355 pub reason: String,
356 pub requires_operator_ack: bool,
357 pub audit_required: bool,
358}
359
360#[derive(Debug, thiserror::Error, PartialEq, Eq)]
362pub enum ProcessTriageContractError {
363 #[error("schema version mismatch: expected {expected}, got {actual}")]
364 SchemaVersionMismatch { expected: String, actual: String },
365 #[error("detector confidence percent must be <= 100, got {0}")]
366 InvalidConfidence(u8),
367 #[error("requested_actions must not be empty")]
368 EmptyRequestedActions,
369 #[error("requested action references unknown pid {0}")]
370 UnknownActionPid(u32),
371 #[error("timeout policy has invalid value for {field}: {value}")]
372 InvalidTimeout { field: &'static str, value: u64 },
373 #[error("retry policy has invalid value for {field}: {value}")]
374 InvalidRetryPolicy { field: &'static str, value: u64 },
375 #[error("allowlist/denylist conflict for action class {0:?}")]
376 AllowDenyConflict(ProcessTriageActionClass),
377 #[error("escalation threshold min_confidence_for_automatic must be <= 100, got {0}")]
378 InvalidEscalationConfidence(u8),
379}
380
381impl ProcessTriageRequest {
382 pub fn validate(&self) -> Result<(), ProcessTriageContractError> {
384 if self.schema_version != PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION {
385 return Err(ProcessTriageContractError::SchemaVersionMismatch {
386 expected: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
387 actual: self.schema_version.clone(),
388 });
389 }
390 if self.detector_confidence_percent > 100 {
391 return Err(ProcessTriageContractError::InvalidConfidence(
392 self.detector_confidence_percent,
393 ));
394 }
395 if self.requested_actions.is_empty() {
396 return Err(ProcessTriageContractError::EmptyRequestedActions);
397 }
398
399 let candidate_pids: HashSet<u32> = self.candidate_processes.iter().map(|p| p.pid).collect();
400 for action in &self.requested_actions {
401 if !candidate_pids.contains(&action.pid) {
402 return Err(ProcessTriageContractError::UnknownActionPid(action.pid));
403 }
404 }
405
406 Ok(())
407 }
408}
409
410impl ProcessTriageContract {
411 pub fn validate(&self) -> Result<(), ProcessTriageContractError> {
413 if self.timeout_policy.request_timeout_secs == 0 {
414 return Err(ProcessTriageContractError::InvalidTimeout {
415 field: "request_timeout_secs",
416 value: self.timeout_policy.request_timeout_secs,
417 });
418 }
419 if self.timeout_policy.action_timeout_secs == 0 {
420 return Err(ProcessTriageContractError::InvalidTimeout {
421 field: "action_timeout_secs",
422 value: self.timeout_policy.action_timeout_secs,
423 });
424 }
425 if self.timeout_policy.total_timeout_secs == 0 {
426 return Err(ProcessTriageContractError::InvalidTimeout {
427 field: "total_timeout_secs",
428 value: self.timeout_policy.total_timeout_secs,
429 });
430 }
431 if self.retry_policy.max_attempts == 0 {
432 return Err(ProcessTriageContractError::InvalidRetryPolicy {
433 field: "max_attempts",
434 value: self.retry_policy.max_attempts as u64,
435 });
436 }
437 if self.retry_policy.initial_backoff_ms == 0 {
438 return Err(ProcessTriageContractError::InvalidRetryPolicy {
439 field: "initial_backoff_ms",
440 value: self.retry_policy.initial_backoff_ms,
441 });
442 }
443 if self.retry_policy.max_backoff_ms < self.retry_policy.initial_backoff_ms {
444 return Err(ProcessTriageContractError::InvalidRetryPolicy {
445 field: "max_backoff_ms",
446 value: self.retry_policy.max_backoff_ms,
447 });
448 }
449 if self
450 .safe_action_policy
451 .escalation
452 .min_confidence_for_automatic
453 > 100
454 {
455 return Err(ProcessTriageContractError::InvalidEscalationConfidence(
456 self.safe_action_policy
457 .escalation
458 .min_confidence_for_automatic,
459 ));
460 }
461
462 let allow: HashSet<ProcessTriageActionClass> = self
463 .safe_action_policy
464 .allow_action_classes
465 .iter()
466 .copied()
467 .collect();
468 let deny: HashSet<ProcessTriageActionClass> = self
469 .safe_action_policy
470 .deny_action_classes
471 .iter()
472 .copied()
473 .collect();
474
475 if let Some(action) = allow.intersection(&deny).next() {
476 return Err(ProcessTriageContractError::AllowDenyConflict(*action));
477 }
478
479 Ok(())
480 }
481}
482
483pub fn evaluate_triage_action(
485 request: &ProcessTriageRequest,
486 contract: &ProcessTriageContract,
487 action: &ProcessTriageActionRequest,
488) -> ProcessTriagePolicyDecision {
489 let policy = &contract.safe_action_policy;
490 let allow: HashSet<ProcessTriageActionClass> =
491 policy.allow_action_classes.iter().copied().collect();
492 let deny: HashSet<ProcessTriageActionClass> =
493 policy.deny_action_classes.iter().copied().collect();
494
495 if deny.contains(&action.action_class) {
496 return ProcessTriagePolicyDecision {
497 permitted: false,
498 escalation_level: ProcessTriageEscalationLevel::Blocked,
499 effective_action: None,
500 decision_code: "PT_BLOCK_DENYLIST".to_string(),
501 reason: format!("action class {:?} is denylisted", action.action_class),
502 requires_operator_ack: true,
503 audit_required: policy.require_audit_record,
504 };
505 }
506
507 if !allow.contains(&action.action_class) {
508 return ProcessTriagePolicyDecision {
509 permitted: false,
510 escalation_level: ProcessTriageEscalationLevel::Blocked,
511 effective_action: None,
512 decision_code: "PT_BLOCK_NOT_ALLOWLISTED".to_string(),
513 reason: format!("action class {:?} is not allowlisted", action.action_class),
514 requires_operator_ack: true,
515 audit_required: policy.require_audit_record,
516 };
517 }
518
519 let target = request
520 .candidate_processes
521 .iter()
522 .find(|proc_desc| proc_desc.pid == action.pid);
523
524 if let Some(proc_desc) = target {
525 let cmd_lower = proc_desc.command.to_ascii_lowercase();
526 if pattern_matches(&cmd_lower, &policy.protected_process_patterns) {
527 return ProcessTriagePolicyDecision {
528 permitted: false,
529 escalation_level: ProcessTriageEscalationLevel::Blocked,
530 effective_action: None,
531 decision_code: "PT_BLOCK_PROTECTED_PROCESS".to_string(),
532 reason: format!(
533 "target process '{}' matches protected patterns",
534 proc_desc.command
535 ),
536 requires_operator_ack: true,
537 audit_required: policy.require_audit_record,
538 };
539 }
540 if !policy.managed_process_patterns.is_empty()
541 && !pattern_matches(&cmd_lower, &policy.managed_process_patterns)
542 {
543 return ProcessTriagePolicyDecision {
544 permitted: false,
545 escalation_level: ProcessTriageEscalationLevel::Blocked,
546 effective_action: None,
547 decision_code: "PT_BLOCK_OUT_OF_SCOPE_PROCESS".to_string(),
548 reason: format!(
549 "target process '{}' does not match managed patterns",
550 proc_desc.command
551 ),
552 requires_operator_ack: true,
553 audit_required: policy.require_audit_record,
554 };
555 }
556 }
557
558 if request.detector_confidence_percent < policy.escalation.min_confidence_for_automatic {
559 return ProcessTriagePolicyDecision {
560 permitted: false,
561 escalation_level: ProcessTriageEscalationLevel::ManualReview,
562 effective_action: None,
563 decision_code: "PT_MANUAL_LOW_CONFIDENCE".to_string(),
564 reason: format!(
565 "detector confidence {} is below automatic threshold {}",
566 request.detector_confidence_percent, policy.escalation.min_confidence_for_automatic
567 ),
568 requires_operator_ack: true,
569 audit_required: policy.require_audit_record,
570 };
571 }
572
573 if request.retry_attempt + 1 >= contract.retry_policy.max_attempts {
574 return ProcessTriagePolicyDecision {
575 permitted: false,
576 escalation_level: ProcessTriageEscalationLevel::ManualReview,
577 effective_action: None,
578 decision_code: "PT_MANUAL_RETRY_EXHAUSTED".to_string(),
579 reason: format!(
580 "retry attempt {} reached max attempts {}",
581 request.retry_attempt + 1,
582 contract.retry_policy.max_attempts
583 ),
584 requires_operator_ack: true,
585 audit_required: policy.require_audit_record,
586 };
587 }
588
589 let hard_kill_requests = request
590 .requested_actions
591 .iter()
592 .filter(|req| req.action_class == ProcessTriageActionClass::HardTerminate)
593 .count() as u32;
594
595 if hard_kill_requests > policy.escalation.max_hard_terminations_before_manual_review {
596 return ProcessTriagePolicyDecision {
597 permitted: false,
598 escalation_level: ProcessTriageEscalationLevel::ManualReview,
599 effective_action: None,
600 decision_code: "PT_MANUAL_HARD_KILL_THRESHOLD".to_string(),
601 reason: format!(
602 "requested hard terminations {} exceeds threshold {}",
603 hard_kill_requests, policy.escalation.max_hard_terminations_before_manual_review
604 ),
605 requires_operator_ack: true,
606 audit_required: policy.require_audit_record,
607 };
608 }
609
610 if request.requested_actions.len() as u32 > policy.escalation.max_actions_before_manual_review {
611 let downgraded_action = if action.action_class.risk_rank()
612 > ProcessTriageActionClass::ObserveOnly.risk_rank()
613 {
614 ProcessTriageActionClass::ObserveOnly
615 } else {
616 action.action_class
617 };
618
619 return ProcessTriagePolicyDecision {
620 permitted: true,
621 escalation_level: ProcessTriageEscalationLevel::Supervised,
622 effective_action: Some(downgraded_action),
623 decision_code: "PT_SUPERVISED_ACTION_VOLUME".to_string(),
624 reason: format!(
625 "requested action count {} exceeds threshold {}, action downgraded for supervised mode",
626 request.requested_actions.len(),
627 policy.escalation.max_actions_before_manual_review
628 ),
629 requires_operator_ack: true,
630 audit_required: policy.require_audit_record,
631 };
632 }
633
634 ProcessTriagePolicyDecision {
635 permitted: true,
636 escalation_level: ProcessTriageEscalationLevel::Automatic,
637 effective_action: Some(action.action_class),
638 decision_code: "PT_ALLOW_AUTOMATIC".to_string(),
639 reason: "action satisfies allowlist and escalation thresholds".to_string(),
640 requires_operator_ack: false,
641 audit_required: policy.require_audit_record,
642 }
643}
644
645fn pattern_matches(command_lower: &str, patterns: &[String]) -> bool {
646 patterns
647 .iter()
648 .map(|p| p.to_ascii_lowercase())
649 .any(|p| !p.is_empty() && command_lower.contains(&p))
650}
651
652pub fn process_triage_request_schema() -> RootSchema {
654 schema_for!(ProcessTriageRequest)
655}
656
657pub fn process_triage_response_schema() -> RootSchema {
659 schema_for!(ProcessTriageResponse)
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use serde_json::Value;
666
667 fn sample_request() -> ProcessTriageRequest {
668 ProcessTriageRequest {
669 schema_version: PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION.to_string(),
670 correlation_id: "corr-123".to_string(),
671 worker_id: "worker-a".to_string(),
672 observed_at_unix_ms: 1_768_768_123_000,
673 trigger: ProcessTriageTrigger::WorkerHealth,
674 detector_confidence_percent: 96,
675 retry_attempt: 0,
676 candidate_processes: vec![
677 ProcessDescriptor {
678 pid: 1001,
679 ppid: Some(1000),
680 owner: "ubuntu".to_string(),
681 command: "cargo test --workspace".to_string(),
682 classification: ProcessClassification::BuildRelated,
683 cpu_percent_milli: 92500,
684 rss_mb: 2100,
685 runtime_secs: 240,
686 },
687 ProcessDescriptor {
688 pid: 1002,
689 ppid: Some(1),
690 owner: "root".to_string(),
691 command: "sshd: ubuntu@pts/4".to_string(),
692 classification: ProcessClassification::SystemCritical,
693 cpu_percent_milli: 100,
694 rss_mb: 32,
695 runtime_secs: 8600,
696 },
697 ],
698 requested_actions: vec![ProcessTriageActionRequest {
699 action_class: ProcessTriageActionClass::SoftTerminate,
700 pid: 1001,
701 reason_code: "stuck_compile".to_string(),
702 signal: Some("TERM".to_string()),
703 }],
704 }
705 }
706
707 fn extract_ref_name(schema: &Value) -> Option<String> {
708 if let Some(reference) = schema.get("$ref").and_then(Value::as_str) {
709 return reference.rsplit('/').next().map(str::to_string);
710 }
711
712 for key in ["anyOf", "oneOf", "allOf"] {
713 if let Some(reference) =
714 schema
715 .get(key)
716 .and_then(Value::as_array)
717 .and_then(|variants| {
718 variants
719 .iter()
720 .find_map(|variant| variant.get("$ref").and_then(Value::as_str))
721 })
722 {
723 return reference.rsplit('/').next().map(str::to_string);
724 }
725 }
726
727 None
728 }
729
730 fn find_schema_properties(
731 schema_json: &Value,
732 required: &[&str],
733 ) -> serde_json::Map<String, Value> {
734 let root_properties = schema_json
735 .get("properties")
736 .and_then(Value::as_object)
737 .filter(|properties| required.iter().all(|key| properties.contains_key(*key)))
738 .cloned();
739 if let Some(properties) = root_properties {
740 return properties;
741 }
742
743 let definition_properties = schema_json
744 .get("definitions")
745 .and_then(Value::as_object)
746 .and_then(|definitions| {
747 definitions.values().find_map(|node| {
748 let properties = node.get("properties")?.as_object()?;
749 if required.iter().all(|key| properties.contains_key(*key)) {
750 Some(properties.clone())
751 } else {
752 None
753 }
754 })
755 });
756 if let Some(properties) = definition_properties {
757 return properties;
758 }
759
760 panic!("schema properties not found for required keys: {required:?}");
761 }
762
763 fn definition_properties(
764 schema_json: &Value,
765 definition_name: &str,
766 ) -> serde_json::Map<String, Value> {
767 schema_json
768 .get("definitions")
769 .and_then(Value::as_object)
770 .and_then(|definitions| definitions.get(definition_name))
771 .and_then(|definition| definition.get("properties"))
772 .and_then(Value::as_object)
773 .cloned()
774 .unwrap_or_else(|| panic!("definition '{definition_name}' missing properties"))
775 }
776
777 fn definition_enum(schema_json: &Value, definition_name: &str) -> Vec<String> {
778 schema_json
779 .get("definitions")
780 .and_then(Value::as_object)
781 .and_then(|definitions| definitions.get(definition_name))
782 .and_then(|definition| definition.get("enum"))
783 .and_then(Value::as_array)
784 .map(|values| {
785 values
786 .iter()
787 .filter_map(Value::as_str)
788 .map(str::to_string)
789 .collect()
790 })
791 .unwrap_or_else(|| panic!("definition '{definition_name}' missing enum values"))
792 }
793
794 #[test]
795 fn process_triage_contract_request_roundtrip() {
796 let request = sample_request();
797 request.validate().expect("sample request should validate");
798
799 let json = serde_json::to_string(&request).expect("serialize request");
800 let restored: ProcessTriageRequest =
801 serde_json::from_str(&json).expect("deserialize request");
802 assert_eq!(
803 restored.schema_version,
804 PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION
805 );
806 assert_eq!(restored.worker_id, "worker-a");
807 assert_eq!(restored.requested_actions.len(), 1);
808 }
809
810 #[test]
811 fn process_triage_contract_policy_validation_rejects_allow_deny_overlap() {
812 let mut contract = ProcessTriageContract::default();
813 contract
814 .safe_action_policy
815 .deny_action_classes
816 .push(ProcessTriageActionClass::SoftTerminate);
817
818 let result = contract.validate();
819 assert!(matches!(
820 result,
821 Err(ProcessTriageContractError::AllowDenyConflict(
822 ProcessTriageActionClass::SoftTerminate
823 ))
824 ));
825 }
826
827 #[test]
828 fn process_triage_contract_blocks_protected_process() {
829 let contract = ProcessTriageContract::default();
830 let request = ProcessTriageRequest {
831 requested_actions: vec![ProcessTriageActionRequest {
832 action_class: ProcessTriageActionClass::SoftTerminate,
833 pid: 1002,
834 reason_code: "force_cleanup".to_string(),
835 signal: Some("TERM".to_string()),
836 }],
837 ..sample_request()
838 };
839
840 let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
841 assert!(!decision.permitted);
842 assert_eq!(
843 decision.escalation_level,
844 ProcessTriageEscalationLevel::Blocked
845 );
846 assert_eq!(decision.decision_code, "PT_BLOCK_PROTECTED_PROCESS");
847 }
848
849 #[test]
850 fn process_triage_contract_requires_manual_review_on_low_confidence() {
851 let contract = ProcessTriageContract::default();
852 let request = ProcessTriageRequest {
853 detector_confidence_percent: 40,
854 ..sample_request()
855 };
856
857 let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
858 assert!(!decision.permitted);
859 assert_eq!(
860 decision.escalation_level,
861 ProcessTriageEscalationLevel::ManualReview
862 );
863 assert_eq!(decision.decision_code, "PT_MANUAL_LOW_CONFIDENCE");
864 }
865
866 #[test]
867 fn process_triage_contract_respects_denylist() {
868 let mut contract = ProcessTriageContract::default();
869 contract.safe_action_policy.allow_action_classes = vec![
870 ProcessTriageActionClass::ObserveOnly,
871 ProcessTriageActionClass::HardTerminate,
872 ];
873 contract.safe_action_policy.deny_action_classes =
874 vec![ProcessTriageActionClass::HardTerminate];
875
876 let request = ProcessTriageRequest {
877 requested_actions: vec![ProcessTriageActionRequest {
878 action_class: ProcessTriageActionClass::HardTerminate,
879 pid: 1001,
880 reason_code: "stuck_compile".to_string(),
881 signal: Some("KILL".to_string()),
882 }],
883 ..sample_request()
884 };
885
886 let decision = evaluate_triage_action(&request, &contract, &request.requested_actions[0]);
887 assert!(!decision.permitted);
888 assert_eq!(
889 decision.escalation_level,
890 ProcessTriageEscalationLevel::Blocked
891 );
892 assert_eq!(decision.decision_code, "PT_BLOCK_DENYLIST");
893 }
894
895 #[test]
896 fn process_triage_contract_schema_contains_core_fields() {
897 let schema = process_triage_request_schema();
898 let schema_json = serde_json::to_value(&schema).expect("schema to json");
899 let root_properties = schema_json
900 .get("properties")
901 .and_then(|props| props.as_object())
902 .cloned();
903 let definition_properties = schema_json
904 .get("definitions")
905 .and_then(|defs| defs.as_object())
906 .and_then(|defs| {
907 defs.values().find_map(|node| {
908 let properties = node.get("properties")?.as_object()?;
909 if properties.contains_key("worker_id")
910 && properties.contains_key("requested_actions")
911 {
912 Some(properties.clone())
913 } else {
914 None
915 }
916 })
917 });
918 let properties = root_properties
919 .or(definition_properties)
920 .expect("request properties in schema");
921
922 assert!(properties.contains_key("schema_version"));
923 assert!(properties.contains_key("worker_id"));
924 assert!(properties.contains_key("requested_actions"));
925 }
926
927 #[test]
928 fn process_triage_contract_response_schema_requires_audit_record() {
929 let schema = process_triage_response_schema();
930 let schema_json = serde_json::to_value(&schema).expect("schema to json");
931 let response_properties =
932 find_schema_properties(&schema_json, &["status", "executed_actions", "audit"]);
933 let audit_schema = response_properties
934 .get("audit")
935 .expect("response schema should contain audit field");
936 let audit_properties = if let Some(definition_name) = extract_ref_name(audit_schema) {
937 definition_properties(&schema_json, &definition_name)
938 } else {
939 audit_schema
940 .get("properties")
941 .and_then(Value::as_object)
942 .cloned()
943 .expect("audit schema should provide properties")
944 };
945
946 assert!(audit_properties.contains_key("policy_version"));
947 assert!(audit_properties.contains_key("decision_code"));
948 assert!(audit_properties.contains_key("audit_required"));
949 }
950
951 #[test]
952 fn process_triage_contract_response_schema_exposes_failure_kind_taxonomy() {
953 let schema = process_triage_response_schema();
954 let schema_json = serde_json::to_value(&schema).expect("schema to json");
955 let response_properties = find_schema_properties(&schema_json, &["status", "failure"]);
956 let failure_schema = response_properties
957 .get("failure")
958 .expect("response schema should contain failure field");
959 let failure_definition = extract_ref_name(failure_schema)
960 .expect("failure field should reference ProcessTriageFailure schema");
961 let failure_properties = definition_properties(&schema_json, &failure_definition);
962 let kind_schema = failure_properties
963 .get("kind")
964 .expect("failure schema should contain kind field");
965 let kind_values = if let Some(definition_name) = extract_ref_name(kind_schema) {
966 definition_enum(&schema_json, &definition_name)
967 } else {
968 kind_schema
969 .get("enum")
970 .and_then(Value::as_array)
971 .map(|values| {
972 values
973 .iter()
974 .filter_map(Value::as_str)
975 .map(str::to_string)
976 .collect::<Vec<_>>()
977 })
978 .expect("kind schema should expose enum values")
979 };
980
981 for expected in [
982 "detector_uncertain",
983 "policy_violation",
984 "transport_error",
985 "executor_runtime_error",
986 "timeout",
987 "partial_result",
988 "invalid_request",
989 ] {
990 assert!(
991 kind_values.iter().any(|value| value == expected),
992 "missing failure kind: {}",
993 expected
994 );
995 }
996 }
997
998 #[test]
999 fn process_triage_contract_parser_compatibility() {
1000 let json = r#"{
1001 "schema_version":"1.0.0",
1002 "correlation_id":"corr-xyz",
1003 "worker_id":"worker-z",
1004 "observed_at_unix_ms":1768768123000,
1005 "trigger":"disk_pressure",
1006 "detector_confidence_percent":88,
1007 "retry_attempt":1,
1008 "candidate_processes":[{
1009 "pid":4242,
1010 "ppid":1,
1011 "owner":"ubuntu",
1012 "command":"cargo clippy --workspace",
1013 "classification":"build_related",
1014 "cpu_percent_milli":50000,
1015 "rss_mb":700,
1016 "runtime_secs":128
1017 }],
1018 "requested_actions":[{
1019 "action_class":"soft_terminate",
1020 "pid":4242,
1021 "reason_code":"timeout",
1022 "signal":"TERM"
1023 }]
1024 }"#;
1025
1026 let request: ProcessTriageRequest = serde_json::from_str(json).expect("compat parse");
1027 assert_eq!(
1028 request.schema_version,
1029 PROCESS_TRIAGE_CONTRACT_SCHEMA_VERSION
1030 );
1031 assert_eq!(request.trigger, ProcessTriageTrigger::DiskPressure);
1032 assert_eq!(request.requested_actions.len(), 1);
1033 }
1034}