1use std::{
8 collections::{BTreeMap, VecDeque},
9 num::NonZeroUsize,
10 sync::Arc,
11};
12
13use serde::{Deserialize, Serialize};
14
15use crate::{
16 domain::{
17 AgentError, AgentErrorKind, DestinationKind, DestinationRef, PolicyRef, RetentionClass,
18 RetryClassification,
19 },
20 event::{AgentEvent, EventCursor, EventFamily, EventKind, EventStreamScope},
21 policy::{ContentCaptureMode as PolicyContentCaptureMode, ContentCapturePolicy},
22 telemetry_ports::{TelemetrySink, TelemetrySinkError, TelemetrySinkSpec},
23 telemetry_records::{
24 TELEMETRY_SCHEMA_VERSION, TelemetryContentCaptureMode, TelemetryExportCursor,
25 TelemetryProjection, TelemetryProjectionId, TelemetryProjectionKind, TelemetryRecord,
26 TelemetryRecordId, TelemetrySinkFailureKind, TelemetrySinkFailureRecord,
27 TelemetrySinkHealth, TelemetrySinkHealthState, TelemetrySinkId, TelemetrySinkKind,
28 TelemetrySinkRecoveryRecord, TelemetrySourceCursor, TelemetrySourceRecord,
29 TelemetryTerminalStatus, TelemetryUsageRecordId, UsageUnits,
30 },
31};
32
33#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
34pub struct TelemetryFanoutConfig {
37 pub queue_capacity: NonZeroUsize,
39 pub terminal_reserve: NonZeroUsize,
42 pub overflow: TelemetryOverflowPolicy,
45 pub sink_isolation: TelemetrySinkIsolationPolicy,
47}
48
49impl TelemetryFanoutConfig {
50 pub fn safe_defaults() -> Self {
54 Self {
55 queue_capacity: NonZeroUsize::new(64).expect("nonzero queue capacity"),
56 terminal_reserve: NonZeroUsize::new(4).expect("nonzero terminal reserve"),
57 overflow: TelemetryOverflowPolicy::DropNonTerminalProgress,
58 sink_isolation: TelemetrySinkIsolationPolicy::IsolateEachSink,
59 }
60 }
61
62 pub fn tiny_for_tests() -> Self {
66 Self {
67 queue_capacity: NonZeroUsize::new(2).expect("nonzero queue capacity"),
68 terminal_reserve: NonZeroUsize::new(1).expect("nonzero terminal reserve"),
69 overflow: TelemetryOverflowPolicy::DropNonTerminalProgress,
70 sink_isolation: TelemetrySinkIsolationPolicy::IsolateEachSink,
71 }
72 }
73}
74
75impl Default for TelemetryFanoutConfig {
76 fn default() -> Self {
77 Self::safe_defaults()
78 }
79}
80
81#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
82#[serde(rename_all = "snake_case")]
83pub enum TelemetryOverflowPolicy {
86 DropNonTerminalProgress,
88 CoalesceProgressByRun,
90 FailSinkNotRun,
92}
93
94#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
95#[serde(rename_all = "snake_case")]
96pub enum TelemetrySinkIsolationPolicy {
99 IsolateEachSink,
101}
102
103#[derive(Default)]
104pub struct TelemetryFanout {
107 config: TelemetryFanoutConfig,
108 sinks: BTreeMap<TelemetrySinkId, TelemetrySinkState>,
109}
110
111impl TelemetryFanout {
112 pub fn new(config: TelemetryFanoutConfig) -> Self {
116 Self {
117 config,
118 sinks: BTreeMap::new(),
119 }
120 }
121
122 pub fn safe_defaults() -> Self {
126 Self::new(TelemetryFanoutConfig::safe_defaults())
127 }
128
129 pub fn register_sink(&mut self, sink: Arc<dyn TelemetrySink>) -> Result<(), AgentError> {
133 let spec = sink.spec().clone();
134 if spec.sink_id.as_str().is_empty() {
135 return Err(AgentError::missing_required_field("telemetry.sink_id"));
136 }
137 self.sinks
138 .insert(spec.sink_id.clone(), TelemetrySinkState::new(sink));
139 Ok(())
140 }
141
142 pub fn sink_queue_len(&self, sink_id: &TelemetrySinkId) -> Option<usize> {
146 self.sinks.get(sink_id).map(|state| state.queue.len())
147 }
148
149 pub fn queued_for_sink(&self, sink_id: &TelemetrySinkId) -> Vec<TelemetryProjection> {
153 self.sinks
154 .get(sink_id)
155 .map(|state| state.queue.iter().cloned().collect())
156 .unwrap_or_default()
157 }
158
159 pub fn try_record(&mut self, projection: TelemetryProjection) -> TelemetryFanoutReport {
163 let mut report = TelemetryFanoutReport::default();
164 for state in self.sinks.values_mut() {
165 let projection = apply_sink_content_boundary(&projection, state.sink.spec());
166 state.enqueue(&self.config, projection, &mut report);
167 }
168 report
169 }
170
171 pub fn drain_sink(
174 &mut self,
175 sink_id: &TelemetrySinkId,
176 ) -> Result<TelemetryDrainReport, AgentError> {
177 let Some(state) = self.sinks.get_mut(sink_id) else {
178 return Err(AgentError::host_configuration_needed(
179 "telemetry sink is not registered",
180 ));
181 };
182 Ok(state.drain())
183 }
184}
185
186struct TelemetrySinkState {
187 sink: Arc<dyn TelemetrySink>,
188 queue: VecDeque<TelemetryProjection>,
189 cursor: TelemetryExportCursor,
190 failed: bool,
191 dropped_count: u64,
192 next_record_seq: u64,
193}
194
195impl TelemetrySinkState {
196 fn new(sink: Arc<dyn TelemetrySink>) -> Self {
197 let sink_id = sink.spec().sink_id.clone();
198 Self {
199 sink,
200 queue: VecDeque::new(),
201 cursor: TelemetryExportCursor::new(sink_id),
202 failed: false,
203 dropped_count: 0,
204 next_record_seq: 0,
205 }
206 }
207
208 fn enqueue(
209 &mut self,
210 config: &TelemetryFanoutConfig,
211 projection: TelemetryProjection,
212 report: &mut TelemetryFanoutReport,
213 ) {
214 if self.has_room_for(config, &projection) {
215 self.queue.push_back(projection);
216 report.enqueued += 1;
217 return;
218 }
219
220 if projection.is_terminal_preserved() {
221 while self.queue.len() >= self.capacity(config) && self.drop_oldest_nonterminal() {
222 report.dropped += 1;
223 }
224 if self.queue.len() < self.capacity(config) {
225 self.queue.push_back(projection.clone());
226 report.enqueued += 1;
227 report.records.push(self.failure_record(
228 &projection,
229 TelemetrySinkFailureKind::Overflow,
230 true,
231 projection.source_record.source_cursor.clone(),
232 "telemetry terminal projection preserved by dropping non-terminal progress",
233 ));
234 return;
235 }
236 self.dropped_count += 1;
237 report.dropped += 1;
238 report.records.push(self.failure_record(
239 &projection,
240 TelemetrySinkFailureKind::Overflow,
241 false,
242 projection.source_record.source_cursor.clone(),
243 "telemetry terminal projection could not enter the bounded sink queue",
244 ));
245 return;
246 }
247
248 match config.overflow {
249 TelemetryOverflowPolicy::DropNonTerminalProgress => {
250 self.dropped_count += 1;
251 report.dropped += 1;
252 report.records.push(self.failure_record(
253 &projection,
254 TelemetrySinkFailureKind::Overflow,
255 true,
256 projection.source_record.source_cursor.clone(),
257 "telemetry non-terminal progress dropped under sink backpressure",
258 ));
259 }
260 TelemetryOverflowPolicy::CoalesceProgressByRun => {
261 if let Some(index) = self.queue.iter().position(|queued| {
262 !queued.is_terminal_preserved() && queued.run_id == projection.run_id
263 }) {
264 self.queue.remove(index);
265 self.dropped_count += 1;
266 report.dropped += 1;
267 }
268 if self.has_room_for(config, &projection) {
269 self.queue.push_back(projection);
270 report.enqueued += 1;
271 } else {
272 self.dropped_count += 1;
273 report.dropped += 1;
274 }
275 }
276 TelemetryOverflowPolicy::FailSinkNotRun => {
277 self.failed = true;
278 self.dropped_count += 1;
279 report.dropped += 1;
280 report.records.push(self.failure_record(
281 &projection,
282 TelemetrySinkFailureKind::Overflow,
283 true,
284 projection.source_record.source_cursor.clone(),
285 "telemetry sink marked failed by overflow; run state is unaffected",
286 ));
287 }
288 }
289 }
290
291 fn drain(&mut self) -> TelemetryDrainReport {
292 let mut report = TelemetryDrainReport::default();
293 while let Some(projection) = self.queue.front().cloned() {
294 let attempted = self
295 .cursor
296 .clone()
297 .attempted(projection.source_record.source_cursor.clone());
298 match self.sink.export(&projection, &attempted) {
299 Ok(ack) => {
300 self.cursor = ack.cursor;
301 self.queue.pop_front();
302 report.exported += 1;
303 if self.failed {
304 self.failed = false;
305 report.records.push(self.recovery_record(&projection));
306 }
307 }
308 Err(error) => {
309 self.failed = true;
310 report
311 .records
312 .push(self.export_failure_record(&projection, error));
313 break;
314 }
315 }
316 }
317 report
318 }
319
320 fn has_room_for(
321 &self,
322 config: &TelemetryFanoutConfig,
323 projection: &TelemetryProjection,
324 ) -> bool {
325 if projection.is_terminal_preserved() {
326 return self.queue.len() < self.capacity(config);
327 }
328 self.queue.len() < self.capacity(config)
329 && self.nonterminal_count() < self.normal_capacity(config)
330 }
331
332 fn capacity(&self, config: &TelemetryFanoutConfig) -> usize {
333 self.sink
334 .spec()
335 .queue_capacity
336 .get()
337 .min(config.queue_capacity.get())
338 }
339
340 fn normal_capacity(&self, config: &TelemetryFanoutConfig) -> usize {
341 let terminal_reserve = self
342 .sink
343 .spec()
344 .terminal_reserve
345 .get()
346 .max(config.terminal_reserve.get())
347 .min(self.capacity(config));
348 self.capacity(config).saturating_sub(terminal_reserve)
349 }
350
351 fn nonterminal_count(&self) -> usize {
352 self.queue
353 .iter()
354 .filter(|projection| !projection.is_terminal_preserved())
355 .count()
356 }
357
358 fn drop_oldest_nonterminal(&mut self) -> bool {
359 let Some(index) = self
360 .queue
361 .iter()
362 .position(|projection| !projection.is_terminal_preserved())
363 else {
364 return false;
365 };
366 self.queue.remove(index);
367 self.dropped_count += 1;
368 true
369 }
370
371 fn export_failure_record(
372 &mut self,
373 projection: &TelemetryProjection,
374 error: TelemetrySinkError,
375 ) -> TelemetryRecord {
376 self.failure_record(
377 projection,
378 error.failure_kind,
379 projection.is_terminal_preserved(),
380 projection.source_record.source_cursor.clone(),
381 error.redacted_summary,
382 )
383 }
384
385 fn failure_record(
386 &mut self,
387 projection: &TelemetryProjection,
388 failure_kind: TelemetrySinkFailureKind,
389 terminal_preserved: bool,
390 repair_cursor: Option<TelemetrySourceCursor>,
391 summary: impl Into<String>,
392 ) -> TelemetryRecord {
393 let sink_spec = self.sink.spec();
394 let failure = TelemetrySinkFailureRecord {
395 sink_id: sink_spec.sink_id.clone(),
396 sink_kind: sink_spec.sink_kind.clone(),
397 failure_kind,
398 terminal_preserved,
399 dropped_count: self.dropped_count,
400 last_acknowledged_cursor: Some(self.cursor.clone()),
401 repair_cursor,
402 unsafe_pending_reason: (!sink_spec.requires_idempotent_replay)
403 .then(|| "sink cannot prove idempotent repair replay".to_string()),
404 redacted_summary: summary.into(),
405 };
406 TelemetryRecord::sink_failed(self.next_record_id("sink_failed"), projection, failure)
407 }
408
409 fn recovery_record(&mut self, projection: &TelemetryProjection) -> TelemetryRecord {
410 let sink_spec = self.sink.spec();
411 let recovery = TelemetrySinkRecoveryRecord {
412 sink_id: sink_spec.sink_id.clone(),
413 sink_kind: sink_spec.sink_kind.clone(),
414 export_cursor: self.cursor.clone(),
415 redacted_summary: "telemetry sink recovered after successful export".to_string(),
416 };
417 TelemetryRecord::sink_recovered(self.next_record_id("sink_recovered"), projection, recovery)
418 }
419
420 fn next_record_id(&mut self, label: &str) -> TelemetryRecordId {
421 self.next_record_seq += 1;
422 TelemetryRecordId::new(format!(
423 "telemetry.{}.{}.{}",
424 self.sink.spec().sink_id.as_str(),
425 label,
426 self.next_record_seq
427 ))
428 }
429}
430
431#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
432pub struct TelemetryFanoutReport {
435 pub enqueued: u64,
437 pub dropped: u64,
439 #[serde(default, skip_serializing_if = "Vec::is_empty")]
440 pub records: Vec<TelemetryRecord>,
443}
444
445#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
446pub struct TelemetryDrainReport {
449 pub exported: u64,
451 #[serde(default, skip_serializing_if = "Vec::is_empty")]
452 pub records: Vec<TelemetryRecord>,
455}
456
457pub struct TelemetryUsageExtractionInput {
460 pub event: AgentEvent,
462 pub event_cursor: Option<EventCursor>,
465 pub provider_id: Option<String>,
467 pub model_id: Option<String>,
469 pub usage: UsageUnits,
471}
472
473pub struct TelemetryUsageExtractor;
476
477impl TelemetryUsageExtractor {
478 pub fn extract_from_event(
481 input: TelemetryUsageExtractionInput,
482 ) -> Result<TelemetryProjection, AgentError> {
483 let envelope = input.event.envelope;
484 if !matches!(
485 envelope.event_family,
486 EventFamily::Model | EventFamily::Run | EventFamily::Subagent
487 ) {
488 return Err(AgentError::new(
489 AgentErrorKind::TelemetryFailure,
490 RetryClassification::RepairNeeded,
491 "usage telemetry must derive from model, run, or subagent facts",
492 ));
493 }
494
495 Ok(TelemetryProjection {
496 schema_version: TELEMETRY_SCHEMA_VERSION,
497 projection_id: TelemetryProjectionId::new(format!(
498 "telemetry.usage.{}",
499 envelope.event_id.as_str()
500 )),
501 projection_kind: TelemetryProjectionKind::Usage,
502 source_record: TelemetrySourceRecord {
503 event_family: envelope.event_family.clone(),
504 event_kind: envelope.event_kind.clone(),
505 event_cursor: input.event_cursor.clone(),
506 source_cursor: envelope
507 .journal_cursor
508 .clone()
509 .map(TelemetrySourceCursor::Journal)
510 .or_else(|| input.event_cursor.clone().map(TelemetrySourceCursor::Event)),
511 },
512 run_id: envelope.run_id,
513 agent_id: envelope.agent_id,
514 turn_id: envelope.turn_id,
515 attempt_id: envelope.attempt_id,
516 event_id: Some(envelope.event_id),
517 journal_cursor: envelope.journal_cursor,
518 trace_id: Some(envelope.trace_id),
519 span_id: Some(envelope.span_id),
520 runtime_package_fingerprint: envelope.runtime_package_fingerprint,
521 source: envelope.source,
522 destination: Some(DestinationRef::with_kind(
523 DestinationKind::Telemetry,
524 "destination.telemetry.usage",
525 )),
526 subject_ref: envelope.subject_ref,
527 policy_refs: envelope.policy_refs,
528 privacy: envelope.privacy,
529 retention: RetentionClass::RunScoped,
530 content_capture: TelemetryContentCaptureMode::Off,
531 redaction_policy_id: envelope.redaction_policy_id,
532 provider_id: input.provider_id,
533 model_id: input.model_id,
534 tool_name: None,
535 usage: Some(input.usage),
536 cost: None,
537 terminal_status: Some(TelemetryTerminalStatus::Completed),
538 sink_health: None,
539 redacted_summary: "usage telemetry derived without raw prompt, tool, or model content"
540 .to_string(),
541 raw_content: None,
542 })
543 }
544
545 pub fn usage_record(
549 projection: &TelemetryProjection,
550 usage_record_id: impl Into<String>,
551 ) -> TelemetryRecord {
552 TelemetryRecord::usage(
553 TelemetryRecordId::new(format!(
554 "telemetry.record.{}",
555 projection.projection_id.as_str()
556 )),
557 projection,
558 TelemetryUsageRecordId::new(usage_record_id),
559 )
560 }
561}
562
563#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
564pub struct TelemetryContentCaptureRequest {
567 pub policy: ContentCapturePolicy,
569 pub sink: TelemetrySinkSpec,
571 pub requested_mode: TelemetryContentCaptureMode,
573 pub source_permits_content: bool,
576 pub retention_active: bool,
579 pub deterministic_sample_included: bool,
583 pub requested_bytes: u64,
586 pub redaction_policy_id: String,
588}
589
590#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
591pub struct TelemetryContentCaptureDecision {
594 pub allowed: bool,
597 pub requested_mode: TelemetryContentCaptureMode,
599 pub effective_mode: TelemetryContentCaptureMode,
601 pub reason: String,
603 pub redaction_policy_id: String,
605 pub policy_refs: Vec<PolicyRef>,
608}
609
610pub fn evaluate_content_capture(
614 request: &TelemetryContentCaptureRequest,
615) -> TelemetryContentCaptureDecision {
616 let policy_raw_mode = matches!(request.policy.mode, PolicyContentCaptureMode::RawContent);
617 let sink_raw_mode = request.sink.content_capture.captures_raw_content();
618 let byte_limit_allows =
619 request.requested_bytes > 0 && request.requested_bytes <= request.policy.byte_limit;
620 let all_raw_gates_pass = policy_raw_mode
621 && request.policy.allows_raw_content()
622 && request.source_permits_content
623 && sink_raw_mode
624 && request.retention_active
625 && request.deterministic_sample_included
626 && byte_limit_allows;
627
628 if !request.requested_mode.captures_raw_content() {
629 return TelemetryContentCaptureDecision {
630 allowed: true,
631 requested_mode: request.requested_mode.clone(),
632 effective_mode: request.requested_mode.clone(),
633 reason: "telemetry metadata or redacted capture does not request raw content"
634 .to_string(),
635 redaction_policy_id: request.redaction_policy_id.clone(),
636 policy_refs: vec![request.policy.policy_ref.clone()],
637 };
638 }
639
640 if all_raw_gates_pass {
641 TelemetryContentCaptureDecision {
642 allowed: true,
643 requested_mode: request.requested_mode.clone(),
644 effective_mode: TelemetryContentCaptureMode::RawContent,
645 reason: "raw telemetry content capture allowed by source, sink, redaction, retention, sampling, and limits".to_string(),
646 redaction_policy_id: request.redaction_policy_id.clone(),
647 policy_refs: vec![request.policy.policy_ref.clone()],
648 }
649 } else {
650 TelemetryContentCaptureDecision {
651 allowed: false,
652 requested_mode: request.requested_mode.clone(),
653 effective_mode: TelemetryContentCaptureMode::RedactedSummary,
654 reason: "raw telemetry content capture denied by source, sink, redaction, retention, sampling, or byte-limit policy".to_string(),
655 redaction_policy_id: request.redaction_policy_id.clone(),
656 policy_refs: vec![request.policy.policy_ref.clone()],
657 }
658 }
659}
660
661#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
662pub struct TelemetryAuthorityBoundary {
665 pub can_decide_run_state: bool,
668 pub can_decide_policy_outcome: bool,
671 pub can_decide_output_delivery: bool,
674 pub can_decide_side_effect_status: bool,
678}
679
680pub const fn telemetry_authority_boundary() -> TelemetryAuthorityBoundary {
683 TelemetryAuthorityBoundary {
684 can_decide_run_state: false,
685 can_decide_policy_outcome: false,
686 can_decide_output_delivery: false,
687 can_decide_side_effect_status: false,
688 }
689}
690
691pub fn terminal_run_projection_from_event(event: AgentEvent) -> TelemetryProjection {
694 let envelope = event.envelope;
695 let terminal_status = match envelope.event_kind {
696 EventKind::RunCompleted => TelemetryTerminalStatus::Completed,
697 EventKind::RunCancelled => TelemetryTerminalStatus::Cancelled,
698 EventKind::RunFailed => TelemetryTerminalStatus::Failed,
699 _ => TelemetryTerminalStatus::Unknown,
700 };
701 let source_cursor = envelope
702 .journal_cursor
703 .clone()
704 .map(TelemetrySourceCursor::Journal);
705 TelemetryProjection {
706 schema_version: TELEMETRY_SCHEMA_VERSION,
707 projection_id: TelemetryProjectionId::new(format!(
708 "telemetry.terminal.{}",
709 envelope.event_id.as_str()
710 )),
711 projection_kind: TelemetryProjectionKind::RunTerminal,
712 source_record: TelemetrySourceRecord {
713 event_family: envelope.event_family.clone(),
714 event_kind: envelope.event_kind.clone(),
715 event_cursor: Some(envelope.cursor(EventStreamScope::Run(envelope.run_id.clone()))),
716 source_cursor,
717 },
718 run_id: envelope.run_id,
719 agent_id: envelope.agent_id,
720 turn_id: envelope.turn_id,
721 attempt_id: envelope.attempt_id,
722 event_id: Some(envelope.event_id),
723 journal_cursor: envelope.journal_cursor,
724 trace_id: Some(envelope.trace_id),
725 span_id: Some(envelope.span_id),
726 runtime_package_fingerprint: envelope.runtime_package_fingerprint,
727 source: envelope.source,
728 destination: Some(DestinationRef::with_kind(
729 DestinationKind::Telemetry,
730 "destination.telemetry.terminal",
731 )),
732 subject_ref: envelope.subject_ref,
733 policy_refs: envelope.policy_refs,
734 privacy: envelope.privacy,
735 retention: RetentionClass::RunScoped,
736 content_capture: TelemetryContentCaptureMode::Off,
737 redaction_policy_id: envelope.redaction_policy_id,
738 provider_id: None,
739 model_id: None,
740 tool_name: None,
741 usage: None,
742 cost: None,
743 terminal_status: Some(terminal_status),
744 sink_health: None,
745 redacted_summary: "terminal run telemetry derived from journal-backed event".to_string(),
746 raw_content: None,
747 }
748}
749
750fn apply_sink_content_boundary(
751 projection: &TelemetryProjection,
752 sink: &TelemetrySinkSpec,
753) -> TelemetryProjection {
754 if projection.raw_content.is_none() {
755 return projection.clone();
756 }
757 if sink.content_capture.captures_raw_content()
758 && projection.content_capture.captures_raw_content()
759 {
760 return projection.clone();
761 }
762 projection.clone().without_raw_content()
763}
764
765pub fn sink_health_projection(
768 base: &TelemetryProjection,
769 sink_id: TelemetrySinkId,
770 sink_kind: TelemetrySinkKind,
771 state: TelemetrySinkHealthState,
772) -> TelemetryProjection {
773 let mut projection = base.clone().without_raw_content();
774 projection.projection_id =
775 TelemetryProjectionId::new(format!("telemetry.sink_health.{}", sink_id.as_str()));
776 projection.projection_kind = TelemetryProjectionKind::SinkHealth;
777 projection.sink_health = Some(TelemetrySinkHealth {
778 sink_id,
779 sink_kind,
780 state,
781 failure_kind: None,
782 terminal_preserved: true,
783 dropped_count: 0,
784 export_cursor: None,
785 unsafe_pending_reason: None,
786 });
787 projection
788}