import hashlib
import json
import time
import os
from datetime import datetime, timezone
from typing import Optional, Dict, List, Any
from dataclasses import dataclass, asdict
from pathlib import Path
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
try:
import hope_genome as hg
HOPE_GENOME_AVAILABLE = True
except ImportError:
HOPE_GENOME_AVAILABLE = False
@dataclass
class AuditEntry:
entry_id: str
timestamp: str
unix_time: int
event_type: str action: str
rule_violated: Optional[str]
denial_reason: Optional[str]
violation_count: int
content_hash: str signature: Optional[str] previous_hash: str
model: Optional[str]
session_id: str
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2, ensure_ascii=False)
def compute_hash(self) -> str:
content = f"{self.timestamp}|{self.event_type}|{self.action}|{self.rule_violated}|{self.previous_hash}"
return hashlib.sha256(content.encode()).hexdigest()
class BlockchainAudit:
def __init__(
self,
audit_dir: str = "audit_chain",
pinata_jwt: Optional[str] = None,
session_id: Optional[str] = None
):
self.audit_dir = Path(audit_dir)
self.audit_dir.mkdir(parents=True, exist_ok=True)
self.pinata_jwt = pinata_jwt or os.environ.get("PINATA_JWT")
self.session_id = session_id or self._generate_session_id()
self.chain: List[AuditEntry] = []
self.chain_file = self.audit_dir / "chain.json"
self._load_chain()
if not self.chain:
self._create_genesis()
def _generate_session_id(self) -> str:
timestamp = int(time.time() * 1000)
random_part = hashlib.sha256(os.urandom(32)).hexdigest()[:8]
return f"session_{timestamp}_{random_part}"
def _load_chain(self):
if self.chain_file.exists():
try:
with open(self.chain_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.chain = [AuditEntry(**entry) for entry in data]
except Exception as e:
print(f"[AUDIT] Warning: Could not load chain: {e}")
self.chain = []
def _save_chain(self):
with open(self.chain_file, 'w', encoding='utf-8') as f:
json.dump([asdict(entry) for entry in self.chain], f, indent=2, ensure_ascii=False)
def _create_genesis(self):
genesis = AuditEntry(
entry_id="GENESIS",
timestamp=datetime.now(timezone.utc).isoformat(),
unix_time=int(time.time()),
event_type="GENESIS",
action="Chain initialized",
rule_violated=None,
denial_reason=None,
violation_count=0,
content_hash="0" * 64,
signature=None,
previous_hash="0" * 64,
model=None,
session_id=self.session_id
)
genesis.content_hash = genesis.compute_hash()
self.chain.append(genesis)
self._save_chain()
print(f"[AUDIT] Genesis block created: {genesis.content_hash[:16]}...")
def _get_previous_hash(self) -> str:
if self.chain:
return self.chain[-1].content_hash
return "0" * 64
def log_denial(
self,
action: str,
rule_violated: str,
denial_reason: str,
violation_count: int,
signature: Optional[str] = None,
model: Optional[str] = None
) -> AuditEntry:
entry_id = f"DENIAL_{int(time.time() * 1000)}"
entry = AuditEntry(
entry_id=entry_id,
timestamp=datetime.now(timezone.utc).isoformat(),
unix_time=int(time.time()),
event_type="DENIAL",
action=action,
rule_violated=rule_violated,
denial_reason=denial_reason,
violation_count=violation_count,
content_hash="",
signature=signature,
previous_hash=self._get_previous_hash(),
model=model,
session_id=self.session_id
)
entry.content_hash = entry.compute_hash()
self.chain.append(entry)
self._save_chain()
entry_file = self.audit_dir / f"{entry_id}.json"
with open(entry_file, 'w', encoding='utf-8') as f:
f.write(entry.to_json())
print(f"[AUDIT] DENIAL logged: {entry.content_hash[:16]}... (violation #{violation_count})")
if self.pinata_jwt:
self._pin_to_ipfs(entry)
return entry
def log_violation(
self,
action: str,
rule_violated: str,
violation_count: int,
model: Optional[str] = None
) -> AuditEntry:
entry_id = f"VIOLATION_{int(time.time() * 1000)}"
entry = AuditEntry(
entry_id=entry_id,
timestamp=datetime.now(timezone.utc).isoformat(),
unix_time=int(time.time()),
event_type="VIOLATION",
action=action,
rule_violated=rule_violated,
denial_reason=f"Violation #{violation_count}",
violation_count=violation_count,
content_hash="",
signature=None,
previous_hash=self._get_previous_hash(),
model=model,
session_id=self.session_id
)
entry.content_hash = entry.compute_hash()
self.chain.append(entry)
self._save_chain()
print(f"[AUDIT] VIOLATION logged: {entry.content_hash[:16]}... (#{violation_count}/10)")
return entry
def log_hard_reset(
self,
reason: str,
final_violation_count: int,
model: Optional[str] = None
) -> AuditEntry:
entry_id = f"HARD_RESET_{int(time.time() * 1000)}"
entry = AuditEntry(
entry_id=entry_id,
timestamp=datetime.now(timezone.utc).isoformat(),
unix_time=int(time.time()),
event_type="HARD_RESET",
action="FORCED CONTEXT CLEAR",
rule_violated="MULTIPLE",
denial_reason=reason,
violation_count=final_violation_count,
content_hash="",
signature=None,
previous_hash=self._get_previous_hash(),
model=model,
session_id=self.session_id
)
entry.content_hash = entry.compute_hash()
self.chain.append(entry)
self._save_chain()
print(f"[AUDIT] [!] HARD RESET logged: {entry.content_hash[:16]}...")
print(f"[AUDIT] [!] AI reached {final_violation_count} violations - CONTEXT CLEARED!")
return entry
def log_action_approved(
self,
action: str,
model: Optional[str] = None
) -> AuditEntry:
entry_id = f"APPROVED_{int(time.time() * 1000)}"
entry = AuditEntry(
entry_id=entry_id,
timestamp=datetime.now(timezone.utc).isoformat(),
unix_time=int(time.time()),
event_type="ACTION_APPROVED",
action=action,
rule_violated=None,
denial_reason=None,
violation_count=0,
content_hash="",
signature=None,
previous_hash=self._get_previous_hash(),
model=model,
session_id=self.session_id
)
entry.content_hash = entry.compute_hash()
self.chain.append(entry)
self._save_chain()
return entry
def _pin_to_ipfs(self, entry: AuditEntry) -> Optional[str]:
if not REQUESTS_AVAILABLE or not self.pinata_jwt:
return None
try:
url = "https://api.pinata.cloud/pinning/pinJSONToIPFS"
headers = {
"Authorization": f"Bearer {self.pinata_jwt}",
"Content-Type": "application/json"
}
payload = {
"pinataContent": asdict(entry),
"pinataMetadata": {
"name": entry.entry_id,
"keyvalues": {
"event_type": entry.event_type,
"session_id": entry.session_id
}
}
}
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
ipfs_hash = response.json().get("IpfsHash")
print(f"[AUDIT] Pinned to IPFS: {ipfs_hash}")
return ipfs_hash
else:
print(f"[AUDIT] IPFS pin failed: {response.status_code}")
return None
except Exception as e:
print(f"[AUDIT] IPFS error: {e}")
return None
def verify_chain(self) -> bool:
print("[AUDIT] Verifying chain integrity...")
if not self.chain:
print("[AUDIT] Chain is empty!")
return False
for i, entry in enumerate(self.chain):
computed_hash = entry.compute_hash()
if computed_hash != entry.content_hash:
print(f"[AUDIT] [X] TAMPERING DETECTED at entry {i}: {entry.entry_id}")
print(f"[AUDIT] Expected: {entry.content_hash[:32]}...")
print(f"[AUDIT] Computed: {computed_hash[:32]}...")
return False
if i > 0:
expected_prev = self.chain[i-1].content_hash
if entry.previous_hash != expected_prev:
print(f"[AUDIT] [X] CHAIN BREAK at entry {i}: {entry.entry_id}")
return False
print(f"[AUDIT] [OK] Chain verified: {len(self.chain)} entries, all valid!")
return True
def get_stats(self) -> Dict[str, Any]:
stats = {
"total_entries": len(self.chain),
"denials": 0,
"violations": 0,
"hard_resets": 0,
"approved": 0,
"first_entry": None,
"last_entry": None,
"session_id": self.session_id
}
for entry in self.chain:
if entry.event_type == "DENIAL":
stats["denials"] += 1
elif entry.event_type == "VIOLATION":
stats["violations"] += 1
elif entry.event_type == "HARD_RESET":
stats["hard_resets"] += 1
elif entry.event_type == "ACTION_APPROVED":
stats["approved"] += 1
if self.chain:
stats["first_entry"] = self.chain[0].timestamp
stats["last_entry"] = self.chain[-1].timestamp
return stats
def export_for_git(self) -> str:
stats = self.get_stats()
summary = f"""# Hope Genome Audit Chain Export
## Session: {self.session_id}
## Statistics:
- Total Entries: {stats['total_entries']}
- Denials: {stats['denials']}
- Violations: {stats['violations']}
- Hard Resets: {stats['hard_resets']}
- Approved Actions: {stats['approved']}
## Time Range:
- First: {stats['first_entry']}
- Last: {stats['last_entry']}
## Chain Hash: {self.chain[-1].content_hash if self.chain else 'N/A'}
## Verification:
Chain integrity: {'[OK] VALID' if self.verify_chain() else '[X] INVALID'}
---
Generated by Hope Genome Blockchain Audit
VAS SZIGORA - {datetime.now().strftime('%Y.%m.%d')}
"""
export_file = self.audit_dir / "AUDIT_EXPORT.md"
with open(export_file, 'w', encoding='utf-8') as f:
f.write(summary)
return summary
class WatchdogAuditIntegration:
def __init__(
self,
watchdog, audit: Optional[BlockchainAudit] = None,
model_name: Optional[str] = None
):
self.watchdog = watchdog
self.audit = audit or BlockchainAudit()
self.model_name = model_name
def verify_and_log(self, action) -> dict:
result = self.watchdog.verify_action(action)
if result.hard_reset_required:
entry = self.audit.log_hard_reset(
reason="10 consecutive violations reached",
final_violation_count=10,
model=self.model_name
)
elif not result.approved:
entry = self.audit.log_denial(
action=str(action),
rule_violated=result.denial_proof.violated_rule if result.denial_proof else "Unknown",
denial_reason=result.denial_proof.denial_reason if result.denial_proof else "Unknown",
violation_count=result.denial_proof.violation_count if result.denial_proof else 0,
signature=result.denial_proof.signature_hex() if result.denial_proof else None,
model=self.model_name
)
else:
entry = self.audit.log_action_approved(
action=str(action),
model=self.model_name
)
return {
"watchdog_result": result,
"audit_entry": entry,
"chain_hash": entry.content_hash
}
def demo():
print("""
+==============================================================+
| HOPE GENOME - BLOCKCHAIN AUDIT DEMO |
| |
| Ingyenes, publikusan verifikalhato, immutable audit! |
| |
| Mate Robert + Claude |
| VAS SZIGORA - 2026.01.01. |
+==============================================================+
""")
audit = BlockchainAudit(audit_dir="demo_audit_chain")
print("\n[DEMO] Simulating Watchdog events...\n")
audit.log_denial(
action="delete_file('/etc/passwd')",
rule_violated="No system file access",
denial_reason="Attempted to delete critical system file",
violation_count=1,
model="TinyLlama"
)
time.sleep(0.1)
audit.log_denial(
action="execute_command('rm -rf /')",
rule_violated="No destructive commands",
denial_reason="Attempted destructive system command",
violation_count=2,
model="TinyLlama"
)
time.sleep(0.1)
audit.log_violation(
action="network_request('http://malware.com')",
rule_violated="No external network access",
violation_count=3,
model="TinyLlama"
)
time.sleep(0.1)
audit.log_action_approved(
action="calculate(2 + 2)",
model="TinyLlama"
)
print("\n" + "="*60)
is_valid = audit.verify_chain()
print("\n" + "="*60)
stats = audit.get_stats()
print(f"""
[AUDIT STATISTICS]
Total Entries: {stats['total_entries']}
Denials: {stats['denials']}
Violations: {stats['violations']}
Hard Resets: {stats['hard_resets']}
Approved: {stats['approved']}
Session: {stats['session_id']}
""")
print("\n" + "="*60)
print("[DEMO] Exporting for Git commit...")
export = audit.export_for_git()
print(export)
print("\n[DEMO] Demo complete! Check 'demo_audit_chain/' folder.")
print("[DEMO] To commit: git add demo_audit_chain/ && git commit -m 'Audit log update'")
if __name__ == "__main__":
demo()