Skip to main content

vona_test_harness/
lib.rs

1use async_trait::async_trait;
2use serde::Deserialize;
3use serde_json::json;
4use std::collections::VecDeque;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex};
7use std::time::Instant;
8use vona_core::backend::{BackendCapabilities, BackendError, BackendStep, SpeechToSpeechBackend};
9use vona_core::realtime::{
10    RealtimeVoiceBackend, RealtimeVoiceCapabilities, RealtimeVoiceError, RealtimeVoiceInput,
11    RealtimeVoiceOutput, RealtimeVoiceSessionConfig,
12};
13use vona_core::runtime::{FallbackReason, SessionPolicy};
14use vona_core::session::SessionConfig;
15use vona_core::skills::{SkillError, SkillExecutor, SkillOutput};
16use vona_core::transport::{AudioTransport, TransportError};
17use vona_core::types::{
18    AudioInputFrame, AudioOutputFrame, ControlEvent, ExternalContextEvent, SkillCall, SkillContext,
19};
20
21#[derive(Clone, Default)]
22pub struct ScriptedTransport {
23    incoming: Arc<Mutex<VecDeque<AudioInputFrame>>>,
24    sent: Arc<Mutex<Vec<AudioOutputFrame>>>,
25}
26
27impl ScriptedTransport {
28    pub fn push_input(&self, frame: AudioInputFrame) {
29        self.incoming
30            .lock()
31            .expect("incoming queue")
32            .push_back(frame);
33    }
34
35    pub fn sent_frames(&self) -> Vec<AudioOutputFrame> {
36        self.sent.lock().expect("sent frames").clone()
37    }
38}
39
40#[async_trait]
41impl AudioTransport for ScriptedTransport {
42    fn sample_rate_hz(&self) -> u32 {
43        24_000
44    }
45
46    fn channels(&self) -> u16 {
47        1
48    }
49
50    async fn recv_frame(&self) -> Result<Option<AudioInputFrame>, TransportError> {
51        Ok(self.incoming.lock().expect("incoming queue").pop_front())
52    }
53
54    async fn send_frame(&self, frame: AudioOutputFrame) -> Result<(), TransportError> {
55        self.sent.lock().expect("sent frames").push(frame);
56        Ok(())
57    }
58
59    async fn clear_output(&self) -> Result<(), TransportError> {
60        self.sent.lock().expect("sent frames").clear();
61        Ok(())
62    }
63}
64
65#[derive(Default)]
66pub struct MockBackend {
67    steps: Arc<Mutex<VecDeque<BackendStep>>>,
68    injections: Arc<Mutex<Vec<ExternalContextEvent>>>,
69}
70
71impl MockBackend {
72    pub fn push_step(&self, step: BackendStep) {
73        self.steps.lock().expect("backend steps").push_back(step);
74    }
75
76    pub fn injected_events(&self) -> Vec<ExternalContextEvent> {
77        self.injections.lock().expect("injections").clone()
78    }
79}
80
81#[async_trait]
82impl SpeechToSpeechBackend for MockBackend {
83    type Session = SessionConfig;
84
85    fn capabilities(&self) -> BackendCapabilities {
86        BackendCapabilities {
87            supports_context_injection: true,
88            ..BackendCapabilities::default()
89        }
90    }
91
92    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, BackendError> {
93        Ok(config)
94    }
95
96    async fn step(
97        &self,
98        _session: &mut Self::Session,
99        _input: AudioInputFrame,
100    ) -> Result<BackendStep, BackendError> {
101        Ok(self
102            .steps
103            .lock()
104            .expect("backend steps")
105            .pop_front()
106            .unwrap_or_default())
107    }
108
109    async fn inject_event(
110        &self,
111        _session: &mut Self::Session,
112        event: ExternalContextEvent,
113    ) -> Result<(), BackendError> {
114        self.injections.lock().expect("injections").push(event);
115        Ok(())
116    }
117
118    async fn end_session(&self, _session: Self::Session) -> Result<(), BackendError> {
119        Ok(())
120    }
121}
122
123#[derive(Default)]
124pub struct ScriptedRealtimeBackend {
125    outputs: Arc<Mutex<VecDeque<RealtimeVoiceOutput>>>,
126    received_inputs: Arc<Mutex<Vec<RealtimeVoiceInput>>>,
127    capabilities: RealtimeVoiceCapabilities,
128}
129
130impl ScriptedRealtimeBackend {
131    pub fn with_capabilities(capabilities: RealtimeVoiceCapabilities) -> Self {
132        Self {
133            capabilities,
134            ..Self::default()
135        }
136    }
137
138    pub fn push_output(&self, output: RealtimeVoiceOutput) {
139        self.outputs
140            .lock()
141            .expect("realtime outputs")
142            .push_back(output);
143    }
144
145    pub fn received_inputs(&self) -> Vec<RealtimeVoiceInput> {
146        self.received_inputs
147            .lock()
148            .expect("realtime inputs")
149            .clone()
150    }
151}
152
153#[async_trait]
154impl RealtimeVoiceBackend for ScriptedRealtimeBackend {
155    type Session = RealtimeVoiceSessionConfig;
156
157    fn realtime_capabilities(&self) -> RealtimeVoiceCapabilities {
158        self.capabilities.clone()
159    }
160
161    async fn start_realtime_session(
162        &self,
163        config: RealtimeVoiceSessionConfig,
164    ) -> Result<Self::Session, RealtimeVoiceError> {
165        Ok(config)
166    }
167
168    async fn send_realtime_event(
169        &self,
170        _session: &mut Self::Session,
171        input: RealtimeVoiceInput,
172    ) -> Result<(), RealtimeVoiceError> {
173        self.received_inputs
174            .lock()
175            .expect("realtime inputs")
176            .push(input);
177        Ok(())
178    }
179
180    async fn recv_realtime_event(
181        &self,
182        _session: &mut Self::Session,
183    ) -> Result<Option<RealtimeVoiceOutput>, RealtimeVoiceError> {
184        Ok(self.outputs.lock().expect("realtime outputs").pop_front())
185    }
186
187    async fn close_realtime_session(
188        &self,
189        _session: Self::Session,
190    ) -> Result<(), RealtimeVoiceError> {
191        Ok(())
192    }
193}
194
195pub struct EchoSkillExecutor;
196
197#[async_trait]
198impl SkillExecutor for EchoSkillExecutor {
199    async fn execute(
200        &self,
201        call: SkillCall,
202        _context: SkillContext,
203    ) -> Result<SkillOutput, SkillError> {
204        Ok(SkillOutput {
205            spoken_summary: format!("executed {}", call.name),
206            structured_payload: Some(json!({"name": call.name, "args": call.arguments})),
207            audit_payload: None,
208        })
209    }
210}
211
212#[tokio::test]
213async fn scripted_realtime_backend_preserves_input_and_output_order() {
214    use vona_core::realtime::{RealtimeLatencyMark, RealtimeLatencyStage};
215
216    let backend = ScriptedRealtimeBackend::with_capabilities(RealtimeVoiceCapabilities {
217        supports_full_duplex: true,
218        supports_streaming_audio_input: true,
219        supports_streaming_audio_output: true,
220        supports_tool_calls: true,
221        supports_interruption: true,
222        supports_context_injection: true,
223        is_hosted_service: true,
224        max_input_chunk_ms: Some(40),
225    });
226
227    backend.push_output(RealtimeVoiceOutput::LatencyMark(RealtimeLatencyMark {
228        stage: RealtimeLatencyStage::InputReceived,
229        elapsed_ms: 5,
230    }));
231    backend.push_output(RealtimeVoiceOutput::ToolCall(SkillCall {
232        name: "lookup_context".to_string(),
233        arguments: json!({"topic": "sts"}),
234    }));
235    backend.push_output(RealtimeVoiceOutput::Interruption {
236        reason: Some("barge_in".to_string()),
237    });
238    backend.push_output(RealtimeVoiceOutput::Closed {
239        reason: Some("done".to_string()),
240    });
241
242    let mut session = backend
243        .start_realtime_session(RealtimeVoiceSessionConfig::default())
244        .await
245        .expect("start realtime session");
246
247    backend
248        .send_realtime_event(
249            &mut session,
250            RealtimeVoiceInput::Audio(AudioInputFrame {
251                sequence: 1,
252                sample_rate_hz: 24_000,
253                channels: 1,
254                samples: vec![0.0; 320],
255            }),
256        )
257        .await
258        .expect("send audio");
259    backend
260        .send_realtime_event(
261            &mut session,
262            RealtimeVoiceInput::ToolResult(ExternalContextEvent {
263                source: "skill:lookup_context".to_string(),
264                spoken_summary: Some("context ready".to_string()),
265                payload: json!({"ok": true}),
266            }),
267        )
268        .await
269        .expect("send tool result");
270
271    let received = backend.received_inputs();
272    assert!(matches!(received[0], RealtimeVoiceInput::Audio(_)));
273    assert!(matches!(received[1], RealtimeVoiceInput::ToolResult(_)));
274
275    assert!(matches!(
276        backend.recv_realtime_event(&mut session).await.unwrap(),
277        Some(RealtimeVoiceOutput::LatencyMark(RealtimeLatencyMark {
278            stage: RealtimeLatencyStage::InputReceived,
279            elapsed_ms: 5
280        }))
281    ));
282    assert!(matches!(
283        backend.recv_realtime_event(&mut session).await.unwrap(),
284        Some(RealtimeVoiceOutput::ToolCall(_))
285    ));
286    assert!(matches!(
287        backend.recv_realtime_event(&mut session).await.unwrap(),
288        Some(RealtimeVoiceOutput::Interruption { .. })
289    ));
290    assert!(matches!(
291        backend.recv_realtime_event(&mut session).await.unwrap(),
292        Some(RealtimeVoiceOutput::Closed { .. })
293    ));
294    assert!(
295        backend
296            .recv_realtime_event(&mut session)
297            .await
298            .unwrap()
299            .is_none()
300    );
301}
302
303pub struct AllowAllPolicy;
304
305impl SessionPolicy for AllowAllPolicy {
306    fn should_accept_control_event(&self, _event: &ControlEvent) -> bool {
307        true
308    }
309
310    fn should_fallback_to_bridge(&self, _reason: &FallbackReason) -> bool {
311        false
312    }
313
314    fn max_tool_latency_ms(&self) -> u64 {
315        500
316    }
317}
318
319#[derive(Debug, Clone, Deserialize)]
320pub struct WaveformFixture {
321    pub sample_rate_hz: u32,
322    pub channels: u16,
323    pub samples: Vec<f32>,
324}
325
326pub fn workspace_fixture_path(name: &str) -> PathBuf {
327    Path::new(env!("CARGO_MANIFEST_DIR"))
328        .join("../../tests/fixtures")
329        .join(name)
330}
331
332pub fn load_waveform_fixture(name: &str) -> WaveformFixture {
333    let path = workspace_fixture_path(name);
334    let raw = std::fs::read_to_string(&path)
335        .unwrap_or_else(|err| panic!("failed to read waveform fixture {}: {err}", path.display()));
336    serde_json::from_str(&raw).unwrap_or_else(|err| {
337        panic!(
338            "failed to decode waveform fixture {}: {err}",
339            path.display()
340        )
341    })
342}
343
344pub async fn measure_loopback_latency_ms(transport: &ScriptedTransport) -> u128 {
345    let started = Instant::now();
346    let inbound = transport
347        .recv_frame()
348        .await
349        .expect("receive frame")
350        .expect("queued frame");
351    transport
352        .send_frame(AudioOutputFrame {
353            sequence: inbound.sequence,
354            sample_rate_hz: inbound.sample_rate_hz,
355            channels: inbound.channels,
356            samples: inbound.samples,
357            is_filler: false,
358        })
359        .await
360        .expect("send frame");
361    started.elapsed().as_millis()
362}
363
364#[tokio::test]
365async fn runtime_executes_skill_call_and_returns_injection_event() {
366    use vona_core::runtime::{FillerStrategy, RuntimeDecision, VonaRuntime};
367
368    let runtime = VonaRuntime::new(
369        Arc::new(EchoSkillExecutor),
370        Arc::new(AllowAllPolicy),
371        FillerStrategy::StaticClip,
372    );
373    let context = SkillContext {
374        session_id: "session-1".to_string(),
375        user_id: Some("user-1".to_string()),
376        thread_id: Some("thread-1".to_string()),
377        metadata: json!({"surface": "test"}),
378    };
379
380    let decision = runtime
381        .handle_control_event(
382            &ControlEvent::SkillCall(SkillCall {
383                name: "get_weather".to_string(),
384                arguments: json!({"city": "Nairobi"}),
385            }),
386            context,
387        )
388        .await
389        .expect("skill decision");
390
391    match decision {
392        RuntimeDecision::InjectContext(event) => {
393            assert_eq!(event.source, "skill:get_weather");
394            assert_eq!(
395                event.spoken_summary.as_deref(),
396                Some("executed get_weather")
397            );
398        }
399        other => panic!("unexpected runtime decision: {other:?}"),
400    }
401}
402
403#[tokio::test]
404async fn waveform_fixture_round_trips_through_scripted_transport() {
405    let fixture = load_waveform_fixture("sine-16khz-mono.json");
406    let transport = ScriptedTransport::default();
407    transport.push_input(AudioInputFrame {
408        sequence: 1,
409        sample_rate_hz: fixture.sample_rate_hz,
410        channels: fixture.channels,
411        samples: fixture.samples.clone(),
412    });
413
414    let _ = measure_loopback_latency_ms(&transport).await;
415    let sent = transport.sent_frames();
416    assert_eq!(sent.len(), 1);
417    assert_eq!(sent[0].sample_rate_hz, fixture.sample_rate_hz);
418    assert_eq!(sent[0].channels, fixture.channels);
419    assert_eq!(sent[0].samples, fixture.samples);
420}
421
422#[tokio::test]
423async fn scripted_transport_loopback_latency_stays_low_for_fixture_audio() {
424    let fixture = load_waveform_fixture("impulse-16khz-mono.json");
425    let transport = ScriptedTransport::default();
426    transport.push_input(AudioInputFrame {
427        sequence: 7,
428        sample_rate_hz: fixture.sample_rate_hz,
429        channels: fixture.channels,
430        samples: fixture.samples,
431    });
432
433    let elapsed_ms = measure_loopback_latency_ms(&transport).await;
434    assert!(
435        elapsed_ms < 50,
436        "expected in-process loopback under 50ms, got {elapsed_ms}ms"
437    );
438}
439
440#[tokio::test]
441async fn run_session_counts_interruption_and_clears_buffered_output() {
442    use vona_core::runtime::{FillerStrategy, VonaRuntime};
443    use vona_core::session::run_session;
444
445    let transport = ScriptedTransport::default();
446    transport.push_input(AudioInputFrame {
447        sequence: 1,
448        sample_rate_hz: 24_000,
449        channels: 1,
450        samples: vec![0.0; 160],
451    });
452
453    let backend = MockBackend::default();
454    backend.push_step(BackendStep {
455        output_audio: vec![AudioOutputFrame {
456            sequence: 1,
457            sample_rate_hz: 24_000,
458            channels: 1,
459            samples: vec![0.25; 160],
460            is_filler: false,
461        }],
462        control_events: vec![ControlEvent::Interruption {
463            reason: Some("barge_in".to_string()),
464        }],
465        transcript: None,
466        finished: true,
467        debug_payload: None,
468    });
469
470    let runtime = VonaRuntime::new(
471        Arc::new(EchoSkillExecutor),
472        Arc::new(AllowAllPolicy),
473        FillerStrategy::None,
474    );
475
476    let summary = run_session(
477        transport.clone(),
478        &backend,
479        &runtime,
480        SessionConfig::default(),
481    )
482    .await
483    .expect("session should complete");
484
485    assert_eq!(summary.metrics.interruptions, 1);
486    assert!(transport.sent_frames().is_empty());
487}