use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::agents::DeliberationPhase;
pub const TELEMETRY_AGENT_PREFIX: &str = "telemetry.agent";
pub const TRACE_ID_LEN: usize = 32;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TelemetryConfig {
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub endpoints: Vec<TelemetryEndpointConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TelemetryEndpointConfig {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nats_url: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub creds: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subject_prefix: Option<String>,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
enabled: true,
endpoints: Vec::new(),
}
}
}
fn default_enabled() -> bool {
true
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TelemetrySource {
Agent {
agent_id: String,
},
}
impl TelemetrySource {
pub fn agent(agent_id: impl Into<String>) -> Result<Self, String> {
let agent_id = agent_id.into();
crate::nats_utils::validate_nats_name(&agent_id, "agent_id")?;
Ok(TelemetrySource::Agent { agent_id })
}
pub fn subject(&self, kind: &str, custom_prefix: Option<&str>) -> Result<String, String> {
if let Some(prefix) = custom_prefix {
for segment in prefix.split('.') {
crate::nats_utils::validate_nats_name(segment, "telemetry custom_prefix segment")?;
}
}
match self {
TelemetrySource::Agent { agent_id } => {
crate::nats_utils::validate_nats_name(agent_id, "agent_id")?;
let prefix = custom_prefix.unwrap_or(TELEMETRY_AGENT_PREFIX);
Ok(format!("{prefix}.{agent_id}.{kind}"))
}
}
}
}
pub fn derive_trace_id(
job_id: &str,
round: u32,
phase: DeliberationPhase,
agent_id: &str,
) -> String {
let input = format!(
"{}:{job_id}|{round}|{}|{}:{agent_id}",
job_id.len(),
phase.as_str(),
agent_id.len(),
);
let digest = Sha256::digest(input.as_bytes());
let mut out = String::with_capacity(TRACE_ID_LEN);
for byte in digest.iter().take(TRACE_ID_LEN / 2) {
use std::fmt::Write;
write!(out, "{byte:02x}").expect("writing to String never fails");
}
debug_assert_eq!(out.len(), TRACE_ID_LEN);
out
}
pub fn trace_id_for(job_id: &str, round: u32, phase: DeliberationPhase, agent_id: &str) -> String {
derive_trace_id(job_id, round, phase, agent_id)
}
fn session_less_trace_id(agent_id: &str) -> String {
let uuid = uuid::Uuid::new_v4();
let input = format!("nosess|{}:{agent_id}|{}", agent_id.len(), uuid.as_simple());
let digest = Sha256::digest(input.as_bytes());
let mut out = String::with_capacity(TRACE_ID_LEN);
for byte in digest.iter().take(TRACE_ID_LEN / 2) {
use std::fmt::Write;
write!(out, "{byte:02x}").expect("writing to String never fails");
}
debug_assert_eq!(out.len(), TRACE_ID_LEN);
out
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentEventCommon {
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub round: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase: Option<DeliberationPhase>,
pub ts: i64,
pub trace_id: String,
}
#[derive(Debug, Clone)]
pub struct TelemetryContext {
agent_id: String,
job_id: Option<String>,
round: Option<u32>,
phase: Option<DeliberationPhase>,
trace_id: String,
}
impl TelemetryContext {
pub fn new(
agent_id: &str,
job_id: Option<&str>,
round: Option<u32>,
phase: Option<DeliberationPhase>,
) -> Self {
let trace_id = match (job_id, round, phase) {
(Some(j), Some(r), Some(p)) => derive_trace_id(j, r, p, agent_id),
_ => session_less_trace_id(agent_id),
};
Self {
agent_id: agent_id.to_string(),
job_id: job_id.map(|s| s.to_string()),
round,
phase,
trace_id,
}
}
pub fn common(&self) -> AgentEventCommon {
AgentEventCommon {
agent_id: self.agent_id.clone(),
job_id: self.job_id.clone(),
round: self.round,
phase: self.phase,
ts: chrono::Utc::now().timestamp_millis(),
trace_id: self.trace_id.clone(),
}
}
}
#[macro_export]
macro_rules! emit_event {
($emitter:expr, $ctx:expr, $variant:ident { $($field:ident $(: $value:expr)?),* $(,)? }) => {
if let Some(emitter) = $emitter {
let event = $crate::telemetry::TelemetryEvent::$variant($crate::telemetry::$variant {
common: $ctx.common(),
$($field $(: $value)?),*
});
emitter.emit(&event);
}
};
}
#[macro_export]
macro_rules! emit_for {
($context:expr, $variant:ident { $($field:ident $(: $value:expr)?),* $(,)? }) => {
if let Some(ref emitter) = $context.telemetry {
let envelope = $context.telemetry_for();
let event = $crate::telemetry::TelemetryEvent::$variant($crate::telemetry::$variant {
common: envelope.common(),
$($field $(: $value)?),*
});
emitter.emit(&event);
}
};
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LlmErrorClass {
Transport,
RateLimit,
PaymentRequired,
ServerError,
ContextOverflow,
Parse,
Other,
}
pub use crate::llms::LlmError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FinishReason {
Stop,
Length,
ToolCalls,
Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RetryReason {
EmptyContent,
SchemaError,
Truncated,
HallucinatedTool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskFailureClass {
LlmExhausted,
ToolError,
Timeout,
ContextOverflow,
ParseRetryExhausted,
EmptyContentAfterRetries,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NatsConnectionState {
Connected,
Disconnected,
Reconnecting,
Closed,
}
impl From<&async_nats::connection::State> for NatsConnectionState {
fn from(s: &async_nats::connection::State) -> Self {
match s {
async_nats::connection::State::Connected => Self::Connected,
async_nats::connection::State::Disconnected => Self::Disconnected,
async_nats::connection::State::Pending => Self::Reconnecting,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LlmRequestStart {
#[serde(flatten)]
pub common: AgentEventCommon,
pub request_id: String,
pub model: String,
pub provider_id: String,
pub attempt: u32,
pub estimated_input_tokens: u32,
#[serde(default)]
pub context_utilization_pct: f64,
#[serde(default)]
pub recent_tool_output_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LlmRequestComplete {
#[serde(flatten)]
pub common: AgentEventCommon,
pub request_id: String,
pub latency_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ttft_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub generation_ms: Option<u64>,
pub input_tokens: u32,
pub output_tokens: u32,
#[serde(default)]
pub reasoning_tokens: u32,
#[serde(default)]
pub cached_tokens: u32,
#[serde(default)]
pub cost_usd: f64,
pub finish_reason: FinishReason,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub provider_backend: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub claim_assessments_emitted: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub disagreements_emitted: Option<u32>,
#[serde(default)]
pub messages_chars: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_tokens_requested: Option<u32>,
#[serde(default)]
pub response_chars: u32,
#[serde(default)]
pub tool_calls_emitted: u32,
#[serde(default)]
pub max_tokens_shrunk_to_floor: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub available_space_at_dispatch: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LlmRequestFailed {
#[serde(flatten)]
pub common: AgentEventCommon,
pub request_id: String,
pub error_class: LlmErrorClass,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub http_status: Option<u16>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retry_after_ms: Option<u64>,
pub latency_ms: u64,
pub provider_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub provider_backend: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LlmRequestStalled {
#[serde(flatten)]
pub common: AgentEventCommon,
pub request_id: String,
pub elapsed_ms: u64,
pub ttft_received: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_token_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ToolCallExecuted {
#[serde(flatten)]
pub common: AgentEventCommon,
pub tool_name: String,
pub latency_ms: u64,
pub success: bool,
#[serde(default)]
pub output_bytes: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_tokens_estimated: Option<u32>,
#[serde(default)]
pub truncated: bool,
#[serde(default)]
pub paginated: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RetryLoopAttempt {
#[serde(flatten)]
pub common: AgentEventCommon,
pub attempt: u32,
pub reason: RetryReason,
pub cumulative_latency_ms: u64,
#[serde(default)]
pub cumulative_cost_usd: f64,
#[serde(default)]
pub cumulative_input_tokens: u32,
#[serde(default)]
pub cumulative_output_tokens: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TaskAccepted {
#[serde(flatten)]
pub common: AgentEventCommon,
pub dispatch_delay_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_publish_ts: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job_age_at_accept_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TaskCompleted {
#[serde(flatten)]
pub common: AgentEventCommon,
pub duration_ms: u64,
pub dispatch_delay_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub queue_wait_ms: Option<u64>,
pub phase_budget_remaining_ms: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub llm_attempts: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_call_count: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pending_publish_depth: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TaskFailed {
#[serde(flatten)]
pub common: AgentEventCommon,
pub duration_ms: u64,
pub dispatch_delay_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub queue_wait_ms: Option<u64>,
pub phase_budget_remaining_ms: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub llm_attempts: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_call_count: Option<u32>,
pub failure_class: TaskFailureClass,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pending_publish_depth: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NatsConnectionStateChanged {
#[serde(flatten)]
pub common: AgentEventCommon,
pub state: NatsConnectionState,
pub reconnects_so_far: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pending_publish_depth: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub buffer_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PromptExposureDetected {
#[serde(flatten)]
pub common: AgentEventCommon,
pub terminal_tool: String,
pub blocked: bool,
pub hit_count: u32,
pub response_length_chars: u32,
pub suspicion_score: f64,
pub xml_tag_hits: u32,
pub tool_name_hits: u32,
pub instruction_hits: u32,
pub wrong_acronym_hits: u32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub sample_hits: Vec<String>,
}
impl PromptExposureDetected {
const ALLOWED_PREFIXES: &'static [&'static str] =
&["xml-tag ", "tool-name ", "instruction ", "wrong-acronym "];
pub fn validate(&self) -> Result<(), String> {
let sum = self
.xml_tag_hits
.saturating_add(self.tool_name_hits)
.saturating_add(self.instruction_hits)
.saturating_add(self.wrong_acronym_hits);
if self.hit_count != sum {
return Err(format!(
"hit_count {} != sum of category hits {} \
(xml={}: tool={}: instruction={}: acronym={})",
self.hit_count,
sum,
self.xml_tag_hits,
self.tool_name_hits,
self.instruction_hits,
self.wrong_acronym_hits
));
}
for (i, hit) in self.sample_hits.iter().enumerate() {
if hit.len() > 64 {
return Err(format!(
"sample_hits[{i}] exceeds 64 chars ({}); \
may contain raw content",
hit.len()
));
}
if !Self::ALLOWED_PREFIXES.iter().any(|p| hit.starts_with(p)) {
return Err(format!(
"sample_hits[{i}] does not start with a known dictionary prefix: {hit:?}"
));
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RecentToolOutput {
pub tool: String,
pub bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ContextEmergencyShrink {
#[serde(flatten)]
pub common: AgentEventCommon,
pub available_space: u32,
pub requested_max: u32,
pub floor_used: u32,
pub estimated_input: u32,
pub context_window: u32,
#[serde(default)]
pub recent_tool_outputs: Vec<RecentToolOutput>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClaudeSubprocessSpawn {
#[serde(flatten)]
pub common: AgentEventCommon,
pub session_id: String,
pub lock_present_at_spawn: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClaudeSubprocessExit {
#[serde(flatten)]
pub common: AgentEventCommon,
pub session_id: String,
pub exit_code: i32,
pub wallclock_ms: u64,
pub session_lock_released: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClaudeSessionLockCollision {
#[serde(flatten)]
pub common: AgentEventCommon,
pub session_id: String,
pub prior_lock_age_secs: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prior_pid: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ApiError {
#[serde(flatten)]
pub common: AgentEventCommon,
pub http_status: u16,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error_code: Option<String>,
pub endpoint: String,
pub method: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TelemetryEvent {
LlmRequestStart(LlmRequestStart),
LlmRequestComplete(LlmRequestComplete),
LlmRequestFailed(LlmRequestFailed),
LlmRequestStalled(LlmRequestStalled),
ToolCallExecuted(ToolCallExecuted),
RetryLoopAttempt(RetryLoopAttempt),
TaskAccepted(TaskAccepted),
TaskCompleted(TaskCompleted),
TaskFailed(TaskFailed),
#[serde(rename = "nats_connection_state")]
NatsConnectionStateChanged(NatsConnectionStateChanged),
PromptExposureDetected(PromptExposureDetected),
ApiError(ApiError),
ContextEmergencyShrink(ContextEmergencyShrink),
ClaudeSubprocessSpawn(ClaudeSubprocessSpawn),
ClaudeSubprocessExit(ClaudeSubprocessExit),
ClaudeSessionLockCollision(ClaudeSessionLockCollision),
}
impl TelemetryEvent {
pub fn kind(&self) -> &'static str {
match self {
TelemetryEvent::LlmRequestStart(_) => "llm_request_start",
TelemetryEvent::LlmRequestComplete(_) => "llm_request_complete",
TelemetryEvent::LlmRequestFailed(_) => "llm_request_failed",
TelemetryEvent::LlmRequestStalled(_) => "llm_request_stalled",
TelemetryEvent::ToolCallExecuted(_) => "tool_call_executed",
TelemetryEvent::RetryLoopAttempt(_) => "retry_loop_attempt",
TelemetryEvent::TaskAccepted(_) => "task_accepted",
TelemetryEvent::TaskCompleted(_) => "task_completed",
TelemetryEvent::TaskFailed(_) => "task_failed",
TelemetryEvent::NatsConnectionStateChanged(_) => "nats_connection_state",
TelemetryEvent::PromptExposureDetected(_) => "prompt_exposure_detected",
TelemetryEvent::ApiError(_) => "api_error",
TelemetryEvent::ContextEmergencyShrink(_) => "context_emergency_shrink",
TelemetryEvent::ClaudeSubprocessSpawn(_) => "claude_subprocess_spawn",
TelemetryEvent::ClaudeSubprocessExit(_) => "claude_subprocess_exit",
TelemetryEvent::ClaudeSessionLockCollision(_) => "claude_session_lock_collision",
}
}
pub fn agent_id(&self) -> &str {
match self {
TelemetryEvent::LlmRequestStart(e) => &e.common.agent_id,
TelemetryEvent::LlmRequestComplete(e) => &e.common.agent_id,
TelemetryEvent::LlmRequestFailed(e) => &e.common.agent_id,
TelemetryEvent::LlmRequestStalled(e) => &e.common.agent_id,
TelemetryEvent::ToolCallExecuted(e) => &e.common.agent_id,
TelemetryEvent::RetryLoopAttempt(e) => &e.common.agent_id,
TelemetryEvent::TaskAccepted(e) => &e.common.agent_id,
TelemetryEvent::TaskCompleted(e) => &e.common.agent_id,
TelemetryEvent::TaskFailed(e) => &e.common.agent_id,
TelemetryEvent::NatsConnectionStateChanged(e) => &e.common.agent_id,
TelemetryEvent::PromptExposureDetected(e) => &e.common.agent_id,
TelemetryEvent::ApiError(e) => &e.common.agent_id,
TelemetryEvent::ContextEmergencyShrink(e) => &e.common.agent_id,
TelemetryEvent::ClaudeSubprocessSpawn(e) => &e.common.agent_id,
TelemetryEvent::ClaudeSubprocessExit(e) => &e.common.agent_id,
TelemetryEvent::ClaudeSessionLockCollision(e) => &e.common.agent_id,
}
}
}
fn source_agent_matches(source: &TelemetrySource, event: &TelemetryEvent) -> bool {
match source {
TelemetrySource::Agent { agent_id: src_id } => event.agent_id() == *src_id,
}
}
#[derive(Clone)]
pub struct TelemetryEmitter {
client: async_nats::Client,
source: TelemetrySource,
custom_prefix: Option<String>,
dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
}
impl std::fmt::Debug for TelemetryEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TelemetryEmitter")
.field("source", &self.source)
.field("custom_prefix", &self.custom_prefix)
.field("dropped", &self.dropped_count())
.finish()
}
}
impl TelemetryEmitter {
pub fn new(client: async_nats::Client, source: TelemetrySource) -> Self {
Self {
client,
source,
custom_prefix: None,
dropped: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.custom_prefix = Some(prefix.into());
self
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn emit(&self, event: &TelemetryEvent) {
if !source_agent_matches(&self.source, event) {
let src_id = match &self.source {
TelemetrySource::Agent { agent_id } => agent_id.as_str(),
};
tracing::warn!(
event_kind = event.kind(),
emitter_agent_id = src_id,
event_agent_id = event.agent_id(),
"dropping telemetry event: payload agent_id does not match emitter"
);
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
}
if let TelemetryEvent::PromptExposureDetected(detected) = event {
if let Err(e) = detected.validate() {
tracing::warn!(
error = %e,
"dropping invalid PromptExposureDetected event"
);
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
}
}
let subject = match self
.source
.subject(event.kind(), self.custom_prefix.as_deref())
{
Ok(s) => s,
Err(_) => {
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
}
};
let payload = match serde_json::to_vec(event) {
Ok(bytes) => bytes,
Err(_) => {
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
}
};
let handle = match tokio::runtime::Handle::try_current() {
Ok(h) => h,
Err(_) => {
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
}
};
let client = self.client.clone();
let dropped = self.dropped.clone();
handle.spawn(async move {
if client.publish(subject, payload.into()).await.is_err() {
dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
});
}
}
#[derive(Clone)]
pub struct TelemetryEmitterMux {
endpoints: Vec<(String, TelemetryEmitter)>,
}
impl std::fmt::Debug for TelemetryEmitterMux {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let names: Vec<&str> = self.endpoints.iter().map(|(n, _)| n.as_str()).collect();
f.debug_struct("TelemetryEmitterMux")
.field("endpoints", &names)
.field("dropped", &self.dropped_count())
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TelemetryMuxError {
DuplicateNames(Vec<String>),
}
impl std::fmt::Display for TelemetryMuxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateNames(names) => write!(
f,
"telemetry endpoint names must be unique; duplicates: {names:?}"
),
}
}
}
impl std::error::Error for TelemetryMuxError {}
fn validate_endpoint_names(names: &[String]) -> Result<(), TelemetryMuxError> {
let mut seen = std::collections::HashSet::with_capacity(names.len());
let mut dups: Vec<String> = Vec::new();
for n in names {
if !seen.insert(n.as_str()) && !dups.iter().any(|d| d == n) {
dups.push(n.clone());
}
}
if dups.is_empty() {
Ok(())
} else {
Err(TelemetryMuxError::DuplicateNames(dups))
}
}
impl TelemetryEmitterMux {
pub fn new(endpoints: Vec<(String, TelemetryEmitter)>) -> Result<Self, TelemetryMuxError> {
let names: Vec<String> = endpoints.iter().map(|(n, _)| n.clone()).collect();
validate_endpoint_names(&names)?;
Ok(Self { endpoints })
}
pub fn single(name: impl Into<String>, emitter: TelemetryEmitter) -> Self {
Self {
endpoints: vec![(name.into(), emitter)],
}
}
pub fn len(&self) -> usize {
self.endpoints.len()
}
pub fn is_empty(&self) -> bool {
self.endpoints.is_empty()
}
pub fn endpoint_names(&self) -> Vec<&str> {
self.endpoints.iter().map(|(n, _)| n.as_str()).collect()
}
pub fn dropped_count(&self) -> u64 {
self.endpoints.iter().map(|(_, e)| e.dropped_count()).sum()
}
pub fn emit(&self, event: &TelemetryEvent) {
for (_, emitter) in &self.endpoints {
emitter.emit(event);
}
}
pub fn emit_for(&self, name: &str, event: &TelemetryEvent) {
if let Some((_, emitter)) = self.endpoints.iter().find(|(n, _)| n == name) {
emitter.emit(event);
} else {
tracing::debug!(
requested = %name,
available = ?self.endpoint_names(),
"telemetry emit_for: endpoint name not configured, dropping event"
);
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum TelemetryConnectError {
#[error("invalid agent_id: {0}")]
InvalidAgentId(String),
#[error("telemetry endpoint `{name}` missing nats_url")]
MissingNatsUrl {
name: String,
},
#[error("connect to NATS for telemetry endpoint `{name}` failed: {source}")]
NatsConnect {
name: String,
#[source]
source: anyhow::Error,
},
#[error("mux construction: {0}")]
Mux(#[from] TelemetryMuxError),
}
pub async fn connect_endpoints(
config: &TelemetryConfig,
agent_id: &str,
) -> Result<Option<TelemetryEmitterMux>, TelemetryConnectError> {
if !config.enabled || config.endpoints.is_empty() {
return Ok(None);
}
let source = TelemetrySource::agent(agent_id)
.map_err(|_| TelemetryConnectError::InvalidAgentId(agent_id.to_string()))?;
let mut emitters = Vec::with_capacity(config.endpoints.len());
for ep in &config.endpoints {
let url = ep
.nats_url
.as_deref()
.ok_or_else(|| TelemetryConnectError::MissingNatsUrl {
name: ep.name.clone(),
})?;
let auth = ep.creds.as_ref().map(|path| crate::nats_utils::NatsAuth {
creds_file: Some(path.clone()),
..Default::default()
});
let client = crate::nats_utils::connect_nats(url, auth.as_ref())
.await
.map_err(|e| TelemetryConnectError::NatsConnect {
name: ep.name.clone(),
source: e,
})?;
let mut emitter = TelemetryEmitter::new(client, source.clone());
if let Some(prefix) = &ep.subject_prefix {
emitter = emitter.with_prefix(prefix);
}
emitters.push((ep.name.clone(), emitter));
}
Ok(Some(TelemetryEmitterMux::new(emitters)?))
}
pub fn redact_content(input: &str, max_length: usize) -> String {
if input.is_empty() {
return "<empty>".to_string();
}
let lower = input.to_lowercase();
let sensitive_words = [
"password",
"api_key",
"secret_key",
"access_key",
"credential",
"bearer",
];
for word in &sensitive_words {
let mut search = &lower[..];
while let Some(pos) = search.find(word) {
let before_ok = pos == 0 || !search.as_bytes()[pos - 1].is_ascii_alphanumeric();
let after_pos = pos + word.len();
let after_ok =
after_pos >= search.len() || !search.as_bytes()[after_pos].is_ascii_alphanumeric();
if before_ok && after_ok {
return "REDACTED_SENSITIVE".to_string();
}
search = &search[pos + 1..];
}
}
if input.chars().count() <= max_length {
input.to_string()
} else {
let truncate_at = input
.char_indices()
.nth(max_length)
.map(|(i, _)| i)
.unwrap_or(input.len());
format!("{}...", &input[..truncate_at])
}
}
pub fn redact_error_message(error_msg: &str) -> String {
redact_content(error_msg, 100)
}
pub fn redact_url(url: &str) -> String {
let base = url.split(['?', '#']).next().unwrap();
if let Some(scheme_end) = base.find("://") {
let scheme = &base[..scheme_end + 3];
let after_scheme = &base[scheme_end + 3..];
if let Some(at_pos) = after_scheme.find('@') {
let slash_pos = after_scheme.find('/');
if slash_pos.is_none_or(|s| s > at_pos) {
return format!("{scheme}{}", &after_scheme[at_pos + 1..]);
}
}
}
base.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_agent_common() -> AgentEventCommon {
AgentEventCommon {
agent_id: "CortexB".to_string(),
job_id: Some("job-123".to_string()),
round: Some(3),
phase: Some(DeliberationPhase::Proposing),
ts: 1_776_790_692_747,
trace_id: derive_trace_id("job-123", 3, DeliberationPhase::Proposing, "CortexB"),
}
}
#[test]
fn trace_id_width_is_at_least_128_bits() {
const _: () = assert!(
TRACE_ID_LEN >= 32,
"trace_id must carry at least 128 bits (32 hex chars)"
);
let out = derive_trace_id("job-x", 1, DeliberationPhase::Proposing, "a");
assert_eq!(out.len(), TRACE_ID_LEN);
}
#[test]
fn telemetry_context_trace_id_shape_is_uniform() {
let task_ctx = TelemetryContext::new(
"alice",
Some("job-1"),
Some(2),
Some(DeliberationPhase::Proposing),
);
let task_trace = task_ctx.common().trace_id;
assert_eq!(task_trace.len(), TRACE_ID_LEN);
assert!(task_trace.chars().all(|c| c.is_ascii_hexdigit()));
let sessionless = TelemetryContext::new("alice", None, None, None);
let sl_trace = sessionless.common().trace_id;
assert_eq!(sl_trace.len(), TRACE_ID_LEN);
assert!(sl_trace.chars().all(|c| c.is_ascii_hexdigit()));
let sl2 = TelemetryContext::new("alice", None, None, None);
assert_ne!(sl_trace, sl2.common().trace_id);
}
#[test]
fn empty_mux_is_empty_and_no_op_safe() {
let mux = TelemetryEmitterMux::new(vec![]).expect("empty mux is valid");
assert!(mux.is_empty());
assert_eq!(mux.len(), 0);
assert_eq!(mux.dropped_count(), 0);
assert!(mux.endpoint_names().is_empty());
let evt = TelemetryEvent::TaskAccepted(TaskAccepted {
common: sample_agent_common(),
dispatch_delay_ms: 0,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
mux.emit(&evt);
mux.emit_for("does-not-exist", &evt);
}
#[test]
fn telemetry_endpoint_config_serde_roundtrip() {
let cfg = TelemetryConfig {
enabled: true,
endpoints: vec![
TelemetryEndpointConfig {
name: "service".into(),
nats_url: Some("nats://orch.example.com:4222".into()),
creds: Some("/etc/nsed/agent-service.creds".into()),
subject_prefix: None,
},
TelemetryEndpointConfig {
name: "own".into(),
nats_url: Some("nats://my-grafana.local:4222".into()),
creds: Some("/etc/nsed/agent-own.creds".into()),
subject_prefix: Some("telemetry.agent".into()),
},
],
};
let json = serde_json::to_string(&cfg).expect("serialise");
let back: TelemetryConfig = serde_json::from_str(&json).expect("deserialise");
assert_eq!(back, cfg);
}
#[test]
fn telemetry_config_endpoints_omitted_defaults_empty() {
let yaml = "enabled: true\n";
let cfg: TelemetryConfig = serde_yaml::from_str(yaml).expect("yaml parse");
assert!(cfg.enabled);
assert!(cfg.endpoints.is_empty());
}
#[test]
fn validate_endpoint_names_rejects_duplicates() {
let err = validate_endpoint_names(&[
"dup".to_string(),
"solo".to_string(),
"dup".to_string(),
"another".to_string(),
"solo".to_string(),
])
.expect_err("must reject duplicates");
match err {
TelemetryMuxError::DuplicateNames(mut names) => {
names.sort();
assert_eq!(names, vec!["dup".to_string(), "solo".to_string()]);
}
}
}
#[test]
fn validate_endpoint_names_accepts_unique() {
validate_endpoint_names(&["a".to_string(), "b".to_string(), "c".to_string()])
.expect("unique names accepted");
}
#[tokio::test]
async fn connect_endpoints_disabled_yields_none() {
let cfg = TelemetryConfig {
enabled: false,
endpoints: vec![TelemetryEndpointConfig {
name: "wont-connect".into(),
nats_url: Some("nats://does-not-resolve.invalid:4222".into()),
creds: None,
subject_prefix: None,
}],
};
let mux = connect_endpoints(&cfg, "agent-x")
.await
.expect("disabled is not an error");
assert!(mux.is_none());
}
#[tokio::test]
async fn connect_endpoints_empty_endpoints_yields_none() {
let cfg = TelemetryConfig {
enabled: true,
endpoints: vec![],
};
let mux = connect_endpoints(&cfg, "agent-x")
.await
.expect("empty endpoints is not an error");
assert!(mux.is_none());
}
#[tokio::test]
async fn connect_endpoints_missing_nats_url_errors() {
let cfg = TelemetryConfig {
enabled: true,
endpoints: vec![TelemetryEndpointConfig {
name: "no-url".into(),
nats_url: None,
creds: None,
subject_prefix: None,
}],
};
let err = connect_endpoints(&cfg, "agent-x")
.await
.expect_err("missing url must error");
match err {
TelemetryConnectError::MissingNatsUrl { name } => assert_eq!(name, "no-url"),
other => panic!("wrong variant: {other:?}"),
}
}
#[tokio::test]
async fn connect_endpoints_invalid_agent_id_errors() {
let cfg = TelemetryConfig {
enabled: true,
endpoints: vec![TelemetryEndpointConfig {
name: "real".into(),
nats_url: Some("nats://ignored.invalid:4222".into()),
creds: None,
subject_prefix: None,
}],
};
let err = connect_endpoints(&cfg, "bad agent_id with space")
.await
.expect_err("invalid agent_id must error");
assert!(matches!(err, TelemetryConnectError::InvalidAgentId(_)));
}
#[test]
fn trace_id_is_deterministic_and_correct_length() {
let a = derive_trace_id("job-x", 1, DeliberationPhase::Evaluating, "alpha");
let b = derive_trace_id("job-x", 1, DeliberationPhase::Evaluating, "alpha");
assert_eq!(a, b, "same inputs must produce the same trace_id");
assert_eq!(a.len(), TRACE_ID_LEN);
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn trace_id_differs_across_any_input() {
let base = derive_trace_id("j", 1, DeliberationPhase::Proposing, "a");
assert_ne!(
base,
derive_trace_id("j2", 1, DeliberationPhase::Proposing, "a")
);
assert_ne!(
base,
derive_trace_id("j", 2, DeliberationPhase::Proposing, "a")
);
assert_ne!(
base,
derive_trace_id("j", 1, DeliberationPhase::Evaluating, "a")
);
assert_ne!(
base,
derive_trace_id("j", 1, DeliberationPhase::Proposing, "b")
);
}
#[test]
fn trace_id_resists_delimiter_collision_attacks() {
let a = derive_trace_id("ab", 1, DeliberationPhase::Proposing, "c");
let b = derive_trace_id("a", 1, DeliberationPhase::Proposing, "bc");
assert_ne!(a, b, "boundary must be unambiguous");
let c = derive_trace_id("job:1|x", 1, DeliberationPhase::Proposing, "agent");
let d = derive_trace_id("job", 1, DeliberationPhase::Proposing, "1|x:agent");
assert_ne!(c, d, "embedded delimiter must not produce collision");
}
#[test]
fn agent_subject_binds_agent_id_position() {
let evt = TelemetryEvent::TaskAccepted(TaskAccepted {
common: sample_agent_common(),
dispatch_delay_ms: 42,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
let src = TelemetrySource::agent("CortexB").unwrap();
assert_eq!(
src.subject(evt.kind(), None).unwrap(),
"telemetry.agent.CortexB.task_accepted"
);
}
#[test]
fn agent_constructor_rejects_invalid_agent_ids() {
for bad in [
"evil.injection",
"with*wildcard",
"with>wildcard",
"with whitespace",
"with\nnewline",
"",
] {
assert!(
TelemetrySource::agent(bad).is_err(),
"agent({bad:?}) should be rejected"
);
}
}
#[test]
fn agent_subject_rejects_invalid_agent_id_at_subject_time() {
let evt = TelemetryEvent::TaskAccepted(TaskAccepted {
common: sample_agent_common(),
dispatch_delay_ms: 0,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
let bad = TelemetrySource::Agent {
agent_id: "evil.injection".into(),
};
assert!(bad.subject(evt.kind(), None).is_err());
}
#[test]
fn tokio_runtime_handle_try_current_is_err_off_runtime() {
let handle = std::thread::spawn(|| tokio::runtime::Handle::try_current().is_err())
.join()
.unwrap();
assert!(
handle,
"off-runtime threads must report Err so emit() can degrade to a drop"
);
}
#[test]
fn custom_prefix_validates_each_dot_segment() {
let src = TelemetrySource::agent("CortexB").unwrap();
assert!(
src.subject("task_accepted", Some("tenant.op42.agent"))
.is_ok()
);
assert!(
src.subject("task_accepted", Some("tenant.op*42.agent"))
.is_err()
);
assert!(
src.subject("task_accepted", Some("tenant. .agent"))
.is_err()
);
}
fn sample_prompt_exposure() -> PromptExposureDetected {
PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: true,
hit_count: 3,
response_length_chars: 1_482,
suspicion_score: 4.76,
xml_tag_hits: 2,
tool_name_hits: 1,
instruction_hits: 0,
wrong_acronym_hits: 0,
sample_hits: vec![
"xml-tag <working_memory>".into(),
"xml-tag <key_findings>".into(),
"tool-name submit_proposal".into(),
],
}
}
#[test]
fn roundtrip_prompt_exposure_detected() {
let evt = TelemetryEvent::PromptExposureDetected(sample_prompt_exposure());
let json = serde_json::to_string(&evt).unwrap();
let back: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(evt, back);
assert!(json.contains("\"type\":\"prompt_exposure_detected\""));
}
#[test]
fn prompt_exposure_kind_is_stable() {
let evt = TelemetryEvent::PromptExposureDetected(sample_prompt_exposure());
assert_eq!(evt.kind(), "prompt_exposure_detected");
let src = TelemetrySource::agent("CortexB").unwrap();
assert_eq!(
src.subject(evt.kind(), None).unwrap(),
"telemetry.agent.CortexB.prompt_exposure_detected"
);
}
#[test]
fn prompt_exposure_category_counts_sum_to_hit_count() {
let evt = sample_prompt_exposure();
let sum =
evt.xml_tag_hits + evt.tool_name_hits + evt.instruction_hits + evt.wrong_acronym_hits;
assert_eq!(sum, evt.hit_count);
}
#[test]
fn prompt_exposure_sample_hits_only_dictionary_prefixes() {
let evt = sample_prompt_exposure();
for hit in &evt.sample_hits {
let ok = hit.starts_with("xml-tag ")
|| hit.starts_with("tool-name ")
|| hit.starts_with("instruction ")
|| hit.starts_with("wrong-acronym ");
assert!(
ok,
"sample_hits entry {hit:?} does not start with a known dictionary prefix"
);
}
}
#[test]
fn prompt_exposure_below_threshold_still_roundtrips() {
let evt = TelemetryEvent::PromptExposureDetected(PromptExposureDetected {
blocked: false,
hit_count: 1,
xml_tag_hits: 1,
tool_name_hits: 0,
instruction_hits: 0,
wrong_acronym_hits: 0,
suspicion_score: 0.12,
sample_hits: vec!["xml-tag <strategy>".into()],
..sample_prompt_exposure()
});
let json = serde_json::to_string(&evt).unwrap();
let back: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(evt, back);
}
#[test]
fn roundtrip_task_completed_with_g1_g6_fields() {
let evt = TelemetryEvent::TaskCompleted(TaskCompleted {
common: sample_agent_common(),
duration_ms: 12_000,
dispatch_delay_ms: 40,
queue_wait_ms: Some(5),
phase_budget_remaining_ms: 3_000,
llm_attempts: Some(2),
tool_call_count: Some(1),
pending_publish_depth: Some(0),
});
let json = serde_json::to_string(&evt).unwrap();
let back: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(evt, back);
assert!(json.contains("\"type\":\"task_completed\""));
}
#[test]
fn roundtrip_llm_request_complete_with_ttft_and_evaluator_counters() {
let evt = TelemetryEvent::LlmRequestComplete(LlmRequestComplete {
common: AgentEventCommon {
phase: Some(DeliberationPhase::Evaluating),
..sample_agent_common()
},
request_id: "req-1".into(),
latency_ms: 4_200,
ttft_ms: Some(180),
generation_ms: Some(4_020),
input_tokens: 1_200,
output_tokens: 350,
reasoning_tokens: 120,
cached_tokens: 0,
cost_usd: 0.0041,
finish_reason: FinishReason::Stop,
provider_backend: Some("openrouter/deepinfra".into()),
claim_assessments_emitted: Some(12),
disagreements_emitted: Some(2),
messages_chars: 4_800,
max_tokens_requested: Some(2_000),
response_chars: 1_400,
tool_calls_emitted: 0,
max_tokens_shrunk_to_floor: false,
available_space_at_dispatch: None,
});
let json = serde_json::to_string(&evt).unwrap();
let back: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(evt, back);
}
#[test]
fn roundtrip_retry_loop_attempt_cost_fields() {
let evt = TelemetryEvent::RetryLoopAttempt(RetryLoopAttempt {
common: sample_agent_common(),
attempt: 3,
reason: RetryReason::SchemaError,
cumulative_latency_ms: 18_400,
cumulative_cost_usd: 0.0127,
cumulative_input_tokens: 3_200,
cumulative_output_tokens: 900,
});
let json = serde_json::to_string(&evt).unwrap();
let back: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(evt, back);
}
#[test]
fn event_kind_covers_every_variant() {
let samples: Vec<TelemetryEvent> = vec![
TelemetryEvent::LlmRequestStart(LlmRequestStart {
common: sample_agent_common(),
request_id: "r".into(),
model: "m".into(),
provider_id: "p".into(),
attempt: 1,
estimated_input_tokens: 0,
context_utilization_pct: 0.0,
recent_tool_output_bytes: 0,
}),
TelemetryEvent::LlmRequestComplete(LlmRequestComplete {
common: sample_agent_common(),
request_id: "r".into(),
latency_ms: 0,
ttft_ms: None,
generation_ms: None,
input_tokens: 0,
output_tokens: 0,
reasoning_tokens: 0,
cached_tokens: 0,
cost_usd: 0.0,
finish_reason: FinishReason::Stop,
provider_backend: None,
claim_assessments_emitted: None,
disagreements_emitted: None,
messages_chars: 0,
max_tokens_requested: None,
response_chars: 0,
tool_calls_emitted: 0,
max_tokens_shrunk_to_floor: false,
available_space_at_dispatch: None,
}),
TelemetryEvent::LlmRequestFailed(LlmRequestFailed {
common: sample_agent_common(),
request_id: "r".into(),
error_class: LlmErrorClass::Transport,
http_status: None,
retry_after_ms: None,
latency_ms: 0,
provider_id: "p".into(),
provider_backend: None,
}),
TelemetryEvent::LlmRequestStalled(LlmRequestStalled {
common: sample_agent_common(),
request_id: "r".into(),
elapsed_ms: 0,
ttft_received: false,
last_token_ms: None,
}),
TelemetryEvent::ToolCallExecuted(ToolCallExecuted {
common: sample_agent_common(),
tool_name: "scratchpad".into(),
latency_ms: 0,
success: true,
output_bytes: 0,
output_tokens_estimated: None,
truncated: false,
paginated: false,
}),
TelemetryEvent::RetryLoopAttempt(RetryLoopAttempt {
common: sample_agent_common(),
attempt: 1,
reason: RetryReason::EmptyContent,
cumulative_latency_ms: 0,
cumulative_cost_usd: 0.0,
cumulative_input_tokens: 0,
cumulative_output_tokens: 0,
}),
TelemetryEvent::TaskAccepted(TaskAccepted {
common: sample_agent_common(),
dispatch_delay_ms: 0,
task_publish_ts: None,
job_age_at_accept_ms: None,
}),
TelemetryEvent::TaskCompleted(TaskCompleted {
common: sample_agent_common(),
duration_ms: 0,
dispatch_delay_ms: 0,
queue_wait_ms: Some(0),
phase_budget_remaining_ms: 0,
llm_attempts: Some(0),
tool_call_count: Some(0),
pending_publish_depth: Some(0),
}),
TelemetryEvent::TaskFailed(TaskFailed {
common: sample_agent_common(),
duration_ms: 0,
dispatch_delay_ms: 0,
queue_wait_ms: Some(0),
phase_budget_remaining_ms: 0,
llm_attempts: Some(0),
tool_call_count: Some(0),
failure_class: TaskFailureClass::Timeout,
pending_publish_depth: Some(0),
}),
TelemetryEvent::NatsConnectionStateChanged(NatsConnectionStateChanged {
common: sample_agent_common(),
state: NatsConnectionState::Connected,
reconnects_so_far: 0,
pending_publish_depth: Some(0),
buffer_bytes: Some(0),
}),
TelemetryEvent::PromptExposureDetected(PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: true,
hit_count: 0,
response_length_chars: 0,
suspicion_score: 0.0,
xml_tag_hits: 0,
tool_name_hits: 0,
instruction_hits: 0,
wrong_acronym_hits: 0,
sample_hits: vec![],
}),
TelemetryEvent::ApiError(ApiError {
common: sample_agent_common(),
http_status: 404,
error_code: Some("not_found".into()),
endpoint: "/health/{name}".into(),
method: "GET".into(),
duration_ms: 5,
}),
TelemetryEvent::ContextEmergencyShrink(ContextEmergencyShrink {
common: sample_agent_common(),
available_space: 100,
requested_max: 4_000,
floor_used: 200,
estimated_input: 130_000,
context_window: 131_072,
recent_tool_outputs: vec![RecentToolOutput {
tool: "read_file".into(),
bytes: 240_000,
}],
}),
TelemetryEvent::ClaudeSubprocessSpawn(ClaudeSubprocessSpawn {
common: sample_agent_common(),
session_id: "8ce6aa3f-d7c2-0000-0000-000000000000".into(),
lock_present_at_spawn: false,
}),
TelemetryEvent::ClaudeSubprocessExit(ClaudeSubprocessExit {
common: sample_agent_common(),
session_id: "8ce6aa3f-d7c2-0000-0000-000000000000".into(),
exit_code: 0,
wallclock_ms: 12_345,
session_lock_released: true,
}),
TelemetryEvent::ClaudeSessionLockCollision(ClaudeSessionLockCollision {
common: sample_agent_common(),
session_id: "8ce6aa3f-d7c2-0000-0000-000000000000".into(),
prior_lock_age_secs: 42,
prior_pid: Some(31415),
}),
];
for evt in &samples {
let kind = evt.kind();
assert!(!kind.is_empty());
assert!(kind.chars().all(|c| c.is_ascii_lowercase() || c == '_'));
let v: serde_json::Value = serde_json::to_value(evt).unwrap();
assert_eq!(
v["type"].as_str(),
Some(kind),
"kind() must match serde tag"
);
}
let mut kinds: Vec<&'static str> = samples.iter().map(|e| e.kind()).collect();
kinds.sort_unstable();
kinds.dedup();
assert_eq!(kinds.len(), samples.len(), "duplicate kind() values");
}
#[test]
fn telemetry_config_defaults_enabled_true() {
let cfg: TelemetryConfig = serde_json::from_str("{}").unwrap();
assert!(cfg.enabled);
assert!(cfg.endpoints.is_empty());
}
#[test]
fn telemetry_config_opt_out() {
let cfg: TelemetryConfig = serde_yaml::from_str("enabled: false\n").unwrap();
assert!(!cfg.enabled);
}
#[test]
fn prompt_exposure_validate_ok() {
let det = PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: true,
hit_count: 5,
response_length_chars: 1200,
suspicion_score: 3.45,
xml_tag_hits: 2,
tool_name_hits: 1,
instruction_hits: 1,
wrong_acronym_hits: 1,
sample_hits: vec!["xml-tag <working_memory>".into()],
};
assert!(det.validate().is_ok());
}
#[test]
fn prompt_exposure_validate_hit_count_mismatch() {
let det = PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: false,
hit_count: 99, response_length_chars: 500,
suspicion_score: 2.0,
xml_tag_hits: 1,
tool_name_hits: 1,
instruction_hits: 1,
wrong_acronym_hits: 1,
sample_hits: vec![],
};
let err = det.validate().unwrap_err();
assert!(err.contains("hit_count 99 != sum"));
}
#[test]
fn prompt_exposure_validate_sample_hit_too_long() {
let det = PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: false,
hit_count: 1,
response_length_chars: 100,
suspicion_score: 1.0,
xml_tag_hits: 1,
tool_name_hits: 0,
instruction_hits: 0,
wrong_acronym_hits: 0,
sample_hits: vec!["a".repeat(65)],
};
let err = det.validate().unwrap_err();
assert!(err.contains("exceeds 64 chars"));
}
#[test]
fn prompt_exposure_validate_sample_hit_unknown_prefix() {
let det = PromptExposureDetected {
common: sample_agent_common(),
terminal_tool: "submit_proposal".into(),
blocked: false,
hit_count: 1,
response_length_chars: 100,
suspicion_score: 1.0,
xml_tag_hits: 1,
tool_name_hits: 0,
instruction_hits: 0,
wrong_acronym_hits: 0,
sample_hits: vec!["the quick brown fox".into()],
};
let err = det.validate().unwrap_err();
assert!(err.contains("does not start with a known dictionary prefix"));
assert!(err.contains("the quick brown fox"));
}
#[test]
fn event_agent_id_accessor() {
let ev = TelemetryEvent::TaskAccepted(TaskAccepted {
common: sample_agent_common(),
dispatch_delay_ms: 42,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
assert_eq!(ev.agent_id(), "CortexB");
}
#[test]
fn emit_drops_mismatched_agent_id() {
let src = TelemetrySource::Agent {
agent_id: "CortexA".into(),
};
let ev_mismatch = TelemetryEvent::TaskAccepted(TaskAccepted {
common: AgentEventCommon {
agent_id: "CortexB".into(),
job_id: Some("job-x".into()),
round: Some(1),
phase: Some(DeliberationPhase::Proposing),
ts: 0,
trace_id: derive_trace_id("job-x", 1, DeliberationPhase::Proposing, "CortexB"),
},
dispatch_delay_ms: 0,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
assert!(
!source_agent_matches(&src, &ev_mismatch),
"CortexB event should not match CortexA source"
);
let ev_match = TelemetryEvent::TaskAccepted(TaskAccepted {
common: AgentEventCommon {
agent_id: "CortexA".into(),
job_id: Some("job-x".into()),
round: Some(1),
phase: Some(DeliberationPhase::Proposing),
ts: 0,
trace_id: derive_trace_id("job-x", 1, DeliberationPhase::Proposing, "CortexA"),
},
dispatch_delay_ms: 0,
task_publish_ts: None,
job_age_at_accept_ms: None,
});
assert!(
source_agent_matches(&src, &ev_match),
"CortexA event should match CortexA source"
);
}
#[test]
fn redact_content_detects_sensitive_words() {
for input in [
"password=abc",
"my_api_key here",
"secret_key: xyz",
"access_key=123",
"credential leaked",
"Bearer tok_abc",
] {
let out = redact_content(input, 100);
assert_eq!(out, "REDACTED_SENSITIVE", "expected redaction for: {input}");
}
}
#[test]
fn redact_content_no_false_positives() {
for input in [
"keyword research",
"the author of this",
"a monkey in the tree",
"public_authority report",
"the secret of his success",
"authenticate the user",
"tokenized assets",
"keyboard warrior",
] {
let out = redact_content(input, 100);
assert_eq!(out, input, "expected no redaction for: {input}");
}
}
#[test]
fn redact_content_truncates_long_input() {
let input = "a".repeat(200);
let out = redact_content(&input, 50);
assert!(out.ends_with("..."));
assert_eq!(out.chars().count(), 53); }
#[test]
fn redact_content_empty_input() {
assert_eq!(redact_content("", 100), "<empty>");
}
#[test]
fn redact_url_strips_credentials_in_authority() {
assert_eq!(
redact_url("https://user:pass@api.example.com/path"),
"https://api.example.com/path"
);
assert_eq!(
redact_url("http://admin:secret@host.com"),
"http://host.com"
);
}
#[test]
fn redact_url_preserves_at_in_path() {
assert_eq!(
redact_url("https://api.example.com/users/foo@bar/profile"),
"https://api.example.com/users/foo@bar/profile"
);
}
#[test]
fn redact_url_removes_query_and_fragment() {
assert_eq!(
redact_url("https://example.com/path?query=1#frag"),
"https://example.com/path"
);
}
#[test]
fn redact_url_no_credentials() {
assert_eq!(
redact_url("https://example.com/path"),
"https://example.com/path"
);
}
#[test]
fn redact_error_message_delegates_to_redact_content() {
let msg = "password leaked in log";
assert_eq!(redact_error_message(msg), "REDACTED_SENSITIVE");
let long = "a".repeat(200);
let out = redact_error_message(&long);
assert!(out.ends_with("..."));
}
}