use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use car_engine::{
dispatch_voice_turn_sidecar_only_with_classifier,
dispatch_voice_turn_sidecar_only_with_telemetry, dispatch_voice_turn_with_telemetry,
DirectDataFetcher, SidecarResult, VoiceTelemetry, VoiceTurnControl, VoiceTurnError,
VoiceTurnHandle,
};
use car_inference::{
intent::IntentHint, GenerateParams, GenerateRequest, InferenceEngine, StreamEvent,
};
use tokio::sync::{Mutex, Semaphore};
use crate::{
compose_voice_context,
utterance::{ToolKind, UtteranceClass},
voice_audio_mixer::VoiceMixerHandle,
Result, Speaker, VoiceConfig, VoiceError,
};
pub use crate::utterance::{bridge_phrase, classify_utterance, format_for_voice};
fn tool_kind_str(kind: ToolKind) -> &'static str {
match kind {
ToolKind::Email => "email",
ToolKind::Calendar => "calendar",
ToolKind::Search => "search",
ToolKind::Unknown => "unknown",
}
}
fn find_sentence_end(s: &str) -> Option<usize> {
s.rfind(|c: char| matches!(c, '.' | '!' | '?'))
}
const DEFAULT_SIDECAR_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_PROGRESS_INTERVAL: Duration = Duration::from_secs(8);
const DEFAULT_MAX_PROGRESS_ATTEMPTS: u32 = 4;
const DEFAULT_VOICE_MAX_TOKENS: usize = 200;
fn voice_params() -> GenerateParams {
GenerateParams {
max_tokens: DEFAULT_VOICE_MAX_TOKENS,
..GenerateParams::default()
}
}
const PROGRESS_PHRASE: &str = "Still working on that.";
pub struct VoiceOrchestrator {
engine: Arc<InferenceEngine>,
speaker: Arc<dyn Speaker>,
mixer: Option<VoiceMixerHandle>,
telemetry: Option<VoiceTelemetry>,
direct_fetcher: Option<Arc<dyn DirectDataFetcher>>,
fast_model: Option<String>,
sidecar_model: Option<String>,
skip_fast_track: bool,
config: VoiceConfig,
pipeline_lock: Arc<Semaphore>,
current_turn_id: Arc<AtomicU64>,
current_handle: Arc<Mutex<Option<VoiceTurnControl>>>,
sidecar_timeout: Duration,
}
impl VoiceOrchestrator {
pub fn new(
engine: Arc<InferenceEngine>,
speaker: Arc<dyn Speaker>,
config: VoiceConfig,
) -> Self {
Self::with_mixer(engine, speaker, config, None)
}
pub fn with_mixer(
engine: Arc<InferenceEngine>,
speaker: Arc<dyn Speaker>,
config: VoiceConfig,
mixer: Option<VoiceMixerHandle>,
) -> Self {
Self {
engine,
speaker,
mixer,
telemetry: None,
direct_fetcher: None,
fast_model: None,
sidecar_model: None,
skip_fast_track: false,
config,
pipeline_lock: Arc::new(Semaphore::new(1)),
current_turn_id: Arc::new(AtomicU64::new(0)),
current_handle: Arc::new(Mutex::new(None)),
sidecar_timeout: DEFAULT_SIDECAR_TIMEOUT,
}
}
pub fn with_sidecar_timeout(mut self, timeout: Duration) -> Self {
self.sidecar_timeout = timeout;
self
}
pub fn with_telemetry(mut self, telemetry: VoiceTelemetry) -> Self {
self.telemetry = Some(telemetry);
self
}
pub fn with_models(
mut self,
fast_model: Option<String>,
sidecar_model: Option<String>,
) -> Self {
self.fast_model = fast_model;
self.sidecar_model = sidecar_model;
self
}
pub fn with_skip_fast_track(mut self, skip: bool) -> Self {
self.skip_fast_track = skip;
self
}
pub fn with_direct_fetcher(mut self, fetcher: Arc<dyn DirectDataFetcher>) -> Self {
self.direct_fetcher = Some(fetcher);
self
}
pub async fn prewarm(&self) {
let req = GenerateRequest {
prompt: ".".to_string(),
model: self.fast_model.clone(),
params: GenerateParams {
max_tokens: 1,
..Default::default()
},
intent: Some(IntentHint {
prefer_fast: true,
..IntentHint::default()
}),
..Default::default()
};
match self.engine.generate(req).await {
Ok(_) => tracing::debug!("voice fast model prewarmed"),
Err(e) => tracing::warn!(
error = %e,
"fast model prewarm failed; first turn will be slower"
),
}
}
pub async fn cancel_current_turn(&self) {
if let Some(m) = &self.mixer {
m.stop_tts();
}
self.current_turn_id.fetch_add(1, Ordering::AcqRel);
let mut g = self.current_handle.lock().await;
if let Some(c) = g.as_ref() {
c.cancel();
}
*g = None;
}
pub async fn handle_utterance(&self, utterance: String) -> Result<()> {
self.cancel_current_turn().await;
let class = classify_utterance(&utterance);
let handle = match class {
UtteranceClass::ToolLikely(kind) => self.dispatch_with_bridge(utterance, kind).await?,
UtteranceClass::Conversational if self.skip_fast_track => {
self.dispatch_sidecar_only(utterance)
}
UtteranceClass::Conversational => self.dispatch_two_track(utterance),
};
let our_turn = handle.turn_id();
self.current_turn_id.store(our_turn, Ordering::Release);
{
let mut g = self.current_handle.lock().await;
*g = Some(handle.control.clone());
}
let _permit = self
.pipeline_lock
.clone()
.acquire_owned()
.await
.map_err(|_| VoiceError::Playback("pipeline closed".into()))?;
self.drain_fast_to_tts(handle.fast).await?;
drop(_permit);
match self.wait_with_progress(handle.sidecar, our_turn).await {
Some(Ok(result)) => self.maybe_play_sidecar(our_turn, result).await?,
Some(Err(VoiceTurnError::Cancelled)) => {
tracing::info!(turn_id = our_turn, "sidecar cancelled");
}
Some(Err(VoiceTurnError::Inference(e))) => {
tracing::warn!(turn_id = our_turn, error = %e, "sidecar inference failed");
}
None => {
if let Some(t) = &self.telemetry {
t.emit(
car_eventlog::EventKind::VoiceSidecarTimedOut,
our_turn,
vec![(
"timeout_ms",
(self.sidecar_timeout.as_millis() as u64).into(),
)],
);
}
}
}
let mut g = self.current_handle.lock().await;
if g.as_ref().map(|c| c.turn_id) == Some(our_turn) {
*g = None;
}
Ok(())
}
async fn dispatch_with_bridge(
&self,
utterance: String,
kind: ToolKind,
) -> Result<VoiceTurnHandle> {
let phrase = bridge_phrase(kind);
{
let _permit = self
.pipeline_lock
.clone()
.acquire_owned()
.await
.map_err(|_| VoiceError::Playback("pipeline closed".into()))?;
self.speak(phrase).await?;
}
let handle = self.dispatch_sidecar_only(utterance);
if let Some(t) = &self.telemetry {
t.emit(
car_eventlog::EventKind::VoiceBridgePlayed,
handle.turn_id(),
vec![
("kind", serde_json::Value::from(tool_kind_str(kind))),
("phrase", serde_json::Value::from(phrase)),
],
);
}
Ok(handle)
}
fn dispatch_two_track(&self, utterance: String) -> VoiceTurnHandle {
let context = compose_voice_context(&self.config, None);
let fast_req = GenerateRequest {
prompt: utterance.clone(),
model: self.fast_model.clone(),
params: voice_params(),
context: context.clone(),
intent: Some(IntentHint {
prefer_fast: true,
..IntentHint::default()
}),
..Default::default()
};
let sidecar_req = GenerateRequest {
prompt: utterance.clone(),
model: self.sidecar_model.clone(),
params: voice_params(),
context,
..Default::default()
};
dispatch_voice_turn_with_telemetry(
self.engine.clone(),
utterance,
fast_req,
sidecar_req,
self.telemetry.clone(),
)
}
fn dispatch_sidecar_only(&self, utterance: String) -> VoiceTurnHandle {
let context = compose_voice_context(&self.config, None);
let sidecar_req = GenerateRequest {
prompt: utterance.clone(),
model: self.sidecar_model.clone(),
params: voice_params(),
context,
..Default::default()
};
match &self.direct_fetcher {
Some(fetcher) => dispatch_voice_turn_sidecar_only_with_classifier(
self.engine.clone(),
utterance,
sidecar_req,
Some(fetcher.clone()),
self.telemetry.clone(),
),
None => dispatch_voice_turn_sidecar_only_with_telemetry(
self.engine.clone(),
utterance,
sidecar_req,
self.telemetry.clone(),
),
}
}
async fn drain_fast_to_tts(
&self,
mut rx: tokio::sync::mpsc::Receiver<StreamEvent>,
) -> Result<()> {
let mut buf = String::new();
while let Some(evt) = rx.recv().await {
match evt {
StreamEvent::TextDelta(d) => {
buf.push_str(&d);
if let Some(end) = find_sentence_end(&buf) {
let sentence: String = buf.drain(..=end).collect();
let trimmed = sentence.trim();
if !trimmed.is_empty() {
self.speak(trimmed).await?;
}
}
}
StreamEvent::Done { .. } => break,
_ => {}
}
}
let trimmed = buf.trim();
if !trimmed.is_empty() {
self.speak(trimmed).await?;
}
Ok(())
}
async fn wait_with_progress(
&self,
sidecar: tokio::sync::oneshot::Receiver<std::result::Result<SidecarResult, VoiceTurnError>>,
our_turn: u64,
) -> Option<std::result::Result<SidecarResult, VoiceTurnError>> {
let interval = Duration::from_secs(
self.config
.progress_interval_secs
.unwrap_or(DEFAULT_PROGRESS_INTERVAL.as_secs()),
);
let max_attempts = self
.config
.max_progress_attempts
.unwrap_or(DEFAULT_MAX_PROGRESS_ATTEMPTS);
let progress_cap = interval.saturating_mul(max_attempts);
let total_cap = std::cmp::min(progress_cap, self.sidecar_timeout);
let deadline = tokio::time::Instant::now() + total_cap;
tokio::pin!(sidecar);
let mut attempts: u32 = 0;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return None;
}
let remaining = deadline.saturating_duration_since(now);
let sleep_for = std::cmp::min(interval, remaining);
tokio::select! {
biased;
outcome = &mut sidecar => {
return Some(outcome.unwrap_or(Err(VoiceTurnError::Cancelled)));
}
_ = tokio::time::sleep(sleep_for) => {
if tokio::time::Instant::now() >= deadline {
return None;
}
attempts += 1;
if attempts > max_attempts {
return None;
}
if self.current_turn_id.load(Ordering::Acquire) != our_turn {
return match tokio::time::timeout(
Duration::from_millis(50),
&mut sidecar,
)
.await
{
Ok(Ok(r)) => Some(r),
_ => Some(Err(VoiceTurnError::Cancelled)),
};
}
let _permit = match self
.pipeline_lock
.clone()
.acquire_owned()
.await
{
Ok(p) => p,
Err(_) => return None,
};
if let Err(e) = self.speak(PROGRESS_PHRASE).await {
tracing::warn!(turn_id = our_turn, error=%e, "progress phrase playback failed");
}
drop(_permit);
}
}
}
}
async fn maybe_play_sidecar(&self, our_turn: u64, result: SidecarResult) -> Result<()> {
let now = self.current_turn_id.load(Ordering::Acquire);
if result.turn_id != now {
tracing::info!(
stale = result.turn_id,
current = now,
"dropped stale sidecar result"
);
return Ok(());
}
let _permit = self
.pipeline_lock
.clone()
.acquire_owned()
.await
.map_err(|_| VoiceError::Playback("pipeline closed".into()))?;
self.speak(&result.text).await?;
let _ = our_turn; Ok(())
}
async fn speak(&self, text: &str) -> Result<()> {
if text.trim().is_empty() {
return Ok(());
}
match &self.mixer {
Some(mixer) => {
let audio = self.speaker.synth(text).await?;
mixer.queue_tts(audio.bytes);
let flag = mixer.speaking_flag();
while flag.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(20)).await;
}
Ok(())
}
None => self.speaker.speak(text).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn find_sentence_end_basic() {
assert_eq!(find_sentence_end(""), None);
assert_eq!(find_sentence_end("hello world"), None);
assert_eq!(find_sentence_end("Hello."), Some(5));
assert_eq!(find_sentence_end("Hi! Bye."), Some(7));
assert_eq!(find_sentence_end("Hi! And then..."), Some(14));
}
#[test]
fn voice_config_progress_defaults_round_trip() {
let cfg = VoiceConfig::default();
assert!(cfg.progress_interval_secs.is_none());
assert!(cfg.max_progress_attempts.is_none());
let json = serde_json::to_string(&cfg).unwrap();
let back: VoiceConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back.progress_interval_secs, None);
assert_eq!(back.max_progress_attempts, None);
}
}