Skip to main content

agent_sdk_core/testing/
realtime.rs

1//! Deterministic test-kit helpers for SDK consumers. Use these fakes and harnesses to
2//! exercise public contracts without live providers, real stores, product UI, network
3//! telemetry, or wall-clock-dependent infrastructure. They mutate only their
4//! in-memory state unless noted. This file contains the realtime portion of that
5//! contract.
6//!
7use std::{
8    collections::VecDeque,
9    sync::{Arc, Mutex},
10};
11
12use crate::{
13    domain::{AgentError, AgentErrorKind, RetryClassification},
14    ports::realtime::{
15        RealtimeAdapterAck, RealtimeAdapterCall, RealtimeConnectRequest, RealtimeConnectResponse,
16        RealtimeProviderAdapter,
17    },
18    realtime_records::{
19        RealtimeCloseReason, RealtimeConnectionId, RealtimeInputFrame, RealtimeOutputFrame,
20        RealtimeResponseId, RealtimeSessionId,
21    },
22};
23
24#[derive(Clone, Debug)]
25/// In-memory scripted realtime adapter fixture for SDK conformance tests.
26/// Use it to script deterministic behavior in memory; any transcript or endpoint mutation is documented on the method that performs it.
27pub struct ScriptedRealtimeAdapter {
28    adapter_ref: String,
29    calls: Arc<Mutex<Vec<RealtimeAdapterCall>>>,
30    output_frames: Arc<Mutex<VecDeque<RealtimeOutputFrame>>>,
31    next_connection_seq: Arc<Mutex<u64>>,
32    fail_restart: Arc<Mutex<Option<String>>>,
33}
34
35impl ScriptedRealtimeAdapter {
36    /// Creates a new testing::realtime value with explicit
37    /// caller-provided inputs. This constructor is data-only and
38    /// performs no I/O or external side effects.
39    ///
40    /// # Panics
41    ///
42    /// Panics if constructor invariants fail, such as invalid identifier
43    /// text or constructor-specific bounds. Use a fallible constructor such as
44    /// `try_new` when one is available for untrusted input.
45    pub fn new(adapter_ref: impl Into<String>) -> Self {
46        Self {
47            adapter_ref: adapter_ref.into(),
48            calls: Arc::new(Mutex::new(Vec::new())),
49            output_frames: Arc::new(Mutex::new(VecDeque::new())),
50            next_connection_seq: Arc::new(Mutex::new(1)),
51            fail_restart: Arc::new(Mutex::new(None)),
52        }
53    }
54
55    /// Push output.
56    /// This reads or mutates deterministic in-memory test state unless the method explicitly
57    /// names a fixture file.
58    pub fn push_output(&self, frame: RealtimeOutputFrame) {
59        self.output_frames
60            .lock()
61            .expect("realtime output lock")
62            .push_back(frame);
63    }
64
65    /// Fail next restart.
66    /// This reads or mutates deterministic in-memory test state unless the method explicitly
67    /// names a fixture file.
68    pub fn fail_next_restart(&self, message: impl Into<String>) {
69        *self
70            .fail_restart
71            .lock()
72            .expect("realtime fail restart lock") = Some(message.into());
73    }
74
75    /// Operates on in-memory or journal-derived testing::realtime state for
76    /// diagnostics and repair evidence. It does not create a second run loop
77    /// or product workflow owner.
78    pub fn calls(&self) -> Vec<RealtimeAdapterCall> {
79        self.calls.lock().expect("realtime calls lock").clone()
80    }
81
82    /// Returns the call names currently held by this value.
83    /// This configures deterministic in-memory test state only.
84    pub fn call_names(&self) -> Vec<&'static str> {
85        self.calls().iter().map(RealtimeAdapterCall::name).collect()
86    }
87
88    fn next_connection_id(&self) -> RealtimeConnectionId {
89        let mut seq = self
90            .next_connection_seq
91            .lock()
92            .expect("realtime connection seq lock");
93        let connection_id = RealtimeConnectionId::new(format!("realtime.connection.{seq}"));
94        *seq += 1;
95        connection_id
96    }
97}
98
99impl RealtimeProviderAdapter for ScriptedRealtimeAdapter {
100    fn adapter_ref(&self) -> &str {
101        &self.adapter_ref
102    }
103
104    fn connect(
105        &self,
106        request: RealtimeConnectRequest,
107    ) -> Result<RealtimeConnectResponse, AgentError> {
108        self.calls
109            .lock()
110            .expect("realtime calls lock")
111            .push(RealtimeAdapterCall::Connect {
112                session_id: request.session_id.clone(),
113            });
114        Ok(RealtimeConnectResponse {
115            session_id: request.session_id,
116            connection_id: self.next_connection_id(),
117            redacted_summary: "realtime connected".to_string(),
118        })
119    }
120
121    fn send(
122        &self,
123        session_id: &RealtimeSessionId,
124        frame: RealtimeInputFrame,
125    ) -> Result<(), AgentError> {
126        self.calls
127            .lock()
128            .expect("realtime calls lock")
129            .push(RealtimeAdapterCall::Send {
130                session_id: session_id.clone(),
131                redacted_summary: frame.redacted_summary,
132            });
133        Ok(())
134    }
135
136    fn receive(
137        &self,
138        session_id: &RealtimeSessionId,
139    ) -> Result<Option<RealtimeOutputFrame>, AgentError> {
140        self.calls
141            .lock()
142            .expect("realtime calls lock")
143            .push(RealtimeAdapterCall::Receive {
144                session_id: session_id.clone(),
145            });
146        Ok(self
147            .output_frames
148            .lock()
149            .expect("realtime output lock")
150            .pop_front())
151    }
152
153    fn interrupt(
154        &self,
155        session_id: &RealtimeSessionId,
156        response_id: &RealtimeResponseId,
157    ) -> Result<RealtimeAdapterAck, AgentError> {
158        self.calls
159            .lock()
160            .expect("realtime calls lock")
161            .push(RealtimeAdapterCall::Interrupt {
162                session_id: session_id.clone(),
163                response_id: response_id.clone(),
164            });
165        Ok(RealtimeAdapterAck {
166            redacted_summary: "realtime response interrupted".to_string(),
167        })
168    }
169
170    fn restart(
171        &self,
172        session_id: &RealtimeSessionId,
173        current_connection_id: &RealtimeConnectionId,
174    ) -> Result<RealtimeConnectResponse, AgentError> {
175        if let Some(message) = self
176            .fail_restart
177            .lock()
178            .expect("realtime fail restart lock")
179            .take()
180        {
181            return Err(AgentError::new(
182                AgentErrorKind::ProviderFailure,
183                RetryClassification::Retryable,
184                message,
185            ));
186        }
187        self.calls
188            .lock()
189            .expect("realtime calls lock")
190            .push(RealtimeAdapterCall::Restart {
191                session_id: session_id.clone(),
192                current_connection_id: current_connection_id.clone(),
193            });
194        Ok(RealtimeConnectResponse {
195            session_id: session_id.clone(),
196            connection_id: self.next_connection_id(),
197            redacted_summary: "realtime restarted".to_string(),
198        })
199    }
200
201    fn close(
202        &self,
203        session_id: &RealtimeSessionId,
204        reason: RealtimeCloseReason,
205    ) -> Result<RealtimeAdapterAck, AgentError> {
206        self.calls
207            .lock()
208            .expect("realtime calls lock")
209            .push(RealtimeAdapterCall::Close {
210                session_id: session_id.clone(),
211                reason,
212            });
213        Ok(RealtimeAdapterAck {
214            redacted_summary: "realtime closed".to_string(),
215        })
216    }
217}