use crate::llms::ChatCompletionResult;
use crate::telemetry::{
ContextEmergencyShrink, FinishReason as TelemFinishReason, LlmError, LlmRequestComplete,
LlmRequestFailed, LlmRequestStalled, LlmRequestStart, RecentToolOutput, TelemetryContext,
TelemetryEmitterMux, TelemetryEvent,
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
pub const STALLED_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
pub struct LlmRequestSpan {
emitter: Option<TelemetryEmitterMux>,
ctx: TelemetryContext,
request_id: String,
provider_id: String,
start_ts: Instant,
terminated: bool,
stalled_cancel: Arc<Notify>,
stalled_handle: Option<JoinHandle<()>>,
}
impl LlmRequestSpan {
#[allow(clippy::too_many_arguments)]
pub fn start(
emitter: Option<&TelemetryEmitterMux>,
ctx: &TelemetryContext,
request_id: &str,
attempt: u32,
model: &str,
provider_id: &str,
estimated_input_tokens: u32,
context_utilization_pct: f64,
recent_tool_output_bytes: u64,
) -> Self {
Self::start_with_stalled_interval(
emitter,
ctx,
request_id,
attempt,
model,
provider_id,
estimated_input_tokens,
context_utilization_pct,
recent_tool_output_bytes,
STALLED_HEARTBEAT_INTERVAL,
)
}
#[doc(hidden)]
#[allow(clippy::too_many_arguments)]
pub fn start_with_stalled_interval(
emitter: Option<&TelemetryEmitterMux>,
ctx: &TelemetryContext,
request_id: &str,
attempt: u32,
model: &str,
provider_id: &str,
estimated_input_tokens: u32,
context_utilization_pct: f64,
recent_tool_output_bytes: u64,
stalled_interval: Duration,
) -> Self {
if let Some(em) = emitter {
em.emit(&TelemetryEvent::LlmRequestStart(LlmRequestStart {
common: ctx.common(),
request_id: request_id.to_string(),
model: model.to_string(),
provider_id: provider_id.to_string(),
attempt,
estimated_input_tokens,
context_utilization_pct,
recent_tool_output_bytes,
}));
}
let start_ts = Instant::now();
let stalled_cancel = Arc::new(Notify::new());
let stalled_handle = if let Some(em) = emitter {
let emitter_clone = em.clone();
let ctx_clone = ctx.clone();
let request_id_clone = request_id.to_string();
let cancel = stalled_cancel.clone();
Some(tokio::spawn(async move {
let until_first_tick = stalled_interval.saturating_sub(start_ts.elapsed());
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + until_first_tick,
stalled_interval,
);
loop {
tokio::select! {
biased;
_ = cancel.notified() => break,
_ = ticker.tick() => {
emitter_clone.emit(&TelemetryEvent::LlmRequestStalled(
LlmRequestStalled {
common: ctx_clone.common(),
request_id: request_id_clone.clone(),
elapsed_ms: start_ts.elapsed().as_millis() as u64,
ttft_received: false,
last_token_ms: None,
},
));
}
}
}
}))
} else {
None
};
Self {
emitter: emitter.cloned(),
ctx: ctx.clone(),
request_id: request_id.to_string(),
provider_id: provider_id.to_string(),
start_ts,
terminated: false,
stalled_cancel,
stalled_handle,
}
}
async fn cancel_heartbeat(&mut self) {
self.stalled_cancel.notify_one();
if let Some(handle) = self.stalled_handle.take() {
let _ = handle.await;
}
}
fn cancel_heartbeat_sync(&mut self) {
self.stalled_cancel.notify_one();
if let Some(handle) = self.stalled_handle.take() {
handle.abort();
}
}
pub async fn complete(
&mut self,
result: &ChatCompletionResult,
cost_usd: f64,
messages_chars: u32,
max_tokens_requested: Option<u32>,
) {
self.cancel_heartbeat().await;
if let Some(ref emitter) = self.emitter {
let response = &result.response;
let usage = response.usage.as_ref();
let input_tokens = usage.map(|u| u.prompt_tokens).unwrap_or(0);
let output_tokens = usage.map(|u| u.completion_tokens).unwrap_or(0);
let response_chars: u32 = response
.choices
.iter()
.map(|c| c.message.content.as_deref().unwrap_or("").chars().count() as u32)
.sum();
let tool_calls_emitted: u32 = response
.choices
.iter()
.map(|c| {
c.message
.tool_calls
.as_ref()
.map(|tc| tc.len())
.unwrap_or(0) as u32
})
.sum();
let finish_reason = response
.choices
.first()
.and_then(|c| c.finish_reason)
.map(|fr| match fr {
async_openai::types::FinishReason::Stop => TelemFinishReason::Stop,
async_openai::types::FinishReason::Length => TelemFinishReason::Length,
async_openai::types::FinishReason::ToolCalls => TelemFinishReason::ToolCalls,
async_openai::types::FinishReason::ContentFilter => TelemFinishReason::Error,
async_openai::types::FinishReason::FunctionCall => TelemFinishReason::ToolCalls,
})
.unwrap_or(TelemFinishReason::Stop);
emitter.emit(&TelemetryEvent::LlmRequestComplete(LlmRequestComplete {
common: self.ctx.common(),
request_id: self.request_id.clone(),
latency_ms: self.start_ts.elapsed().as_millis() as u64,
ttft_ms: result.timing.ttft_ms,
generation_ms: result.timing.generation_ms,
input_tokens,
output_tokens,
reasoning_tokens: usage
.as_ref()
.and_then(|u| {
u.completion_tokens_details
.as_ref()
.and_then(|d| d.reasoning_tokens)
})
.unwrap_or(0),
cached_tokens: usage
.as_ref()
.and_then(|u| {
u.prompt_tokens_details
.as_ref()
.and_then(|d| d.cached_tokens)
})
.unwrap_or(0),
cost_usd,
finish_reason,
provider_backend: result.provider_backend.clone(),
claim_assessments_emitted: None,
disagreements_emitted: None,
messages_chars,
max_tokens_requested,
response_chars,
tool_calls_emitted,
max_tokens_shrunk_to_floor: result
.shrink_info
.as_ref()
.is_some_and(|s| s.floor_used),
available_space_at_dispatch: result.shrink_info.as_ref().map(|s| s.available_space),
}));
if let Some(ref shrink) = result.shrink_info
&& shrink.floor_used
{
emitter.emit(&TelemetryEvent::ContextEmergencyShrink(
ContextEmergencyShrink {
common: self.ctx.common(),
available_space: shrink.available_space,
requested_max: shrink.requested_max,
floor_used: shrink.floor,
estimated_input: shrink.estimated_input,
context_window: shrink.context_window,
recent_tool_outputs: Vec::<RecentToolOutput>::new(),
},
));
}
}
self.terminated = true;
}
pub async fn fail(&mut self, error: &LlmError) {
self.cancel_heartbeat().await;
if let Some(ref emitter) = self.emitter {
let (error_class, http_status) = error.classify();
emitter.emit(&TelemetryEvent::LlmRequestFailed(LlmRequestFailed {
common: self.ctx.common(),
request_id: self.request_id.clone(),
error_class,
http_status,
retry_after_ms: match error {
LlmError::RateLimit { retry_after_ms, .. } => *retry_after_ms,
_ => None,
},
latency_ms: self.start_ts.elapsed().as_millis() as u64,
provider_id: self.provider_id.clone(),
provider_backend: None,
}));
}
self.terminated = true;
}
}
impl Drop for LlmRequestSpan {
fn drop(&mut self) {
self.cancel_heartbeat_sync();
if self.terminated {
return;
}
let Some(ref emitter) = self.emitter else {
return;
};
emitter.emit(&TelemetryEvent::LlmRequestFailed(LlmRequestFailed {
common: self.ctx.common(),
request_id: self.request_id.clone(),
error_class: crate::telemetry::LlmErrorClass::Other,
http_status: None,
retry_after_ms: None,
latency_ms: self.start_ts.elapsed().as_millis() as u64,
provider_id: self.provider_id.clone(),
provider_backend: None,
}));
}
}