use std::collections::HashMap;
use compact_str::CompactString;
use crate::event::{AgentEvent, ToolContent};
use super::message::{ContentBlock, DeltaPhase, LoopEvent, LoopMessage, StopReason};
pub struct EventBridge {
turn_index: u32,
last_text_emitted: String,
last_reasoning_emitted: String,
tool_name_by_id: HashMap<String, String>,
}
impl Default for EventBridge {
fn default() -> Self {
Self::new()
}
}
impl EventBridge {
pub fn new() -> Self {
Self {
turn_index: 0,
last_text_emitted: String::new(),
last_reasoning_emitted: String::new(),
tool_name_by_id: HashMap::new(),
}
}
pub fn translate(&mut self, event: LoopEvent) -> Vec<AgentEvent> {
match event {
LoopEvent::AgentStart => Vec::new(),
LoopEvent::CompactionStarted { tokens_before } => {
vec![AgentEvent::CompactionStarted { tokens_before }]
}
LoopEvent::ContextCompacted {
ref new_session_id,
tokens_before,
tokens_after,
ref summary,
first_kept_index,
compaction_kind,
ref summary_model,
} => {
tracing::info!(
target: "dirge::agent_loop",
session_id = %new_session_id,
tokens_before,
tokens_after,
has_summary = !summary.is_empty(),
first_kept_index,
kind = ?compaction_kind,
"context compacted — session rotated"
);
vec![AgentEvent::ContextCompacted {
new_session_id: CompactString::new(new_session_id),
tokens_before,
tokens_after,
summary: CompactString::new(summary),
first_kept_index,
compaction_kind,
summary_model: summary_model.as_deref().map(CompactString::new),
}]
}
LoopEvent::CheckpointRefresh { summary } => {
vec![AgentEvent::CheckpointRefresh {
summary: CompactString::new(summary),
}]
}
LoopEvent::RetryNotice {
attempt,
delay_ms,
error,
} => {
vec![AgentEvent::RetryNotice {
attempt,
delay_ms,
error: CompactString::from(error),
}]
}
LoopEvent::RepairStats { snapshot } => {
vec![AgentEvent::RepairStats { snapshot }]
}
LoopEvent::SystemNotice { content } => {
vec![AgentEvent::SystemNotice {
content: CompactString::from(content),
}]
}
LoopEvent::EscalationActivated { provider, reason } => {
tracing::info!(
target: "dirge::agent_loop",
provider = %provider,
reason = ?reason,
"dual-client escalation activated for next LLM call",
);
vec![AgentEvent::EscalationActivated {
provider: CompactString::from(provider),
reason,
}]
}
LoopEvent::AgentEnd { messages } => {
let last_assistant = messages.iter().rev().find_map(|m| match m {
LoopMessage::Assistant(a) => Some(a),
_ => None,
});
if let Some(a) = last_assistant
&& matches!(a.stop_reason, StopReason::Error)
{
let error_text = a
.error_message
.as_deref()
.unwrap_or("agent loop produced an error with no message");
if error_text.contains("stream aborted by cancellation signal") {
let partial_response = a
.content
.iter()
.filter_map(|b| match b {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
return vec![AgentEvent::Interjected {
partial_response: CompactString::from(partial_response),
tokens: 0,
}];
}
let kind = crate::agent::recovery::classify_error(error_text);
return if matches!(kind, crate::agent::recovery::ErrorKind::ContextLength) {
let prompt_text = messages
.iter()
.find_map(|m| match m {
LoopMessage::User(u) => Some(u.content.as_str()),
_ => None,
})
.unwrap_or("");
vec![AgentEvent::ContextOverflow {
prompt: CompactString::from(prompt_text),
error: CompactString::from(error_text),
}]
} else {
vec![AgentEvent::Error(CompactString::from(error_text))]
};
}
let response = messages
.iter()
.rev()
.find_map(|m| match m {
LoopMessage::Assistant(a) => Some(
a.content
.iter()
.filter_map(|b| match b {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(""),
),
_ => None,
})
.unwrap_or_default();
vec![AgentEvent::Done {
response: CompactString::from(response),
tokens: 0,
cost: 0.0,
}]
}
LoopEvent::TurnStart => {
let evt = AgentEvent::TurnStart {
index: self.turn_index,
};
self.turn_index += 1;
self.last_text_emitted.clear();
self.last_reasoning_emitted.clear();
vec![evt]
}
LoopEvent::TurnEnd { .. } => {
let idx = self.turn_index.saturating_sub(1);
self.tool_name_by_id.clear();
vec![AgentEvent::TurnEnd { index: idx }]
}
LoopEvent::Usage { usage } => {
vec![AgentEvent::Usage {
input_tokens: usage.input_tokens,
cached_input_tokens: usage.cached_input_tokens,
cache_creation_input_tokens: usage.cache_creation_input_tokens,
}]
}
LoopEvent::MessageStart { message } => {
match message {
LoopMessage::User(u) => {
vec![AgentEvent::UserMessage {
content: CompactString::from(u.content),
}]
}
LoopMessage::Custom(payload) => {
vec![AgentEvent::CustomMessage {
payload: payload.clone(),
}]
}
_ => Vec::new(),
}
}
LoopEvent::MessageEnd { message } => {
let _ = message;
Vec::new()
}
LoopEvent::MessageUpdate { message, phase } => {
match phase {
DeltaPhase::TextStart | DeltaPhase::TextDelta => {
let concat: String = message
.content
.iter()
.filter_map(|b| match b {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
if concat.len() > self.last_text_emitted.len()
&& concat.starts_with(&self.last_text_emitted)
{
let new_chunk = &concat[self.last_text_emitted.len()..];
let chunk = CompactString::from(new_chunk);
self.last_text_emitted = concat;
vec![AgentEvent::Token(chunk)]
} else if concat != self.last_text_emitted {
let chunk = CompactString::from(concat.as_str());
self.last_text_emitted = concat;
vec![AgentEvent::Token(chunk)]
} else {
Vec::new()
}
}
DeltaPhase::ThinkingStart | DeltaPhase::ThinkingDelta => {
let concat: String = message
.content
.iter()
.filter_map(|b| match b {
ContentBlock::Thinking { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
if concat.len() > self.last_reasoning_emitted.len()
&& concat.starts_with(&self.last_reasoning_emitted)
{
let new_chunk = &concat[self.last_reasoning_emitted.len()..];
let chunk = CompactString::from(new_chunk);
self.last_reasoning_emitted = concat;
vec![AgentEvent::Reasoning(chunk)]
} else if concat != self.last_reasoning_emitted {
let chunk = CompactString::from(concat.as_str());
self.last_reasoning_emitted = concat;
vec![AgentEvent::Reasoning(chunk)]
} else {
Vec::new()
}
}
DeltaPhase::TextEnd
| DeltaPhase::ThinkingEnd
| DeltaPhase::ToolCallStart
| DeltaPhase::ToolCallDelta
| DeltaPhase::ToolCallEnd => Vec::new(),
}
}
LoopEvent::ToolExecutionStart {
tool_call_id,
tool_name,
args,
} => {
self.tool_name_by_id
.insert(tool_call_id.clone(), tool_name.clone());
vec![
AgentEvent::ToolCall {
id: CompactString::from(tool_call_id.clone()),
name: CompactString::from(tool_name),
args,
},
AgentEvent::ToolStarted {
id: CompactString::from(tool_call_id),
},
]
}
LoopEvent::ToolExecutionUpdate { .. } => {
Vec::new()
}
LoopEvent::ToolExecutionEnd {
tool_call_id,
tool_name: _,
result,
is_error: _,
} => {
let output = flatten_content(&result.content);
let name = self.tool_name_by_id.remove(&tool_call_id);
let kind = classify_tool(name.as_deref());
vec![AgentEvent::ToolResult {
id: CompactString::from(tool_call_id),
output: CompactString::from(output),
kind,
}]
}
}
}
}
fn flatten_content(content: &[serde_json::Value]) -> String {
let mut out = String::new();
for block in content {
if let Some(obj) = block.as_object()
&& obj.get("type").and_then(|t| t.as_str()) == Some("text")
&& let Some(text) = obj.get("text").and_then(|t| t.as_str())
{
if !out.is_empty() {
out.push('\n');
}
out.push_str(text);
continue;
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(&block.to_string());
}
out
}
fn classify_tool(name: Option<&str>) -> ToolContent {
match name {
Some("read") | Some("find_files") | Some("list_dir") => ToolContent::File,
_ => ToolContent::Text,
}
}
#[cfg(test)]
#[path = "bridge_tests.rs"]
mod tests;