from __future__ import annotations
import json
import time
from dataclasses import asdict, dataclass, field
from typing import Any, Optional
@dataclass
class I2IMessage:
sender: str
recipient: str
msg_type: str
content: str
priority: str = "P2"
timestamp: float = field(default_factory=time.time)
metadata: dict[str, Any] = field(default_factory=dict)
class I2IProtocol:
_PRIORITY_RANK = {"P0": 0, "P1": 1, "P2": 2}
@staticmethod
def format_message(
sender: str,
recipient: str,
msg_type: str,
content: str,
priority: str = "P2",
) -> I2IMessage:
return I2IMessage(
sender=sender,
recipient=recipient,
msg_type=msg_type,
content=content,
priority=priority,
)
@classmethod
def parse_message(cls, raw: str) -> I2IMessage:
if not raw.startswith("[I2I:"):
raise ValueError("Message must start with '[I2I:'")
type_end = raw.find("]", 5)
if type_end == -1:
raise ValueError("Missing closing ']' for message type")
msg_type = raw[5:type_end].strip()
remainder = raw[type_end + 1 :].strip()
arrow = remainder.find("->")
if arrow == -1:
raise ValueError("Missing '->' separator")
sender = remainder[:arrow].strip()
after_arrow = remainder[arrow + 2 :].strip()
em_dash = after_arrow.find("—")
if em_dash == -1:
em_dash = after_arrow.find("- ")
if em_dash == -1:
raise ValueError("Missing '—' content separator")
recipient = after_arrow[:em_dash].strip()
content = after_arrow[em_dash + 1 :].strip()
else:
recipient = after_arrow[:em_dash].strip()
content = after_arrow[em_dash + 1 :].strip()
return I2IMessage(
sender=sender,
recipient=recipient,
msg_type=msg_type,
content=content,
)
@staticmethod
def serialize(message: I2IMessage) -> str:
return json.dumps(asdict(message), separators=(",", ":"))
@classmethod
def deserialize(cls, raw: str) -> I2IMessage:
try:
payload: dict[str, Any] = json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON: {exc}") from exc
required = {"sender", "recipient", "msg_type", "content"}
missing = required - set(payload.keys())
if missing:
raise ValueError(f"Missing required fields: {missing}")
return I2IMessage(
sender=payload["sender"],
recipient=payload["recipient"],
msg_type=payload["msg_type"],
content=payload["content"],
priority=payload.get("priority", "P2"),
timestamp=payload.get("timestamp", time.time()),
metadata=payload.get("metadata", {}),
)
@classmethod
def validate(cls, message: I2IMessage) -> bool:
return all(
[
isinstance(message.sender, str) and message.sender.strip() != "",
isinstance(message.recipient, str) and message.recipient.strip() != "",
isinstance(message.msg_type, str) and message.msg_type.strip() != "",
isinstance(message.content, str) and message.content.strip() != "",
]
)
@classmethod
def priority_gate(cls, message: I2IMessage, min_priority: str = "P2") -> bool:
msg_rank = cls._PRIORITY_RANK.get(message.priority, 99)
threshold_rank = cls._PRIORITY_RANK.get(min_priority, 99)
return msg_rank <= threshold_rank
class TrustRouter:
def __init__(self) -> None:
self._trust: dict[str, float] = {}
def add_trust(self, agent_id: str, trust_level: float) -> None:
if not 0.0 <= trust_level <= 1.0:
raise ValueError("trust_level must be between 0.0 and 1.0")
self._trust[agent_id] = trust_level
def get_trust(self, agent_id: str) -> float:
return self._trust.get(agent_id, 0.0)
def route(self, message: I2IMessage, available_agents: list[str]) -> Optional[str]:
if not available_agents:
return None
best_agent: Optional[str] = None
best_score = -1.0
for agent in available_agents:
score = self.get_trust(agent)
if score > best_score:
best_score = score
best_agent = agent
if best_score <= 0.0:
return None
return best_agent
def is_trusted(self, agent_id: str, min_trust: float = 0.3) -> bool:
return self.get_trust(agent_id) >= min_trust
def stats(self) -> dict[str, Any]:
if not self._trust:
return {
"count": 0,
"average_trust": 0.0,
"max_trust": 0.0,
"min_trust": 0.0,
}
values = list(self._trust.values())
return {
"count": len(values),
"average_trust": sum(values) / len(values),
"max_trust": max(values),
"min_trust": min(values),
}