from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
from briefcase.semantic_conventions import cowork as conv
@dataclass
class RedactionResult:
original_value: str
redacted_value: str
redaction_count: int = 0
pii_types_found: List[str] = field(default_factory=list)
_DEFAULT_PATTERNS: List[tuple] = [
("api_key", re.compile(
r"\b(?:sk-|bai_|api_|key_|AIza|AKIA|ya29\.|xox[bpoa]-)[A-Za-z0-9_-]{15,}\b"
)),
("credit_card", re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b")),
("ssn", re.compile(r"\b\d{3}-\d{2}-\d{4}\b|\b\d{3}\s\d{2}\s\d{4}\b")),
("email", re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b")),
("phone", re.compile(
r"(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}"
)),
("ip_address", re.compile(
r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
)),
]
class CoworkRedactionFilter:
def __init__(
self,
*,
enabled: bool = True,
sensitive_attrs: Optional[FrozenSet[str]] = None,
extra_sensitive_attrs: Optional[Set[str]] = None,
custom_patterns: Optional[Dict[str, str]] = None,
redact_prompt_content: bool = True,
) -> None:
self._enabled = enabled
self._redact_prompt_content = redact_prompt_content
self._sensitive_attrs: FrozenSet[str] = sensitive_attrs or conv.SENSITIVE_ATTRIBUTES
if extra_sensitive_attrs:
self._sensitive_attrs = self._sensitive_attrs | frozenset(extra_sensitive_attrs)
self._patterns: List[tuple] = list(_DEFAULT_PATTERNS)
if custom_patterns:
for name, pat in custom_patterns.items():
self._patterns.append((name, re.compile(pat)))
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, value: bool) -> None:
self._enabled = value
def redact_event(self, attrs: Dict[str, Any]) -> Dict[str, Any]:
if not self._enabled:
return dict(attrs)
out: Dict[str, Any] = {}
for key, value in attrs.items():
if key in self._sensitive_attrs:
out[key] = self._redact_value(key, value)
else:
out[key] = value
return out
def redact_string(self, text: str) -> RedactionResult:
if not self._enabled:
return RedactionResult(original_value=text, redacted_value=text)
redacted = text
count = 0
found: List[str] = []
for pii_type, pattern in self._patterns:
matches = pattern.findall(redacted)
if matches:
marker = f"[REDACTED_{pii_type.upper()}]"
redacted = pattern.sub(marker, redacted)
count += len(matches)
found.append(pii_type)
return RedactionResult(
original_value=text,
redacted_value=redacted,
redaction_count=count,
pii_types_found=found,
)
def _redact_value(self, attr_name: str, value: Any) -> Any:
if not isinstance(value, str):
value = str(value)
if attr_name == conv.PROMPT_TEXT and self._redact_prompt_content:
return "[REDACTED_PROMPT]"
if attr_name == conv.TOOL_PARAMETERS:
return self._redact_json_string(value)
result = self.redact_string(value)
return result.redacted_value
def _redact_json_string(self, json_str: str) -> str:
try:
obj = json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return self.redact_string(json_str).redacted_value
redacted_obj = self._redact_json_value(obj)
return json.dumps(redacted_obj)
def _redact_json_value(self, value: Any) -> Any:
if isinstance(value, str):
return self.redact_string(value).redacted_value
elif isinstance(value, dict):
return {k: self._redact_json_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self._redact_json_value(v) for v in value]
return value