import re
import os
import sys
import time
import json
import argparse
import requests
import textwrap
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, quote
from bs4 import BeautifulSoup
from datetime import datetime
BASE_URL = "https://wiki.sei.cmu.edu"
WIKI_BASE = f"{BASE_URL}/confluence/display/c"
DEFAULT_DELAY = 3.0 USER_AGENT = "CERT-C-Scraper/1.0 (Educational Purpose)"
BASE_OUTPUT_DIR = "src/rules/cert_c"
CATEGORIES = {}
@dataclass
class ItemMetadata:
id: str
item_type: str category: str
number: int
title: str
severity: Optional[str] = None
likelihood: Optional[str] = None
priority: Optional[str] = None
level: Optional[str] = None
wiki_url: str = ""
cert_version: Optional[str] = None last_modified: Optional[str] = None cwe: List[str] = None
related_rules: List[str] = None
related_recommendations: List[str] = None
description: str = ""
def __post_init__(self):
if self.cwe is None:
self.cwe = []
if self.related_rules is None:
self.related_rules = []
if self.related_recommendations is None:
self.related_recommendations = []
class WikiScraper:
def __init__(self, delay: float = DEFAULT_DELAY):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
self.delay = delay
self.last_request_time = 0
def _rate_limit(self):
elapsed = time.time() - self.last_request_time
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self.last_request_time = time.time()
def fetch_page(self, url: str) -> Optional[BeautifulSoup]:
self._rate_limit()
try:
print(f" Fetching: {url}")
response = self.session.get(url, timeout=30)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except requests.RequestException as e:
print(f" ✗ Error fetching {url}: {e}")
return None
def discover_categories(self) -> Dict[str, Tuple[str, str]]:
print("Discovering categories from main wiki page...")
soup = self.fetch_page(WIKI_BASE)
if not soup:
print(" ✗ Failed to fetch main wiki page, using empty categories")
return {}
categories = {}
for link in soup.find_all('a', href=True):
link_text = link.get_text(strip=True)
match = re.match(r'(?:Rule|Rec\.)\s+(\d+)\.\s+([^(]+)\s+\((\w+)\)', link_text)
if match:
cat_num, cat_name, cat_code = match.groups()
cat_name = cat_name.strip()
if cat_code not in categories:
categories[cat_code] = (cat_num, cat_name)
print(f" Found category: {cat_code} = {cat_num}. {cat_name}")
print(f"✓ Discovered {len(categories)} categories")
return categories
def get_category_items(self, category_code: str, item_type: str) -> List[Tuple[str, str, str]]:
cat_num, cat_name = CATEGORIES[category_code]
if item_type == "rule":
prefix = "Rule"
else:
prefix = "Rec."
cat_url = f"{WIKI_BASE}/{prefix}+{cat_num}.+{cat_name.replace(' ', '+')}+({category_code})"
soup = self.fetch_page(cat_url)
if not soup:
return []
items = []
item_pattern = re.compile(rf'^{category_code}\d{{2}}-C')
for link in soup.find_all('a', href=True):
text = link.get_text(strip=True)
if item_pattern.match(text):
match = re.match(rf'({category_code}\d{{2}}-C)', text)
if match:
item_id = match.group(1)
title = text[len(item_id):].strip('. ')
href = link['href']
if href.startswith('/'):
item_url = urljoin(BASE_URL, href)
else:
item_url = href
items.append((item_id, title, item_url))
print(f" Found: {item_id} - {title}")
return items
def parse_item_page(self, item_id: str, item_url: str, item_type: str) -> Optional[Tuple[ItemMetadata, List[str], List[str]]]:
soup = self.fetch_page(item_url)
if not soup:
return None
match = re.match(r'^([A-Z]{3})(\d{2})-C$', item_id)
if not match:
return None
category, number = match.groups()
item = ItemMetadata(
id=item_id,
item_type=item_type,
category=category,
number=int(number),
title="",
wiki_url=item_url
)
title_elem = soup.find('h1', id='title-text')
if title_elem:
title_text = title_elem.get_text(strip=True)
item.title = re.sub(rf'^{item_id}\.?\s*', '', title_text)
full_text = soup.get_text()
modified_match = re.search(r'last\s+modified\s+by\s+[^\n]+\s+on\s+([A-Z][a-z]+\s+\d{1,2},\s+\d{4})', full_text, re.IGNORECASE)
if modified_match:
item.last_modified = modified_match.group(1)
else:
page_meta = soup.find('div', id='page-metadata-banner') or soup.find('div', class_='page-metadata')
if page_meta:
meta_text = page_meta.get_text()
modified_match = re.search(r'(?:Last\s+Modified|Updated):\s*([A-Za-z]+\s+\d{1,2},\s+\d{4})', meta_text, re.IGNORECASE)
if modified_match:
item.last_modified = modified_match.group(1)
item.cert_version = "2016 Edition (Wiki)"
content = soup.find('div', id='main-content')
if not content:
return (item, [], [])
desc_parts = []
for elem in content.find_all(['p', 'div'], recursive=False):
text = elem.get_text(strip=True)
if text and not text.startswith('Rule') and not text.startswith('Rec'):
desc_parts.append(text)
if len(desc_parts) >= 3: break
item.description = '\n\n'.join(desc_parts)
for table in content.find_all('table'):
headers = [th.get_text(strip=True).lower() for th in table.find_all('th')]
if 'severity' in headers or 'likelihood' in headers:
rows = table.find_all('tr')
if len(rows) > 1:
cells = rows[1].find_all(['td', 'th'])
for i, header in enumerate(headers):
if i < len(cells):
value = cells[i].get_text(strip=True)
if header == 'severity':
item.severity = value
elif header == 'likelihood':
item.likelihood = value
elif header == 'priority':
item.priority = value
elif header == 'level':
item.level = value
cwe_pattern = re.compile(r'CWE-(\d+)')
for match in cwe_pattern.finditer(str(content)):
cwe_id = f"CWE-{match.group(1)}"
if cwe_id not in item.cwe:
item.cwe.append(cwe_id)
rule_pattern = re.compile(r'\b([A-Z]{3}\d{2}-C)\b')
for heading in content.find_all(['h2', 'h3', 'h4']):
heading_text = heading.get_text(strip=True)
if 'related' in heading_text.lower():
next_section = heading.find_next_sibling()
if next_section:
section_text = next_section.get_text()
for match in rule_pattern.finditer(section_text):
related_id = match.group(1)
if related_id == item_id:
continue
related_num = int(related_id[3:5])
start = max(0, match.start() - 50)
end = min(len(section_text), match.end() + 50)
context = section_text[start:end].lower()
if 'recommendation' in context or related_num < 30:
if related_id not in item.related_recommendations:
item.related_recommendations.append(related_id)
else:
if related_id not in item.related_rules:
item.related_rules.append(related_id)
break
non_compliant, compliant = extract_code_examples(content, item_id)
return (item, non_compliant, compliant)
def sanitize_code(code: str) -> str:
code = code.replace('\u00a0', ' ')
code = code.replace('\u200b', '')
code = code.replace('\u200c', '')
code = code.replace('\u200d', '')
code = code.replace('\ufeff', '') return code
def extract_code_examples(content, item_id: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]:
non_compliant = []
compliant = []
for heading in content.find_all(['h2', 'h3', 'h4', 'h5']):
heading_text = heading.get_text(strip=True)
heading_lower = heading_text.lower()
clean_name = heading_text
for remove_phrase in ['Noncompliant Code Example', 'Compliant Solution', 'Non-Compliant Code Example']:
clean_name = re.sub(re.escape(remove_phrase), '', clean_name, flags=re.IGNORECASE)
clean_name = clean_name.strip(' :-')
if not clean_name or len(clean_name) < 3:
clean_name = None
if 'noncompliant' in heading_lower or 'non-compliant' in heading_lower:
current = heading.find_next_sibling()
code_count = 0
while current and current.name not in ['h2', 'h3', 'h4', 'h5']:
if current.name == 'pre' or (current.name == 'div' and 'code' in current.get('class', [])):
code = current.get_text()
code = sanitize_code(code) if code.strip():
code_count += 1
if clean_name:
example_name = sanitize_filename(clean_name)
else:
example_name = f"noncompliant_{len(non_compliant) + 1}"
if code_count > 1:
example_name = f"{example_name}_{code_count}"
non_compliant.append((example_name, code.strip()))
current = current.find_next_sibling()
elif 'compliant' in heading_lower and 'non' not in heading_lower:
current = heading.find_next_sibling()
code_count = 0
while current and current.name not in ['h2', 'h3', 'h4', 'h5']:
if current.name == 'pre' or (current.name == 'div' and 'code' in current.get('class', [])):
code = current.get_text()
code = sanitize_code(code) if code.strip():
code_count += 1
if clean_name:
example_name = sanitize_filename(clean_name)
else:
example_name = f"compliant_{len(compliant) + 1}"
if code_count > 1:
example_name = f"{example_name}_{code_count}"
compliant.append((example_name, code.strip()))
current = current.find_next_sibling()
return non_compliant, compliant
def sanitize_filename(name: str) -> str:
name = re.sub(r'[^\w\s-]', '', name)
name = re.sub(r'[-\s]+', '_', name)
name = name.strip('_')
name = name.lower()
if len(name) > 60:
name = name[:60]
return name
def save_code_examples(item_id: str, category: str, non_compliant: List[Tuple[str, str]], compliant: List[Tuple[str, str]], output_dir: str):
if not non_compliant and not compliant:
return
tests_dir = Path(output_dir) / category / item_id / "tests"
fail_dir = tests_dir / "fail"
pass_dir = tests_dir / "pass"
fail_dir.mkdir(parents=True, exist_ok=True)
pass_dir.mkdir(parents=True, exist_ok=True)
for example_name, code in non_compliant:
filename = f"wiki_{example_name}.c"
filepath = fail_dir / filename
header = f"""/*
* Rule: {item_id}
* Source: wiki
* Status: FAIL - Should trigger {item_id} violation
*/
"""
full_content = header + code
filepath.write_text(full_content)
print(f" ✓ Saved non-compliant example: {filepath}")
for example_name, code in compliant:
filename = f"wiki_{example_name}.c"
filepath = pass_dir / filename
header = f"""/*
* Rule: {item_id}
* Source: wiki
* Status: PASS - Compliant solution
*/
"""
full_content = header + code
filepath.write_text(full_content)
print(f" ✓ Saved compliant example: {filepath}")
def wrap_description(text: str, width: int = 80) -> str:
if not text:
return ""
text = text.replace('\xa0', ' ')
paragraphs = text.split('\n\n')
wrapped_paragraphs = []
for para in paragraphs:
para = ' '.join(para.split())
wrapped = textwrap.fill(para, width=width)
wrapped_paragraphs.append(wrapped)
return '\n'.join(wrapped_paragraphs)
def parse_existing_toml_date(toml_path: Path) -> Optional[str]:
try:
with open(toml_path, 'r') as f:
content = f.read()
match = re.search(r'last_modified\s*=\s*"([^"]+)"', content)
if match:
return match.group(1)
except Exception as e:
print(f" ⚠ Could not read existing TOML: {e}")
return None
def compare_dates(date1: Optional[str], date2: Optional[str]) -> int:
if not date1 or not date2:
return 0
try:
from datetime import datetime
d1 = datetime.strptime(date1, "%b %d, %Y")
d2 = datetime.strptime(date2, "%b %d, %Y")
if d1 < d2:
return -1
elif d1 > d2:
return 1
else:
return 0
except:
return 0 if date1 == date2 else 1
def generate_toml_metadata(item: ItemMetadata, output_path: Path, force: bool = False):
if output_path.exists() and not force:
existing_date = parse_existing_toml_date(output_path)
wiki_date = item.last_modified
if existing_date and wiki_date:
comparison = compare_dates(wiki_date, existing_date)
if comparison <= 0:
print(f" ⊙ TOML up-to-date: {output_path} (wiki: {wiki_date}, local: {existing_date})")
return
else:
print(f" ↻ Wiki updated: {wiki_date} > {existing_date}, regenerating...")
else:
print(f" ⊙ TOML exists: {output_path} (use --force to overwrite)")
return
wrapped_desc = wrap_description(item.description, width=80)
toml_lines = []
toml_lines.append("[metadata]")
toml_lines.append(f'id = "{item.id}"')
toml_lines.append(f'type = "{item.item_type}"')
toml_lines.append(f'category = "{item.category}"')
toml_lines.append(f'number = {item.number}')
toml_lines.append(f'title = "{item.title}"')
if wrapped_desc:
toml_lines.append('description = """')
toml_lines.append(wrapped_desc)
toml_lines.append('"""')
else:
toml_lines.append('description = ""')
toml_lines.append(f'severity = "{item.severity or "Unknown"}"')
toml_lines.append(f'likelihood = "{item.likelihood or "Unknown"}"')
toml_lines.append(f'priority = "{item.priority or "Unknown"}"')
toml_lines.append(f'level = "{item.level or "Unknown"}"')
toml_lines.append(f'cert_version = "{item.cert_version or "Unknown"}"')
toml_lines.append(f'last_modified = "{item.last_modified or "Unknown"}"')
toml_lines.append("")
toml_lines.append(f'[rules.cert_c.{item.id}]')
toml_lines.append('enabled = false')
toml_lines.append("")
toml_lines.append("[references]")
toml_lines.append(f'wiki = "{item.wiki_url}"')
if item.cwe:
cwe_list = ', '.join([f'"{cwe}"' for cwe in item.cwe])
toml_lines.append(f'cwe = [{cwe_list}]')
else:
toml_lines.append('cwe = []')
toml_lines.append("")
with open(output_path, 'w') as f:
f.write('\n'.join(toml_lines))
def main():
parser = argparse.ArgumentParser(description='Scrape CERT C wiki and generate TOML metadata')
parser.add_argument('--delay', type=float, default=DEFAULT_DELAY,
help=f'Delay between requests in seconds (default: {DEFAULT_DELAY})')
parser.add_argument('--output', type=str, default=BASE_OUTPUT_DIR,
help=f'Output base directory (default: {BASE_OUTPUT_DIR})')
parser.add_argument('--categories', type=str, nargs='+',
help='Specific categories to scrape (e.g., ARR MEM), default: all')
parser.add_argument('--type', choices=['rule', 'rec', 'all'], default='all',
help='Scrape rules, recommendations, or both (default: all)')
parser.add_argument('--force', action='store_true',
help='Force overwrite existing TOML files')
args = parser.parse_args()
print("=" * 60)
print("CERT C Wiki Scraper - TOML Generation")
print("=" * 60)
print(f"Rate limit: {args.delay}s between requests")
print(f" (Note: No robots.txt found - using conservative default)")
print(f"Output directory: {args.output}")
print(f"Scraping: {args.type}")
print(f"Force overwrite: {args.force}")
print()
scraper = WikiScraper(delay=args.delay)
global CATEGORIES
CATEGORIES = scraper.discover_categories()
print()
if not CATEGORIES:
print("✗ No categories discovered from wiki. Exiting.")
return 1
categories_to_scrape = args.categories if args.categories else CATEGORIES.keys()
types_to_scrape = []
if args.type in ['rule', 'all']:
types_to_scrape.append('rule')
if args.type in ['rec', 'all']:
types_to_scrape.append('recommendation')
all_items = []
for item_type in types_to_scrape:
print(f"\n{'='*60}")
print(f"Collecting {item_type.upper()}S from category pages...")
print(f"{'='*60}")
for category_code in categories_to_scrape:
if category_code not in CATEGORIES:
print(f" ✗ Unknown category: {category_code}")
continue
print(f"\n[{category_code}] Fetching {item_type} category page...")
items = scraper.get_category_items(category_code, item_type)
print(f" ✓ Found {len(items)} {item_type}s")
all_items.extend([(item_type, category_code, item_id, title, url)
for item_id, title, url in items])
print(f"\n✓ Total items found: {len(all_items)}")
print()
print("=" * 60)
print("Parsing individual pages and generating TOML...")
print("=" * 60)
parsed_items = []
skipped_items = []
new_items = []
updated_items = []
for i, (item_type, category, item_id, title, url) in enumerate(all_items, 1):
print(f"[{i}/{len(all_items)}] Parsing {item_type} {item_id}...")
result = scraper.parse_item_page(item_id, url, item_type)
if result:
item, non_compliant, compliant = result
if not item.title and title:
item.title = title
parsed_items.append(item)
rule_dir = Path(args.output) / item.category / item.id
rule_dir.mkdir(parents=True, exist_ok=True)
toml_filename = item.id + '.toml'
toml_path = rule_dir / toml_filename
is_new = not toml_path.exists()
existing_date = parse_existing_toml_date(toml_path) if not is_new else None
generate_toml_metadata(item, toml_path, force=args.force)
if is_new:
new_items.append(item_id)
print(f" ✓ Generated new TOML: {toml_path}")
elif args.force:
updated_items.append(item_id)
print(f" ✓ Force updated TOML: {toml_path}")
elif existing_date and item.last_modified and compare_dates(item.last_modified, existing_date) > 0:
updated_items.append(item_id)
print(f" ✓ Updated TOML: {toml_path}")
else:
skipped_items.append(item_id)
if non_compliant or compliant:
save_code_examples(item.id, item.category, non_compliant, compliant, args.output)
else:
print(f" ✗ Failed to parse {item_id}")
print()
print("=" * 60)
print("✅ SCRAPING COMPLETE!")
print("=" * 60)
print(f"Scraped {len(parsed_items)} items")
print(f" - New TOML files: {len(new_items)}")
print(f" - Updated (wiki newer): {len(updated_items)}")
print(f" - Unchanged (skipped): {len(skipped_items)}")
rules_count = sum(1 for item in parsed_items if item.item_type == "rule")
rec_count = sum(1 for item in parsed_items if item.item_type == "recommendation")
print(f" - Rules: {rules_count}")
print(f" - Recommendations: {rec_count}")
print(f"Output directory: {args.output}")
if new_items:
print()
print("New rules added:")
for rule_id in new_items:
print(f" - {rule_id}")
if updated_items:
print()
print("Rules updated (wiki content newer):")
for rule_id in updated_items:
print(f" - {rule_id}")
print()
print("Next steps:")
print("1. Review generated TOML files")
print("2. Run: cargo build # Regenerates rules-all.toml")
print("3. Implement rules as needed")
if __name__ == "__main__":
main()