use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use uuid::Uuid;
pub const I2I_VERSION: &str = "I2I/1.0";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum I2IVerb {
Announce,
Request,
Response,
Notify,
ConstraintCheck,
ConstraintResult,
TutorJump,
EpisodePush,
Disconnect,
}
impl fmt::Display for I2IVerb {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = serde_json::to_string(self).unwrap_or_default();
write!(f, "{}", s.trim_matches('"'))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ComponentKind {
Kernel,
Tui,
Os,
Agent,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InstanceId {
pub kind: ComponentKind,
pub name: String,
pub host: String,
}
impl InstanceId {
pub fn new(kind: ComponentKind, name: &str, host: &str) -> Self {
Self { kind, name: name.to_string(), host: host.to_string() }
}
pub fn local_kernel(name: &str) -> Self {
Self::new(ComponentKind::Kernel, name, "localhost")
}
pub fn local_tui(name: &str) -> Self {
Self::new(ComponentKind::Tui, name, "localhost")
}
}
impl fmt::Display for InstanceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = serde_json::to_string(&self.kind).unwrap_or_default();
write!(f, "{}/{}@{}", kind.trim_matches('"'), self.name, self.host)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct I2IMessage {
pub version: String,
pub verb: I2IVerb,
pub target: String,
pub from: InstanceId,
pub to: InstanceId,
pub nonce: Uuid,
pub timestamp: DateTime<Utc>,
pub payload: serde_json::Value,
pub in_reply_to: Option<Uuid>,
}
impl I2IMessage {
pub fn new(
verb: I2IVerb,
target: &str,
from: InstanceId,
to: InstanceId,
payload: serde_json::Value,
) -> Self {
Self {
version: I2I_VERSION.to_string(),
verb,
target: target.to_string(),
from,
to,
nonce: Uuid::new_v4(),
timestamp: Utc::now(),
payload,
in_reply_to: None,
}
}
pub fn reply(
original: &I2IMessage,
verb: I2IVerb,
payload: serde_json::Value,
) -> Self {
Self {
version: I2I_VERSION.to_string(),
verb,
target: original.from.to_string(),
from: original.to.clone(),
to: original.from.clone(),
nonce: Uuid::new_v4(),
timestamp: Utc::now(),
payload,
in_reply_to: Some(original.nonce),
}
}
pub fn to_wire(&self) -> String {
format!(
"{} {} {}\nFrom: {}\nTo: {}\nNonce: {}\nTimestamp: {}\n\n{}\n",
self.version,
self.verb,
self.target,
self.from,
self.to,
self.nonce,
self.timestamp.to_rfc3339(),
serde_json::to_string_pretty(&self.payload).unwrap_or_default(),
)
}
pub fn from_wire(s: &str) -> Result<Self, I2IParseError> {
let mut lines = s.lines();
let first = lines.next().ok_or(I2IParseError::MissingHeader)?;
let parts: Vec<&str> = first.splitn(3, ' ').collect();
if parts.len() < 3 {
return Err(I2IParseError::MalformedHeader);
}
let _version = parts[0].to_string();
let verb: I2IVerb = serde_json::from_str(&format!("\"{}\"", parts[1]))
.map_err(|_| I2IParseError::UnknownVerb(parts[1].to_string()))?;
let target = parts[2].to_string();
let mut from_str = String::new();
let mut to_str = String::new();
let mut nonce_str = String::new();
let mut ts_str = String::new();
for line in lines.by_ref() {
if line.is_empty() {
break;
}
if let Some(v) = line.strip_prefix("From: ") { from_str = v.to_string(); }
else if let Some(v) = line.strip_prefix("To: ") { to_str = v.to_string(); }
else if let Some(v) = line.strip_prefix("Nonce: ") { nonce_str = v.to_string(); }
else if let Some(v) = line.strip_prefix("Timestamp: ") { ts_str = v.to_string(); }
}
let body: String = lines.collect::<Vec<_>>().join("\n");
let payload: serde_json::Value = if body.trim().is_empty() {
serde_json::Value::Null
} else {
serde_json::from_str(&body).map_err(|_| I2IParseError::InvalidPayload)?
};
let nonce = Uuid::parse_str(&nonce_str).unwrap_or_else(|_| Uuid::new_v4());
let timestamp = ts_str.parse::<DateTime<Utc>>().unwrap_or_else(|_| Utc::now());
let from = parse_instance_id(&from_str);
let to = parse_instance_id(&to_str);
Ok(Self {
version: I2I_VERSION.to_string(),
verb,
target,
from,
to,
nonce,
timestamp,
payload,
in_reply_to: None,
})
}
}
fn parse_instance_id(s: &str) -> InstanceId {
let (kind_name, host) = s.split_once('@').unwrap_or((s, "localhost"));
let (kind_str, name) = kind_name.split_once('/').unwrap_or(("unknown", kind_name));
let kind = serde_json::from_str(&format!("\"{}\"", kind_str))
.unwrap_or(ComponentKind::Unknown);
InstanceId { kind, name: name.to_string(), host: host.to_string() }
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum I2IParseError {
#[error("Missing I2I header line")]
MissingHeader,
#[error("Malformed I2I header (expected: I2I/1.0 VERB TARGET)")]
MalformedHeader,
#[error("Unknown I2I verb: {0}")]
UnknownVerb(String),
#[error("Invalid JSON payload")]
InvalidPayload,
}
pub type MessageHandler = Arc<dyn Fn(I2IMessage) -> Option<I2IMessage> + Send + Sync>;
pub struct I2IServer {
bind_addr: String,
handler: MessageHandler,
}
impl I2IServer {
pub fn new(handler: MessageHandler) -> Self {
Self {
bind_addr: "0.0.0.0:7272".to_string(),
handler,
}
}
pub fn with_addr(addr: impl Into<String>, handler: MessageHandler) -> Self {
Self {
bind_addr: addr.into(),
handler,
}
}
pub async fn serve(self) -> anyhow::Result<()> {
let listener = TcpListener::bind(&self.bind_addr).await?;
tracing::info!("I2I server listening on {}", self.bind_addr);
let handler = self.handler.clone();
loop {
match listener.accept().await {
Ok((stream, peer)) => {
tracing::debug!("I2I: accepted connection from {}", peer);
let h = handler.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, h).await {
tracing::warn!("I2I connection error from {}: {}", peer, e);
}
});
}
Err(e) => {
tracing::error!("I2I accept error: {}", e);
}
}
}
}
}
async fn handle_connection(
stream: TcpStream,
handler: MessageHandler,
) -> anyhow::Result<()> {
let (reader_half, mut writer_half) = stream.into_split();
let mut lines = BufReader::new(reader_half).lines();
let mut buf = String::new();
let mut blank_seen = false;
let mut body_lines: Vec<String> = Vec::new();
while let Some(line) = lines.next_line().await? {
if !blank_seen {
if line.is_empty() {
blank_seen = true;
buf.push('\n');
} else {
buf.push_str(&line);
buf.push('\n');
}
} else {
if line.is_empty() {
let full = format!("{}\n{}", buf, body_lines.join("\n"));
blank_seen = false;
match I2IMessage::from_wire(&full) {
Ok(msg) => {
tracing::debug!("I2I rx: {:?} → {}", msg.verb, msg.target);
if let Some(reply) = handler(msg) {
let wire = reply.to_wire();
writer_half.write_all(wire.as_bytes()).await?;
writer_half.write_all(b"\n").await?;
}
}
Err(e) => {
tracing::warn!("I2I parse error: {}", e);
}
}
buf.clear();
body_lines.clear();
} else {
body_lines.push(line);
}
}
}
Ok(())
}
pub fn default_kernel_handler(kernel_id: InstanceId) -> MessageHandler {
Arc::new(move |msg: I2IMessage| -> Option<I2IMessage> {
match &msg.verb {
I2IVerb::TutorJump => {
let anchor = msg
.payload
.get("anchor")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
tracing::info!("I2I TUTOR_JUMP for anchor '{}'", anchor);
let reply = I2IMessage {
version: I2I_VERSION.to_string(),
verb: I2IVerb::Response,
target: msg.from.to_string(),
from: kernel_id.clone(),
to: msg.from.clone(),
nonce: Uuid::new_v4(),
timestamp: Utc::now(),
payload: serde_json::json!({
"anchor": anchor,
"status": "queued",
"note": "TUTOR jump enqueued — tile will be injected into next prompt context"
}),
in_reply_to: Some(msg.nonce),
};
Some(reply)
}
I2IVerb::ConstraintCheck => {
let command = msg
.payload
.get("command")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
tracing::info!("I2I CONSTRAINT_CHECK for command '{}'", command);
let result = if command.starts_with('@') || command.starts_with("delete") {
"Deny"
} else {
"Allow"
};
let reply = I2IMessage {
version: I2I_VERSION.to_string(),
verb: I2IVerb::ConstraintResult,
target: msg.from.to_string(),
from: kernel_id.clone(),
to: msg.from.clone(),
nonce: Uuid::new_v4(),
timestamp: Utc::now(),
payload: serde_json::json!({
"command": command,
"result": result
}),
in_reply_to: Some(msg.nonce),
};
Some(reply)
}
I2IVerb::Announce => {
tracing::info!("I2I: instance announced: {}", msg.from);
None
}
I2IVerb::Disconnect => {
tracing::info!("I2I: instance disconnected: {}", msg.from);
None
}
_ => None,
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_roundtrip_wire() {
let msg = I2IMessage::new(
I2IVerb::Notify,
"room/convergence-station",
InstanceId::local_kernel("kernel-1"),
InstanceId::local_tui("tui-1"),
json!({ "event": "constraint_updated", "room": "convergence-station" }),
);
let wire = msg.to_wire();
assert!(wire.starts_with("I2I/1.0 NOTIFY"));
let parsed = I2IMessage::from_wire(&wire).expect("should parse");
assert_eq!(parsed.verb, I2IVerb::Notify);
assert_eq!(parsed.target, "room/convergence-station");
}
#[test]
fn test_reply_links_nonce() {
let req = I2IMessage::new(
I2IVerb::Request,
"constraint-check",
InstanceId::local_tui("tui-1"),
InstanceId::local_kernel("kernel-1"),
json!({ "command": "look" }),
);
let resp = I2IMessage::reply(&req, I2IVerb::Response, json!({ "result": "Allow" }));
assert_eq!(resp.in_reply_to, Some(req.nonce));
}
#[test]
fn test_instance_id_display() {
let id = InstanceId::local_kernel("alpha");
assert_eq!(id.to_string(), "kernel/alpha@localhost");
}
}