agent_sdk_core/testing/
realtime.rs1use std::{
8 collections::VecDeque,
9 sync::{Arc, Mutex},
10};
11
12use crate::{
13 domain::{AgentError, AgentErrorKind, RetryClassification},
14 ports::realtime::{
15 RealtimeAdapterAck, RealtimeAdapterCall, RealtimeConnectRequest, RealtimeConnectResponse,
16 RealtimeProviderAdapter,
17 },
18 realtime_records::{
19 RealtimeCloseReason, RealtimeConnectionId, RealtimeInputFrame, RealtimeOutputFrame,
20 RealtimeResponseId, RealtimeSessionId,
21 },
22};
23
24#[derive(Clone, Debug)]
25pub struct ScriptedRealtimeAdapter {
28 adapter_ref: String,
29 calls: Arc<Mutex<Vec<RealtimeAdapterCall>>>,
30 output_frames: Arc<Mutex<VecDeque<RealtimeOutputFrame>>>,
31 next_connection_seq: Arc<Mutex<u64>>,
32 fail_restart: Arc<Mutex<Option<String>>>,
33}
34
35impl ScriptedRealtimeAdapter {
36 pub fn new(adapter_ref: impl Into<String>) -> Self {
46 Self {
47 adapter_ref: adapter_ref.into(),
48 calls: Arc::new(Mutex::new(Vec::new())),
49 output_frames: Arc::new(Mutex::new(VecDeque::new())),
50 next_connection_seq: Arc::new(Mutex::new(1)),
51 fail_restart: Arc::new(Mutex::new(None)),
52 }
53 }
54
55 pub fn push_output(&self, frame: RealtimeOutputFrame) {
59 self.output_frames
60 .lock()
61 .expect("realtime output lock")
62 .push_back(frame);
63 }
64
65 pub fn fail_next_restart(&self, message: impl Into<String>) {
69 *self
70 .fail_restart
71 .lock()
72 .expect("realtime fail restart lock") = Some(message.into());
73 }
74
75 pub fn calls(&self) -> Vec<RealtimeAdapterCall> {
79 self.calls.lock().expect("realtime calls lock").clone()
80 }
81
82 pub fn call_names(&self) -> Vec<&'static str> {
85 self.calls().iter().map(RealtimeAdapterCall::name).collect()
86 }
87
88 fn next_connection_id(&self) -> RealtimeConnectionId {
89 let mut seq = self
90 .next_connection_seq
91 .lock()
92 .expect("realtime connection seq lock");
93 let connection_id = RealtimeConnectionId::new(format!("realtime.connection.{seq}"));
94 *seq += 1;
95 connection_id
96 }
97}
98
99impl RealtimeProviderAdapter for ScriptedRealtimeAdapter {
100 fn adapter_ref(&self) -> &str {
101 &self.adapter_ref
102 }
103
104 fn connect(
105 &self,
106 request: RealtimeConnectRequest,
107 ) -> Result<RealtimeConnectResponse, AgentError> {
108 self.calls
109 .lock()
110 .expect("realtime calls lock")
111 .push(RealtimeAdapterCall::Connect {
112 session_id: request.session_id.clone(),
113 });
114 Ok(RealtimeConnectResponse {
115 session_id: request.session_id,
116 connection_id: self.next_connection_id(),
117 redacted_summary: "realtime connected".to_string(),
118 })
119 }
120
121 fn send(
122 &self,
123 session_id: &RealtimeSessionId,
124 frame: RealtimeInputFrame,
125 ) -> Result<(), AgentError> {
126 self.calls
127 .lock()
128 .expect("realtime calls lock")
129 .push(RealtimeAdapterCall::Send {
130 session_id: session_id.clone(),
131 redacted_summary: frame.redacted_summary,
132 });
133 Ok(())
134 }
135
136 fn receive(
137 &self,
138 session_id: &RealtimeSessionId,
139 ) -> Result<Option<RealtimeOutputFrame>, AgentError> {
140 self.calls
141 .lock()
142 .expect("realtime calls lock")
143 .push(RealtimeAdapterCall::Receive {
144 session_id: session_id.clone(),
145 });
146 Ok(self
147 .output_frames
148 .lock()
149 .expect("realtime output lock")
150 .pop_front())
151 }
152
153 fn interrupt(
154 &self,
155 session_id: &RealtimeSessionId,
156 response_id: &RealtimeResponseId,
157 ) -> Result<RealtimeAdapterAck, AgentError> {
158 self.calls
159 .lock()
160 .expect("realtime calls lock")
161 .push(RealtimeAdapterCall::Interrupt {
162 session_id: session_id.clone(),
163 response_id: response_id.clone(),
164 });
165 Ok(RealtimeAdapterAck {
166 redacted_summary: "realtime response interrupted".to_string(),
167 })
168 }
169
170 fn restart(
171 &self,
172 session_id: &RealtimeSessionId,
173 current_connection_id: &RealtimeConnectionId,
174 ) -> Result<RealtimeConnectResponse, AgentError> {
175 if let Some(message) = self
176 .fail_restart
177 .lock()
178 .expect("realtime fail restart lock")
179 .take()
180 {
181 return Err(AgentError::new(
182 AgentErrorKind::ProviderFailure,
183 RetryClassification::Retryable,
184 message,
185 ));
186 }
187 self.calls
188 .lock()
189 .expect("realtime calls lock")
190 .push(RealtimeAdapterCall::Restart {
191 session_id: session_id.clone(),
192 current_connection_id: current_connection_id.clone(),
193 });
194 Ok(RealtimeConnectResponse {
195 session_id: session_id.clone(),
196 connection_id: self.next_connection_id(),
197 redacted_summary: "realtime restarted".to_string(),
198 })
199 }
200
201 fn close(
202 &self,
203 session_id: &RealtimeSessionId,
204 reason: RealtimeCloseReason,
205 ) -> Result<RealtimeAdapterAck, AgentError> {
206 self.calls
207 .lock()
208 .expect("realtime calls lock")
209 .push(RealtimeAdapterCall::Close {
210 session_id: session_id.clone(),
211 reason,
212 });
213 Ok(RealtimeAdapterAck {
214 redacted_summary: "realtime closed".to_string(),
215 })
216 }
217}