use std::collections::HashMap;
use agent_client_protocol_schema::{
ContentBlock, StopReason, ToolCallStatus, ToolCallUpdateFields,
};
use defect_agent::event::{AgentEvent, LlmRequestSnapshot};
use defect_agent::llm::{Message, MessageContent, Role, Usage};
use super::model::{EventKind, IngestionEvent, ObservationBody, ObservationLevel, TraceBody};
const DEFAULT_ENVIRONMENT: &str = "production";
const TRACE_NAME: &str = "turn";
const STEP_NAME: &str = "step";
const GENERATION_NAME: &str = "llm_call";
const SPAWN_AGENT_TOOL_NAME: &str = "spawn_agent";
const SUBAGENT_SPAN_NAME: &str = "subagent";
pub struct TraceProjector {
session_id: String,
turn: Option<TurnMeta>,
pending_input: Option<String>,
anchors: HashMap<String, String>,
scopes: HashMap<String, ScopeState>,
}
struct TurnMeta {
trace_id: String,
input: Option<String>,
final_output: String,
}
struct ScopeState {
prefix: String,
step_parent: Option<String>,
current_step_id: Option<String>,
step_seq: u32,
current_gen: Option<PendingGeneration>,
tool_spans: HashMap<String, String>,
}
struct PendingGeneration {
id: String,
parent_step_id: String,
model: String,
output: String,
thinking: String,
usage: Usage,
error: Option<String>,
}
impl ScopeState {
fn new(prefix: String, step_parent: Option<String>) -> Self {
Self {
prefix,
step_parent,
current_step_id: None,
step_seq: 0,
current_gen: None,
tool_spans: HashMap::new(),
}
}
}
impl TraceProjector {
pub fn new(session_id: impl Into<String>) -> Self {
Self {
session_id: session_id.into(),
turn: None,
pending_input: None,
anchors: HashMap::new(),
scopes: HashMap::new(),
}
}
pub fn project(
&mut self,
event: AgentEvent,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
match event {
AgentEvent::TurnStarted => self.on_turn_started(now, new_id),
AgentEvent::UserPromptCommitted { content } => {
self.on_user_prompt(&content);
Vec::new()
}
AgentEvent::LlmCallStarted {
model,
attempt,
request,
} => self.on_top_llm_started(model, attempt, request.as_ref(), now, new_id),
AgentEvent::AssistantText { content } => {
self.accumulate_top_text(&content);
Vec::new()
}
AgentEvent::AssistantThought { content } => {
self.accumulate_top_thinking(&content);
Vec::new()
}
AgentEvent::LlmCallFinished { usage, error, .. } => {
self.on_top_llm_finished(usage, error, now, new_id)
}
AgentEvent::ToolCallStarted { id, name, fields } => {
self.on_top_tool_started(id.to_string(), name, fields.raw_input, now, new_id)
}
AgentEvent::ToolCallFinished { id, fields } => {
self.on_top_tool_finished(&id.to_string(), &fields, now, new_id)
}
AgentEvent::ContextCompressed {
tokens_before,
tokens_after,
} => self.on_context_compressed(tokens_before, tokens_after, None, now, new_id),
AgentEvent::ContextMicrocompacted {
tokens_before,
tokens_after,
cleared,
} => {
self.on_context_compressed(tokens_before, tokens_after, Some(cleared), now, new_id)
}
AgentEvent::TurnEnded { reason, usage } => {
self.on_turn_ended(reason, usage, now, new_id)
}
AgentEvent::Subagent {
ancestor_path,
agent_type,
inner,
} => {
let path: Vec<String> = ancestor_path.iter().map(ToString::to_string).collect();
self.on_subagent(&path, agent_type, *inner, now, new_id)
}
AgentEvent::ToolCallProgress { .. }
| AgentEvent::PolicyDecision { .. }
| AgentEvent::PermissionResolved { .. } => Vec::new(),
_ => Vec::new(),
}
}
fn on_turn_started(
&mut self,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let trace_id = new_id();
let input = self.pending_input.take();
let body = TraceBody {
id: trace_id.clone(),
name: Some(TRACE_NAME.into()),
session_id: Some(self.session_id.clone()),
input: input.clone().map(serde_json::Value::String),
environment: Some(DEFAULT_ENVIRONMENT.into()),
timestamp: Some(now.to_string()),
..Default::default()
};
self.scopes
.insert(trace_id.clone(), ScopeState::new(trace_id.clone(), None));
self.turn = Some(TurnMeta {
trace_id: trace_id.clone(),
input,
final_output: String::new(),
});
vec![IngestionEvent::trace(
new_id(),
now.to_string(),
EventKind::TraceCreate,
&body,
)]
}
fn on_user_prompt(&mut self, content: &[ContentBlock]) {
let text = content_text(content);
if !text.is_empty() {
self.pending_input = Some(text);
}
}
fn on_top_llm_started(
&mut self,
model: String,
attempt: u32,
request: &LlmRequestSnapshot,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
return Vec::new();
};
let Some(scope) = self.scopes.get_mut(&trace_id) else {
return Vec::new();
};
scope_llm_started(scope, &trace_id, model, attempt, request, now, new_id)
}
fn accumulate_top_text(&mut self, content: &ContentBlock) {
if let ContentBlock::Text(text) = content
&& let Some(turn) = self.turn.as_mut()
{
turn.final_output.push_str(&text.text);
let trace_id = turn.trace_id.clone();
if let Some(scope) = self.scopes.get_mut(&trace_id)
&& let Some(pg) = scope.current_gen.as_mut()
{
pg.output.push_str(&text.text);
}
}
}
fn accumulate_top_thinking(&mut self, content: &ContentBlock) {
if let ContentBlock::Text(text) = content
&& let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone())
&& let Some(scope) = self.scopes.get_mut(&trace_id)
&& let Some(pg) = scope.current_gen.as_mut()
{
pg.thinking.push_str(&text.text);
}
}
fn on_top_llm_finished(
&mut self,
usage: Usage,
error: Option<String>,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
return Vec::new();
};
let Some(scope) = self.scopes.get_mut(&trace_id) else {
return Vec::new();
};
note_llm_finished(scope, usage, error);
flush_generation(scope, &trace_id, now, new_id)
}
fn on_top_tool_started(
&mut self,
tool_call_id: String,
name: String,
raw_input: Option<serde_json::Value>,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
return Vec::new();
};
if name == SPAWN_AGENT_TOOL_NAME {
self.anchors.insert(tool_call_id.clone(), trace_id.clone());
}
let Some(scope) = self.scopes.get_mut(&trace_id) else {
return Vec::new();
};
scope_tool_started(
scope,
&trace_id,
&tool_call_id,
name,
raw_input,
now,
new_id,
)
}
fn on_top_tool_finished(
&mut self,
tool_call_id: &str,
fields: &ToolCallUpdateFields,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
return Vec::new();
};
let Some(scope) = self.scopes.get_mut(&trace_id) else {
return Vec::new();
};
scope_tool_finished(scope, &trace_id, tool_call_id, fields, now, new_id)
}
fn on_context_compressed(
&mut self,
tokens_before: u64,
tokens_after: u64,
cleared: Option<usize>,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
return Vec::new();
};
let mut meta = serde_json::Map::new();
meta.insert("tokens_before".into(), tokens_before.into());
meta.insert("tokens_after".into(), tokens_after.into());
if let Some(cleared) = cleared {
meta.insert("cleared_tool_results".into(), cleared.into());
}
let name = if cleared.is_some() {
"context_microcompaction"
} else {
"context_compaction"
};
let body = ObservationBody {
id: new_id(),
trace_id,
name: Some(name.into()),
start_time: Some(now.to_string()),
metadata: Some(serde_json::Value::Object(meta)),
environment: Some(DEFAULT_ENVIRONMENT.into()),
..Default::default()
};
vec![IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::EventCreate,
&body,
)]
}
fn on_turn_ended(
&mut self,
reason: StopReason,
usage: Usage,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(turn) = self.turn.take() else {
return Vec::new();
};
let trace_id = turn.trace_id.clone();
let mut events = Vec::new();
if let Some(mut scope) = self.scopes.remove(&trace_id) {
events.extend(flush_generation(&mut scope, &trace_id, now, new_id));
events.extend(close_current_step(&mut scope, &trace_id, now, new_id));
}
let mut meta = serde_json::Map::new();
meta.insert(
"stop_reason".into(),
serde_json::to_value(reason).unwrap_or(serde_json::Value::Null),
);
if let Some(details) = usage_to_details(&usage) {
meta.insert("usage".into(), serde_json::Value::Object(details));
}
let body = TraceBody {
id: trace_id,
name: Some(TRACE_NAME.into()),
session_id: Some(self.session_id.clone()),
input: turn.input.map(serde_json::Value::String),
output: (!turn.final_output.is_empty())
.then_some(serde_json::Value::String(turn.final_output)),
metadata: Some(serde_json::Value::Object(meta)),
timestamp: Some(now.to_string()),
..Default::default()
};
events.push(IngestionEvent::trace(
new_id(),
now.to_string(),
EventKind::TraceCreate,
&body,
));
events
}
fn on_subagent(
&mut self,
path: &[String],
agent_type: String,
inner: AgentEvent,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(first) = path.first() else {
return Vec::new();
};
let Some(trace_id) = self.anchors.get(first).cloned() else {
return Vec::new();
};
let prefix = scope_prefix(&trace_id, path);
let mut events = Vec::new();
if !self.scopes.contains_key(&prefix) {
let parent_tool = parent_tool_span_id(&trace_id, path);
let mut meta = serde_json::Map::new();
meta.insert("agent_type".into(), agent_type.clone().into());
let body = ObservationBody {
id: prefix.clone(),
trace_id: trace_id.clone(),
parent_observation_id: Some(parent_tool),
name: Some(format!("{SUBAGENT_SPAN_NAME}:{agent_type}")),
start_time: Some(now.to_string()),
metadata: Some(serde_json::Value::Object(meta)),
environment: Some(DEFAULT_ENVIRONMENT.into()),
..Default::default()
};
events.push(IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanCreate,
&body,
));
self.scopes.insert(
prefix.clone(),
ScopeState::new(prefix.clone(), Some(prefix.clone())),
);
}
let scope = self
.scopes
.get_mut(&prefix)
.expect("subagent scope just ensured");
match inner {
AgentEvent::LlmCallStarted {
model,
attempt,
request,
} => {
events.extend(scope_llm_started(
scope,
&trace_id,
model,
attempt,
request.as_ref(),
now,
new_id,
));
}
AgentEvent::AssistantText { content } => {
if let (ContentBlock::Text(text), Some(pg)) = (&content, scope.current_gen.as_mut())
{
pg.output.push_str(&text.text);
}
}
AgentEvent::AssistantThought { content } => {
if let (ContentBlock::Text(text), Some(pg)) = (&content, scope.current_gen.as_mut())
{
pg.thinking.push_str(&text.text);
}
}
AgentEvent::LlmCallFinished { usage, error, .. } => {
note_llm_finished(scope, usage, error);
events.extend(flush_generation(scope, &trace_id, now, new_id));
}
AgentEvent::ToolCallStarted { id, name, fields } => {
events.extend(scope_tool_started(
scope,
&trace_id,
&id.to_string(),
name,
fields.raw_input,
now,
new_id,
));
}
AgentEvent::ToolCallFinished { id, fields } => {
events.extend(scope_tool_finished(
scope,
&trace_id,
&id.to_string(),
&fields,
now,
new_id,
));
}
AgentEvent::TurnEnded { .. } => {
events.extend(flush_generation(scope, &trace_id, now, new_id));
events.extend(close_current_step(scope, &trace_id, now, new_id));
let subagent_span_id = scope.prefix.clone();
let body = ObservationBody {
id: subagent_span_id,
trace_id: trace_id.clone(),
end_time: Some(now.to_string()),
..Default::default()
};
events.push(IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanUpdate,
&body,
));
self.scopes.remove(&prefix);
if path.len() == 1 {
self.anchors.remove(first);
}
}
_ => {}
}
events
}
}
fn scope_llm_started(
scope: &mut ScopeState,
trace_id: &str,
model: String,
attempt: u32,
request: &LlmRequestSnapshot,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let mut events = flush_generation(scope, trace_id, now, new_id);
events.extend(close_current_step(scope, trace_id, now, new_id));
scope.step_seq += 1;
let step_id = format!("{}-step-{}", scope.prefix, scope.step_seq);
scope.current_step_id = Some(step_id.clone());
let step_body = ObservationBody {
id: step_id.clone(),
trace_id: trace_id.to_string(),
parent_observation_id: scope.step_parent.clone(),
name: Some(STEP_NAME.into()),
start_time: Some(now.to_string()),
environment: Some(DEFAULT_ENVIRONMENT.into()),
..Default::default()
};
events.push(IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanCreate,
&step_body,
));
let gen_id = format!("{step_id}-gen");
scope.current_gen = Some(PendingGeneration {
id: gen_id.clone(),
parent_step_id: step_id.clone(),
model: model.clone(),
output: String::new(),
thinking: String::new(),
usage: Usage::default(),
error: None,
});
let mut meta = serde_json::Map::new();
meta.insert("attempt".into(), attempt.into());
let gen_body = ObservationBody {
id: gen_id,
trace_id: trace_id.to_string(),
parent_observation_id: Some(step_id),
name: Some(GENERATION_NAME.into()),
model: Some(model),
start_time: Some(now.to_string()),
input: Some(request_to_input(request)),
metadata: Some(serde_json::Value::Object(meta)),
environment: Some(DEFAULT_ENVIRONMENT.into()),
..Default::default()
};
events.push(IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::GenerationCreate,
&gen_body,
));
events
}
fn note_llm_finished(scope: &mut ScopeState, usage: Usage, error: Option<String>) {
if let Some(pg) = scope.current_gen.as_mut() {
pg.usage = usage;
if error.is_some() {
pg.error = error;
}
}
}
fn flush_generation(
scope: &mut ScopeState,
trace_id: &str,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(pg) = scope.current_gen.take() else {
return Vec::new();
};
let mut meta = serde_json::Map::new();
if !pg.thinking.is_empty() {
meta.insert("reasoning".into(), serde_json::Value::String(pg.thinking));
}
let body = ObservationBody {
id: pg.id,
trace_id: trace_id.to_string(),
parent_observation_id: Some(pg.parent_step_id),
name: Some(GENERATION_NAME.into()),
model: Some(pg.model),
end_time: Some(now.to_string()),
output: (!pg.output.is_empty()).then_some(serde_json::Value::String(pg.output)),
usage_details: usage_to_details(&pg.usage),
metadata: (!meta.is_empty()).then_some(serde_json::Value::Object(meta)),
level: pg.error.as_ref().map(|_| ObservationLevel::Error),
status_message: pg.error,
..Default::default()
};
vec![IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::GenerationUpdate,
&body,
)]
}
fn close_current_step(
scope: &mut ScopeState,
trace_id: &str,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let Some(step_id) = scope.current_step_id.take() else {
return Vec::new();
};
let body = ObservationBody {
id: step_id,
trace_id: trace_id.to_string(),
end_time: Some(now.to_string()),
..Default::default()
};
vec![IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanUpdate,
&body,
)]
}
fn scope_tool_started(
scope: &mut ScopeState,
trace_id: &str,
tool_call_id: &str,
name: String,
raw_input: Option<serde_json::Value>,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let span_id = format!("{}-tool-{}", scope.prefix, tool_call_id);
scope
.tool_spans
.insert(tool_call_id.to_string(), span_id.clone());
let body = ObservationBody {
id: span_id,
trace_id: trace_id.to_string(),
parent_observation_id: scope.current_step_id.clone(),
name: Some(name),
start_time: Some(now.to_string()),
input: raw_input,
environment: Some(DEFAULT_ENVIRONMENT.into()),
..Default::default()
};
vec![IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanCreate,
&body,
)]
}
fn scope_tool_finished(
scope: &mut ScopeState,
trace_id: &str,
tool_call_id: &str,
fields: &ToolCallUpdateFields,
now: &str,
new_id: &mut dyn FnMut() -> String,
) -> Vec<IngestionEvent> {
let span_id = scope
.tool_spans
.remove(tool_call_id)
.unwrap_or_else(|| format!("{}-tool-{}", scope.prefix, tool_call_id));
let failed = matches!(fields.status, Some(ToolCallStatus::Failed));
let body = ObservationBody {
id: span_id,
trace_id: trace_id.to_string(),
end_time: Some(now.to_string()),
output: fields.raw_output.clone(),
level: failed.then_some(ObservationLevel::Error),
..Default::default()
};
vec![IngestionEvent::observation(
new_id(),
now.to_string(),
EventKind::SpanUpdate,
&body,
)]
}
fn scope_prefix(trace_id: &str, path: &[String]) -> String {
let mut s = trace_id.to_string();
for id in path {
s.push_str("-sub-");
s.push_str(id);
}
s
}
fn parent_tool_span_id(trace_id: &str, path: &[String]) -> String {
let (last, parent_path) = path.split_last().expect("path is non-empty");
format!("{}-tool-{}", scope_prefix(trace_id, parent_path), last)
}
fn usage_to_details(usage: &Usage) -> Option<serde_json::Map<String, serde_json::Value>> {
let mut map = serde_json::Map::new();
if let Some(v) = usage.input_tokens {
map.insert("input".into(), v.into());
}
if let Some(v) = usage.output_tokens {
map.insert("output".into(), v.into());
}
if let Some(v) = usage.cache_read_input_tokens {
map.insert("cache_read_input_tokens".into(), v.into());
}
if let Some(v) = usage.cache_creation_input_tokens {
map.insert("cache_creation_input_tokens".into(), v.into());
}
(!map.is_empty()).then_some(map)
}
fn content_text(content: &[ContentBlock]) -> String {
let mut out = String::new();
for block in content {
if let ContentBlock::Text(text) = block {
out.push_str(&text.text);
}
}
out
}
fn request_to_input(request: &LlmRequestSnapshot) -> serde_json::Value {
let mut messages: Vec<serde_json::Value> = Vec::new();
if let Some(system) = &request.system {
messages.push(serde_json::json!({ "role": "system", "content": system }));
}
for msg in &request.messages {
messages.push(message_to_value(msg));
}
serde_json::Value::Array(messages)
}
fn message_to_value(msg: &Message) -> serde_json::Value {
let role = match msg.role {
Role::User => "user",
Role::Assistant => "assistant",
};
let parts: Vec<serde_json::Value> = msg.content.iter().map(content_to_value).collect();
let content = match parts.as_slice() {
[serde_json::Value::String(s)] => serde_json::Value::String(s.clone()),
_ => serde_json::Value::Array(parts),
};
serde_json::json!({ "role": role, "content": content })
}
fn content_to_value(content: &MessageContent) -> serde_json::Value {
match content {
MessageContent::Text { text } => serde_json::Value::String(text.clone()),
MessageContent::Thinking { text, .. } => {
serde_json::json!({ "type": "thinking", "text": text })
}
MessageContent::ToolUse { id, name, args } => {
serde_json::json!({ "type": "tool_use", "id": id, "name": name, "input": args })
}
MessageContent::ToolResult {
tool_use_id,
is_error,
..
} => serde_json::json!({
"type": "tool_result",
"tool_use_id": tool_use_id,
"is_error": is_error,
}),
MessageContent::Image { mime, .. } => {
serde_json::json!({ "type": "image", "mime": mime })
}
MessageContent::ProviderActivity {
provider_id, kind, ..
} => serde_json::json!({
"type": "provider_activity",
"provider_id": provider_id,
"kind": format!("{kind:?}"),
}),
}
}