1use async_trait::async_trait;
2use serde::Deserialize;
3use serde_json::json;
4use std::collections::VecDeque;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex};
7use std::time::Instant;
8use vona_core::backend::{BackendCapabilities, BackendError, BackendStep, SpeechToSpeechBackend};
9use vona_core::realtime::{
10 RealtimeVoiceBackend, RealtimeVoiceCapabilities, RealtimeVoiceError, RealtimeVoiceInput,
11 RealtimeVoiceOutput, RealtimeVoiceSessionConfig,
12};
13use vona_core::runtime::{FallbackReason, SessionPolicy};
14use vona_core::session::SessionConfig;
15use vona_core::skills::{SkillError, SkillExecutor, SkillOutput};
16use vona_core::transport::{AudioTransport, TransportError};
17use vona_core::types::{
18 AudioInputFrame, AudioOutputFrame, ControlEvent, ExternalContextEvent, SkillCall, SkillContext,
19};
20
21#[derive(Clone, Default)]
22pub struct ScriptedTransport {
23 incoming: Arc<Mutex<VecDeque<AudioInputFrame>>>,
24 sent: Arc<Mutex<Vec<AudioOutputFrame>>>,
25}
26
27impl ScriptedTransport {
28 pub fn push_input(&self, frame: AudioInputFrame) {
29 self.incoming
30 .lock()
31 .expect("incoming queue")
32 .push_back(frame);
33 }
34
35 pub fn sent_frames(&self) -> Vec<AudioOutputFrame> {
36 self.sent.lock().expect("sent frames").clone()
37 }
38}
39
40#[async_trait]
41impl AudioTransport for ScriptedTransport {
42 fn sample_rate_hz(&self) -> u32 {
43 24_000
44 }
45
46 fn channels(&self) -> u16 {
47 1
48 }
49
50 async fn recv_frame(&self) -> Result<Option<AudioInputFrame>, TransportError> {
51 Ok(self.incoming.lock().expect("incoming queue").pop_front())
52 }
53
54 async fn send_frame(&self, frame: AudioOutputFrame) -> Result<(), TransportError> {
55 self.sent.lock().expect("sent frames").push(frame);
56 Ok(())
57 }
58
59 async fn clear_output(&self) -> Result<(), TransportError> {
60 self.sent.lock().expect("sent frames").clear();
61 Ok(())
62 }
63}
64
65#[derive(Default)]
66pub struct MockBackend {
67 steps: Arc<Mutex<VecDeque<BackendStep>>>,
68 injections: Arc<Mutex<Vec<ExternalContextEvent>>>,
69}
70
71impl MockBackend {
72 pub fn push_step(&self, step: BackendStep) {
73 self.steps.lock().expect("backend steps").push_back(step);
74 }
75
76 pub fn injected_events(&self) -> Vec<ExternalContextEvent> {
77 self.injections.lock().expect("injections").clone()
78 }
79}
80
81#[async_trait]
82impl SpeechToSpeechBackend for MockBackend {
83 type Session = SessionConfig;
84
85 fn capabilities(&self) -> BackendCapabilities {
86 BackendCapabilities {
87 supports_context_injection: true,
88 ..BackendCapabilities::default()
89 }
90 }
91
92 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, BackendError> {
93 Ok(config)
94 }
95
96 async fn step(
97 &self,
98 _session: &mut Self::Session,
99 _input: AudioInputFrame,
100 ) -> Result<BackendStep, BackendError> {
101 Ok(self
102 .steps
103 .lock()
104 .expect("backend steps")
105 .pop_front()
106 .unwrap_or_default())
107 }
108
109 async fn inject_event(
110 &self,
111 _session: &mut Self::Session,
112 event: ExternalContextEvent,
113 ) -> Result<(), BackendError> {
114 self.injections.lock().expect("injections").push(event);
115 Ok(())
116 }
117
118 async fn end_session(&self, _session: Self::Session) -> Result<(), BackendError> {
119 Ok(())
120 }
121}
122
123#[derive(Default)]
124pub struct ScriptedRealtimeBackend {
125 outputs: Arc<Mutex<VecDeque<RealtimeVoiceOutput>>>,
126 received_inputs: Arc<Mutex<Vec<RealtimeVoiceInput>>>,
127 capabilities: RealtimeVoiceCapabilities,
128}
129
130impl ScriptedRealtimeBackend {
131 pub fn with_capabilities(capabilities: RealtimeVoiceCapabilities) -> Self {
132 Self {
133 capabilities,
134 ..Self::default()
135 }
136 }
137
138 pub fn push_output(&self, output: RealtimeVoiceOutput) {
139 self.outputs
140 .lock()
141 .expect("realtime outputs")
142 .push_back(output);
143 }
144
145 pub fn received_inputs(&self) -> Vec<RealtimeVoiceInput> {
146 self.received_inputs
147 .lock()
148 .expect("realtime inputs")
149 .clone()
150 }
151}
152
153#[async_trait]
154impl RealtimeVoiceBackend for ScriptedRealtimeBackend {
155 type Session = RealtimeVoiceSessionConfig;
156
157 fn realtime_capabilities(&self) -> RealtimeVoiceCapabilities {
158 self.capabilities.clone()
159 }
160
161 async fn start_realtime_session(
162 &self,
163 config: RealtimeVoiceSessionConfig,
164 ) -> Result<Self::Session, RealtimeVoiceError> {
165 Ok(config)
166 }
167
168 async fn send_realtime_event(
169 &self,
170 _session: &mut Self::Session,
171 input: RealtimeVoiceInput,
172 ) -> Result<(), RealtimeVoiceError> {
173 self.received_inputs
174 .lock()
175 .expect("realtime inputs")
176 .push(input);
177 Ok(())
178 }
179
180 async fn recv_realtime_event(
181 &self,
182 _session: &mut Self::Session,
183 ) -> Result<Option<RealtimeVoiceOutput>, RealtimeVoiceError> {
184 Ok(self.outputs.lock().expect("realtime outputs").pop_front())
185 }
186
187 async fn close_realtime_session(
188 &self,
189 _session: Self::Session,
190 ) -> Result<(), RealtimeVoiceError> {
191 Ok(())
192 }
193}
194
195pub struct EchoSkillExecutor;
196
197#[async_trait]
198impl SkillExecutor for EchoSkillExecutor {
199 async fn execute(
200 &self,
201 call: SkillCall,
202 _context: SkillContext,
203 ) -> Result<SkillOutput, SkillError> {
204 Ok(SkillOutput {
205 spoken_summary: format!("executed {}", call.name),
206 structured_payload: Some(json!({"name": call.name, "args": call.arguments})),
207 audit_payload: None,
208 })
209 }
210}
211
212#[tokio::test]
213async fn scripted_realtime_backend_preserves_input_and_output_order() {
214 use vona_core::realtime::{RealtimeLatencyMark, RealtimeLatencyStage};
215
216 let backend = ScriptedRealtimeBackend::with_capabilities(RealtimeVoiceCapabilities {
217 supports_full_duplex: true,
218 supports_streaming_audio_input: true,
219 supports_streaming_audio_output: true,
220 supports_tool_calls: true,
221 supports_interruption: true,
222 supports_context_injection: true,
223 is_hosted_service: true,
224 max_input_chunk_ms: Some(40),
225 });
226
227 backend.push_output(RealtimeVoiceOutput::LatencyMark(RealtimeLatencyMark {
228 stage: RealtimeLatencyStage::InputReceived,
229 elapsed_ms: 5,
230 }));
231 backend.push_output(RealtimeVoiceOutput::ToolCall(SkillCall {
232 name: "lookup_context".to_string(),
233 arguments: json!({"topic": "sts"}),
234 }));
235 backend.push_output(RealtimeVoiceOutput::Interruption {
236 reason: Some("barge_in".to_string()),
237 });
238 backend.push_output(RealtimeVoiceOutput::Closed {
239 reason: Some("done".to_string()),
240 });
241
242 let mut session = backend
243 .start_realtime_session(RealtimeVoiceSessionConfig::default())
244 .await
245 .expect("start realtime session");
246
247 backend
248 .send_realtime_event(
249 &mut session,
250 RealtimeVoiceInput::Audio(AudioInputFrame {
251 sequence: 1,
252 sample_rate_hz: 24_000,
253 channels: 1,
254 samples: vec![0.0; 320],
255 }),
256 )
257 .await
258 .expect("send audio");
259 backend
260 .send_realtime_event(
261 &mut session,
262 RealtimeVoiceInput::ToolResult(ExternalContextEvent {
263 source: "skill:lookup_context".to_string(),
264 spoken_summary: Some("context ready".to_string()),
265 payload: json!({"ok": true}),
266 }),
267 )
268 .await
269 .expect("send tool result");
270
271 let received = backend.received_inputs();
272 assert!(matches!(received[0], RealtimeVoiceInput::Audio(_)));
273 assert!(matches!(received[1], RealtimeVoiceInput::ToolResult(_)));
274
275 assert!(matches!(
276 backend.recv_realtime_event(&mut session).await.unwrap(),
277 Some(RealtimeVoiceOutput::LatencyMark(RealtimeLatencyMark {
278 stage: RealtimeLatencyStage::InputReceived,
279 elapsed_ms: 5
280 }))
281 ));
282 assert!(matches!(
283 backend.recv_realtime_event(&mut session).await.unwrap(),
284 Some(RealtimeVoiceOutput::ToolCall(_))
285 ));
286 assert!(matches!(
287 backend.recv_realtime_event(&mut session).await.unwrap(),
288 Some(RealtimeVoiceOutput::Interruption { .. })
289 ));
290 assert!(matches!(
291 backend.recv_realtime_event(&mut session).await.unwrap(),
292 Some(RealtimeVoiceOutput::Closed { .. })
293 ));
294 assert!(
295 backend
296 .recv_realtime_event(&mut session)
297 .await
298 .unwrap()
299 .is_none()
300 );
301}
302
303pub struct AllowAllPolicy;
304
305impl SessionPolicy for AllowAllPolicy {
306 fn should_accept_control_event(&self, _event: &ControlEvent) -> bool {
307 true
308 }
309
310 fn should_fallback_to_bridge(&self, _reason: &FallbackReason) -> bool {
311 false
312 }
313
314 fn max_tool_latency_ms(&self) -> u64 {
315 500
316 }
317}
318
319#[derive(Debug, Clone, Deserialize)]
320pub struct WaveformFixture {
321 pub sample_rate_hz: u32,
322 pub channels: u16,
323 pub samples: Vec<f32>,
324}
325
326pub fn workspace_fixture_path(name: &str) -> PathBuf {
327 Path::new(env!("CARGO_MANIFEST_DIR"))
328 .join("../../tests/fixtures")
329 .join(name)
330}
331
332pub fn load_waveform_fixture(name: &str) -> WaveformFixture {
333 let path = workspace_fixture_path(name);
334 let raw = std::fs::read_to_string(&path)
335 .unwrap_or_else(|err| panic!("failed to read waveform fixture {}: {err}", path.display()));
336 serde_json::from_str(&raw).unwrap_or_else(|err| {
337 panic!(
338 "failed to decode waveform fixture {}: {err}",
339 path.display()
340 )
341 })
342}
343
344pub async fn measure_loopback_latency_ms(transport: &ScriptedTransport) -> u128 {
345 let started = Instant::now();
346 let inbound = transport
347 .recv_frame()
348 .await
349 .expect("receive frame")
350 .expect("queued frame");
351 transport
352 .send_frame(AudioOutputFrame {
353 sequence: inbound.sequence,
354 sample_rate_hz: inbound.sample_rate_hz,
355 channels: inbound.channels,
356 samples: inbound.samples,
357 is_filler: false,
358 })
359 .await
360 .expect("send frame");
361 started.elapsed().as_millis()
362}
363
364#[tokio::test]
365async fn runtime_executes_skill_call_and_returns_injection_event() {
366 use vona_core::runtime::{FillerStrategy, RuntimeDecision, VonaRuntime};
367
368 let runtime = VonaRuntime::new(
369 Arc::new(EchoSkillExecutor),
370 Arc::new(AllowAllPolicy),
371 FillerStrategy::StaticClip,
372 );
373 let context = SkillContext {
374 session_id: "session-1".to_string(),
375 user_id: Some("user-1".to_string()),
376 thread_id: Some("thread-1".to_string()),
377 metadata: json!({"surface": "test"}),
378 };
379
380 let decision = runtime
381 .handle_control_event(
382 &ControlEvent::SkillCall(SkillCall {
383 name: "get_weather".to_string(),
384 arguments: json!({"city": "Nairobi"}),
385 }),
386 context,
387 )
388 .await
389 .expect("skill decision");
390
391 match decision {
392 RuntimeDecision::InjectContext(event) => {
393 assert_eq!(event.source, "skill:get_weather");
394 assert_eq!(
395 event.spoken_summary.as_deref(),
396 Some("executed get_weather")
397 );
398 }
399 other => panic!("unexpected runtime decision: {other:?}"),
400 }
401}
402
403#[tokio::test]
404async fn waveform_fixture_round_trips_through_scripted_transport() {
405 let fixture = load_waveform_fixture("sine-16khz-mono.json");
406 let transport = ScriptedTransport::default();
407 transport.push_input(AudioInputFrame {
408 sequence: 1,
409 sample_rate_hz: fixture.sample_rate_hz,
410 channels: fixture.channels,
411 samples: fixture.samples.clone(),
412 });
413
414 let _ = measure_loopback_latency_ms(&transport).await;
415 let sent = transport.sent_frames();
416 assert_eq!(sent.len(), 1);
417 assert_eq!(sent[0].sample_rate_hz, fixture.sample_rate_hz);
418 assert_eq!(sent[0].channels, fixture.channels);
419 assert_eq!(sent[0].samples, fixture.samples);
420}
421
422#[tokio::test]
423async fn scripted_transport_loopback_latency_stays_low_for_fixture_audio() {
424 let fixture = load_waveform_fixture("impulse-16khz-mono.json");
425 let transport = ScriptedTransport::default();
426 transport.push_input(AudioInputFrame {
427 sequence: 7,
428 sample_rate_hz: fixture.sample_rate_hz,
429 channels: fixture.channels,
430 samples: fixture.samples,
431 });
432
433 let elapsed_ms = measure_loopback_latency_ms(&transport).await;
434 assert!(
435 elapsed_ms < 50,
436 "expected in-process loopback under 50ms, got {elapsed_ms}ms"
437 );
438}
439
440#[tokio::test]
441async fn run_session_counts_interruption_and_clears_buffered_output() {
442 use vona_core::runtime::{FillerStrategy, VonaRuntime};
443 use vona_core::session::run_session;
444
445 let transport = ScriptedTransport::default();
446 transport.push_input(AudioInputFrame {
447 sequence: 1,
448 sample_rate_hz: 24_000,
449 channels: 1,
450 samples: vec![0.0; 160],
451 });
452
453 let backend = MockBackend::default();
454 backend.push_step(BackendStep {
455 output_audio: vec![AudioOutputFrame {
456 sequence: 1,
457 sample_rate_hz: 24_000,
458 channels: 1,
459 samples: vec![0.25; 160],
460 is_filler: false,
461 }],
462 control_events: vec![ControlEvent::Interruption {
463 reason: Some("barge_in".to_string()),
464 }],
465 transcript: None,
466 finished: true,
467 debug_payload: None,
468 });
469
470 let runtime = VonaRuntime::new(
471 Arc::new(EchoSkillExecutor),
472 Arc::new(AllowAllPolicy),
473 FillerStrategy::None,
474 );
475
476 let summary = run_session(
477 transport.clone(),
478 &backend,
479 &runtime,
480 SessionConfig::default(),
481 )
482 .await
483 .expect("session should complete");
484
485 assert_eq!(summary.metrics.interruptions, 1);
486 assert!(transport.sent_frames().is_empty());
487}