use std::path::Path;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use crate::{
config::AppConfig,
daemon::{RuntimeShutdownResponse, RuntimeStatusResponse},
http::{
AttachWorkspaceRequest, ClearAgentModelRequest, ControlPromptRequest, CreateAgentRequest,
DebugPromptRequest, DetachWorkspaceRequest, ExitWorkspaceRequest, SetAgentModelRequest,
},
model_catalog::BuiltInModelMetadata,
system::ExecutionSnapshot,
types::{
ActiveWorkspaceEntry, AgentListEntry, AgentSummary, BriefRecord,
ExternalTriggerStateSnapshot, OperatorNotificationRecord, ResolvedModelAvailability,
TaskRecord, TimerRecord, TranscriptEntry, TrustLevel, TurnTerminalRecord,
WaitingIntentRecord, WorkItemRecord, WorkspaceOccupancyRecord, WorktreeSession,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TuiNetworkPolicy {
pub connect_timeout: Duration,
pub read_idle_timeout: Duration,
pub stream_idle_timeout: Duration,
}
pub const TUI_LOCAL_NETWORK_POLICY: TuiNetworkPolicy = TuiNetworkPolicy {
connect_timeout: Duration::from_secs(2),
read_idle_timeout: Duration::from_secs(10),
stream_idle_timeout: Duration::from_secs(45),
};
pub const TUI_REMOTE_NETWORK_POLICY: TuiNetworkPolicy = TuiNetworkPolicy {
connect_timeout: Duration::from_secs(5),
read_idle_timeout: Duration::from_secs(10),
stream_idle_timeout: Duration::from_secs(45),
};
#[derive(Clone)]
pub struct LocalClient {
config: AppConfig,
http: reqwest::Client,
remote: Option<RemoteConnection>,
network: TuiNetworkPolicy,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RemoteConnection {
base_url: String,
token: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceAttachResponse {
pub ok: bool,
pub agent_id: String,
pub workspace_id: String,
pub workspace_anchor: std::path::PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceExitResponse {
pub ok: bool,
pub agent_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceDetachResponse {
pub ok: bool,
pub agent_id: String,
pub workspace_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DebugPromptResponse {
pub ok: bool,
pub agent_id: String,
pub dump: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelsResponse {
#[serde(default)]
pub available_models: Vec<BuiltInModelMetadata>,
#[serde(default)]
pub model_availability: Vec<ResolvedModelAvailability>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateSessionSnapshot {
pub current_run_id: Option<String>,
pub pending_count: usize,
pub last_turn: Option<TurnTerminalRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StateWorkspaceSnapshot {
#[serde(default)]
pub attached_workspaces: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_workspace_entry: Option<ActiveWorkspaceEntry>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_workspace_occupancy: Option<WorkspaceOccupancyRecord>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worktree_session: Option<WorktreeSession>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStateSnapshot {
pub agent: AgentSummary,
pub session: StateSessionSnapshot,
pub tasks: Vec<TaskRecord>,
#[serde(default)]
pub timers: Vec<TimerRecord>,
#[serde(default)]
pub work_items: Vec<WorkItemRecord>,
#[serde(default)]
pub waiting_intents: Vec<WaitingIntentRecord>,
#[serde(default)]
pub external_triggers: Vec<ExternalTriggerStateSnapshot>,
#[serde(default)]
pub operator_notifications: Vec<OperatorNotificationRecord>,
#[serde(default)]
pub workspace: StateWorkspaceSnapshot,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub execution: Option<ExecutionSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlPromptResponse {
pub ok: bool,
pub agent_id: String,
pub message_id: String,
}
#[derive(Debug, Clone, Default)]
pub struct EventStreamRequest {
pub after_seq: Option<u64>,
pub limit: Option<usize>,
pub projection: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct EventPageRequest {
pub before_seq: Option<u64>,
pub after_seq: Option<u64>,
pub limit: Option<usize>,
pub order: Option<String>,
pub projection: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EventPageResponse {
pub events: Vec<StreamEventEnvelope>,
pub oldest_seq: Option<u64>,
pub newest_seq: Option<u64>,
pub has_older: bool,
pub has_newer: bool,
pub order: String,
pub limit: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StreamEventEnvelope {
pub id: String,
pub event_seq: u64,
pub ts: chrono::DateTime<Utc>,
pub agent_id: String,
#[serde(rename = "type")]
pub event_type: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub projection: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub provenance: Option<Value>,
pub payload: Value,
}
#[derive(Debug, Clone, PartialEq)]
pub struct AgentStreamEvent {
pub id: String,
pub event: String,
pub data: StreamEventEnvelope,
}
pub struct LocalEventStream {
transport: EventStreamTransport,
frame_buffer: Vec<u8>,
idle_timeout: Duration,
}
#[derive(Debug, Clone)]
pub struct LocalHttpError {
pub path: String,
pub status_code: u16,
pub message: String,
pub code: Option<String>,
pub hint: Option<String>,
}
impl LocalHttpError {
pub fn has_code(&self, expected: &str) -> bool {
self.code.as_deref() == Some(expected)
}
fn code_suffix(&self) -> String {
self.code
.as_deref()
.map(|code| format!(" [{code}]"))
.unwrap_or_default()
}
fn hint_suffix(&self) -> String {
self.hint
.as_deref()
.map(|hint| format!(" Hint: {hint}"))
.unwrap_or_default()
}
}
impl std::fmt::Display for LocalHttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} returned HTTP {}: {}{}{}",
self.path,
self.status_code,
self.message,
self.code_suffix(),
self.hint_suffix()
)
}
}
impl std::error::Error for LocalHttpError {}
enum EventStreamTransport {
Http(reqwest::Response),
#[cfg(unix)]
Unix(UnixEventStream),
}
#[cfg(unix)]
struct UnixEventStream {
stream: tokio::net::UnixStream,
body_buffer: Vec<u8>,
chunked: bool,
current_chunk_size: Option<usize>,
eof: bool,
}
async fn read_http_response_body(
mut response: reqwest::Response,
path: &str,
network: TuiNetworkPolicy,
) -> Result<Vec<u8>> {
let mut body = Vec::new();
loop {
let chunk = tokio::time::timeout(network.read_idle_timeout, response.chunk())
.await
.map_err(|_| {
anyhow!(
"read idle timeout after {} for {}",
format_duration(network.read_idle_timeout),
path
)
})?
.with_context(|| format!("failed to read response body for {path}"))?;
let Some(chunk) = chunk else {
return Ok(body);
};
body.extend_from_slice(&chunk);
}
}
#[cfg(unix)]
async fn read_unix_response_body(
stream: &mut tokio::net::UnixStream,
path: &str,
network: TuiNetworkPolicy,
) -> Result<Vec<u8>> {
use tokio::io::AsyncReadExt;
let mut body = Vec::new();
let mut buffer = [0_u8; 8192];
loop {
let read = tokio::time::timeout(network.read_idle_timeout, stream.read(&mut buffer))
.await
.map_err(|_| {
anyhow!(
"read idle timeout after {} for unix-socket response {}",
format_duration(network.read_idle_timeout),
path
)
})?
.with_context(|| format!("failed to read unix-socket response for {path}"))?;
if read == 0 {
return Ok(body);
}
body.extend_from_slice(&buffer[..read]);
}
}
impl LocalClient {
pub fn new(config: AppConfig) -> Result<Self> {
let http = reqwest::Client::builder()
.connect_timeout(TUI_LOCAL_NETWORK_POLICY.connect_timeout)
.build()
.context("failed to build local control client")?;
Ok(Self {
config,
http,
remote: None,
network: TUI_LOCAL_NETWORK_POLICY,
})
}
pub fn remote(
config: AppConfig,
base_url: impl Into<String>,
token: impl Into<String>,
) -> Result<Self> {
let base_url = normalize_remote_base_url(base_url.into())?;
let token = token.into();
if token.trim().is_empty() {
return Err(anyhow!("remote TUI token must not be empty"));
}
let http = reqwest::Client::builder()
.connect_timeout(TUI_REMOTE_NETWORK_POLICY.connect_timeout)
.build()
.context("failed to build remote control client")?;
Ok(Self {
config,
http,
remote: Some(RemoteConnection { base_url, token }),
network: TUI_REMOTE_NETWORK_POLICY,
})
}
pub fn connection_summary(&self) -> String {
match &self.remote {
Some(remote) => format!("remote {} auth=bearer", remote.base_url),
None => "local runtime".into(),
}
}
pub(crate) fn home_dir(&self) -> &Path {
&self.config.home_dir
}
pub(crate) fn default_agent_id(&self) -> &str {
&self.config.default_agent_id
}
pub(crate) fn remote_base_url(&self) -> Option<&str> {
self.remote.as_ref().map(|remote| remote.base_url.as_str())
}
pub async fn list_agent_entries(&self) -> Result<Vec<AgentListEntry>> {
self.get_json("/agents/list").await
}
pub async fn fetch_models(&self) -> Result<ModelsResponse> {
self.get_json("/models").await
}
pub async fn handshake(&self) -> Result<Value> {
self.get_json("/handshake").await
}
pub async fn runtime_status(&self) -> Result<RuntimeStatusResponse> {
self.get_control_json("/control/runtime/status").await
}
pub async fn runtime_readiness(&self) -> Result<RuntimeStatusResponse> {
self.get_control_json("/control/runtime/readiness").await
}
#[cfg(unix)]
pub async fn runtime_readiness_unix_only(&self) -> Result<RuntimeStatusResponse> {
let body = self
.send_unix(RequestSpec::get("/control/runtime/readiness"), true)
.await?;
serde_json::from_slice(&body).with_context(|| {
"failed to decode response body for GET /control/runtime/readiness over unix socket"
})
}
pub async fn runtime_shutdown(&self) -> Result<RuntimeShutdownResponse> {
self.post_control_json("/control/runtime/shutdown", &serde_json::json!({}))
.await
}
pub async fn agent_status(&self, agent_id: &str) -> Result<AgentSummary> {
self.get_json(&format!("/agents/{agent_id}/status")).await
}
pub async fn agent_state_snapshot(&self, agent_id: &str) -> Result<AgentStateSnapshot> {
self.get_json(&format!("/agents/{agent_id}/state")).await
}
pub async fn agent_briefs(&self, agent_id: &str, limit: usize) -> Result<Vec<BriefRecord>> {
self.get_json(&format!("/agents/{agent_id}/briefs?limit={limit}"))
.await
}
pub async fn agent_transcript(
&self,
agent_id: &str,
limit: usize,
) -> Result<Vec<TranscriptEntry>> {
self.get_json(&format!("/agents/{agent_id}/transcript?limit={limit}"))
.await
}
pub async fn agent_tasks(&self, agent_id: &str, limit: usize) -> Result<Vec<TaskRecord>> {
self.get_json(&format!("/agents/{agent_id}/tasks?limit={limit}"))
.await
}
pub async fn control_prompt(
&self,
agent_id: &str,
text: impl Into<String>,
) -> Result<ControlPromptResponse> {
self.post_control_json(
&format!("/control/agents/{agent_id}/prompt"),
&ControlPromptRequest { text: text.into() },
)
.await
}
pub async fn abort_current_run(&self, agent_id: &str, run_id: Option<String>) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/current-run/abort"),
&crate::http::AbortCurrentRunRequest {
run_id,
mode: Some("stop_after_abort".into()),
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn control_agent(
&self,
agent_id: &str,
action: crate::types::ControlAction,
) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/control"),
&crate::http::ControlRequest {
action,
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn create_agent(&self, agent_id: &str) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/create"),
&CreateAgentRequest {
trust: Some(TrustLevel::TrustedOperator),
template: None,
},
)
.await
}
pub async fn attach_workspace(
&self,
agent_id: &str,
path: impl Into<String>,
) -> Result<WorkspaceAttachResponse> {
self.post_control_json(
&format!("/control/agents/{agent_id}/workspace/attach"),
&AttachWorkspaceRequest {
path: path.into(),
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn exit_workspace(&self, agent_id: &str) -> Result<WorkspaceExitResponse> {
self.post_control_json(
&format!("/control/agents/{agent_id}/workspace/exit"),
&ExitWorkspaceRequest {
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn detach_workspace(
&self,
agent_id: &str,
workspace_id: impl Into<String>,
) -> Result<WorkspaceDetachResponse> {
self.post_control_json(
&format!("/control/agents/{agent_id}/workspace/detach"),
&DetachWorkspaceRequest {
workspace_id: workspace_id.into(),
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn debug_prompt(
&self,
agent_id: &str,
text: impl Into<String>,
trust: TrustLevel,
) -> Result<String> {
let response: DebugPromptResponse = self
.post_control_json(
&format!("/control/agents/{agent_id}/debug-prompt"),
&DebugPromptRequest {
text: text.into(),
trust: Some(trust),
},
)
.await?;
Ok(response.dump)
}
pub async fn set_agent_model_override(
&self,
agent_id: &str,
model: impl Into<String>,
reasoning_effort: Option<String>,
) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/model"),
&SetAgentModelRequest {
model: model.into(),
reasoning_effort,
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn clear_agent_model_override(&self, agent_id: &str) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/model/clear"),
&ClearAgentModelRequest {
trust: Some(TrustLevel::TrustedOperator),
},
)
.await
}
pub async fn list_skills(&self, agent_id: &str) -> Result<Value> {
let body = self
.send(
RequestSpec::get(&format!("/agents/{}/skills", agent_id)),
false,
)
.await?;
serde_json::from_slice(&body).with_context(|| {
format!(
"failed to decode response body for GET /agents/{}/skills",
agent_id
)
})
}
pub async fn install_skill(
&self,
agent_id: &str,
kind: crate::types::SkillInstallKind,
) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/skills/install"),
&crate::types::InstallSkillRequest { kind },
)
.await
}
pub async fn uninstall_skill(&self, agent_id: &str, name: &str) -> Result<Value> {
self.post_control_json(
&format!("/control/agents/{agent_id}/skills/uninstall"),
&crate::types::UninstallSkillRequest {
name: name.to_string(),
},
)
.await
}
pub async fn stream_agent_events(
&self,
agent_id: &str,
request: EventStreamRequest,
) -> Result<LocalEventStream> {
let path = event_stream_path(agent_id, &request)?;
#[cfg(unix)]
if self.remote.is_none() && self.config.socket_path.exists() {
let socket_error = match tokio::time::timeout(
self.network.connect_timeout,
self.stream_unix_events(&path, false),
)
.await
{
Ok(Ok(stream)) => {
return Ok(LocalEventStream {
transport: EventStreamTransport::Unix(stream),
frame_buffer: Vec::new(),
idle_timeout: self.network.stream_idle_timeout,
});
}
Ok(Err(err)) => err,
Err(_) => anyhow!("timed out opening event stream {}", path),
};
return self
.stream_http_events(&path, false)
.await
.with_context(|| {
format!("unix socket event stream failed before HTTP fallback: {socket_error}")
});
}
self.stream_http_events(&path, self.remote.is_some()).await
}
pub async fn agent_events_page(
&self,
agent_id: &str,
request: EventPageRequest,
) -> Result<EventPageResponse> {
let path = event_page_path(agent_id, &request)?;
self.get_json(&path).await
}
async fn get_json<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let body = self.send(RequestSpec::get(path), false).await?;
serde_json::from_slice(&body)
.with_context(|| format!("failed to decode response body for GET {}", path))
}
async fn get_control_json<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let body = self.send(RequestSpec::get(path), true).await?;
serde_json::from_slice(&body)
.with_context(|| format!("failed to decode response body for GET {}", path))
}
async fn post_control_json<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
payload: &B,
) -> Result<T> {
let body = self
.send(RequestSpec::post_json(path, payload)?, true)
.await?;
serde_json::from_slice(&body)
.with_context(|| format!("failed to decode response body for POST {}", path))
}
async fn send(&self, request: RequestSpec, include_control_auth: bool) -> Result<Vec<u8>> {
#[cfg(unix)]
if self.remote.is_none() && self.config.socket_path.exists() {
let socket_error = match self.send_unix(request.clone(), include_control_auth).await {
Ok(body) => return Ok(body),
Err(err) => err,
};
return self
.send_http(request, include_control_auth)
.await
.with_context(|| {
format!("unix socket request failed before HTTP fallback: {socket_error}")
});
}
self.send_http(request, include_control_auth).await
}
async fn send_http(&self, request: RequestSpec, include_control_auth: bool) -> Result<Vec<u8>> {
let response = tokio::time::timeout(
self.network.read_idle_timeout,
self.build_http_request(&request, include_control_auth)
.send(),
)
.await
.map_err(|_| {
anyhow!(
"timed out waiting for response headers for {}",
request.path
)
})?
.with_context(|| format!("failed to send {}", request.path))?;
let status = response.status();
let bytes = read_http_response_body(response, &request.path, self.network).await?;
decode_or_error(status.as_u16(), bytes, &request.path)
}
async fn stream_http_events(
&self,
path: &str,
include_control_auth: bool,
) -> Result<LocalEventStream> {
let request = RequestSpec::get(path);
let builder = self
.build_http_request(&request, include_control_auth)
.header(reqwest::header::ACCEPT, "text/event-stream");
let response = tokio::time::timeout(self.network.read_idle_timeout, builder.send())
.await
.map_err(|_| anyhow!("timed out opening event stream {}", path))?
.with_context(|| format!("failed to open event stream {}", path))?;
let status = response.status();
if !status.is_success() {
let bytes = read_http_response_body(response, path, self.network).await?;
decode_or_error(status.as_u16(), bytes, path)?;
unreachable!("decode_or_error returns Ok only for successful responses");
}
Ok(LocalEventStream {
transport: EventStreamTransport::Http(response),
frame_buffer: Vec::new(),
idle_timeout: self.network.stream_idle_timeout,
})
}
fn build_http_request(
&self,
request: &RequestSpec,
include_control_auth: bool,
) -> reqwest::RequestBuilder {
let mut builder = match request.method {
HttpMethod::Get => self.http.get(self.http_url_for(&request.path)),
HttpMethod::Post => self.http.post(self.http_url_for(&request.path)),
};
if let Some(remote) = &self.remote {
builder = builder.bearer_auth(&remote.token);
} else if include_control_auth {
if let Some(token) = &self.config.control_token {
builder = builder.bearer_auth(token);
}
}
if let Some(body) = request.body.clone() {
builder = builder
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body);
}
builder
}
fn http_url_for(&self, path: &str) -> String {
if let Some(remote) = &self.remote {
format!("{}{}", remote.base_url, path)
} else {
format!("http://{}{}", self.config.http_addr, path)
}
}
#[cfg(unix)]
async fn send_unix(&self, request: RequestSpec, include_control_auth: bool) -> Result<Vec<u8>> {
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
let mut stream = tokio::time::timeout(
self.network.connect_timeout,
UnixStream::connect(&self.config.socket_path),
)
.await
.map_err(|_| {
anyhow!(
"timed out connecting to Holon control socket {}",
self.config.socket_path.display()
)
})?
.with_context(|| {
format!(
"failed to connect to Holon control socket {}",
self.config.socket_path.display()
)
})?;
let method = match request.method {
HttpMethod::Get => "GET",
HttpMethod::Post => "POST",
};
let mut raw = format!(
"{method} {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nAccept: application/json\r\n",
request.path
);
if include_control_auth {
if let Some(token) = &self.config.control_token {
raw.push_str(&format!(
"Authorization: {}\r\n",
authorization_header_value(token)?
));
}
}
let body_len = request.body.as_ref().map(|body| body.len()).unwrap_or(0);
raw.push_str(&format!("Content-Length: {body_len}\r\n"));
if request.body.is_some() {
raw.push_str("Content-Type: application/json\r\n");
}
raw.push_str("\r\n");
tokio::time::timeout(self.network.read_idle_timeout, async {
stream.write_all(raw.as_bytes()).await?;
if let Some(body) = request.body.as_ref() {
stream.write_all(body).await?;
}
stream.flush().await
})
.await
.map_err(|_| anyhow!("timed out writing unix-socket request for {}", request.path))??;
let response = read_unix_response_body(&mut stream, &request.path, self.network).await?;
let parsed = parse_http_response(&response).with_context(|| {
format!("failed to parse unix-socket response for {}", request.path)
})?;
decode_or_error(parsed.status_code, parsed.body, &request.path)
}
#[cfg(unix)]
async fn stream_unix_events(
&self,
path: &str,
include_control_auth: bool,
) -> Result<UnixEventStream> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
validate_unix_request_target(path)?;
let mut stream = tokio::time::timeout(
self.network.connect_timeout,
UnixStream::connect(&self.config.socket_path),
)
.await
.map_err(|_| {
anyhow!(
"timed out connecting to Holon control socket {}",
self.config.socket_path.display()
)
})?
.with_context(|| {
format!(
"failed to connect to Holon control socket {}",
self.config.socket_path.display()
)
})?;
let mut raw = format!(
"GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: keep-alive\r\nAccept: text/event-stream\r\n",
);
if include_control_auth {
if let Some(token) = &self.config.control_token {
raw.push_str(&format!(
"Authorization: {}\r\n",
authorization_header_value(token)?
));
}
}
raw.push_str("Content-Length: 0\r\n\r\n");
stream.write_all(raw.as_bytes()).await?;
stream.flush().await?;
let mut response = read_unix_response_head(&mut stream)
.await
.with_context(|| format!("failed to parse unix-socket response for {path}"))?;
if !(200..300).contains(&response.status_code) {
let mut tail = Vec::new();
stream.read_to_end(&mut tail).await?;
response.body.extend_from_slice(&tail);
let body = if response.chunked {
decode_chunked_body(&response.body)?
} else {
response.body
};
decode_or_error(response.status_code, body, path)?;
unreachable!("decode_or_error returns Ok only for successful responses");
}
Ok(UnixEventStream {
stream,
body_buffer: response.body,
chunked: response.chunked,
current_chunk_size: None,
eof: false,
})
}
}
pub fn normalize_remote_base_url(value: String) -> Result<String> {
normalize_http_base_url(value, "remote connect URL")
}
pub fn normalize_control_base_url(value: String, label: &str) -> Result<String> {
normalize_http_base_url(value, label)
}
fn normalize_http_base_url(value: String, label: &str) -> Result<String> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(anyhow!("{label} must not be empty"));
}
let url =
reqwest::Url::parse(trimmed).with_context(|| format!("invalid {label} {trimmed:?}"))?;
match url.scheme() {
"http" | "https" => {}
scheme => return Err(anyhow!("{label} must use http or https, got {scheme}")),
}
let host = url
.host_str()
.ok_or_else(|| anyhow!("{label} must include a host"))?;
if url.query().is_some() {
return Err(anyhow!("{label} must not include a query string"));
}
if host == "0.0.0.0" || host == "::" || host.eq_ignore_ascii_case("[::]") {
return Err(anyhow!(
"{label} must use a client-reachable host, not {host}"
));
}
let mut normalized = url;
normalized.set_fragment(None);
if normalized.path() == "/" {
normalized.set_path("");
}
Ok(normalized.as_str().trim_end_matches('/').to_string())
}
#[derive(Clone)]
struct RequestSpec {
method: HttpMethod,
path: String,
body: Option<Vec<u8>>,
}
impl RequestSpec {
fn get(path: &str) -> Self {
Self {
method: HttpMethod::Get,
path: path.to_string(),
body: None,
}
}
fn post_json<B: Serialize>(path: &str, payload: &B) -> Result<Self> {
Ok(Self {
method: HttpMethod::Post,
path: path.to_string(),
body: Some(serde_json::to_vec(payload)?),
})
}
}
#[derive(Clone, Copy)]
enum HttpMethod {
Get,
Post,
}
impl LocalEventStream {
pub async fn next_event(&mut self) -> Result<AgentStreamEvent> {
loop {
while let Some(frame) = take_next_sse_frame(&mut self.frame_buffer)? {
if let Some(event) = parse_sse_frame(&frame)? {
return Ok(event);
}
}
let timeout = self.idle_timeout;
let chunk = tokio::time::timeout(timeout, self.read_body_chunk())
.await
.map_err(|_| {
anyhow!(
"event stream idle timeout after {}",
format_duration(timeout)
)
})??;
let chunk = chunk.ok_or_else(|| anyhow!("sse stream ended"))?;
self.frame_buffer.extend_from_slice(&chunk);
}
}
async fn read_body_chunk(&mut self) -> Result<Option<Vec<u8>>> {
match &mut self.transport {
EventStreamTransport::Http(response) => response
.chunk()
.await
.context("failed to read HTTP event stream chunk")
.map(|chunk| chunk.map(|bytes| bytes.to_vec())),
#[cfg(unix)]
EventStreamTransport::Unix(stream) => stream.read_body_chunk().await,
}
}
}
fn format_duration(duration: Duration) -> String {
if duration.subsec_nanos() == 0 {
format!("{}s", duration.as_secs())
} else {
format!("{duration:?}")
}
}
#[cfg(unix)]
impl UnixEventStream {
async fn read_body_chunk(&mut self) -> Result<Option<Vec<u8>>> {
use tokio::io::AsyncReadExt;
if self.eof {
return Ok(None);
}
if !self.chunked {
if !self.body_buffer.is_empty() {
return Ok(Some(std::mem::take(&mut self.body_buffer)));
}
let mut buffer = [0u8; 8192];
let read = self.stream.read(&mut buffer).await?;
if read == 0 {
self.eof = true;
return Ok(None);
}
return Ok(Some(buffer[..read].to_vec()));
}
loop {
if let Some(size) = self.current_chunk_size {
let required = size
.checked_add(2)
.ok_or_else(|| anyhow!("malformed chunked response: chunk size overflow"))?;
if self.body_buffer.len() < required {
if !self.read_more().await? {
return Err(anyhow!("malformed chunked response: truncated chunk"));
}
continue;
}
let data = self.body_buffer.drain(..size).collect::<Vec<_>>();
let terminator = self.body_buffer.drain(..2).collect::<Vec<_>>();
if terminator.as_slice() != b"\r\n" {
return Err(anyhow!(
"malformed chunked response: missing chunk terminator"
));
}
self.current_chunk_size = None;
return Ok(Some(data));
}
if let Some(size_line_end) = find_bytes(&self.body_buffer, b"\r\n") {
let size_line = std::str::from_utf8(&self.body_buffer[..size_line_end])
.context("malformed chunked response: non-utf8 size line")?;
let size_hex = size_line.split(';').next().unwrap_or_default().trim();
let size = usize::from_str_radix(size_hex, 16).with_context(|| {
format!("malformed chunked response: invalid size {size_hex}")
})?;
self.body_buffer.drain(..size_line_end + 2);
if size == 0 {
while self.body_buffer.len() < 2 {
if !self.read_more().await? {
break;
}
}
if self.body_buffer.starts_with(b"\r\n") {
self.body_buffer.drain(..2);
}
self.eof = true;
return Ok(None);
}
self.current_chunk_size = Some(size);
continue;
}
if !self.read_more().await? {
return Err(anyhow!(
"malformed chunked response: missing size line terminator"
));
}
}
}
async fn read_more(&mut self) -> Result<bool> {
use tokio::io::AsyncReadExt;
let mut buffer = [0u8; 8192];
let read = self.stream.read(&mut buffer).await?;
if read == 0 {
return Ok(false);
}
self.body_buffer.extend_from_slice(&buffer[..read]);
Ok(true)
}
}
struct ParsedHttpResponse {
status_code: u16,
body: Vec<u8>,
}
struct ParsedHttpResponseHead {
status_code: u16,
chunked: bool,
body: Vec<u8>,
}
#[derive(Debug, Deserialize)]
struct ErrorPayload {
#[serde(default)]
error: Option<String>,
#[serde(default)]
code: Option<String>,
#[serde(default)]
hint: Option<String>,
}
fn event_stream_path(agent_id: &str, request: &EventStreamRequest) -> Result<String> {
let mut url = reqwest::Url::parse("http://localhost")
.context("failed to initialize event stream URL builder")?;
url.path_segments_mut()
.map_err(|_| anyhow!("failed to build event stream path"))?
.extend(["agents", agent_id, "events", "stream"]);
{
let mut query = url.query_pairs_mut();
if let Some(limit) = request.limit {
query.append_pair("limit", &limit.to_string());
}
if let Some(after_seq) = request.after_seq {
query.append_pair("after_seq", &after_seq.to_string());
}
if let Some(projection) = request.projection.as_deref() {
query.append_pair("projection", projection);
}
}
let mut path = url.path().to_string();
if let Some(query) = url.query() {
path.push('?');
path.push_str(query);
}
Ok(path)
}
fn event_page_path(agent_id: &str, request: &EventPageRequest) -> Result<String> {
let mut url =
reqwest::Url::parse("http://localhost").context("failed to initialize event page URL")?;
url.path_segments_mut()
.map_err(|_| anyhow!("failed to build event page path"))?
.extend(["agents", agent_id, "events"]);
{
let mut query = url.query_pairs_mut();
if let Some(limit) = request.limit {
query.append_pair("limit", &limit.to_string());
}
if let Some(before_seq) = request.before_seq {
query.append_pair("before_seq", &before_seq.to_string());
}
if let Some(after_seq) = request.after_seq {
query.append_pair("after_seq", &after_seq.to_string());
}
if let Some(order) = request.order.as_deref() {
query.append_pair("order", order);
}
if let Some(projection) = request.projection.as_deref() {
query.append_pair("projection", projection);
}
}
let mut path = url.path().to_string();
if let Some(query) = url.query() {
path.push('?');
path.push_str(query);
}
Ok(path)
}
fn validate_unix_request_target(path: &str) -> Result<()> {
if !path.starts_with('/') {
return Err(anyhow!(
"invalid unix event stream request target: expected origin-form path"
));
}
if !path.is_ascii() {
return Err(anyhow!(
"invalid unix event stream request target: non-ASCII bytes are not allowed"
));
}
if path
.bytes()
.any(|byte| byte.is_ascii_control() || byte == b' ')
{
return Err(anyhow!(
"invalid unix event stream request target: control or space characters are not allowed"
));
}
Ok(())
}
fn validate_header_value(header_name: &str, value: &str) -> Result<()> {
reqwest::header::HeaderValue::from_str(value)
.with_context(|| format!("invalid {header_name} header value"))?;
Ok(())
}
fn authorization_header_value(token: &str) -> Result<String> {
let value = format!("Bearer {token}");
validate_header_value("Authorization", &value)?;
Ok(value)
}
fn find_bytes(buffer: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() {
return Some(0);
}
buffer
.windows(needle.len())
.position(|window| window == needle)
}
fn take_next_sse_frame(buffer: &mut Vec<u8>) -> Result<Option<Vec<u8>>> {
let split = [b"\r\n\r\n".as_slice(), b"\n\n".as_slice()]
.into_iter()
.filter_map(|delimiter| find_bytes(buffer, delimiter).map(|index| (index, delimiter.len())))
.min_by_key(|(index, _)| *index);
let Some((index, delimiter_len)) = split else {
return Ok(None);
};
let frame = buffer[..index].to_vec();
buffer.drain(..index + delimiter_len);
Ok(Some(frame))
}
fn parse_sse_frame(frame: &[u8]) -> Result<Option<AgentStreamEvent>> {
let text = std::str::from_utf8(frame).context("malformed sse frame: non-utf8 payload")?;
let mut id = String::new();
let mut event = String::new();
let mut data_lines = Vec::new();
for line in text.lines() {
let trimmed = line.trim_end();
if trimmed.starts_with(':') {
continue;
}
if let Some(value) = trimmed.strip_prefix("id:") {
id = value.trim().to_string();
} else if let Some(value) = trimmed.strip_prefix("event:") {
event = value.trim().to_string();
} else if let Some(value) = trimmed.strip_prefix("data:") {
data_lines.push(value.trim_start().to_string());
}
}
if data_lines.is_empty() {
return Ok(None);
}
let data: StreamEventEnvelope = serde_json::from_str(&data_lines.join("\n"))
.context("failed to decode SSE event payload as JSON")?;
Ok(Some(AgentStreamEvent { id, event, data }))
}
#[cfg(unix)]
async fn read_unix_response_head(
stream: &mut tokio::net::UnixStream,
) -> Result<ParsedHttpResponseHead> {
use tokio::io::AsyncReadExt;
let mut buffer = Vec::new();
let (header_end, delimiter_len) = loop {
if let Some(index) = find_bytes(&buffer, b"\r\n\r\n") {
break (index, 4);
}
if let Some(index) = find_bytes(&buffer, b"\n\n") {
break (index, 2);
}
let mut chunk = [0u8; 8192];
let read = stream.read(&mut chunk).await?;
if read == 0 {
return Err(anyhow!(
"malformed HTTP response: missing header terminator"
));
}
buffer.extend_from_slice(&chunk[..read]);
};
let header_bytes = &buffer[..header_end];
let body = buffer[header_end + delimiter_len..].to_vec();
let header_text = std::str::from_utf8(header_bytes)
.context("malformed HTTP response: headers are not valid UTF-8")?;
let mut lines = header_text.lines();
let status_line = lines
.next()
.ok_or_else(|| anyhow!("malformed HTTP response: missing status line"))?;
let status_code = status_line
.split_whitespace()
.nth(1)
.ok_or_else(|| anyhow!("malformed HTTP response: missing status code"))?
.parse::<u16>()
.context("malformed HTTP response: invalid status code")?;
let chunked = lines.any(|line| {
let lower = line.to_ascii_lowercase();
lower.starts_with("transfer-encoding:") && lower.contains("chunked")
});
Ok(ParsedHttpResponseHead {
status_code,
chunked,
body,
})
}
fn decode_or_error(status_code: u16, body: Vec<u8>, path: &str) -> Result<Vec<u8>> {
if (200..300).contains(&status_code) {
return Ok(body);
}
let payload = serde_json::from_slice::<ErrorPayload>(&body).ok();
let message = payload
.as_ref()
.and_then(|value| value.error.as_deref())
.map(ToString::to_string)
.unwrap_or_else(|| String::from_utf8_lossy(&body).trim().to_string());
let code = payload.as_ref().and_then(|value| value.code.clone());
let hint = payload.as_ref().and_then(|value| value.hint.clone());
Err(LocalHttpError {
path: path.to_string(),
status_code,
message,
code,
hint,
}
.into())
}
fn parse_http_response(buffer: &[u8]) -> Result<ParsedHttpResponse> {
let header_end = buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.ok_or_else(|| anyhow!("malformed HTTP response: missing header terminator"))?;
let header_bytes = &buffer[..header_end];
let body = &buffer[header_end + 4..];
let header_text = std::str::from_utf8(header_bytes)
.context("malformed HTTP response: headers are not valid UTF-8")?;
let mut lines = header_text.lines();
let status_line = lines
.next()
.ok_or_else(|| anyhow!("malformed HTTP response: missing status line"))?;
let status_code = status_line
.split_whitespace()
.nth(1)
.ok_or_else(|| anyhow!("malformed HTTP response: missing status code"))?
.parse::<u16>()
.context("malformed HTTP response: invalid status code")?;
let chunked = lines.any(|line| {
let lower = line.to_ascii_lowercase();
lower.starts_with("transfer-encoding:") && lower.contains("chunked")
});
Ok(ParsedHttpResponse {
status_code,
body: if chunked {
decode_chunked_body(body)?
} else {
body.to_vec()
},
})
}
fn decode_chunked_body(body: &[u8]) -> Result<Vec<u8>> {
let mut cursor = 0usize;
let mut decoded = Vec::new();
loop {
let size_line_end = body[cursor..]
.windows(2)
.position(|window| window == b"\r\n")
.map(|offset| cursor + offset)
.ok_or_else(|| anyhow!("malformed chunked response: missing size line terminator"))?;
let size_line = std::str::from_utf8(&body[cursor..size_line_end])
.context("malformed chunked response: non-utf8 size line")?;
let size_hex = size_line.split(';').next().unwrap_or_default().trim();
let size = usize::from_str_radix(size_hex, 16)
.with_context(|| format!("malformed chunked response: invalid size {}", size_hex))?;
cursor = size_line_end + 2;
if size == 0 {
break;
}
let chunk_end = cursor
.checked_add(size)
.ok_or_else(|| anyhow!("malformed chunked response: chunk size overflow"))?;
if chunk_end + 2 > body.len() {
return Err(anyhow!("malformed chunked response: truncated chunk"));
}
decoded.extend_from_slice(&body[cursor..chunk_end]);
if &body[chunk_end..chunk_end + 2] != b"\r\n" {
return Err(anyhow!(
"malformed chunked response: missing chunk terminator"
));
}
cursor = chunk_end + 2;
}
Ok(decoded)
}
#[cfg(test)]
mod tests {
use super::{
authorization_header_value, decode_chunked_body, decode_or_error, event_stream_path,
normalize_remote_base_url, parse_http_response, parse_sse_frame, read_http_response_body,
take_next_sse_frame, validate_header_value, validate_unix_request_target,
EventStreamRequest, EventStreamTransport, LocalEventStream, LocalHttpError,
StreamEventEnvelope, TuiNetworkPolicy, TUI_LOCAL_NETWORK_POLICY, TUI_REMOTE_NETWORK_POLICY,
};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::task::JoinHandle;
#[test]
fn remote_connect_url_rejects_unspecified_hosts() {
assert!(normalize_remote_base_url("http://0.0.0.0:7878".into()).is_err());
assert!(normalize_remote_base_url("http://[::]:7878".into()).is_err());
}
#[test]
fn remote_connect_url_normalizes_trailing_slash() {
assert_eq!(
normalize_remote_base_url("http://example.test:7878/".into()).unwrap(),
"http://example.test:7878"
);
}
#[test]
fn remote_policy_has_longer_connect_timeout_than_local_policy() {
assert!(
TUI_REMOTE_NETWORK_POLICY.connect_timeout > TUI_LOCAL_NETWORK_POLICY.connect_timeout
);
assert_eq!(
TUI_REMOTE_NETWORK_POLICY.read_idle_timeout,
TUI_LOCAL_NETWORK_POLICY.read_idle_timeout
);
}
#[test]
fn parse_http_response_decodes_chunked_body() {
let raw = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\ntest\r\n0\r\n\r\n";
let parsed = parse_http_response(raw).unwrap();
assert_eq!(parsed.status_code, 200);
assert_eq!(parsed.body, b"test");
}
#[test]
fn decode_chunked_body_rejects_truncated_payload() {
let err = decode_chunked_body(b"4\r\nte").unwrap_err().to_string();
assert!(err.contains("truncated chunk"));
}
#[test]
fn decode_or_error_includes_code_and_hint_for_stopped_agent() {
let err = decode_or_error(
409,
br#"{"ok":false,"error":"agent default is stopped; start first","code":"agent_stopped","hint":"start with `holon agent start default`"}"#.to_vec(),
"/control/agents/default/prompt",
)
.unwrap_err();
let rendered = err.to_string();
let typed = err.downcast_ref::<LocalHttpError>().unwrap();
assert_eq!(typed.status_code, 409);
assert!(typed.has_code("agent_stopped"));
assert_eq!(
typed.hint.as_deref(),
Some("start with `holon agent start default`")
);
assert!(rendered.contains("agent default is stopped; start first"));
assert!(rendered.contains("[agent_stopped]"));
assert!(rendered.contains("Hint: start with `holon agent start default`"));
}
#[test]
fn decode_or_error_falls_back_to_plain_error_message() {
let err = decode_or_error(
500,
br#"{"ok":false,"error":"internal exploded"}"#.to_vec(),
"/control/runtime/status",
)
.unwrap_err()
.to_string();
assert!(err.contains("internal exploded"));
assert!(!err.contains("Hint:"));
}
#[test]
fn event_stream_path_includes_query_parameters() {
let path = event_stream_path(
"default",
&EventStreamRequest {
after_seq: Some(123),
limit: Some(20),
projection: Some("local_debug".into()),
},
)
.unwrap();
assert_eq!(
path,
"/agents/default/events/stream?limit=20&after_seq=123&projection=local_debug"
);
}
#[test]
fn event_stream_path_includes_seq_cursor() {
let path = event_stream_path(
"default",
&EventStreamRequest {
after_seq: Some(42),
limit: None,
projection: None,
},
)
.unwrap();
assert_eq!(path, "/agents/default/events/stream?after_seq=42");
}
#[test]
fn validate_unix_request_target_rejects_crlf_injection() {
let err = validate_unix_request_target("/agents/default/events\r\nInjected: yes")
.unwrap_err()
.to_string();
assert!(err.contains("invalid unix event stream request target"));
}
#[test]
fn validate_header_value_rejects_crlf_injection() {
let err = validate_header_value("Authorization", "Bearer token\r\nInjected: yes")
.unwrap_err()
.to_string();
assert!(err.contains("invalid Authorization header value"));
}
#[test]
fn authorization_header_value_rejects_crlf_injection() {
let err = authorization_header_value("secret\r\nInjected: yes")
.unwrap_err()
.to_string();
assert!(err.contains("invalid Authorization header value"));
}
#[test]
fn take_next_sse_frame_supports_lf_and_crlf_delimiters() {
let mut lf = b"id: evt_1\nevent: ping\ndata: {}\n\nid: evt_2".to_vec();
let first = take_next_sse_frame(&mut lf).unwrap().unwrap();
assert_eq!(
std::str::from_utf8(&first).unwrap(),
"id: evt_1\nevent: ping\ndata: {}"
);
assert_eq!(std::str::from_utf8(&lf).unwrap(), "id: evt_2");
let mut crlf = b"id: evt_1\r\nevent: ping\r\ndata: {}\r\n\r\n".to_vec();
let first = take_next_sse_frame(&mut crlf).unwrap().unwrap();
assert_eq!(
std::str::from_utf8(&first).unwrap(),
"id: evt_1\r\nevent: ping\r\ndata: {}"
);
assert!(crlf.is_empty());
}
#[tokio::test]
async fn event_stream_idle_timeout_after_opened_stream_without_chunks() {
let (response, server) = open_test_event_stream(Vec::new(), true).await;
let mut stream = LocalEventStream {
transport: EventStreamTransport::Http(response),
frame_buffer: Vec::new(),
idle_timeout: Duration::from_millis(25),
};
let err = stream.next_event().await.unwrap_err().to_string();
server.abort();
assert!(err.contains("event stream idle timeout after 25ms"));
}
#[tokio::test]
async fn http_body_reader_allows_slow_continuous_chunks() {
let (response, server) = open_test_event_stream(
vec![
("hello ".into(), Duration::ZERO),
("world".into(), Duration::from_millis(20)),
],
false,
)
.await;
let network = TuiNetworkPolicy {
connect_timeout: Duration::from_millis(50),
read_idle_timeout: Duration::from_millis(50),
stream_idle_timeout: Duration::from_millis(50),
};
let body = read_http_response_body(response, "/slow-json", network)
.await
.unwrap();
server.await.unwrap();
assert_eq!(body, b"hello world");
}
#[tokio::test]
async fn http_body_reader_times_out_on_idle_gap() {
let (response, server) = open_test_event_stream(Vec::new(), true).await;
let network = TuiNetworkPolicy {
connect_timeout: Duration::from_millis(50),
read_idle_timeout: Duration::from_millis(25),
stream_idle_timeout: Duration::from_millis(50),
};
let err = read_http_response_body(response, "/idle-json", network)
.await
.unwrap_err()
.to_string();
server.abort();
assert!(err.contains("read idle timeout after 25ms for /idle-json"));
}
#[tokio::test]
async fn sse_comment_heartbeat_does_not_emit_event_and_preserves_liveness() {
let event = StreamEventEnvelope {
id: "evt-1".into(),
event_seq: 1,
ts: chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
.unwrap()
.with_timezone(&chrono::Utc),
agent_id: "agent-1".into(),
event_type: "agent_message".into(),
projection: None,
provenance: None,
payload: serde_json::json!({"body": "hello"}),
};
let event_frame = format!(
"id: evt-1\nevent: message\ndata: {}\n\n",
serde_json::to_string(&event).unwrap()
);
let (response, server) = open_test_event_stream(
vec![
(": heartbeat\n\n".into(), Duration::ZERO),
(event_frame, Duration::from_millis(25)),
],
false,
)
.await;
let mut stream = LocalEventStream {
transport: EventStreamTransport::Http(response),
frame_buffer: Vec::new(),
idle_timeout: Duration::from_millis(50),
};
let received = stream.next_event().await.unwrap();
server.await.unwrap();
assert_eq!(received.id, "evt-1");
assert_eq!(received.event, "message");
}
async fn open_test_event_stream(
chunks: Vec<(String, Duration)>,
hold_open: bool,
) -> (reqwest::Response, JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut request = [0u8; 1024];
let _ = tokio::time::timeout(Duration::from_secs(1), socket.read(&mut request)).await;
socket
.write_all(
b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\n\r\n",
)
.await
.unwrap();
for (chunk, delay) in chunks {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
write_http_chunk(&mut socket, chunk.as_bytes()).await;
}
if hold_open {
tokio::time::sleep(Duration::from_secs(60)).await;
} else {
socket.write_all(b"0\r\n\r\n").await.unwrap();
}
});
let response = reqwest::Client::new()
.get(format!("http://{addr}/events"))
.send()
.await
.unwrap();
(response, server)
}
async fn write_http_chunk(socket: &mut TcpStream, body: &[u8]) {
socket
.write_all(format!("{:X}\r\n", body.len()).as_bytes())
.await
.unwrap();
socket.write_all(body).await.unwrap();
socket.write_all(b"\r\n").await.unwrap();
}
#[test]
fn parse_sse_frame_ignores_comments_and_multiline_data() {
let frame = b": heartbeat\nid: 1\nevent: message_admitted\ndata: {\"id\":\"evt_123\",\ndata: \"event_seq\":1,\ndata: \"ts\":\"2026-04-19T08:00:00Z\",\ndata: \"agent_id\":\"default\",\ndata: \"type\":\"message_admitted\",\ndata: \"payload\":{}}\n";
let parsed = parse_sse_frame(frame).unwrap().unwrap();
assert_eq!(parsed.id, "1");
assert_eq!(parsed.event, "message_admitted");
assert_eq!(parsed.data.event_type, "message_admitted");
assert_eq!(parsed.data.agent_id, "default");
}
}