use car_voice::{
Listener, PushHandle, PushListener, VoiceConfig, VoiceEventSink, VoiceSession,
VoiceSessionRegistry,
};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, OnceLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AudioSourceSpec {
Mic,
File { path: String },
Fifo { path: String },
System,
PcmPush {
sample_rate: u32,
#[serde(default = "default_channels")]
channels: u16,
},
}
fn default_channels() -> u16 {
1
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TranscribeStreamOptions {
pub model: Option<String>,
pub language: Option<String>,
pub prompt: Option<String>,
#[serde(default)]
pub emit_audio_meta: bool,
#[serde(default)]
pub streaming: bool,
#[serde(default)]
pub diarizer: bool,
#[serde(default)]
pub enrolled: bool,
#[serde(default)]
pub voice_prompt_overlay: Option<String>,
#[serde(default)]
pub provider: Option<String>,
}
fn push_handles() -> &'static DashMap<String, PushHandle> {
static MAP: OnceLock<DashMap<String, PushHandle>> = OnceLock::new();
MAP.get_or_init(DashMap::new)
}
#[cfg(feature = "parakeet")]
fn prepared_parakeet() -> &'static OnceLock<std::sync::Arc<car_voice::ParakeetSttProvider>> {
static P: OnceLock<std::sync::Arc<car_voice::ParakeetSttProvider>> = OnceLock::new();
&P
}
#[cfg(feature = "parakeet")]
pub async fn prepare_parakeet() -> Result<String, String> {
let dir = car_voice::ParakeetSttProvider::default_model_dir()
.map_err(|e| format!("model dir resolution failed: {}", e))?;
let dir_str = dir.display().to_string();
let provider = std::sync::Arc::new(car_voice::ParakeetSttProvider::new(dir.clone()));
let provider_for_load = provider.clone();
tokio::task::spawn_blocking(move || provider_for_load.prepare())
.await
.map_err(|e| format!("blocking task failed: {}", e))?
.map_err(|e| format!("parakeet load failed: {}", e))?;
let _ = prepared_parakeet().set(provider);
Ok(serde_json::json!({ "model_dir": dir_str, "ready": true }).to_string())
}
#[cfg(not(feature = "parakeet"))]
pub async fn prepare_parakeet() -> Result<String, String> {
Err(
"car-ffi-common was built without the `parakeet` feature; rebuild with --features parakeet"
.to_string(),
)
}
#[cfg(feature = "parakeet")]
pub(crate) fn current_prepared_parakeet() -> Option<std::sync::Arc<car_voice::ParakeetSttProvider>>
{
prepared_parakeet().get().cloned()
}
#[cfg(feature = "diarization")]
fn prepared_diarizer() -> &'static OnceLock<car_voice::SharedDiarizer> {
static D: OnceLock<car_voice::SharedDiarizer> = OnceLock::new();
&D
}
#[cfg(feature = "diarization")]
pub async fn prepare_diarizer() -> Result<String, String> {
let result = tokio::task::spawn_blocking(|| {
car_voice::SpeakerDiarizer::new(car_voice::DiarizationConfig::default())
})
.await
.map_err(|e| format!("blocking task failed: {}", e))?
.map_err(|e| format!("diarizer load failed: {}", e))?;
let arc = std::sync::Arc::new(result);
let _ = prepared_diarizer().set(arc);
Ok(serde_json::json!({ "ready": true }).to_string())
}
#[cfg(not(feature = "diarization"))]
pub async fn prepare_diarizer() -> Result<String, String> {
Err("car-ffi-common was built without the `diarization` feature; rebuild with --features diarization".to_string())
}
#[cfg(feature = "diarization")]
pub fn current_prepared_diarizer() -> Option<car_voice::SharedDiarizer> {
prepared_diarizer().get().cloned()
}
fn prepared_pipeline() -> &'static OnceLock<std::sync::Arc<car_voice::SpeakerPipeline>> {
static P: OnceLock<std::sync::Arc<car_voice::SpeakerPipeline>> = OnceLock::new();
&P
}
pub fn current_or_build_pipeline() -> Result<std::sync::Arc<car_voice::SpeakerPipeline>, String> {
if let Some(p) = prepared_pipeline().get() {
return Ok(p.clone());
}
let pipeline = car_voice::SpeakerPipeline::baseline();
let pipeline = match car_voice::list_enrollments()
.map_err(|e| format!("list enrollments: {}", e))?
.into_iter()
.next()
{
Some(info) => match car_voice::load_enrollment(&info.label) {
Ok(e) => pipeline.with_enrollment(e),
Err(e) => {
tracing::warn!(
"[enrollment] failed to load '{}': {} — pipeline starts empty",
info.label,
e
);
pipeline
}
},
None => pipeline,
};
let arc = std::sync::Arc::new(pipeline);
let _ = prepared_pipeline().set(arc.clone());
Ok(arc)
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum EnrollAudioSpec {
Pcm {
sample_rate: u32,
#[serde(default = "default_channels")]
channels: u16,
data_b64: String,
},
Wav { path: String },
}
pub async fn enroll_speaker(label: &str, audio_json: &str) -> Result<String, String> {
let spec: EnrollAudioSpec =
serde_json::from_str(audio_json).map_err(|e| format!("invalid audio JSON: {}", e))?;
let label = label.to_string();
let result = tokio::task::spawn_blocking(move || -> Result<String, String> {
let enrollment = match spec {
EnrollAudioSpec::Pcm {
sample_rate,
channels,
data_b64,
} => {
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD
.decode(&data_b64)
.map_err(|e| format!("invalid pcm data_b64: {}", e))?;
if bytes.len() % 2 != 0 {
return Err("pcm bytes length must be even (16-bit samples)".to_string());
}
let samples_i16: Vec<i16> = bytes
.chunks_exact(2)
.map(|c| i16::from_le_bytes([c[0], c[1]]))
.collect();
car_voice::enroll_from_pcm(&label, &samples_i16, sample_rate, channels)
.map_err(|e| format!("enroll: {}", e))?
}
EnrollAudioSpec::Wav { path } => car_voice::enroll_from_wav(&label, path.as_ref())
.map_err(|e| format!("enroll: {}", e))?,
};
let path = car_voice::save_enrollment(&enrollment).map_err(|e| e.to_string())?;
let _ = prepared_pipeline();
Ok(serde_json::json!({
"label": enrollment.label,
"path": path.display().to_string(),
"model_id": enrollment.embedding.model,
})
.to_string())
})
.await
.map_err(|e| format!("blocking task: {}", e))??;
Ok(result)
}
pub fn list_enrollments() -> Result<String, String> {
let infos = car_voice::list_enrollments().map_err(|e| e.to_string())?;
let arr: Vec<serde_json::Value> = infos
.into_iter()
.map(|i| {
serde_json::json!({
"label": i.label,
"path": i.path.display().to_string(),
"model_id": i.model_id,
})
})
.collect();
Ok(serde_json::json!({ "enrollments": arr }).to_string())
}
pub fn remove_enrollment(label: &str) -> Result<String, String> {
car_voice::remove_enrollment(label).map_err(|e| e.to_string())?;
Ok(serde_json::json!({ "label": label, "removed": true }).to_string())
}
fn build_listener_for_source(
session_id: &str,
source: &AudioSourceSpec,
streaming: bool,
diarizer: bool,
enrolled: bool,
provider: Option<&str>,
) -> Result<Box<dyn Listener>, String> {
match source {
AudioSourceSpec::Mic => {
let mut listener = car_voice::CpalListener::new();
#[cfg(target_os = "macos")]
if streaming {
let provider =
std::sync::Arc::new(car_voice::AppleSpeechSttProvider::new());
listener = listener.with_apple_speech_streaming(provider);
}
#[cfg(all(not(target_os = "macos"), feature = "parakeet"))]
if streaming {
let provider = current_prepared_parakeet().unwrap_or_else(|| {
let dir = car_voice::ParakeetSttProvider::default_model_dir()
.expect("default_model_dir resolution");
std::sync::Arc::new(car_voice::ParakeetSttProvider::new(dir))
});
listener = listener.with_parakeet_streaming(provider);
}
#[cfg(all(not(target_os = "macos"), not(feature = "parakeet")))]
let _ = streaming;
#[cfg(feature = "diarization")]
if diarizer {
if let Some(d) = current_prepared_diarizer() {
listener = listener.with_diarizer(d);
} else {
tracing::warn!(
"[voice] diarizer requested but not prepared — call prepare_diarizer() first; defaulting to no diarization"
);
}
}
#[cfg(not(feature = "diarization"))]
let _ = diarizer;
if enrolled {
match current_or_build_pipeline() {
Ok(p) => listener = listener.with_speaker_pipeline(p),
Err(e) => tracing::warn!(
"[voice] failed to build SpeakerPipeline: {} — proceeding without enrollment",
e
),
}
}
Ok(Box::new(listener))
}
#[cfg(all(target_os = "macos", feature = "system-audio-macos"))]
AudioSourceSpec::System => Ok(Box::new(car_voice::SystemAudioListener::new())),
#[cfg(target_os = "windows")]
AudioSourceSpec::System => Ok(Box::new(car_voice::WindowsLoopbackListener::new())),
#[cfg(target_os = "linux")]
AudioSourceSpec::System => {
let monitor = car_voice::default_monitor_source().ok_or_else(|| {
"no PipeWire/PulseAudio monitor source found — install pipewire-pulse or pulseaudio".to_string()
})?;
tracing::info!(
"[voice] linux system audio: using monitor source '{}'",
monitor
);
Ok(Box::new(car_voice::CpalListener::new()))
}
#[cfg(not(any(
all(target_os = "macos", feature = "system-audio-macos"),
target_os = "windows",
target_os = "linux",
)))]
AudioSourceSpec::System => Err(
"system audio capture: macOS requires the `system-audio-macos` feature (full Xcode); other platforms (Windows / Linux) need building with their respective target_os"
.to_string(),
),
AudioSourceSpec::File { .. } => Err(
"file source not yet wired into transcribeStream — call the existing batch transcribe() in the meantime"
.to_string(),
),
AudioSourceSpec::Fifo { .. } => {
Err("fifo source not yet wired into transcribeStream".to_string())
}
AudioSourceSpec::PcmPush {
sample_rate,
channels,
} => match provider {
Some("elevenlabs") => {
if *channels != 1 {
return Err(format!(
"elevenlabs provider requires 1-channel PCM, got {channels}; \
downmix on the caller side or open a `PcmPush` with channels=1"
));
}
let listener =
car_voice::ElevenLabsStreamingListener::new(*sample_rate);
if let Some(handle) = listener.handle() {
push_handles().insert(session_id.to_string(), handle);
}
Ok(Box::new(listener))
}
None | Some("local") => {
let listener = PushListener::new(*sample_rate, *channels);
if let Some(handle) = listener.handle() {
push_handles().insert(session_id.to_string(), handle);
}
Ok(Box::new(listener))
}
Some(other) => Err(format!(
"unknown transcribe_stream provider '{other}'; supported: \"elevenlabs\", \"local\" (default)"
)),
},
}
}
pub async fn transcribe_stream_start(
session_id: &str,
audio_source_json: &str,
options_json: Option<&str>,
registry: Arc<VoiceSessionRegistry>,
sink: Arc<dyn VoiceEventSink>,
) -> Result<String, String> {
if session_id.is_empty() {
return Err("session_id must not be empty".to_string());
}
if registry.contains(session_id) {
return Err(format!("session_id '{}' already exists", session_id));
}
let source: AudioSourceSpec = serde_json::from_str(audio_source_json)
.map_err(|e| format!("invalid audioSource JSON: {}", e))?;
let opts: TranscribeStreamOptions = match options_json {
Some(s) if !s.is_empty() => {
serde_json::from_str(s).map_err(|e| format!("invalid options JSON: {}", e))?
}
_ => TranscribeStreamOptions::default(),
};
let listener = build_listener_for_source(
session_id,
&source,
opts.streaming,
opts.diarizer,
opts.enrolled,
opts.provider.as_deref(),
)?;
let session = VoiceSession::new(session_id, listener);
let config = build_voice_config(&opts);
session
.start(config, sink)
.await
.map_err(|e| format!("session start error: {}", e))?;
registry
.insert(session)
.map_err(|e| format!("registry insert error: {}", e))?;
Ok(serde_json::json!({ "session_id": session_id }).to_string())
}
pub async fn transcribe_stream_stop(
session_id: &str,
registry: Arc<VoiceSessionRegistry>,
) -> Result<String, String> {
push_handles().remove(session_id);
registry
.stop(session_id)
.await
.map_err(|e| format!("session stop error: {}", e))?;
Ok(serde_json::json!({ "session_id": session_id, "stopped": true }).to_string())
}
pub async fn transcribe_stream_push(
session_id: &str,
pcm_frame: &[u8],
registry: Arc<VoiceSessionRegistry>,
) -> Result<String, String> {
if !registry.contains(session_id) {
return Err(format!("unknown session_id '{}'", session_id));
}
let handle = push_handles()
.get(session_id)
.ok_or_else(|| {
format!(
"session '{}' is not a pcm_push session (push only works for sources of kind 'pcm_push')",
session_id
)
})?
.clone();
handle
.feed_pcm(pcm_frame.to_vec())
.await
.map_err(|e: car_voice::VoiceError| format!("push failed: {}", e))?;
Ok(serde_json::json!({ "accepted": true }).to_string())
}
pub fn list_voice_sessions(registry: Arc<VoiceSessionRegistry>) -> String {
serde_json::json!({ "sessions": registry.list() }).to_string()
}
fn build_voice_config(opts: &TranscribeStreamOptions) -> VoiceConfig {
let mut config = VoiceConfig::default();
if let Some(model) = &opts.model {
config.whisper_cpp_model = model.clone();
}
if let Some(lang) = &opts.language {
config.language = lang.clone();
}
if let Some(overlay) = &opts.voice_prompt_overlay {
config.voice_prompt_overlay = Some(overlay.clone());
}
let _ = &opts.prompt; config
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_mic_source() {
let s: AudioSourceSpec = serde_json::from_str(r#"{"kind":"mic"}"#).unwrap();
matches!(s, AudioSourceSpec::Mic);
}
#[test]
fn parses_pcm_push_with_defaults() {
let s: AudioSourceSpec =
serde_json::from_str(r#"{"kind":"pcm_push","sample_rate":48000}"#).unwrap();
match s {
AudioSourceSpec::PcmPush {
sample_rate,
channels,
} => {
assert_eq!(sample_rate, 48000);
assert_eq!(channels, 1);
}
_ => panic!("wrong variant"),
}
}
#[test]
fn parses_file_source() {
let s: AudioSourceSpec =
serde_json::from_str(r#"{"kind":"file","path":"/tmp/a.wav"}"#).unwrap();
match s {
AudioSourceSpec::File { path } => assert_eq!(path, "/tmp/a.wav"),
_ => panic!("wrong variant"),
}
}
#[test]
fn rejects_unknown_kind() {
let res: Result<AudioSourceSpec, _> = serde_json::from_str(r#"{"kind":"radio"}"#);
assert!(res.is_err());
}
#[test]
fn options_defaults_are_empty() {
let opts: TranscribeStreamOptions = serde_json::from_str("{}").unwrap();
assert!(opts.model.is_none());
assert!(opts.language.is_none());
assert!(!opts.emit_audio_meta);
}
#[tokio::test]
async fn unknown_session_stop_returns_error() {
let registry = Arc::new(VoiceSessionRegistry::new());
let err = transcribe_stream_stop("nope", registry).await.unwrap_err();
assert!(err.contains("nope") || err.contains("not running"));
}
}