use std::fmt::Write as _;
use std::sync::{Arc, Mutex};
use agent_sdk_foundation::privacy::{RedactionPolicy, redact_string};
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry::global::BoxedSpan;
use opentelemetry::trace::Span;
use crate::events::AgentEvent;
use crate::llm::ContentBlock;
use crate::types::AgentInput;
use super::langfuse::{LANGFUSE_TRACE_OUTPUT, truncate_trace_text};
#[must_use]
pub fn langfuse_trace_input(input: &AgentInput, max_chars: usize) -> Option<String> {
let text = match input {
AgentInput::Text(text) => {
let text = text.trim();
if text.is_empty() {
return None;
}
text.to_owned()
}
AgentInput::Message(blocks) => summarize_message_input(blocks)?,
AgentInput::Resume {
confirmed,
rejection_reason,
..
} => {
if *confirmed {
"User approved tool confirmation".to_owned()
} else {
let reason = rejection_reason
.as_deref()
.map(str::trim)
.filter(|reason| !reason.is_empty());
reason.map_or_else(
|| "User rejected tool confirmation".to_owned(),
|reason| format!("User rejected tool confirmation: {reason}"),
)
}
}
AgentInput::Continue => "[Turn continuation]".to_owned(),
AgentInput::SubmitToolResults { .. } => "[External tool results]".to_owned(),
};
Some(truncate_trace_text(&text, max_chars))
}
fn summarize_message_input(blocks: &[ContentBlock]) -> Option<String> {
let first_block_is_text = blocks
.first()
.is_some_and(|block| matches!(block, ContentBlock::Text { .. }));
let summary = match blocks.first() {
Some(ContentBlock::Text { text }) => {
let text = text.trim();
if text.is_empty() {
None
} else {
Some(text.to_owned())
}
}
_ => None,
};
let mut text_attachment_count = 0usize;
let mut image_count = 0usize;
let mut document_count = 0usize;
for block in blocks.iter().skip(usize::from(first_block_is_text)) {
match block {
ContentBlock::Text { text } if !text.trim().is_empty() => {
text_attachment_count += 1;
}
ContentBlock::Image { .. } => image_count += 1,
ContentBlock::Document { .. } => document_count += 1,
_ => {}
}
}
let mut parts: Vec<String> = Vec::new();
if let Some(summary) = summary {
parts.push(summary);
}
let mut attachments: Vec<String> = Vec::new();
if text_attachment_count > 0 {
attachments.push(format!("{text_attachment_count} text attachment(s)"));
}
if image_count > 0 {
attachments.push(format!("{image_count} image attachment(s)"));
}
if document_count > 0 {
attachments.push(format!("{document_count} document attachment(s)"));
}
if !attachments.is_empty() {
parts.push(format!("[{}]", attachments.join(", ")));
}
if parts.is_empty() {
return None;
}
Some(parts.join("\n\n"))
}
#[must_use]
pub fn langfuse_trace_output(event: &AgentEvent) -> Option<String> {
match event {
AgentEvent::Text { text, .. } => non_empty(text),
AgentEvent::ToolCallStart {
name,
display_name,
input,
..
} => Some(summarize_tool_call_start(name, display_name, input)),
AgentEvent::ToolCallEnd { result, .. } => non_empty(&result.output),
AgentEvent::ToolRequiresConfirmation { description, .. } => non_empty(description),
AgentEvent::Error { message, .. } => non_empty(message),
AgentEvent::Refusal { text, .. } => text.as_deref().and_then(non_empty),
_ => None,
}
}
#[must_use]
pub const fn langfuse_trace_event_label(event: &AgentEvent) -> &'static str {
match event {
AgentEvent::Text { .. } => "Assistant",
AgentEvent::ToolCallStart { .. } => "Tool Call",
AgentEvent::ToolCallEnd { .. } => "Tool Result",
AgentEvent::ToolRequiresConfirmation { .. } => "Tool Confirmation",
AgentEvent::Error { .. } => "Error",
AgentEvent::Refusal { .. } => "Refusal",
AgentEvent::Cancelled { .. } => "Cancelled",
AgentEvent::UserInput { .. } => "User",
_ => "Event",
}
}
fn summarize_tool_call_start(name: &str, display_name: &str, input: &serde_json::Value) -> String {
let tool_name = name.trim();
let tool_name = if tool_name.is_empty() {
"Tool requested"
} else {
tool_name
};
let display_name = display_name.trim();
let display_name = display_name
.strip_prefix("Use ")
.unwrap_or(display_name)
.trim();
let mut output = tool_name.to_owned();
if !display_name.is_empty() && display_name != tool_name {
let _ = write!(output, "\nDisplay name: {display_name}");
}
if let serde_json::Value::Object(args) = input
&& !args.is_empty()
{
let mut keys: Vec<&str> = args.keys().map(String::as_str).collect();
keys.sort_unstable();
let _ = write!(output, "\nArguments: {}", keys.join(", "));
}
output
}
fn non_empty(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_owned())
}
}
pub struct RootTraceState {
redactor: RedactionPolicy,
max_chars: usize,
buffer: Mutex<String>,
}
impl RootTraceState {
pub fn new(max_chars: usize) -> Self {
let buffer = String::with_capacity(max_chars.saturating_add(64));
Self {
redactor: RedactionPolicy::baseline(),
max_chars,
buffer: Mutex::new(buffer),
}
}
pub fn attach_to(self: Arc<Self>, cx: &Context) -> Context {
cx.with_value(TraceStateHandle(self))
}
pub fn from_current_context() -> Option<Arc<Self>> {
let cx = Context::current();
cx.get::<TraceStateHandle>().map(|handle| handle.0.clone())
}
pub fn observe(&self, event: &AgentEvent) {
let Some(text) = langfuse_trace_output(event) else {
return;
};
let label = langfuse_trace_event_label(event);
self.append(label, &text);
}
pub fn observe_error(&self, message: &str) {
let trimmed = message.trim();
if trimmed.is_empty() {
return;
}
self.append("Error", trimmed);
}
pub fn flush(&self, span: &mut BoxedSpan) {
let Ok(buf) = self.buffer.lock() else {
log::warn!("langfuse trace-output buffer mutex poisoned; dropping flush");
return;
};
if buf.is_empty() {
return;
}
let truncated = truncate_trace_text(&buf, self.max_chars);
drop(buf);
span.set_attribute(KeyValue::new(LANGFUSE_TRACE_OUTPUT, truncated));
}
fn append(&self, label: &str, text: &str) {
let masked = redact_string(text, &self.redactor);
let Ok(mut buf) = self.buffer.lock() else {
log::warn!("langfuse trace-output buffer mutex poisoned; dropping update");
return;
};
if !buf.is_empty() {
buf.push_str("\n\n---\n\n");
}
let _ = write!(buf, "[{label}]\n{masked}");
}
}
#[derive(Clone)]
struct TraceStateHandle(Arc<RootTraceState>);
pub fn observe_current(event: &AgentEvent) {
if let Some(state) = RootTraceState::from_current_context() {
state.observe(event);
}
}
pub fn observe_current_error(message: &str) {
if let Some(state) = RootTraceState::from_current_context() {
state.observe_error(message);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::ContentSource;
use crate::types::{ContinuationEnvelope, ThreadId};
use anyhow::Context as _;
use serde_json::json;
#[test]
fn trace_input_returns_none_for_empty_text() {
let input = AgentInput::Text(" ".to_string());
assert!(langfuse_trace_input(&input, 100).is_none());
}
#[test]
fn trace_input_truncates_long_text() -> anyhow::Result<()> {
let long: String = "x".repeat(20);
let input = AgentInput::Text(long);
let result = langfuse_trace_input(&input, 5).context("expected Some")?;
assert_eq!(result.chars().count(), 5);
assert!(result.ends_with('…'));
Ok(())
}
#[test]
fn trace_input_message_summarises_attachments() -> anyhow::Result<()> {
let blocks = vec![
ContentBlock::Text {
text: "hello".to_string(),
},
ContentBlock::Image {
source: ContentSource::new("image/png", "aGk="),
},
ContentBlock::Document {
source: ContentSource::new("application/pdf", "cGRm"),
},
];
let input = AgentInput::Message(blocks);
let result = langfuse_trace_input(&input, 1000).context("expected Some")?;
assert!(result.contains("hello"));
assert!(result.contains("1 image attachment"));
assert!(result.contains("1 document attachment"));
Ok(())
}
#[test]
fn trace_input_message_returns_none_when_only_thinking() {
let blocks = vec![ContentBlock::Thinking {
thinking: "internal".to_string(),
signature: None,
}];
let input = AgentInput::Message(blocks);
assert!(langfuse_trace_input(&input, 100).is_none());
}
#[test]
fn trace_input_resume_confirmed() -> anyhow::Result<()> {
use crate::types::TokenUsage;
let env = ContinuationEnvelope::wrap(crate::types::AgentContinuation {
thread_id: ThreadId::from_string("t"),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: Vec::new(),
awaiting_index: 0,
completed_results: Vec::new(),
state: crate::types::AgentState::new(ThreadId::from_string("t")),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
});
let input = AgentInput::Resume {
continuation: Box::new(env),
tool_call_id: "call_1".to_string(),
confirmed: true,
rejection_reason: None,
};
let out = langfuse_trace_input(&input, 100).context("expected Some")?;
assert_eq!(out, "User approved tool confirmation");
Ok(())
}
#[test]
fn trace_input_continue_and_submit() {
use crate::types::TokenUsage;
let env = ContinuationEnvelope::wrap(crate::types::AgentContinuation {
thread_id: ThreadId::from_string("t"),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: Vec::new(),
awaiting_index: 0,
completed_results: Vec::new(),
state: crate::types::AgentState::new(ThreadId::from_string("t")),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
});
let cont = AgentInput::Continue;
assert_eq!(
langfuse_trace_input(&cont, 100).as_deref(),
Some("[Turn continuation]")
);
let submit = AgentInput::SubmitToolResults {
continuation: Box::new(env),
results: Vec::new(),
};
assert_eq!(
langfuse_trace_input(&submit, 100).as_deref(),
Some("[External tool results]")
);
}
#[test]
fn trace_output_text_returns_text() {
let event = AgentEvent::text("m", "hello");
assert_eq!(langfuse_trace_output(&event).as_deref(), Some("hello"));
}
#[test]
fn trace_output_skipped_for_internal_events() {
let start = AgentEvent::Start {
thread_id: ThreadId::from_string("t"),
turn: 1,
};
assert!(langfuse_trace_output(&start).is_none());
let delta = AgentEvent::text_delta("m", "x");
assert!(langfuse_trace_output(&delta).is_none());
}
#[test]
fn trace_output_tool_call_summarises_arguments() -> anyhow::Result<()> {
let event = AgentEvent::tool_call_start(
"call_1",
"ls",
"List Files",
json!({"path": "/", "depth": 1}),
crate::types::ToolTier::Observe,
);
let out = langfuse_trace_output(&event).context("expected Some")?;
assert!(out.contains("ls"));
assert!(out.contains("Arguments: depth, path"));
Ok(())
}
#[test]
fn trace_output_label_for_text() {
assert_eq!(
langfuse_trace_event_label(&AgentEvent::text("m", "hi")),
"Assistant"
);
assert_eq!(
langfuse_trace_event_label(&AgentEvent::tool_call_end(
"id",
"ls",
"List",
crate::types::ToolResult::success("done"),
)),
"Tool Result"
);
assert_eq!(
langfuse_trace_event_label(&AgentEvent::error("boom", false)),
"Error"
);
}
}