agent_sdk_core/testing/
telemetry.rs1use std::sync::{Arc, Mutex};
8
9use crate::{
10 telemetry_ports::{TelemetrySink, TelemetrySinkAck, TelemetrySinkError, TelemetrySinkSpec},
11 telemetry_records::{TelemetryExportCursor, TelemetryProjection},
12};
13
14#[derive(Clone)]
15pub struct ScriptedTelemetrySink {
18 spec: TelemetrySinkSpec,
19 fail_next: Arc<Mutex<Option<TelemetrySinkError>>>,
20 exports: Arc<Mutex<Vec<TelemetryProjection>>>,
21}
22
23impl ScriptedTelemetrySink {
24 pub fn new(spec: TelemetrySinkSpec) -> Self {
28 Self {
29 spec,
30 fail_next: Arc::new(Mutex::new(None)),
31 exports: Arc::new(Mutex::new(Vec::new())),
32 }
33 }
34
35 pub fn sink_spec(&self) -> &TelemetrySinkSpec {
38 &self.spec
39 }
40
41 pub fn fail_next(&self, summary: impl Into<String>) {
45 *self
46 .fail_next
47 .lock()
48 .expect("telemetry scripted sink fail lock") =
49 Some(TelemetrySinkError::unavailable(summary));
50 }
51
52 pub fn exports(&self) -> Vec<TelemetryProjection> {
55 self.exports
56 .lock()
57 .expect("telemetry scripted sink exports lock")
58 .clone()
59 }
60}
61
62impl TelemetrySink for ScriptedTelemetrySink {
63 fn spec(&self) -> &TelemetrySinkSpec {
64 &self.spec
65 }
66
67 fn export(
68 &self,
69 projection: &TelemetryProjection,
70 cursor: &TelemetryExportCursor,
71 ) -> Result<TelemetrySinkAck, TelemetrySinkError> {
72 if let Some(error) = self
73 .fail_next
74 .lock()
75 .expect("telemetry scripted sink fail lock")
76 .take()
77 {
78 return Err(error);
79 }
80 self.exports
81 .lock()
82 .expect("telemetry scripted sink exports lock")
83 .push(projection.clone());
84 Ok(TelemetrySinkAck::accepted(cursor.clone().acknowledged(
85 projection.source_record.source_cursor.clone(),
86 )))
87 }
88}