use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
const W_FIRST_ARG_LITERAL: f32 = 0.40;
const W_FIRST_ARG_CONFIG_SOURCE: f32 = 0.30;
const W_FIRST_ARG_REQUEST_SOURCE: f32 = -0.50;
const W_FIRST_ARG_IS_PARAMETER: f32 = -0.30;
const W_BASENAME_APPLIED: f32 = 0.20;
const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
const REQUEST_OBJECT_SUBSTRINGS: &[&str] = &[
"request.get",
"request.post",
"request.args",
"request.form",
"request.values",
"request.files",
"request.json",
"request.data",
"request.body",
"request.params",
"request.query",
"req.params",
"req.query",
"req.body",
"input(",
"sys.argv",
"raw_input(",
];
const CONFIG_OBJECT_SUBSTRINGS: &[&str] = &[
"settings.",
"config.",
"os.environ",
"os.getenv",
"os.path.expanduser",
"pathlib.path.home",
"tempfile.gettempdir",
];
const TEST_FUNCTION_SUBSTRINGS: &[&str] =
&["test_", "_test", "fixture", "setup", "teardown", "conftest"];
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum FirstArgOrigin {
Literal,
ConfigSource,
RequestSource,
Parameter { name: String },
Unknown,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub first_arg_origin: Option<FirstArgOrigin>,
pub basename_applied: bool,
pub internal_path_annotation: Option<String>,
pub user_controlled_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence, api_label: &str) -> Prediction {
if let Some(reason) = &evidence.internal_path_annotation {
return collapse(
BranchLabel::Benign,
api_label,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: internal-path[{reason}]"),
},
description: format!(
"`internal-path[{reason}]` annotation declares this path-join \
as protected by caller-side validation; the finding collapses \
to Info."
),
example: Some(format!(
"{api_label}(...) # repotoire: internal-path[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("internal-path[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as caller-validated ({reason}); not a path-traversal risk."
),
},
);
}
if let Some(source) = &evidence.user_controlled_annotation {
return collapse(
BranchLabel::RealBug,
api_label,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: user-controlled[{source}]"),
},
description: format!(
"`user-controlled[{source}]` annotation declares the path \
argument as attacker-controlled; the finding stays at High."
),
example: Some(format!(
"{api_label}(...) # repotoire: user-controlled[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("user-controlled[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as user-controlled (source: {source})."),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if let Some(origin) = &evidence.first_arg_origin {
match origin {
FirstArgOrigin::Literal => {
sum += W_FIRST_ARG_LITERAL;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "first arg is a string literal".to_string(),
},
weight: W_FIRST_ARG_LITERAL,
note: "First argument is a string literal; the base path is \
project-controlled, not attacker-controlled."
.to_string(),
});
}
FirstArgOrigin::ConfigSource => {
sum += W_FIRST_ARG_CONFIG_SOURCE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "first arg sourced from config module".to_string(),
},
weight: W_FIRST_ARG_CONFIG_SOURCE,
note: "First argument originates from a config/env source \
(`settings`, `os.environ`, `os.path.expanduser`); the \
project owns this value."
.to_string(),
});
}
FirstArgOrigin::RequestSource => {
sum += W_FIRST_ARG_REQUEST_SOURCE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "first arg from request object".to_string(),
},
weight: W_FIRST_ARG_REQUEST_SOURCE,
note: "First argument originates from a request object \
(`request.GET`, `request.args`, ...); attacker-controlled."
.to_string(),
});
}
FirstArgOrigin::Parameter { name } => {
sum += W_FIRST_ARG_IS_PARAMETER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::FirstArgIdentifier { name: name.clone() },
weight: W_FIRST_ARG_IS_PARAMETER,
note: format!(
"First argument is `{name}`, a function parameter; possibly \
user-controlled."
),
});
}
FirstArgOrigin::Unknown => {
}
}
}
if evidence.basename_applied {
sum += W_BASENAME_APPLIED;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "os.path.basename applied to argument".to_string(),
},
weight: W_BASENAME_APPLIED,
note: "`os.path.basename` strips `..` traversal sequences from \
user input; defensive idiom."
.to_string(),
});
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a test/fixture; \
test code rarely the actionable security target."
),
});
}
}
let predicted = if sum >= 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api_label, reasons, Vec::new())
}
pub(super) fn matches_request_object(text: &str) -> bool {
let lower = text.to_lowercase();
REQUEST_OBJECT_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_config_object(text: &str) -> bool {
let lower = text.to_lowercase();
CONFIG_OBJECT_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn collapse(
label: BranchLabel,
api_label: &str,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api_label, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api_label: &str,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let predicted_severity = severity_for_branch(predicted);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel) -> Severity {
match label {
BranchLabel::RealBug => Severity::High,
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Path traversal via {api_label}"),
BranchLabel::Benign => format!("Internal path-join in {api_label} (informational)"),
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The path argument to `{api_label}` appears to originate from \
user-controlled input. Concatenating untrusted path components \
allows directory traversal (`..`), absolute-path overrides, and \
arbitrary file read/write."
),
BranchLabel::Benign => format!(
"The path argument to `{api_label}` appears to be internal/literal/\
config-derived. The call site is carried as Info; the High-severity \
interpretation is preserved in `alternative_branch` in case the \
predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Validate the path component against an allowlist, or wrap with \
`os.path.basename(...)` to strip `..` sequences. For file-serving \
endpoints, use `flask.send_from_directory` or `pathlib.Path.resolve` \
with a base-prefix check."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional internal use, annotate \
`# repotoire: internal-path[<reason>]` to collapse the finding to Info \
definitively."
.to_string(),
),
}
}
pub(super) fn extract_internal_path_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "internal-path" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_user_controlled_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "user-controlled" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn click_utils_489_config_source_alone_predicts_benign() {
let evidence = Evidence {
enclosing_function: Some("get_app_dir".to_string()),
enclosing_class: None,
first_arg_origin: Some(FirstArgOrigin::ConfigSource),
basename_applied: false,
internal_path_annotation: None,
user_controlled_annotation: None,
};
let p = predict(&evidence, "os.path.join");
assert_eq!(
p.predicted,
BranchLabel::Benign,
"config-source alone leans Benign at +0.30; full canonical case \
requires combined first-arg + param signals (deferred to v1)"
);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn pure_literal_first_arg_predicts_benign() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Literal),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - W_FIRST_ARG_LITERAL).abs() < 1e-6);
}
#[test]
fn request_source_predicts_realbug() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::RequestSource),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn parameter_arg_predicts_realbug() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Parameter {
name: "name".to_string(),
}),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn config_source_alone_predicts_benign() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::ConfigSource),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn no_signals_predicts_benign_via_tiebreak() {
let evidence = Evidence::empty();
let p = predict(&evidence, "os.path.join");
assert_eq!(
p.predicted,
BranchLabel::Benign,
"0.0 sum tiebreaks Benign/Info when no risk evidence fires"
);
}
#[test]
fn unknown_origin_alone_predicts_benign() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Unknown),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn parameter_with_basename_still_realbug_at_minus_010() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Parameter {
name: "name".to_string(),
}),
basename_applied: true,
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::RealBug);
}
#[test]
fn parameter_with_basename_in_test_function_flips_to_benign() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Parameter {
name: "name".to_string(),
}),
basename_applied: true,
enclosing_function: Some("test_path_join_safety".to_string()),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn internal_path_annotation_collapses_to_benign() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::RequestSource),
internal_path_annotation: Some("validated-by-caller".to_string()),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::Benign);
match &p.resolutions[0].kind {
ResolutionKind::SourceAnnotation { syntax } => {
assert!(syntax.contains("internal-path"));
assert!(syntax.contains("validated-by-caller"));
}
other => panic!("expected SourceAnnotation, got {other:?}"),
}
}
#[test]
fn user_controlled_annotation_collapses_to_realbug() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Literal),
user_controlled_annotation: Some("GET-request".to_string()),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
assert_eq!(p.resolutions.len(), 1);
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::RealBug);
}
#[test]
fn both_annotations_present_internal_path_wins_documented_order() {
let evidence = Evidence {
internal_path_annotation: Some("a".to_string()),
user_controlled_annotation: Some("b".to_string()),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn realbug_prediction_carries_benign_alternative() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Parameter {
name: "name".to_string(),
}),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.alternative_branch.label, BranchLabel::Benign);
assert_eq!(p.alternative_branch.severity, Severity::Info);
}
#[test]
fn benign_prediction_carries_realbug_alternative() {
let evidence = Evidence {
first_arg_origin: Some(FirstArgOrigin::Literal),
..Default::default()
};
let p = predict(&evidence, "os.path.join");
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.alternative_branch.label, BranchLabel::RealBug);
assert_eq!(p.alternative_branch.severity, Severity::High);
}
#[test]
fn extract_internal_path_reason_works() {
assert_eq!(
extract_internal_path_reason(
"open(p) # repotoire: internal-path[validated-by-caller]"
),
Some("validated-by-caller".to_string())
);
assert_eq!(
extract_internal_path_reason("# repotoire: internal-path"),
Some("unspecified".to_string())
);
assert_eq!(
extract_internal_path_reason("open(p) # repotoire: user-controlled[x]"),
None,
"wrong kind returns None"
);
assert_eq!(extract_internal_path_reason("# noqa"), None);
}
#[test]
fn extract_user_controlled_source_works() {
assert_eq!(
extract_user_controlled_source("open(p) # repotoire: user-controlled[GET]"),
Some("GET".to_string())
);
assert_eq!(
extract_user_controlled_source("# repotoire: user-controlled"),
Some("unspecified".to_string())
);
assert_eq!(
extract_user_controlled_source("# repotoire: internal-path[x]"),
None
);
}
#[test]
fn request_object_substrings_match_common_frameworks() {
assert!(matches_request_object("request.GET['file']"));
assert!(matches_request_object("flask.request.args['x']"));
assert!(matches_request_object("req.params.id"));
assert!(matches_request_object("REQUEST.POST['x']")); assert!(!matches_request_object("config.BASE_DIR"));
assert!(!matches_request_object("my_request_id"));
}
#[test]
fn config_object_substrings_match_common_idioms() {
assert!(matches_config_object("settings.BASE_DIR"));
assert!(matches_config_object("config.TEMPLATE_DIR"));
assert!(matches_config_object("os.environ.get('HOME')"));
assert!(matches_config_object("os.getenv('HOME')"));
assert!(matches_config_object("os.path.expanduser('~')"));
assert!(!matches_config_object("request.GET['x']"));
}
}