import json
import re
import sys
import hashlib
from typing import Any
_id_counter = 0
_id_map: dict[str, str] = {}
_username_counter = 0
_username_map: dict[str, str] = {}
_instance_counter = 0
_instance_map: dict[str, str] = {}
def get_anonymous_id(original_id: str) -> str:
global _id_counter
if original_id not in _id_map:
_id_counter += 1
_id_map[original_id] = str(100000 + _id_counter)
return _id_map[original_id]
def get_anonymous_username(original: str) -> str:
global _username_counter
if original not in _username_map:
_username_counter += 1
_username_map[original] = f"user{_username_counter}"
return _username_map[original]
def get_anonymous_instance(original: str) -> str:
global _instance_counter
if original not in _instance_map:
_instance_counter += 1
_instance_map[original] = f"instance{_instance_counter}.example"
return _instance_map[original]
def anonymize_instance_url(url: str) -> str:
pattern = r'https?://([a-zA-Z0-9][-a-zA-Z0-9]*\.)+[a-zA-Z]{2,}'
def replace_match(m: re.Match) -> str:
instance = m.group(0)
domain = re.sub(r'^https?://', '', instance)
anon_domain = get_anonymous_instance(domain)
return f"https://{anon_domain}"
return re.sub(pattern, replace_match, url)
def anonymize_headers(headers: dict[str, str]) -> dict[str, str]:
result = {}
for key, value in headers.items():
key_lower = key.lower()
if key_lower == "content-length":
continue
elif key_lower == "authorization":
if value.startswith("Bearer "):
result[key] = "Bearer anonymous_token_xxx"
else:
result[key] = "anonymous_auth"
elif key_lower == "cookie":
result[key] = "_mastodon_session=anonymous_session"
elif key_lower == "set-cookie":
result[key] = re.sub(r'=([^;]+)', '=anonymous', value)
elif key_lower in ("host", "origin", "referer"):
result[key] = anonymize_instance_url(value)
else:
result[key] = value
return result
def anonymize_json_value(value: Any, depth: int = 0) -> Any:
if depth > 50: return value
if isinstance(value, dict):
return anonymize_json_object(value, depth + 1)
elif isinstance(value, list):
return [anonymize_json_value(item, depth + 1) for item in value]
elif isinstance(value, str):
return anonymize_string_value(value)
else:
return value
def anonymize_string_value(value: str) -> str:
if len(value) < 5:
return value
if value.startswith("http://") or value.startswith("https://"):
return anonymize_instance_url(value)
if "@" in value and "." in value:
parts = value.split("@")
if len(parts) == 2:
return f"{get_anonymous_username(parts[0])}@{get_anonymous_instance(parts[1])}"
return value
def anonymize_json_object(obj: dict, depth: int = 0) -> dict:
result = {}
for key, value in obj.items():
if key == "id" and isinstance(value, str):
result[key] = get_anonymous_id(value)
elif key == "uri" and isinstance(value, str):
result[key] = anonymize_instance_url(value)
elif key == "url" and isinstance(value, str):
result[key] = anonymize_instance_url(value)
elif key == "username" and isinstance(value, str):
result[key] = get_anonymous_username(value)
elif key == "acct" and isinstance(value, str):
if "@" in value:
parts = value.split("@")
result[key] = f"{get_anonymous_username(parts[0])}@{get_anonymous_instance(parts[1])}"
else:
result[key] = get_anonymous_username(value)
elif key == "display_name" and isinstance(value, str):
result[key] = f"Anonymous User {get_anonymous_username(value)}"
elif key == "email" and isinstance(value, str):
result[key] = f"{get_anonymous_username(value.split('@')[0])}@example.com"
elif key == "note" and isinstance(value, str):
result[key] = "<p>This is an anonymized user bio.</p>"
elif key == "content" and isinstance(value, str):
result[key] = anonymize_html_content(value)
elif key == "avatar" or key == "avatar_static":
result[key] = "https://example.com/avatars/original/missing.png"
elif key == "header" or key == "header_static":
result[key] = "https://example.com/headers/original/missing.png"
elif key == "access_token" and isinstance(value, str):
result[key] = "anonymous_access_token_xxx"
elif key == "token" and isinstance(value, str):
result[key] = "anonymous_token_xxx"
elif key in ("created_at", "updated_at", "edited_at", "last_status_at"):
result[key] = value elif key == "account":
result[key] = anonymize_json_value(value, depth)
elif key == "reblog":
result[key] = anonymize_json_value(value, depth)
elif key == "media_attachments" and isinstance(value, list):
result[key] = [anonymize_media_attachment(m) for m in value]
else:
result[key] = anonymize_json_value(value, depth)
return result
def anonymize_media_attachment(attachment: dict) -> dict:
result = dict(attachment)
if "url" in result:
result["url"] = "https://example.com/media/original/anonymized.jpg"
if "preview_url" in result:
result["preview_url"] = "https://example.com/media/small/anonymized.jpg"
if "remote_url" in result:
result["remote_url"] = "https://example.com/media/original/anonymized.jpg"
if "id" in result:
result["id"] = get_anonymous_id(str(result["id"]))
return result
def anonymize_html_content(html: str) -> str:
html = re.sub(
r'@<span>([^<]+)</span>',
lambda m: f'@<span>{get_anonymous_username(m.group(1))}</span>',
html
)
html = re.sub(
r'href="([^"]+)"',
lambda m: f'href="{anonymize_instance_url(m.group(1))}"',
html
)
return html
def anonymize_exchange(exchange: dict) -> dict:
result = {
"timestamp": exchange.get("timestamp", "2025-01-01T00:00:00Z"),
}
if "request" in exchange:
req = exchange["request"]
result["request"] = {
"method": req.get("method", "GET"),
"path": req.get("path", "/"),
"headers": anonymize_headers(req.get("headers", {})),
}
if "body" in req and req["body"]:
try:
body_json = json.loads(req["body"])
result["request"]["body"] = json.dumps(anonymize_json_value(body_json))
except json.JSONDecodeError:
result["request"]["body"] = req["body"]
if "response" in exchange:
resp = exchange["response"]
result["response"] = {
"status": resp.get("status", 200),
"headers": anonymize_headers(resp.get("headers", {})),
}
if "body" in resp:
try:
body_json = json.loads(resp["body"])
result["response"]["body"] = json.dumps(anonymize_json_value(body_json))
except json.JSONDecodeError:
result["response"]["body"] = resp["body"]
return result
def main():
if len(sys.argv) < 2:
print("Usage: python anonymize_traffic.py <input.jsonl> [output.jsonl]", file=sys.stderr)
print("\nReads recorded traffic and outputs anonymized version.", file=sys.stderr)
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
output = open(output_file, 'w') if output_file else sys.stdout
try:
with open(input_file, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
exchange = json.loads(line)
anonymized = anonymize_exchange(exchange)
print(json.dumps(anonymized), file=output)
except json.JSONDecodeError as e:
print(f"Warning: Skipping invalid JSON line: {e}", file=sys.stderr)
finally:
if output_file:
output.close()
print(f"\nAnonymization complete!", file=sys.stderr)
print(f" - Anonymized {_id_counter} IDs", file=sys.stderr)
print(f" - Anonymized {_username_counter} usernames", file=sys.stderr)
print(f" - Anonymized {_instance_counter} instances", file=sys.stderr)
if __name__ == "__main__":
main()