Skip to main content

vona_core/
session.rs

1use crate::backend::{BackendError, SpeechToSpeechBackend};
2use crate::runtime::{FallbackReason, RuntimeDecision, SessionPolicy, VonaRuntime};
3use crate::skills::SkillExecutor;
4use crate::transport::{AudioTransport, TransportError};
5use crate::types::{ControlEvent, SessionMetrics, SkillContext};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::time::Instant;
9use thiserror::Error;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub struct SpeechStyleProfile {
13    pub pace: f32,
14    pub warmth: f32,
15    pub expressiveness: f32,
16    pub formality: Option<String>,
17    pub preferred_voice: Option<String>,
18}
19
20impl Default for SpeechStyleProfile {
21    fn default() -> Self {
22        Self {
23            pace: 0.5,
24            warmth: 0.5,
25            expressiveness: 0.5,
26            formality: None,
27            preferred_voice: None,
28        }
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub struct SessionConfig {
34    pub session_id: String,
35    pub sample_rate_hz: u32,
36    pub channels: u16,
37    pub style_profile: Option<SpeechStyleProfile>,
38    #[serde(default)]
39    pub metadata: Value,
40}
41
42impl Default for SessionConfig {
43    fn default() -> Self {
44        Self {
45            session_id: "default-session".to_string(),
46            sample_rate_hz: 24_000,
47            channels: 1,
48            style_profile: None,
49            metadata: Value::Null,
50        }
51    }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "snake_case")]
56pub enum SessionState {
57    Idle,
58    Listening,
59    Generating,
60    ExecutingSkill,
61    PausedForInterruption,
62    FallingBackToBridge,
63    Closed,
64}
65
66// ---------------------------------------------------------------------------
67// Phase 3: session state machine driver
68// ---------------------------------------------------------------------------
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub enum SessionCloseReason {
73    TransportClosed,
74    BackendFinished,
75    PolicyEnded,
76    Error(String),
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct SessionSummary {
81    pub session_id: String,
82    pub metrics: SessionMetrics,
83    pub close_reason: SessionCloseReason,
84}
85
86#[derive(Debug, Error)]
87pub enum SessionError {
88    #[error("backend error: {0}")]
89    Backend(#[from] BackendError),
90    #[error("transport error: {0}")]
91    Transport(#[from] TransportError),
92}
93
94/// Drives a full speech-to-speech session loop until the transport closes,
95/// the backend signals completion, or an unrecoverable error occurs.
96///
97/// # Metrics populated
98/// - `time_to_first_audio_ms`: instant of the first non-empty output frame
99/// - `interruptions`: each backend-emitted `ControlEvent::Interruption`
100/// - `tool_calls`: each `SkillAttempt` emitted from `RuntimeDecision::InjectContext`
101/// - `fallback_count`: each `RuntimeDecision::Fallback` returned from runtime
102pub async fn run_session<T, B, P, E>(
103    transport: T,
104    backend: &B,
105    runtime: &VonaRuntime<E, P>,
106    config: SessionConfig,
107) -> Result<SessionSummary, SessionError>
108where
109    T: AudioTransport,
110    B: SpeechToSpeechBackend,
111    P: SessionPolicy,
112    E: SkillExecutor,
113{
114    let session_id = config.session_id.clone();
115    let mut backend_session = backend.start_session(config.clone()).await?;
116    let mut metrics = SessionMetrics::default();
117    let session_start = Instant::now();
118
119    loop {
120        // Receive input from transport
121        let frame = match transport.recv_frame().await {
122            Ok(Some(frame)) => frame,
123            Ok(None) => {
124                // Transport closed cleanly
125                let _ = backend.end_session(backend_session).await;
126                return Ok(SessionSummary {
127                    session_id,
128                    metrics,
129                    close_reason: SessionCloseReason::TransportClosed,
130                });
131            }
132            Err(err) => {
133                let _ = backend.end_session(backend_session).await;
134                return Ok(SessionSummary {
135                    session_id,
136                    metrics,
137                    close_reason: SessionCloseReason::Error(err.to_string()),
138                });
139            }
140        };
141
142        let step = match backend.step(&mut backend_session, frame).await {
143            Ok(step) => step,
144            Err(err) => {
145                let _ = backend.end_session(backend_session).await;
146                return Ok(SessionSummary {
147                    session_id,
148                    metrics,
149                    close_reason: SessionCloseReason::Error(err.to_string()),
150                });
151            }
152        };
153
154        // Track time to first audio
155        if metrics.time_to_first_audio_ms.is_none() && !step.output_audio.is_empty() {
156            metrics.time_to_first_audio_ms = Some(session_start.elapsed().as_millis() as u64);
157        }
158
159        // Send output audio frames
160        for audio_frame in step.output_audio {
161            if let Err(err) = transport.send_frame(audio_frame).await {
162                let _ = backend.end_session(backend_session).await;
163                return Ok(SessionSummary {
164                    session_id,
165                    metrics,
166                    close_reason: SessionCloseReason::Error(err.to_string()),
167                });
168            }
169        }
170
171        // Process control events
172        for control_event in step.control_events {
173            let skill_context = SkillContext {
174                session_id: session_id.clone(),
175                user_id: None,
176                thread_id: None,
177                metadata: Value::Null,
178            };
179
180            if let ControlEvent::Interruption { .. } = &control_event {
181                metrics.interruptions += 1;
182                transport.clear_output().await.ok();
183            }
184
185            let decision = match runtime
186                .handle_control_event(&control_event, skill_context)
187                .await
188            {
189                Ok(d) => d,
190                Err(_) => continue,
191            };
192
193            match decision {
194                RuntimeDecision::InjectContext(ctx_event) => {
195                    metrics.tool_calls += 1;
196                    if let Err(err) = backend.inject_event(&mut backend_session, ctx_event).await {
197                        tracing_fallback(&err.to_string());
198                    }
199                }
200                RuntimeDecision::Fallback { reason } => {
201                    metrics.fallback_count += 1;
202                    if reason == FallbackReason::ToolTimeout {
203                        // Record timeout audit event through the session's audit path.
204                        // (If caller wired an AuditSink into SkillRegistry it already fired.)
205                    }
206                }
207                RuntimeDecision::Ignore | RuntimeDecision::Continue => {}
208            }
209        }
210
211        if step.finished {
212            let _ = backend.end_session(backend_session).await;
213            return Ok(SessionSummary {
214                session_id,
215                metrics,
216                close_reason: SessionCloseReason::BackendFinished,
217            });
218        }
219    }
220}
221
222// Minimal tracing shim to avoid pulling in tracing as a hard dep in core.
223// Callers that want structured logging should handle errors from inject_event themselves.
224#[inline(always)]
225fn tracing_fallback(msg: &str) {
226    let _ = msg; // silence unused warning in release builds
227    #[cfg(debug_assertions)]
228    eprintln!("[vona] inject_event error: {msg}");
229}