import os
import tempfile
import shutil
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse, Response
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from python.analyzer import ProteusAnalyzer
from python.ml_detector import ProteusMLDetector
from python.yara_engine import ProteusYaraEngine
from python.config import ConfigManager
import proteus
ml_detector: Optional[ProteusMLDetector] = None
yara_engine: Optional[ProteusYaraEngine] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global ml_detector, yara_engine
print("[PROTEUS] Starting server...")
try:
ml_detector = ProteusMLDetector()
ml_detector.load_model()
print("[+] ML models loaded successfully")
except Exception as e:
print(f"[!] ML models not available: {e}")
ml_detector = None
try:
yara_engine = ProteusYaraEngine()
if yara_engine.load_rules():
print(
f"[+] YARA engine loaded: {yara_engine.get_stats()['rule_files']} rules"
)
else:
print("[!] YARA rules not available")
yara_engine = None
except Exception as e:
print(f"[!] YARA engine not available: {e}")
yara_engine = None
print("[+] Server ready at http://localhost:8000")
yield
print("[PROTEUS] Shutting down...")
app = FastAPI(title="PROTEUS API", version="0.2.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory="web"), name="static")
@app.get("/")
async def root():
return FileResponse("web/index.html")
@app.get("/api/stats")
async def get_stats():
stats = {
"ml_loaded": ml_detector is not None,
"yara_loaded": yara_engine is not None,
"yara_info": yara_engine.get_stats() if yara_engine else {},
"version": "0.2.0",
}
return stats
@app.post("/api/scan")
async def scan_file(
file: UploadFile = File(...),
ml: str = Form("false"),
yara: str = Form("false"),
strings: str = Form("false"),
sandbox: str = Form("false"),
):
temp_dir = None
temp_file = None
try:
temp_dir = tempfile.mkdtemp(prefix="proteus_scan_")
temp_file = Path(temp_dir) / file.filename
file_content = await file.read()
try:
with open(temp_file, "wb") as f:
f.write(file_content)
except PermissionError:
raise HTTPException(
status_code=403,
detail=f"Antivirus software blocked file access. Please add '{temp_dir}' to Windows Defender exclusions.",
)
except OSError as e:
if e.winerror == 225: raise HTTPException(
status_code=403,
detail="Windows Defender blocked this file. Please disable Real-Time Protection or add Proteus directory to exclusions.",
)
raise
config = ConfigManager.create_proteus_config()
cuckoo_enabled = (
os.getenv("CUCKOO_ENABLED", "false").lower() == "true"
and sandbox.lower() == "true"
)
cuckoo_url = os.getenv("CUCKOO_URL", config.cuckoo_url)
cuckoo_token = os.getenv("CUCKOO_API_TOKEN", config.cuckoo_api_token)
analyzer = ProteusAnalyzer(
cuckoo_enabled=cuckoo_enabled,
cuckoo_url=cuckoo_url,
cuckoo_api_token=cuckoo_token,
)
use_sandbox = sandbox.lower() == "true"
try:
result = analyzer.analyze_single(str(temp_file), use_sandbox=use_sandbox)
except Exception as e:
print(f"[!] Analysis error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
response = {
"filename": file.filename,
"heuristic": {
"type": result["type"],
"entropy": result["entropy"],
"score": result["score"],
"verdict": result["verdict"],
"indicators": result["indicators"],
"packer": result["packer"],
},
"sandbox": result.get("sandbox"),
}
if ml.lower() == "true" and ml_detector:
try:
ml_result = ml_detector.predict(str(temp_file))
response["ml"] = ml_result
except Exception as e:
response["ml"] = {"error": str(e)}
else:
response["ml"] = None
if yara.lower() == "true" and yara_engine:
try:
yara_result = yara_engine.scan_file(str(temp_file))
response["yara"] = yara_result
except Exception as e:
response["yara"] = {"error": str(e)}
else:
response["yara"] = None
if strings.lower() == "true":
try:
string_result = proteus.extract_strings_from_file(str(temp_file))
response["strings"] = {
"total": string_result.total_strings,
"encoded": string_result.encoded_strings,
"urls": string_result.urls[:10],
"ips": string_result.ips[:10],
"suspicious": string_result.suspicious_strings[:20],
}
except Exception as e:
response["strings"] = {"error": str(e)}
else:
response["strings"] = None
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if temp_dir and Path(temp_dir).exists():
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"[!] Failed to clean up temp directory: {e}")
@app.post("/api/export")
async def export_report(format: str = Form(...), data: str = Form(...)):
try:
scan_data = json.loads(data)
if format == "json":
filename = f"proteus_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
return JSONResponse(
content=scan_data,
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
elif format == "html":
html = generate_html_report(scan_data)
filename = f"proteus_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
return Response(
content=html.encode("utf-8"),
media_type="text/html",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
else:
raise HTTPException(status_code=400, detail="Unsupported format")
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON data")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def generate_html_report(data: dict) -> str:
filename = data.get("filename", "Unknown")
heuristic = data.get("heuristic", {})
verdict = heuristic.get("verdict", "UNKNOWN")
score = heuristic.get("score", 0)
verdict_color = "red" if verdict == "MALICIOUS" else "green"
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PROTEUS Analysis Report - {filename}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; background: #f5f5f5; }}
.container {{ background: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
h1 {{ color: #333; border-bottom: 2px solid #667eea; padding-bottom: 10px; }}
h2 {{ color: #555; margin-top: 30px; }}
.verdict {{ font-size: 24px; font-weight: bold; color: {verdict_color}; }}
.score {{ font-size: 18px; color: #666; }}
.section {{ margin: 20px 0; padding: 15px; background: #f9f9f9; border-radius: 5px; }}
.indicator {{ padding: 8px; margin: 5px 0; background: #fff3cd; border-left: 4px solid #ffc107; }}
table {{ width: 100%; border-collapse: collapse; margin: 10px 0; }}
th, td {{ padding: 10px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background: #667eea; color: white; }}
.meta {{ color: #666; font-size: 12px; margin-top: 30px; }}
</style>
</head>
<body>
<div class="container">
<h1>PROTEUS Malware Analysis Report</h1>
<div class="section">
<h2>File Information</h2>
<table>
<tr><th>Filename</th><td>{filename}</td></tr>
<tr><th>Type</th><td>{heuristic.get("type", "Unknown")}</td></tr>
<tr><th>Entropy</th><td>{heuristic.get("entropy", 0):.2f}</td></tr>
<tr><th>Verdict</th><td><span class="verdict">{verdict}</span></td></tr>
<tr><th>Threat Score</th><td><span class="score">{score:.2f}/100</span></td></tr>
</table>
</div>
"""
indicators = heuristic.get("indicators", [])
if indicators:
html += """
<div class="section">
<h2>Suspicious Indicators</h2>
"""
for ind in indicators:
html += f' <div class="indicator">{ind}</div>\n'
html += " </div>\n"
packer = heuristic.get("packer", {})
if packer and packer.get("detected"):
html += f"""
<div class="section">
<h2>Packer Detection</h2>
<table>
<tr><th>Packer Name</th><td>{packer.get("name", "Unknown")}</td></tr>
<tr><th>Confidence</th><td>{packer.get("confidence", 0) * 100:.1f}%</td></tr>
</table>
</div>
"""
ml = data.get("ml")
if ml and not ml.get("error"):
html += f"""
<div class="section">
<h2>Machine Learning Analysis</h2>
<table>
<tr><th>Prediction</th><td>{ml.get("prediction", "Unknown").upper()}</td></tr>
<tr><th>Confidence</th><td>{ml.get("confidence", 0) * 100:.1f}%</td></tr>
<tr><th>Anomaly Detected</th><td>{"Yes" if ml.get("is_anomaly") else "No"}</td></tr>
</table>
</div>
"""
yara = data.get("yara")
if yara and yara.get("match_count", 0) > 0:
html += f"""
<div class="section">
<h2>YARA Matches ({yara.get("match_count", 0)})</h2>
<table>
<tr><th>Rule</th><th>Description</th></tr>
"""
for match in yara.get("matches", []):
desc = match.get("meta", {}).get("description", "N/A")
html += f""" <tr><td>{match.get("rule", "Unknown")}</td><td>{desc}</td></tr>\n"""
html += """ </table>
</div>
"""
html += f"""
<div class="meta">
<p>Generated by PROTEUS v0.2.0 - Advanced Zero-Day Static Analysis Engine</p>
<p>Report generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
</div>
</div>
</body>
</html>"""
return html
def main():
print(
"""
===============================================
PROTEUS v0.2.0
Web Interface Starting...
===============================================
"""
)
print("\n[!] ANTIVIRUS WARNING:")
print(
"If you encounter 'virus detected' errors, add this directory to Windows Defender exclusions:"
)
print(f" {Path.cwd()}")
print("\nSteps:")
print(" 1. Open Windows Security > Virus & threat protection")
print(" 2. Click 'Manage settings'")
print(" 3. Scroll to 'Exclusions' and click 'Add or remove exclusions'")
print(f" 4. Add folder: {Path.cwd()}")
print("\nStarting server on http://localhost:8000\n")
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
if __name__ == "__main__":
main()