Skip to main content

agent_sdk_core/application/
telemetry.rs

1//! Application-layer coordination over core primitives. Use these services to lower
2//! helpers, drive runs, validate output, coordinate tools, approvals, delivery,
3//! isolation, telemetry, and feature layers. Methods in this layer may call
4//! configured ports, mutate in-memory stores, append journals, or publish events as
5//! documented. This file contains the telemetry portion of that contract.
6//!
7use std::{
8    collections::{BTreeMap, VecDeque},
9    num::NonZeroUsize,
10    sync::Arc,
11};
12
13use serde::{Deserialize, Serialize};
14
15use crate::{
16    domain::{
17        AgentError, AgentErrorKind, DestinationKind, DestinationRef, PolicyRef, RetentionClass,
18        RetryClassification,
19    },
20    event::{AgentEvent, EventCursor, EventFamily, EventKind, EventStreamScope},
21    policy::{ContentCaptureMode as PolicyContentCaptureMode, ContentCapturePolicy},
22    telemetry_ports::{TelemetrySink, TelemetrySinkError, TelemetrySinkSpec},
23    telemetry_records::{
24        TELEMETRY_SCHEMA_VERSION, TelemetryContentCaptureMode, TelemetryExportCursor,
25        TelemetryProjection, TelemetryProjectionId, TelemetryProjectionKind, TelemetryRecord,
26        TelemetryRecordId, TelemetrySinkFailureKind, TelemetrySinkFailureRecord,
27        TelemetrySinkHealth, TelemetrySinkHealthState, TelemetrySinkId, TelemetrySinkKind,
28        TelemetrySinkRecoveryRecord, TelemetrySourceCursor, TelemetrySourceRecord,
29        TelemetryTerminalStatus, TelemetryUsageRecordId, UsageUnits,
30    },
31};
32
33#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
34/// Holds telemetry fanout config application-layer state or configuration.
35/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
36pub struct TelemetryFanoutConfig {
37    /// Queue capacity used by this record or request.
38    pub queue_capacity: NonZeroUsize,
39    /// Queue slots reserved for terminal frames.
40    /// This keeps important terminal events available even when non-terminal frames overflow.
41    pub terminal_reserve: NonZeroUsize,
42    /// Overflow policy applied when a subscriber queue reaches capacity.
43    /// It decides whether to drop, summarize, backpressure, or fail the subscriber.
44    pub overflow: TelemetryOverflowPolicy,
45    /// Sink isolation used by this record or request.
46    pub sink_isolation: TelemetrySinkIsolationPolicy,
47}
48
49impl TelemetryFanoutConfig {
50    /// Returns an updated value with safe defaults configured.
51    /// This is data-only and does not perform I/O, call host ports, append journals, publish
52    /// events, or start processes.
53    pub fn safe_defaults() -> Self {
54        Self {
55            queue_capacity: NonZeroUsize::new(64).expect("nonzero queue capacity"),
56            terminal_reserve: NonZeroUsize::new(4).expect("nonzero terminal reserve"),
57            overflow: TelemetryOverflowPolicy::DropNonTerminalProgress,
58            sink_isolation: TelemetrySinkIsolationPolicy::IsolateEachSink,
59        }
60    }
61
62    /// Builds the tiny for tests value.
63    /// This is data construction and performs no I/O, journal append, event publication, or
64    /// process work.
65    pub fn tiny_for_tests() -> Self {
66        Self {
67            queue_capacity: NonZeroUsize::new(2).expect("nonzero queue capacity"),
68            terminal_reserve: NonZeroUsize::new(1).expect("nonzero terminal reserve"),
69            overflow: TelemetryOverflowPolicy::DropNonTerminalProgress,
70            sink_isolation: TelemetrySinkIsolationPolicy::IsolateEachSink,
71        }
72    }
73}
74
75impl Default for TelemetryFanoutConfig {
76    fn default() -> Self {
77        Self::safe_defaults()
78    }
79}
80
81#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
82#[serde(rename_all = "snake_case")]
83/// Enumerates the finite telemetry overflow policy cases.
84/// Serialized names are part of the SDK contract; update fixtures when variants change.
85pub enum TelemetryOverflowPolicy {
86    /// Use this variant when the contract needs to represent drop non terminal progress; selecting it has no side effect by itself.
87    DropNonTerminalProgress,
88    /// Use this variant when the contract needs to represent coalesce progress by run; selecting it has no side effect by itself.
89    CoalesceProgressByRun,
90    /// Use this variant when the contract needs to represent fail sink not run; selecting it has no side effect by itself.
91    FailSinkNotRun,
92}
93
94#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
95#[serde(rename_all = "snake_case")]
96/// Enumerates the finite telemetry sink isolation policy cases.
97/// Serialized names are part of the SDK contract; update fixtures when variants change.
98pub enum TelemetrySinkIsolationPolicy {
99    /// Use this variant when the contract needs to represent isolate each sink; selecting it has no side effect by itself.
100    IsolateEachSink,
101}
102
103#[derive(Default)]
104/// Holds telemetry fanout application-layer state or configuration.
105/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
106pub struct TelemetryFanout {
107    config: TelemetryFanoutConfig,
108    sinks: BTreeMap<TelemetrySinkId, TelemetrySinkState>,
109}
110
111impl TelemetryFanout {
112    /// Creates a new application::telemetry value with explicit
113    /// caller-provided inputs. This constructor is data-only and
114    /// performs no I/O or external side effects.
115    pub fn new(config: TelemetryFanoutConfig) -> Self {
116        Self {
117            config,
118            sinks: BTreeMap::new(),
119        }
120    }
121
122    /// Returns an updated value with safe defaults configured.
123    /// This is data-only and does not perform I/O, call host ports, append journals, publish
124    /// events, or start processes.
125    pub fn safe_defaults() -> Self {
126        Self::new(TelemetryFanoutConfig::safe_defaults())
127    }
128
129    /// Register sink.
130    /// This adds the sink to telemetry fanout state and initializes its bounded in-memory
131    /// queue.
132    pub fn register_sink(&mut self, sink: Arc<dyn TelemetrySink>) -> Result<(), AgentError> {
133        let spec = sink.spec().clone();
134        if spec.sink_id.as_str().is_empty() {
135            return Err(AgentError::missing_required_field("telemetry.sink_id"));
136        }
137        self.sinks
138            .insert(spec.sink_id.clone(), TelemetrySinkState::new(sink));
139        Ok(())
140    }
141
142    /// Returns sink queue len for callers that need to inspect the contract state.
143    /// This reads the in-memory queue length for one sink and does not drain or export
144    /// telemetry.
145    pub fn sink_queue_len(&self, sink_id: &TelemetrySinkId) -> Option<usize> {
146        self.sinks.get(sink_id).map(|state| state.queue.len())
147    }
148
149    /// Returns queued for sink for callers that need to inspect the contract state.
150    /// This clones queued in-memory telemetry projections for inspection and does not drain or
151    /// export them.
152    pub fn queued_for_sink(&self, sink_id: &TelemetrySinkId) -> Vec<TelemetryProjection> {
153        self.sinks
154            .get(sink_id)
155            .map(|state| state.queue.iter().cloned().collect())
156            .unwrap_or_default()
157    }
158
159    /// Sets try record on the value and returns it.
160    /// This enqueues a telemetry projection for eligible sinks and reports overflow or drop
161    /// outcomes.
162    pub fn try_record(&mut self, projection: TelemetryProjection) -> TelemetryFanoutReport {
163        let mut report = TelemetryFanoutReport::default();
164        for state in self.sinks.values_mut() {
165            let projection = apply_sink_content_boundary(&projection, state.sink.spec());
166            state.enqueue(&self.config, projection, &mut report);
167        }
168        report
169    }
170
171    /// Drain sink.
172    /// This drains queued projections for one sink so tests or adapters can export them.
173    pub fn drain_sink(
174        &mut self,
175        sink_id: &TelemetrySinkId,
176    ) -> Result<TelemetryDrainReport, AgentError> {
177        let Some(state) = self.sinks.get_mut(sink_id) else {
178            return Err(AgentError::host_configuration_needed(
179                "telemetry sink is not registered",
180            ));
181        };
182        Ok(state.drain())
183    }
184}
185
186struct TelemetrySinkState {
187    sink: Arc<dyn TelemetrySink>,
188    queue: VecDeque<TelemetryProjection>,
189    cursor: TelemetryExportCursor,
190    failed: bool,
191    dropped_count: u64,
192    next_record_seq: u64,
193}
194
195impl TelemetrySinkState {
196    fn new(sink: Arc<dyn TelemetrySink>) -> Self {
197        let sink_id = sink.spec().sink_id.clone();
198        Self {
199            sink,
200            queue: VecDeque::new(),
201            cursor: TelemetryExportCursor::new(sink_id),
202            failed: false,
203            dropped_count: 0,
204            next_record_seq: 0,
205        }
206    }
207
208    fn enqueue(
209        &mut self,
210        config: &TelemetryFanoutConfig,
211        projection: TelemetryProjection,
212        report: &mut TelemetryFanoutReport,
213    ) {
214        if self.has_room_for(config, &projection) {
215            self.queue.push_back(projection);
216            report.enqueued += 1;
217            return;
218        }
219
220        if projection.is_terminal_preserved() {
221            while self.queue.len() >= self.capacity(config) && self.drop_oldest_nonterminal() {
222                report.dropped += 1;
223            }
224            if self.queue.len() < self.capacity(config) {
225                self.queue.push_back(projection.clone());
226                report.enqueued += 1;
227                report.records.push(self.failure_record(
228                    &projection,
229                    TelemetrySinkFailureKind::Overflow,
230                    true,
231                    projection.source_record.source_cursor.clone(),
232                    "telemetry terminal projection preserved by dropping non-terminal progress",
233                ));
234                return;
235            }
236            self.dropped_count += 1;
237            report.dropped += 1;
238            report.records.push(self.failure_record(
239                &projection,
240                TelemetrySinkFailureKind::Overflow,
241                false,
242                projection.source_record.source_cursor.clone(),
243                "telemetry terminal projection could not enter the bounded sink queue",
244            ));
245            return;
246        }
247
248        match config.overflow {
249            TelemetryOverflowPolicy::DropNonTerminalProgress => {
250                self.dropped_count += 1;
251                report.dropped += 1;
252                report.records.push(self.failure_record(
253                    &projection,
254                    TelemetrySinkFailureKind::Overflow,
255                    true,
256                    projection.source_record.source_cursor.clone(),
257                    "telemetry non-terminal progress dropped under sink backpressure",
258                ));
259            }
260            TelemetryOverflowPolicy::CoalesceProgressByRun => {
261                if let Some(index) = self.queue.iter().position(|queued| {
262                    !queued.is_terminal_preserved() && queued.run_id == projection.run_id
263                }) {
264                    self.queue.remove(index);
265                    self.dropped_count += 1;
266                    report.dropped += 1;
267                }
268                if self.has_room_for(config, &projection) {
269                    self.queue.push_back(projection);
270                    report.enqueued += 1;
271                } else {
272                    self.dropped_count += 1;
273                    report.dropped += 1;
274                }
275            }
276            TelemetryOverflowPolicy::FailSinkNotRun => {
277                self.failed = true;
278                self.dropped_count += 1;
279                report.dropped += 1;
280                report.records.push(self.failure_record(
281                    &projection,
282                    TelemetrySinkFailureKind::Overflow,
283                    true,
284                    projection.source_record.source_cursor.clone(),
285                    "telemetry sink marked failed by overflow; run state is unaffected",
286                ));
287            }
288        }
289    }
290
291    fn drain(&mut self) -> TelemetryDrainReport {
292        let mut report = TelemetryDrainReport::default();
293        while let Some(projection) = self.queue.front().cloned() {
294            let attempted = self
295                .cursor
296                .clone()
297                .attempted(projection.source_record.source_cursor.clone());
298            match self.sink.export(&projection, &attempted) {
299                Ok(ack) => {
300                    self.cursor = ack.cursor;
301                    self.queue.pop_front();
302                    report.exported += 1;
303                    if self.failed {
304                        self.failed = false;
305                        report.records.push(self.recovery_record(&projection));
306                    }
307                }
308                Err(error) => {
309                    self.failed = true;
310                    report
311                        .records
312                        .push(self.export_failure_record(&projection, error));
313                    break;
314                }
315            }
316        }
317        report
318    }
319
320    fn has_room_for(
321        &self,
322        config: &TelemetryFanoutConfig,
323        projection: &TelemetryProjection,
324    ) -> bool {
325        if projection.is_terminal_preserved() {
326            return self.queue.len() < self.capacity(config);
327        }
328        self.queue.len() < self.capacity(config)
329            && self.nonterminal_count() < self.normal_capacity(config)
330    }
331
332    fn capacity(&self, config: &TelemetryFanoutConfig) -> usize {
333        self.sink
334            .spec()
335            .queue_capacity
336            .get()
337            .min(config.queue_capacity.get())
338    }
339
340    fn normal_capacity(&self, config: &TelemetryFanoutConfig) -> usize {
341        let terminal_reserve = self
342            .sink
343            .spec()
344            .terminal_reserve
345            .get()
346            .max(config.terminal_reserve.get())
347            .min(self.capacity(config));
348        self.capacity(config).saturating_sub(terminal_reserve)
349    }
350
351    fn nonterminal_count(&self) -> usize {
352        self.queue
353            .iter()
354            .filter(|projection| !projection.is_terminal_preserved())
355            .count()
356    }
357
358    fn drop_oldest_nonterminal(&mut self) -> bool {
359        let Some(index) = self
360            .queue
361            .iter()
362            .position(|projection| !projection.is_terminal_preserved())
363        else {
364            return false;
365        };
366        self.queue.remove(index);
367        self.dropped_count += 1;
368        true
369    }
370
371    fn export_failure_record(
372        &mut self,
373        projection: &TelemetryProjection,
374        error: TelemetrySinkError,
375    ) -> TelemetryRecord {
376        self.failure_record(
377            projection,
378            error.failure_kind,
379            projection.is_terminal_preserved(),
380            projection.source_record.source_cursor.clone(),
381            error.redacted_summary,
382        )
383    }
384
385    fn failure_record(
386        &mut self,
387        projection: &TelemetryProjection,
388        failure_kind: TelemetrySinkFailureKind,
389        terminal_preserved: bool,
390        repair_cursor: Option<TelemetrySourceCursor>,
391        summary: impl Into<String>,
392    ) -> TelemetryRecord {
393        let sink_spec = self.sink.spec();
394        let failure = TelemetrySinkFailureRecord {
395            sink_id: sink_spec.sink_id.clone(),
396            sink_kind: sink_spec.sink_kind.clone(),
397            failure_kind,
398            terminal_preserved,
399            dropped_count: self.dropped_count,
400            last_acknowledged_cursor: Some(self.cursor.clone()),
401            repair_cursor,
402            unsafe_pending_reason: (!sink_spec.requires_idempotent_replay)
403                .then(|| "sink cannot prove idempotent repair replay".to_string()),
404            redacted_summary: summary.into(),
405        };
406        TelemetryRecord::sink_failed(self.next_record_id("sink_failed"), projection, failure)
407    }
408
409    fn recovery_record(&mut self, projection: &TelemetryProjection) -> TelemetryRecord {
410        let sink_spec = self.sink.spec();
411        let recovery = TelemetrySinkRecoveryRecord {
412            sink_id: sink_spec.sink_id.clone(),
413            sink_kind: sink_spec.sink_kind.clone(),
414            export_cursor: self.cursor.clone(),
415            redacted_summary: "telemetry sink recovered after successful export".to_string(),
416        };
417        TelemetryRecord::sink_recovered(self.next_record_id("sink_recovered"), projection, recovery)
418    }
419
420    fn next_record_id(&mut self, label: &str) -> TelemetryRecordId {
421        self.next_record_seq += 1;
422        TelemetryRecordId::new(format!(
423            "telemetry.{}.{}.{}",
424            self.sink.spec().sink_id.as_str(),
425            label,
426            self.next_record_seq
427        ))
428    }
429}
430
431#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
432/// Holds telemetry fanout report application-layer state or configuration.
433/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
434pub struct TelemetryFanoutReport {
435    /// Enqueued used by this record or request.
436    pub enqueued: u64,
437    /// Dropped used by this record or request.
438    pub dropped: u64,
439    #[serde(default, skip_serializing_if = "Vec::is_empty")]
440    /// Bounded records included in this record. Limits and truncation are
441    /// represented by companion metadata when applicable.
442    pub records: Vec<TelemetryRecord>,
443}
444
445#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
446/// Holds telemetry drain report application-layer state or configuration.
447/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
448pub struct TelemetryDrainReport {
449    /// Exported used by this record or request.
450    pub exported: u64,
451    #[serde(default, skip_serializing_if = "Vec::is_empty")]
452    /// Bounded records included in this record. Limits and truncation are
453    /// represented by companion metadata when applicable.
454    pub records: Vec<TelemetryRecord>,
455}
456
457/// Holds telemetry usage extraction input application-layer state or configuration.
458/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
459pub struct TelemetryUsageExtractionInput {
460    /// Event used by this record or request.
461    pub event: AgentEvent,
462    /// Cursor identifying a replay, export, or subscription position.
463    /// Use it to resume without widening the original scope.
464    pub event_cursor: Option<EventCursor>,
465    /// Stable provider id used for typed lineage, lookup, or dedupe.
466    pub provider_id: Option<String>,
467    /// Stable model id used for typed lineage, lookup, or dedupe.
468    pub model_id: Option<String>,
469    /// Usage used by this record or request.
470    pub usage: UsageUnits,
471}
472
473/// Holds telemetry usage extractor application-layer state or configuration.
474/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
475pub struct TelemetryUsageExtractor;
476
477impl TelemetryUsageExtractor {
478    /// Returns extract from event derived from the supplied state.
479    /// This uses only local coordinator state and performs no hidden host work.
480    pub fn extract_from_event(
481        input: TelemetryUsageExtractionInput,
482    ) -> Result<TelemetryProjection, AgentError> {
483        let envelope = input.event.envelope;
484        if !matches!(
485            envelope.event_family,
486            EventFamily::Model | EventFamily::Run | EventFamily::Subagent
487        ) {
488            return Err(AgentError::new(
489                AgentErrorKind::TelemetryFailure,
490                RetryClassification::RepairNeeded,
491                "usage telemetry must derive from model, run, or subagent facts",
492            ));
493        }
494
495        Ok(TelemetryProjection {
496            schema_version: TELEMETRY_SCHEMA_VERSION,
497            projection_id: TelemetryProjectionId::new(format!(
498                "telemetry.usage.{}",
499                envelope.event_id.as_str()
500            )),
501            projection_kind: TelemetryProjectionKind::Usage,
502            source_record: TelemetrySourceRecord {
503                event_family: envelope.event_family.clone(),
504                event_kind: envelope.event_kind.clone(),
505                event_cursor: input.event_cursor.clone(),
506                source_cursor: envelope
507                    .journal_cursor
508                    .clone()
509                    .map(TelemetrySourceCursor::Journal)
510                    .or_else(|| input.event_cursor.clone().map(TelemetrySourceCursor::Event)),
511            },
512            run_id: envelope.run_id,
513            agent_id: envelope.agent_id,
514            turn_id: envelope.turn_id,
515            attempt_id: envelope.attempt_id,
516            event_id: Some(envelope.event_id),
517            journal_cursor: envelope.journal_cursor,
518            trace_id: Some(envelope.trace_id),
519            span_id: Some(envelope.span_id),
520            runtime_package_fingerprint: envelope.runtime_package_fingerprint,
521            source: envelope.source,
522            destination: Some(DestinationRef::with_kind(
523                DestinationKind::Telemetry,
524                "destination.telemetry.usage",
525            )),
526            subject_ref: envelope.subject_ref,
527            policy_refs: envelope.policy_refs,
528            privacy: envelope.privacy,
529            retention: RetentionClass::RunScoped,
530            content_capture: TelemetryContentCaptureMode::Off,
531            redaction_policy_id: envelope.redaction_policy_id,
532            provider_id: input.provider_id,
533            model_id: input.model_id,
534            tool_name: None,
535            usage: Some(input.usage),
536            cost: None,
537            terminal_status: Some(TelemetryTerminalStatus::Completed),
538            sink_health: None,
539            redacted_summary: "usage telemetry derived without raw prompt, tool, or model content"
540                .to_string(),
541            raw_content: None,
542        })
543    }
544
545    /// Builds the usage record record for this contract.
546    /// This builds a telemetry usage record from already redacted accounting data without
547    /// exporting it.
548    pub fn usage_record(
549        projection: &TelemetryProjection,
550        usage_record_id: impl Into<String>,
551    ) -> TelemetryRecord {
552        TelemetryRecord::usage(
553            TelemetryRecordId::new(format!(
554                "telemetry.record.{}",
555                projection.projection_id.as_str()
556            )),
557            projection,
558            TelemetryUsageRecordId::new(usage_record_id),
559        )
560    }
561}
562
563#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
564/// Holds telemetry content capture request application-layer state or configuration.
565/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
566pub struct TelemetryContentCaptureRequest {
567    /// Policy used by this record or request.
568    pub policy: ContentCapturePolicy,
569    /// Sink used by this record or request.
570    pub sink: TelemetrySinkSpec,
571    /// Requested mode used by this record or request.
572    pub requested_mode: TelemetryContentCaptureMode,
573    /// Whether source permits content is enabled.
574    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
575    pub source_permits_content: bool,
576    /// Retention class for referenced content or records.
577    /// Stores and telemetry sinks use it to decide how long evidence may be kept.
578    pub retention_active: bool,
579    /// Whether deterministic telemetry sampling included this projection after policy gates.
580    /// False means the projection should be dropped for sampled sinks even if retention is
581    /// active.
582    pub deterministic_sample_included: bool,
583    /// requested bytes used for bounds checks, summaries, or truncation
584    /// evidence.
585    pub requested_bytes: u64,
586    /// Stable redaction policy id used for typed lineage, lookup, or dedupe.
587    pub redaction_policy_id: String,
588}
589
590#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
591/// Holds telemetry content capture decision application-layer state or configuration.
592/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
593pub struct TelemetryContentCaptureDecision {
594    /// Allowlist for this policy or contract.
595    /// Validation uses it to reject undeclared or policy-denied values.
596    pub allowed: bool,
597    /// Requested mode used by this record or request.
598    pub requested_mode: TelemetryContentCaptureMode,
599    /// Effective mode used by this record or request.
600    pub effective_mode: TelemetryContentCaptureMode,
601    /// Redacted explanation for a denial, failure, status, or package delta.
602    pub reason: String,
603    /// Stable redaction policy id used for typed lineage, lookup, or dedupe.
604    pub redaction_policy_id: String,
605    /// Policy references that govern admission, projection, execution, or
606    /// delivery.
607    pub policy_refs: Vec<PolicyRef>,
608}
609
610/// Evaluate content capture.
611/// This evaluates policy for telemetry content capture and performs no capture or sink export
612/// by itself.
613pub fn evaluate_content_capture(
614    request: &TelemetryContentCaptureRequest,
615) -> TelemetryContentCaptureDecision {
616    let policy_raw_mode = matches!(request.policy.mode, PolicyContentCaptureMode::RawContent);
617    let sink_raw_mode = request.sink.content_capture.captures_raw_content();
618    let byte_limit_allows =
619        request.requested_bytes > 0 && request.requested_bytes <= request.policy.byte_limit;
620    let all_raw_gates_pass = policy_raw_mode
621        && request.policy.allows_raw_content()
622        && request.source_permits_content
623        && sink_raw_mode
624        && request.retention_active
625        && request.deterministic_sample_included
626        && byte_limit_allows;
627
628    if !request.requested_mode.captures_raw_content() {
629        return TelemetryContentCaptureDecision {
630            allowed: true,
631            requested_mode: request.requested_mode.clone(),
632            effective_mode: request.requested_mode.clone(),
633            reason: "telemetry metadata or redacted capture does not request raw content"
634                .to_string(),
635            redaction_policy_id: request.redaction_policy_id.clone(),
636            policy_refs: vec![request.policy.policy_ref.clone()],
637        };
638    }
639
640    if all_raw_gates_pass {
641        TelemetryContentCaptureDecision {
642            allowed: true,
643            requested_mode: request.requested_mode.clone(),
644            effective_mode: TelemetryContentCaptureMode::RawContent,
645            reason: "raw telemetry content capture allowed by source, sink, redaction, retention, sampling, and limits".to_string(),
646            redaction_policy_id: request.redaction_policy_id.clone(),
647            policy_refs: vec![request.policy.policy_ref.clone()],
648        }
649    } else {
650        TelemetryContentCaptureDecision {
651            allowed: false,
652            requested_mode: request.requested_mode.clone(),
653            effective_mode: TelemetryContentCaptureMode::RedactedSummary,
654            reason: "raw telemetry content capture denied by source, sink, redaction, retention, sampling, or byte-limit policy".to_string(),
655            redaction_policy_id: request.redaction_policy_id.clone(),
656            policy_refs: vec![request.policy.policy_ref.clone()],
657        }
658    }
659}
660
661#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
662/// Holds telemetry authority boundary application-layer state or configuration.
663/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
664pub struct TelemetryAuthorityBoundary {
665    /// Boolean policy/capability flag for whether can decide run state is
666    /// enabled.
667    pub can_decide_run_state: bool,
668    /// Boolean policy/capability flag for whether can decide policy outcome
669    /// is enabled.
670    pub can_decide_policy_outcome: bool,
671    /// Boolean policy/capability flag for whether can decide output delivery
672    /// is enabled.
673    pub can_decide_output_delivery: bool,
674    /// Whether this telemetry surface is allowed to affect side-effect status.
675    /// Observability-only telemetry should leave this false so telemetry cannot drive run
676    /// control.
677    pub can_decide_side_effect_status: bool,
678}
679
680/// Constant value for the application::telemetry contract. Use it to
681/// keep SDK records and tests aligned on the same stable value.
682pub const fn telemetry_authority_boundary() -> TelemetryAuthorityBoundary {
683    TelemetryAuthorityBoundary {
684        can_decide_run_state: false,
685        can_decide_policy_outcome: false,
686        can_decide_output_delivery: false,
687        can_decide_side_effect_status: false,
688    }
689}
690
691/// Returns terminal run projection from event derived from the supplied state.
692/// This derives SDK state locally and does not call host adapters.
693pub fn terminal_run_projection_from_event(event: AgentEvent) -> TelemetryProjection {
694    let envelope = event.envelope;
695    let terminal_status = match envelope.event_kind {
696        EventKind::RunCompleted => TelemetryTerminalStatus::Completed,
697        EventKind::RunCancelled => TelemetryTerminalStatus::Cancelled,
698        EventKind::RunFailed => TelemetryTerminalStatus::Failed,
699        _ => TelemetryTerminalStatus::Unknown,
700    };
701    let source_cursor = envelope
702        .journal_cursor
703        .clone()
704        .map(TelemetrySourceCursor::Journal);
705    TelemetryProjection {
706        schema_version: TELEMETRY_SCHEMA_VERSION,
707        projection_id: TelemetryProjectionId::new(format!(
708            "telemetry.terminal.{}",
709            envelope.event_id.as_str()
710        )),
711        projection_kind: TelemetryProjectionKind::RunTerminal,
712        source_record: TelemetrySourceRecord {
713            event_family: envelope.event_family.clone(),
714            event_kind: envelope.event_kind.clone(),
715            event_cursor: Some(envelope.cursor(EventStreamScope::Run(envelope.run_id.clone()))),
716            source_cursor,
717        },
718        run_id: envelope.run_id,
719        agent_id: envelope.agent_id,
720        turn_id: envelope.turn_id,
721        attempt_id: envelope.attempt_id,
722        event_id: Some(envelope.event_id),
723        journal_cursor: envelope.journal_cursor,
724        trace_id: Some(envelope.trace_id),
725        span_id: Some(envelope.span_id),
726        runtime_package_fingerprint: envelope.runtime_package_fingerprint,
727        source: envelope.source,
728        destination: Some(DestinationRef::with_kind(
729            DestinationKind::Telemetry,
730            "destination.telemetry.terminal",
731        )),
732        subject_ref: envelope.subject_ref,
733        policy_refs: envelope.policy_refs,
734        privacy: envelope.privacy,
735        retention: RetentionClass::RunScoped,
736        content_capture: TelemetryContentCaptureMode::Off,
737        redaction_policy_id: envelope.redaction_policy_id,
738        provider_id: None,
739        model_id: None,
740        tool_name: None,
741        usage: None,
742        cost: None,
743        terminal_status: Some(terminal_status),
744        sink_health: None,
745        redacted_summary: "terminal run telemetry derived from journal-backed event".to_string(),
746        raw_content: None,
747    }
748}
749
750fn apply_sink_content_boundary(
751    projection: &TelemetryProjection,
752    sink: &TelemetrySinkSpec,
753) -> TelemetryProjection {
754    if projection.raw_content.is_none() {
755        return projection.clone();
756    }
757    if sink.content_capture.captures_raw_content()
758        && projection.content_capture.captures_raw_content()
759    {
760        return projection.clone();
761    }
762    projection.clone().without_raw_content()
763}
764
765/// Returns sink health projection derived from the supplied state.
766/// This derives SDK state locally and does not call host adapters.
767pub fn sink_health_projection(
768    base: &TelemetryProjection,
769    sink_id: TelemetrySinkId,
770    sink_kind: TelemetrySinkKind,
771    state: TelemetrySinkHealthState,
772) -> TelemetryProjection {
773    let mut projection = base.clone().without_raw_content();
774    projection.projection_id =
775        TelemetryProjectionId::new(format!("telemetry.sink_health.{}", sink_id.as_str()));
776    projection.projection_kind = TelemetryProjectionKind::SinkHealth;
777    projection.sink_health = Some(TelemetrySinkHealth {
778        sink_id,
779        sink_kind,
780        state,
781        failure_kind: None,
782        terminal_preserved: true,
783        dropped_count: 0,
784        export_cursor: None,
785        unsafe_pending_reason: None,
786    });
787    projection
788}