use crate::Technology;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OsContext {
pub distro: Distro,
pub version_hint: Option<String>,
pub source: OsSource,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum Distro {
Debian,
Ubuntu,
Rhel,
CentOs,
Rocky,
Alma,
AmazonLinux,
Suse,
OpenSuse,
Alpine,
Arch,
Fedora,
FreeBsd,
Windows,
Unknown,
}
impl Distro {
#[must_use]
pub const fn backports_patches(self) -> bool {
matches!(
self,
Self::Debian
| Self::Ubuntu
| Self::Rhel
| Self::CentOs
| Self::Rocky
| Self::Alma
| Self::AmazonLinux
| Self::Suse
| Self::OpenSuse
)
}
#[must_use]
pub const fn is_rolling(self) -> bool {
matches!(self, Self::Arch | Self::Fedora | Self::Alpine)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OsSource {
ServerHeader,
PoweredByHeader,
Fingerprint,
VersionPattern,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum VersionReliability {
Unreliable,
Suspect,
Likely,
NoVersion,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionAssessment {
pub technology: String,
pub reported_version: Option<String>,
pub os_context: Option<OsContext>,
pub backport_likely: bool,
pub reliability: VersionReliability,
pub advisory: String,
}
#[must_use]
pub fn assess_headers<K: AsRef<str>, V: AsRef<str>>(
technologies: &[Technology],
headers: &[(K, V)],
) -> Vec<VersionAssessment> {
let os = detect_os(headers);
technologies
.iter()
.map(|tech| {
let version = tech.version.as_deref();
match version {
None => VersionAssessment {
technology: tech.name.clone(),
reported_version: None,
os_context: os.clone(),
backport_likely: false,
reliability: VersionReliability::NoVersion,
advisory: "no version extracted — cannot assess".to_string(),
},
Some(ver) => assess_single(&tech.name, ver, &os),
}
})
.collect()
}
pub fn assess<K: AsRef<str>, V: AsRef<str>>(
technologies: &mut Vec<Technology>,
headers: &[(K, V)],
) {
let assessments = assess_headers(technologies, headers);
for tech in technologies.iter_mut() {
if let Some(a) = assessments.iter().find(|a| a.technology == tech.name) {
tech.confidence = match a.reliability {
VersionReliability::Unreliable => tech.confidence.saturating_sub(20).max(20),
VersionReliability::Likely => (tech.confidence + 5).min(100),
_ => tech.confidence,
};
}
}
}
#[must_use]
pub fn detect_os<K: AsRef<str>, V: AsRef<str>>(headers: &[(K, V)]) -> Option<OsContext> {
if let Some(ctx) = headers
.iter()
.filter(|(k, _)| k.as_ref().eq_ignore_ascii_case("server"))
.find_map(|(_, v)| parse_os_from_parenthetical(v.as_ref()))
{
return Some(ctx);
}
if let Some(ctx) = headers
.iter()
.filter(|(k, _)| k.as_ref().eq_ignore_ascii_case("x-powered-by"))
.find_map(|(_, v)| parse_os_from_version_suffix(v.as_ref()))
{
return Some(ctx);
}
if let Some(ctx) = headers
.iter()
.filter(|(k, _)| k.as_ref().eq_ignore_ascii_case("server"))
.find_map(|(_, v)| parse_os_from_server_string(v.as_ref()))
{
return Some(ctx);
}
None
}
fn assess_single(tech_name: &str, version: &str, os: &Option<OsContext>) -> VersionAssessment {
let (backport_likely, reliability, advisory) = match os {
Some(ctx) if ctx.distro.backports_patches() => {
let distro_name = format!("{:?}", ctx.distro);
(
true,
VersionReliability::Unreliable,
format!(
"{tech_name} {version} reported on {distro_name} — \
this distribution backports security patches without \
changing the upstream version number. The actual \
vulnerability surface may be entirely different from \
what CVE databases report for version {version}. \
Do NOT use this version for CVE correlation without \
behavioral verification or package-level inspection \
(e.g. dpkg -l, rpm -q)."
),
)
}
Some(ctx) if ctx.distro.is_rolling() => (
false,
VersionReliability::Likely,
format!(
"{tech_name} {version} on {:?} (rolling release) — \
version number likely tracks upstream. \
CVE correlation is more reliable but verify with \
package manager.",
ctx.distro
),
),
Some(ctx) => {
(
false,
VersionReliability::Suspect,
format!(
"{tech_name} {version} on {:?} — \
patch strategy is uncertain for this platform. \
Treat CVE matches with moderate confidence.",
ctx.distro
),
)
}
None => {
if let Some(inline_ctx) = parse_os_from_version_suffix(version) {
if inline_ctx.distro.backports_patches() {
return VersionAssessment {
technology: tech_name.to_string(),
reported_version: Some(version.to_string()),
os_context: Some(inline_ctx.clone()),
backport_likely: true,
reliability: VersionReliability::Unreliable,
advisory: format!(
"{tech_name} {version} — version string contains \
{:?} packaging suffix, confirming backported patches. \
Upstream version number is meaningless for CVE assessment.",
inline_ctx.distro
),
};
}
}
(
false,
VersionReliability::Suspect,
format!(
"{tech_name} {version} — no OS context detected. \
Version may be accurate or may be backported. \
Treat CVE matches with moderate confidence."
),
)
}
};
VersionAssessment {
technology: tech_name.to_string(),
reported_version: Some(version.to_string()),
os_context: os.clone(),
backport_likely,
reliability,
advisory,
}
}
const DISTRO_PATTERNS: &[(&str, Distro)] = &[
("ubuntu", Distro::Ubuntu),
("debian", Distro::Debian),
("red hat", Distro::Rhel),
("redhat", Distro::Rhel),
("rhel", Distro::Rhel),
("centos", Distro::CentOs),
("rocky", Distro::Rocky),
("almalinux", Distro::Alma),
("amzn", Distro::AmazonLinux),
("amazon", Distro::AmazonLinux),
("suse", Distro::Suse),
("opensuse", Distro::OpenSuse),
("alpine", Distro::Alpine),
("arch", Distro::Arch),
("fedora", Distro::Fedora),
("freebsd", Distro::FreeBsd),
("win32", Distro::Windows),
("win64", Distro::Windows),
("windows", Distro::Windows),
("unix", Distro::Unknown), ];
fn parse_os_from_parenthetical(server: &str) -> Option<OsContext> {
let paren_start = server.find('(')?;
let paren_end = server.find(')')?;
if paren_end <= paren_start {
return None;
}
let content = &server[paren_start + 1..paren_end];
let lower = content.to_ascii_lowercase();
for &(pattern, distro) in DISTRO_PATTERNS {
if lower.contains(pattern) && distro != Distro::Unknown {
let version_hint = extract_distro_version(content, pattern);
return Some(OsContext {
distro,
version_hint,
source: OsSource::ServerHeader,
});
}
}
None
}
fn parse_os_from_version_suffix(value: &str) -> Option<OsContext> {
let lower = value.to_ascii_lowercase();
if lower.contains("+deb") || lower.contains("~deb") || lower.contains(".deb") {
let version_hint = extract_deb_version(&lower);
return Some(OsContext {
distro: Distro::Debian,
version_hint,
source: OsSource::PoweredByHeader,
});
}
if lower.contains("ubuntu") {
return Some(OsContext {
distro: Distro::Ubuntu,
version_hint: None,
source: OsSource::PoweredByHeader,
});
}
if lower.contains(".el") {
let version_hint = lower.find(".el").and_then(|i| {
let rest = &lower[i + 3..];
let digits: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect();
if digits.is_empty() {
None
} else {
Some(format!("EL{digits}"))
}
});
return Some(OsContext {
distro: Distro::Rhel,
version_hint,
source: OsSource::VersionPattern,
});
}
if lower.contains(".amzn") || lower.contains("amzn") {
return Some(OsContext {
distro: Distro::AmazonLinux,
version_hint: None,
source: OsSource::VersionPattern,
});
}
if lower.contains("alpine") {
return Some(OsContext {
distro: Distro::Alpine,
version_hint: None,
source: OsSource::VersionPattern,
});
}
None
}
fn parse_os_from_server_string(server: &str) -> Option<OsContext> {
let lower = server.to_ascii_lowercase();
for &(pattern, distro) in DISTRO_PATTERNS {
if distro != Distro::Unknown && lower.contains(pattern) {
return Some(OsContext {
distro,
version_hint: None,
source: OsSource::ServerHeader,
});
}
}
None
}
fn extract_distro_version(content: &str, _pattern: &str) -> Option<String> {
let mut chars = content.chars().peekable();
let mut found_digit_start = false;
let mut version = String::new();
for ch in &mut chars {
if ch.is_ascii_digit() {
found_digit_start = true;
version.push(ch);
} else if found_digit_start && (ch == '.' || ch == '-') {
version.push(ch);
} else if found_digit_start {
break;
}
}
let trimmed = version
.trim_end_matches(|c: char| !c.is_ascii_digit())
.to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}
fn extract_deb_version(lower: &str) -> Option<String> {
if let Some(idx) = lower.find("deb") {
let rest = &lower[idx + 3..];
let digits: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect();
if !digits.is_empty() {
return Some(format!("Debian {digits}"));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::TechCategory;
#[test]
fn detect_ubuntu_from_server_header() {
let headers = vec![("Server".to_string(), "Apache/2.4.41 (Ubuntu)".to_string())];
let os = detect_os(&headers);
assert!(os.is_some());
let ctx = os.unwrap();
assert_eq!(ctx.distro, Distro::Ubuntu);
assert!(ctx.distro.backports_patches());
}
#[test]
fn detect_debian_from_php_version() {
let headers = vec![(
"X-Powered-By".to_string(),
"PHP/7.4.33-1+deb11u1".to_string(),
)];
let os = detect_os(&headers);
assert!(os.is_some());
let ctx = os.unwrap();
assert_eq!(ctx.distro, Distro::Debian);
assert_eq!(ctx.version_hint.as_deref(), Some("Debian 11"));
}
#[test]
fn detect_rhel_from_el_suffix() {
let headers = vec![("X-Powered-By".to_string(), "PHP/5.4.16-48.el7".to_string())];
let os = detect_os(&headers);
assert!(os.is_some());
let ctx = os.unwrap();
assert_eq!(ctx.distro, Distro::Rhel);
assert_eq!(ctx.version_hint.as_deref(), Some("EL7"));
}
#[test]
fn no_os_from_clean_headers() {
let headers = vec![("Server".to_string(), "nginx/1.21.0".to_string())];
let os = detect_os(&headers);
assert!(os.is_none());
}
#[test]
fn assess_backported_version() {
let techs = vec![Technology {
name: "Apache".to_string(),
version: Some("2.4.41".to_string()),
category: TechCategory::Server,
confidence: 95,
}];
let headers = vec![("Server".to_string(), "Apache/2.4.41 (Ubuntu)".to_string())];
let assessments = assess_headers(&techs, &headers);
assert_eq!(assessments.len(), 1);
let a = &assessments[0];
assert!(a.backport_likely);
assert_eq!(a.reliability, VersionReliability::Unreliable);
assert!(a.advisory.contains("backport"));
assert!(a.advisory.contains("Do NOT"));
}
#[test]
fn assess_rolling_release() {
let techs = vec![Technology {
name: "nginx".to_string(),
version: Some("1.25.0".to_string()),
category: TechCategory::Server,
confidence: 95,
}];
let headers = vec![("Server".to_string(), "nginx/1.25.0 Arch".to_string())];
let assessments = assess_headers(&techs, &headers);
let a = &assessments[0];
assert!(!a.backport_likely);
assert_eq!(a.reliability, VersionReliability::Likely);
}
#[test]
fn assess_no_version_technology() {
let techs = vec![Technology {
name: "Cloudflare".to_string(),
version: None,
category: TechCategory::Cdn,
confidence: 95,
}];
let headers: Vec<(String, String)> = vec![];
let assessments = assess_headers(&techs, &headers);
assert_eq!(assessments[0].reliability, VersionReliability::NoVersion);
}
#[test]
fn assess_inline_debian_version() {
let techs = vec![Technology {
name: "PHP".to_string(),
version: Some("7.4.33-1+deb11u1".to_string()),
category: TechCategory::Language,
confidence: 85,
}];
let headers = vec![("Server".to_string(), "nginx/1.18.0".to_string())];
let assessments = assess_headers(&techs, &headers);
let a = &assessments[0];
assert!(a.backport_likely);
assert_eq!(a.reliability, VersionReliability::Unreliable);
assert!(a.os_context.is_some());
assert_eq!(a.os_context.as_ref().unwrap().distro, Distro::Debian);
}
#[test]
fn distro_backport_classification() {
assert!(Distro::Debian.backports_patches());
assert!(Distro::Ubuntu.backports_patches());
assert!(Distro::Rhel.backports_patches());
assert!(Distro::CentOs.backports_patches());
assert!(Distro::AmazonLinux.backports_patches());
assert!(Distro::Suse.backports_patches());
assert!(!Distro::Arch.backports_patches());
assert!(!Distro::Fedora.backports_patches());
assert!(!Distro::Alpine.backports_patches());
assert!(Distro::Arch.is_rolling());
assert!(Distro::Fedora.is_rolling());
assert!(!Distro::Debian.is_rolling());
}
}