use super::annotation::parse_python_comment;
use super::{CommandApi, CommandArgKind};
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
const W_KW_SHELL_TRUE: f32 = -0.40;
const W_ARGV0_IS_PARAMETER: f32 = -0.50;
const W_FIRST_ARG_REQUEST_SOURCE: f32 = -0.50;
const W_ENCLOSING_HANDLER: f32 = -0.30;
const W_ARGV_LIST_ALL_LITERALS: f32 = 0.50;
const W_ARGV0_IS_LITERAL: f32 = 0.30;
const W_FIRST_ARG_CONFIG_SOURCE: f32 = 0.30;
const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
const REQUEST_OBJECT_SUBSTRINGS: &[&str] = &[
"request.",
"req.",
"flask.request",
"event.",
"self.request",
];
const CONFIG_OBJECT_SUBSTRINGS: &[&str] = &[
"settings.",
"config.",
"os.environ",
"os.path.expanduser",
"self.config",
"self.settings",
];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
const HANDLER_FUNCTION_SUBSTRINGS: &[&str] = &[
"handler",
"route",
"endpoint",
"view",
"controller",
"middleware",
"request",
"response",
];
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum FirstArgOrigin {
Literal,
ConfigSource,
RequestSource,
Parameter { name: String },
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum Argv0Origin {
Literal,
Parameter { name: String },
Other,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub kw_shell_true: bool,
pub first_arg_origin: Option<FirstArgOrigin>,
pub argv0_origin: Option<Argv0Origin>,
pub argv_list_all_literals: bool,
pub command_static_annotation: Option<String>,
pub command_user_controlled_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(
evidence: &Evidence,
api: CommandApi,
arg_kind: CommandArgKind,
literal_text: Option<&str>,
) -> Prediction {
let api_label = api.callee_label();
if let Some(reason) = &evidence.command_static_annotation {
return collapse(
BranchLabel::Benign,
api,
arg_kind,
literal_text,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: command-static[{reason}]"),
},
description: format!(
"`command-static[{reason}]` annotation declares this \
command-exec call as protected by caller-side \
validation; the finding collapses to Info."
),
example: Some(format!(
"{api_label}(...) # repotoire: command-static[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("command-static[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as caller-validated ({reason}); not a \
command-injection risk."
),
},
);
}
if let Some(source) = &evidence.command_user_controlled_annotation {
return collapse(
BranchLabel::RealBug,
api,
arg_kind,
literal_text,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: command-user-controlled[{source}]"),
},
description: format!(
"`command-user-controlled[{source}]` annotation \
declares the command argument as attacker-controlled; \
the finding stays at the existing severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: command-user-controlled[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("command-user-controlled[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as user-controlled (source: {source})."),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if evidence.kw_shell_true {
sum += W_KW_SHELL_TRUE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "shell".to_string(),
value: "True".to_string(),
},
weight: W_KW_SHELL_TRUE,
note: "`shell=True` makes the call interpret its argument \
through `/bin/sh`; canonical RCE smell."
.to_string(),
});
}
if evidence.argv_list_all_literals {
sum += W_ARGV_LIST_ALL_LITERALS;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "argv list is all static literals".to_string(),
},
weight: W_ARGV_LIST_ALL_LITERALS,
note: "Every element of the argv list is a string literal; \
neither the binary nor any argument can be attacker-\
chosen."
.to_string(),
});
}
if let Some(origin) = &evidence.argv0_origin {
match origin {
Argv0Origin::Literal if !evidence.argv_list_all_literals => {
sum += W_ARGV0_IS_LITERAL;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "argv[0] is a string literal".to_string(),
},
weight: W_ARGV0_IS_LITERAL,
note: "Fixed argv[0] caps damage to argument injection \
(CWE-88) under `shell=False`; the executed \
binary cannot be attacker-chosen."
.to_string(),
});
}
Argv0Origin::Literal => {
}
Argv0Origin::Parameter { name } => {
sum += W_ARGV0_IS_PARAMETER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::FirstArgIdentifier { name: name.clone() },
weight: W_ARGV0_IS_PARAMETER,
note: format!(
"argv[0] is `{name}`, a function parameter; the \
attacker chooses which binary runs."
),
});
}
Argv0Origin::Other => {
}
}
}
if let Some(origin) = &evidence.first_arg_origin {
match origin {
FirstArgOrigin::Literal => {
}
FirstArgOrigin::ConfigSource => {
sum += W_FIRST_ARG_CONFIG_SOURCE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "first arg sourced from config module".to_string(),
},
weight: W_FIRST_ARG_CONFIG_SOURCE,
note: "First argument originates from a config/env \
source (`os.environ`, `settings`, `config`); \
the project owns this value."
.to_string(),
});
}
FirstArgOrigin::RequestSource => {
sum += W_FIRST_ARG_REQUEST_SOURCE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "first arg from request object".to_string(),
},
weight: W_FIRST_ARG_REQUEST_SOURCE,
note: "First argument originates from a request object \
(`request.GET`, `flask.request.args`, ...); \
attacker-controlled."
.to_string(),
});
}
FirstArgOrigin::Parameter { name } => {
if evidence.argv0_origin.is_none() {
sum += W_ARGV0_IS_PARAMETER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::FirstArgIdentifier { name: name.clone() },
weight: W_ARGV0_IS_PARAMETER,
note: format!(
"First argument is `{name}`, a function \
parameter; possibly user-controlled."
),
});
}
}
FirstArgOrigin::Unknown => {
}
}
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
} else if matches_handler_function(fn_name) {
sum += W_ENCLOSING_HANDLER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "request_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_HANDLER,
note: format!(
"Enclosing function `{fn_name}` looks like a request \
handler (`handler`/`route`/`endpoint`/`view`/\
`controller`/`middleware`/HTTP-verb-prefix); higher \
prior on attacker-reachable code."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api, arg_kind, literal_text, reasons, Vec::new())
}
pub(super) fn matches_request_object(text: &str) -> bool {
let lower = text.to_lowercase();
REQUEST_OBJECT_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_config_object(text: &str) -> bool {
let lower = text.to_lowercase();
CONFIG_OBJECT_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_handler_function(name: &str) -> bool {
let lower = name.to_lowercase();
if HANDLER_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
{
return true;
}
const VERBS: &[&str] = &["get", "post", "put", "delete", "patch", "head", "options"];
for verb in VERBS {
if let Some(rest) = name.strip_prefix(verb) {
if let Some(next) = rest.chars().next() {
if next.is_ascii_uppercase() {
return true;
}
}
}
}
false
}
fn collapse(
label: BranchLabel,
api: CommandApi,
arg_kind: CommandArgKind,
literal_text: Option<&str>,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(
label,
api,
arg_kind,
literal_text,
vec![reason],
vec![resolution],
)
}
fn build_prediction(
predicted: BranchLabel,
api: CommandApi,
arg_kind: CommandArgKind,
literal_text: Option<&str>,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, api, arg_kind, literal_text);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, api, arg_kind, literal_text);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(
label: BranchLabel,
api: CommandApi,
arg_kind: CommandArgKind,
literal_text: Option<&str>,
) -> Severity {
match label {
BranchLabel::RealBug => api.severity_for(arg_kind, literal_text),
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential command injection via {api_label}"),
BranchLabel::Benign => {
format!("Internal command-exec call via {api_label} (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The argument to `{api_label}` appears attacker-influenceable. \
OS-command-execution APIs run their argument as a shell or \
argv list. When that argument is anything other than a \
constant the program author controls at write time, \
attackers who can influence the value get arbitrary command \
execution."
),
BranchLabel::Benign => format!(
"The argument to `{api_label}` appears to be a literal or \
config-derived value. The call site is carried as Info; the \
RealBug interpretation is preserved in `alternative_branch` \
in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Use the list form with a fixed argv[0]: \
`subprocess.run([\"cmd\", arg1, arg2], shell=False)`. \
Validate any user-controlled later argv elements against an \
allowlist. Avoid `shell=True` and `os.system` / `os.popen` \
entirely."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional internal use, annotate \
`# repotoire: command-static[<reason>]` to collapse the \
finding to Info definitively."
.to_string(),
),
}
}
pub(super) fn extract_command_static_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "command-static" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_command_user_controlled_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "command-user-controlled" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn argv_list_all_literals_predicts_benign_info() {
let evidence = Evidence {
argv_list_all_literals: true,
argv0_origin: Some(Argv0Origin::Literal),
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessNoShell,
CommandArgKind::StaticList,
None,
);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.alternative_branch.label, BranchLabel::RealBug);
assert_eq!(p.alternative_branch.severity, Severity::Low);
assert_eq!(p.reasons.len(), 1);
}
#[test]
fn shell_true_with_param_interpolation_predicts_realbug_critical() {
let evidence = Evidence {
kw_shell_true: true,
first_arg_origin: Some(FirstArgOrigin::Parameter {
name: "name".to_string(),
}),
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessShell,
CommandArgKind::Interpolated,
None,
);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_KW_SHELL_TRUE + W_ARGV0_IS_PARAMETER)).abs() < 1e-6,
"expected -0.70, got {total}"
);
}
#[test]
fn shell_true_static_literal_predicts_realbug_low() {
let evidence = Evidence {
kw_shell_true: true,
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessShell,
CommandArgKind::StaticLiteral,
Some("git status"),
);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Low);
assert_eq!(p.alternative_branch.label, BranchLabel::Benign);
assert_eq!(p.alternative_branch.severity, Severity::Info);
}
#[test]
fn mixed_list_literal_argv0_predicts_benign_info() {
let evidence = Evidence {
argv0_origin: Some(Argv0Origin::Literal),
argv_list_all_literals: false,
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessNoShell,
CommandArgKind::MixedListLiteralArgv0,
None,
);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.alternative_branch.severity, Severity::Low);
}
#[test]
fn command_static_annotation_collapses_to_benign() {
let evidence = Evidence {
kw_shell_true: true,
first_arg_origin: Some(FirstArgOrigin::RequestSource),
command_static_annotation: Some("allowlisted-by-caller".to_string()),
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessShell,
CommandArgKind::Interpolated,
None,
);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn command_user_controlled_annotation_collapses_to_realbug() {
let evidence = Evidence {
argv_list_all_literals: true,
argv0_origin: Some(Argv0Origin::Literal),
command_user_controlled_annotation: Some("env-var".to_string()),
..Default::default()
};
let p = predict(
&evidence,
CommandApi::PySubprocessNoShell,
CommandArgKind::StaticList,
None,
);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Low);
}
#[test]
fn empty_evidence_tiebreaks_realbug() {
let p = predict(
&Evidence::empty(),
CommandApi::PyOsSystem,
CommandArgKind::Unknown,
None,
);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn handler_scope_weight_is_negative() {
assert!(W_ENCLOSING_HANDLER < 0.0);
assert!(W_KW_SHELL_TRUE < 0.0);
assert!(W_FIRST_ARG_REQUEST_SOURCE < 0.0);
assert!(W_ARGV0_IS_PARAMETER < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_ARGV_LIST_ALL_LITERALS > 0.0);
assert!(W_ARGV0_IS_LITERAL > 0.0);
assert!(W_FIRST_ARG_CONFIG_SOURCE > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
}
#[test]
fn handler_lexicon_matches_camel_verb_prefix() {
assert!(matches_handler_function("getUserById"));
assert!(matches_handler_function("postOrder"));
assert!(matches_handler_function("deleteAccount"));
assert!(!matches_handler_function("get_user_by_id"));
assert!(!matches_handler_function("getfoo")); }
#[test]
fn handler_lexicon_matches_substring() {
assert!(matches_handler_function("user_route"));
assert!(matches_handler_function("loginHandler"));
assert!(matches_handler_function("upload_endpoint"));
assert!(!matches_handler_function("calculate_total"));
}
#[test]
fn request_object_lexicon() {
assert!(matches_request_object("request.GET[\"foo\"]"));
assert!(matches_request_object("req.body.cmd"));
assert!(matches_request_object("flask.request.args"));
assert!(!matches_request_object("settings.BASE_DIR"));
}
#[test]
fn config_object_lexicon() {
assert!(matches_config_object("os.environ.get(\"X\")"));
assert!(matches_config_object("settings.BASE_DIR"));
assert!(matches_config_object("config.DEBUG"));
assert!(!matches_config_object("request.args.get(\"x\")"));
}
#[test]
fn test_function_lexicon() {
assert!(matches_test_function("test_subprocess_run"));
assert!(matches_test_function("subprocess_test"));
assert!(matches_test_function("setup_fixture"));
assert!(!matches_test_function("run_command"));
}
#[test]
fn extract_command_static_with_reason() {
assert_eq!(
extract_command_static_reason(
"subprocess.run(...) # repotoire: command-static[validated]"
),
Some("validated".to_string())
);
}
#[test]
fn extract_command_static_without_reason() {
assert_eq!(
extract_command_static_reason("subprocess.run(...) # repotoire: command-static"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_command_user_controlled_with_source() {
assert_eq!(
extract_command_user_controlled_source(
"subprocess.run(...) # repotoire: command-user-controlled[GET-request]"
),
Some("GET-request".to_string())
);
}
#[test]
fn extract_command_static_ignores_other_kinds() {
assert_eq!(
extract_command_static_reason("os.path.join(...) # repotoire: internal-path[ok]"),
None
);
}
#[test]
fn extract_command_user_controlled_ignores_other_kinds() {
assert_eq!(
extract_command_user_controlled_source(
"os.path.join(...) # repotoire: user-controlled[GET]"
),
None
);
}
}