import csv
import json
import os
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from bench.config import RULE_CWE_MAP
@dataclass
class CWEAnalysis:
cwe_id: str
cwe_dir_name: str
files_analyzed: int = 0
tp_count: int = 0
fp_count: int = 0
tp_rate_pct: float = 0.0
flaw_lines_total: int = 0
flaw_lines_detected: int = 0
flaw_detection_rate_pct: float = 0.0
rule_breakdown: dict = field(default_factory=dict)
cwe_matched_tp: int = 0
cwe_matched_fp: int = 0
noise_count: int = 0
noise_ratio: float = 0.0
per_file_detected: int = 0
per_file_total: int = 0
per_file_rate: float = 0.0
flaw_hit_detected: int = 0
flaw_hit_total: int = 0
flaw_hit_rate: float = 0.0
cwe_rules: set = field(default_factory=set)
violations: list = field(default_factory=list)
def parse_c_file_sections(filepath: str | Path) -> dict:
result = {'bad_lines': set(), 'good_lines': set(), 'flaw_lines': set()}
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
in_bad = False
in_good = False
for i, line in enumerate(lines, start=1):
if 'FLAW:' in line or 'POTENTIAL FLAW:' in line:
result['flaw_lines'].add(i)
if '#ifndef OMITBAD' in line:
in_bad = True
in_good = False
elif '#endif /* OMITBAD */' in line or '#endif /* OMITBAD */' in line:
in_bad = False
elif '#ifndef OMITGOOD' in line:
in_good = True
in_bad = False
elif '#endif /* OMITGOOD */' in line or '#endif /* OMITGOOD */' in line:
in_good = False
if in_bad:
result['bad_lines'].add(i)
elif in_good:
result['good_lines'].add(i)
_reclassify_helpers(lines, result)
return result
_FUNC_DEF_RE = re.compile(
r'^(?:static\s+)?' r'(?:[\w*\s]+\s+)' r'(\w+)' r'\s*\([^)]*\)\s*$' )
def _parse_function_ranges(lines: list[str]) -> dict[str, tuple[int, int]]:
functions = {}
i = 0
while i < len(lines):
line = lines[i]
if line.startswith('#') or (line and line[0] in ' \t'):
i += 1
continue
m = _FUNC_DEF_RE.match(line.rstrip())
if m:
func_name = m.group(1)
brace_line = None
for j in range(i, min(i + 3, len(lines))):
if '{' in lines[j]:
brace_line = j
break
if brace_line is not None:
depth = 0
start = i + 1 end = start
for j in range(brace_line, len(lines)):
for ch in lines[j]:
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
if depth == 0:
end = j + 1 functions[func_name] = (start, end)
i = j + 1
break
if depth == 0 and func_name in functions:
break
else:
i += 1
continue
i += 1
return functions
def _reclassify_helpers(lines: list[str], result: dict) -> None:
bad_lines = result['bad_lines']
good_lines = result['good_lines']
functions = _parse_function_ranges(lines)
if not functions:
return
outside_funcs = {}
for name, (start, end) in functions.items():
if start not in bad_lines and start not in good_lines:
outside_funcs[name] = (start, end)
if not outside_funcs:
return
calls_from_bad = set() calls_from_good = set()
func_patterns = {
name: re.compile(r'(?<![a-zA-Z0-9_])' + re.escape(name) + r'(?![a-zA-Z0-9_])')
for name in outside_funcs
}
for i, line in enumerate(lines, start=1):
if i not in bad_lines and i not in good_lines:
continue
for func_name, pattern in func_patterns.items():
if pattern.search(line):
if i in bad_lines:
calls_from_bad.add(func_name)
if i in good_lines:
calls_from_good.add(func_name)
for func_name, (start, end) in outside_funcs.items():
in_bad_only = func_name in calls_from_bad and func_name not in calls_from_good
in_good_only = func_name in calls_from_good and func_name not in calls_from_bad
if in_bad_only:
for line_num in range(start, end + 1):
bad_lines.add(line_num)
elif in_good_only:
for line_num in range(start, end + 1):
good_lines.add(line_num)
def parse_sqc_csv(csv_path: str | Path) -> dict:
violations = defaultdict(lambda: defaultdict(list))
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
title = row['Title']
title = re.sub(r'\s+version:\S+$', '', title)
match = re.match(r'([A-Z0-9-]+):(.+):(\d+)', title)
if match:
rule_id = match.group(1)
filepath = match.group(2)
line_num = int(match.group(3))
filename = os.path.basename(filepath)
violations[filename][line_num].append((rule_id, filepath))
return violations
def _normalize_cwe_id(raw: str) -> str:
raw = raw.strip().upper()
if raw.startswith("CWE-"):
return raw
if raw.startswith("CWE"):
return "CWE-" + raw[3:]
if raw.isdigit():
return "CWE-" + raw
return raw
def _extract_cwe_from_dir(dir_path: str) -> str | None:
dirname = Path(dir_path).name
m = re.match(r'(CWE)(\d+)', dirname)
if m:
return f"CWE-{m.group(2)}"
return None
def _hits_flaw_line(line_num: int, flaw_lines: set) -> bool:
return (line_num in flaw_lines or
line_num - 1 in flaw_lines or
line_num + 1 in flaw_lines)
def _load_cwe_rules(cwe_id: str) -> set:
if not RULE_CWE_MAP.exists():
return set()
try:
with open(RULE_CWE_MAP) as f:
data = json.load(f)
return set(data.get("cwe_to_rules", {}).get(cwe_id, []))
except Exception:
return set()
def analyze_cwe(csv_path: str | Path, cwe_dir: str | Path,
cwe_scan_id: int | None = None) -> CWEAnalysis:
cwe_dir = Path(cwe_dir)
csv_path = Path(csv_path)
cwe_dir_name = cwe_dir.name
cwe_id = _extract_cwe_from_dir(str(cwe_dir)) or cwe_dir_name
cwe_rules = _load_cwe_rules(cwe_id)
analysis = CWEAnalysis(cwe_id=cwe_id, cwe_dir_name=cwe_dir_name)
analysis.cwe_rules = cwe_rules
violations_dict = parse_sqc_csv(csv_path)
subdirs = sorted(cwe_dir.glob('s*'))
if subdirs and subdirs[0].is_dir():
search_dirs = subdirs
else:
search_dirs = [cwe_dir]
rule_tp = defaultdict(int)
rule_fp = defaultdict(int)
rule_flaw = defaultdict(int)
files_with_bad_section = 0
files_detected = 0
total_flaw_lines_for_hit = 0
for search_dir in search_dirs:
if not search_dir.is_dir():
continue
for c_file in sorted(search_dir.glob('*.c')):
sections = parse_c_file_sections(c_file)
filename = c_file.name
file_violations = violations_dict.get(filename, {})
if not sections['bad_lines'] and not sections['good_lines']:
continue
analysis.files_analyzed += 1
analysis.flaw_lines_total += len(sections['flaw_lines'])
total_flaw_lines_for_hit += len(sections['flaw_lines'])
has_bad = bool(sections['bad_lines'])
if has_bad:
files_with_bad_section += 1
file_has_cwe_tp = False
for line_num, rule_entries in file_violations.items():
for rule_id, filepath in rule_entries:
in_bad = line_num in sections['bad_lines']
in_good = line_num in sections['good_lines']
on_flaw = _hits_flaw_line(line_num, sections['flaw_lines'])
is_matched = rule_id in cwe_rules if cwe_rules else False
if in_bad:
classification = "tp"
analysis.tp_count += 1
rule_tp[rule_id] += 1
if line_num in sections['flaw_lines']:
analysis.flaw_lines_detected += 1
rule_flaw[rule_id] += 1
if is_matched:
analysis.cwe_matched_tp += 1
file_has_cwe_tp = True
if on_flaw:
analysis.flaw_hit_detected += 1
else:
analysis.noise_count += 1
elif in_good:
classification = "fp"
analysis.fp_count += 1
rule_fp[rule_id] += 1
if is_matched:
analysis.cwe_matched_fp += 1
else:
analysis.noise_count += 1
else:
classification = "unknown"
if cwe_scan_id is not None:
analysis.violations.append({
"cwe_scan_id": cwe_scan_id,
"rule_id": rule_id,
"file_path": filepath,
"line": line_num,
"classification": classification,
"in_bad_section": int(in_bad),
"in_good_section": int(in_good),
"hits_flaw_line": int(on_flaw),
"is_cwe_matched": int(is_matched),
})
if file_has_cwe_tp:
files_detected += 1
total_violations = analysis.tp_count + analysis.fp_count
if total_violations > 0:
analysis.tp_rate_pct = round(analysis.tp_count / total_violations * 100, 1)
if analysis.flaw_lines_total > 0:
analysis.flaw_detection_rate_pct = round(
analysis.flaw_lines_detected / analysis.flaw_lines_total * 100, 1)
cwe_matched_total = analysis.cwe_matched_tp + analysis.cwe_matched_fp
all_total = cwe_matched_total + analysis.noise_count
if all_total > 0:
analysis.noise_ratio = round(analysis.noise_count / all_total * 100, 1)
analysis.per_file_detected = files_detected
analysis.per_file_total = files_with_bad_section
if files_with_bad_section > 0:
analysis.per_file_rate = round(files_detected / files_with_bad_section * 100, 1)
analysis.flaw_hit_total = total_flaw_lines_for_hit
if total_flaw_lines_for_hit > 0:
analysis.flaw_hit_rate = round(
analysis.flaw_hit_detected / total_flaw_lines_for_hit * 100, 1)
all_rules = set(rule_tp) | set(rule_fp) | set(rule_flaw)
for rule in all_rules:
analysis.rule_breakdown[rule] = {
"tp": rule_tp.get(rule, 0),
"fp": rule_fp.get(rule, 0),
"flaw": rule_flaw.get(rule, 0),
"is_cwe_matched": int(rule in cwe_rules) if cwe_rules else 0,
}
return analysis