1use crate::backend::{BackendError, SpeechToSpeechBackend};
2use crate::runtime::{FallbackReason, RuntimeDecision, SessionPolicy, VonaRuntime};
3use crate::skills::SkillExecutor;
4use crate::transport::{AudioTransport, TransportError};
5use crate::types::{ControlEvent, SessionMetrics, SkillContext};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::time::Instant;
9use thiserror::Error;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub struct SpeechStyleProfile {
13 pub pace: f32,
14 pub warmth: f32,
15 pub expressiveness: f32,
16 pub formality: Option<String>,
17 pub preferred_voice: Option<String>,
18}
19
20impl Default for SpeechStyleProfile {
21 fn default() -> Self {
22 Self {
23 pace: 0.5,
24 warmth: 0.5,
25 expressiveness: 0.5,
26 formality: None,
27 preferred_voice: None,
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub struct SessionConfig {
34 pub session_id: String,
35 pub sample_rate_hz: u32,
36 pub channels: u16,
37 pub style_profile: Option<SpeechStyleProfile>,
38 #[serde(default)]
39 pub metadata: Value,
40}
41
42impl Default for SessionConfig {
43 fn default() -> Self {
44 Self {
45 session_id: "default-session".to_string(),
46 sample_rate_hz: 24_000,
47 channels: 1,
48 style_profile: None,
49 metadata: Value::Null,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "snake_case")]
56pub enum SessionState {
57 Idle,
58 Listening,
59 Generating,
60 ExecutingSkill,
61 PausedForInterruption,
62 FallingBackToBridge,
63 Closed,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub enum SessionCloseReason {
73 TransportClosed,
74 BackendFinished,
75 PolicyEnded,
76 Error(String),
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct SessionSummary {
81 pub session_id: String,
82 pub metrics: SessionMetrics,
83 pub close_reason: SessionCloseReason,
84}
85
86#[derive(Debug, Error)]
87pub enum SessionError {
88 #[error("backend error: {0}")]
89 Backend(#[from] BackendError),
90 #[error("transport error: {0}")]
91 Transport(#[from] TransportError),
92}
93
94pub async fn run_session<T, B, P, E>(
103 transport: T,
104 backend: &B,
105 runtime: &VonaRuntime<E, P>,
106 config: SessionConfig,
107) -> Result<SessionSummary, SessionError>
108where
109 T: AudioTransport,
110 B: SpeechToSpeechBackend,
111 P: SessionPolicy,
112 E: SkillExecutor,
113{
114 let session_id = config.session_id.clone();
115 let mut backend_session = backend.start_session(config.clone()).await?;
116 let mut metrics = SessionMetrics::default();
117 let session_start = Instant::now();
118
119 loop {
120 let frame = match transport.recv_frame().await {
122 Ok(Some(frame)) => frame,
123 Ok(None) => {
124 let _ = backend.end_session(backend_session).await;
126 return Ok(SessionSummary {
127 session_id,
128 metrics,
129 close_reason: SessionCloseReason::TransportClosed,
130 });
131 }
132 Err(err) => {
133 let _ = backend.end_session(backend_session).await;
134 return Ok(SessionSummary {
135 session_id,
136 metrics,
137 close_reason: SessionCloseReason::Error(err.to_string()),
138 });
139 }
140 };
141
142 let step = match backend.step(&mut backend_session, frame).await {
143 Ok(step) => step,
144 Err(err) => {
145 let _ = backend.end_session(backend_session).await;
146 return Ok(SessionSummary {
147 session_id,
148 metrics,
149 close_reason: SessionCloseReason::Error(err.to_string()),
150 });
151 }
152 };
153
154 if metrics.time_to_first_audio_ms.is_none() && !step.output_audio.is_empty() {
156 metrics.time_to_first_audio_ms = Some(session_start.elapsed().as_millis() as u64);
157 }
158
159 for audio_frame in step.output_audio {
161 if let Err(err) = transport.send_frame(audio_frame).await {
162 let _ = backend.end_session(backend_session).await;
163 return Ok(SessionSummary {
164 session_id,
165 metrics,
166 close_reason: SessionCloseReason::Error(err.to_string()),
167 });
168 }
169 }
170
171 for control_event in step.control_events {
173 let skill_context = SkillContext {
174 session_id: session_id.clone(),
175 user_id: None,
176 thread_id: None,
177 metadata: Value::Null,
178 };
179
180 if let ControlEvent::Interruption { .. } = &control_event {
181 metrics.interruptions += 1;
182 transport.clear_output().await.ok();
183 }
184
185 let decision = match runtime
186 .handle_control_event(&control_event, skill_context)
187 .await
188 {
189 Ok(d) => d,
190 Err(_) => continue,
191 };
192
193 match decision {
194 RuntimeDecision::InjectContext(ctx_event) => {
195 metrics.tool_calls += 1;
196 if let Err(err) = backend.inject_event(&mut backend_session, ctx_event).await {
197 tracing_fallback(&err.to_string());
198 }
199 }
200 RuntimeDecision::Fallback { reason } => {
201 metrics.fallback_count += 1;
202 if reason == FallbackReason::ToolTimeout {
203 }
206 }
207 RuntimeDecision::Ignore | RuntimeDecision::Continue => {}
208 }
209 }
210
211 if step.finished {
212 let _ = backend.end_session(backend_session).await;
213 return Ok(SessionSummary {
214 session_id,
215 metrics,
216 close_reason: SessionCloseReason::BackendFinished,
217 });
218 }
219 }
220}
221
222#[inline(always)]
225fn tracing_fallback(msg: &str) {
226 let _ = msg; #[cfg(debug_assertions)]
228 eprintln!("[vona] inject_event error: {msg}");
229}