use car_voice::{
Listener, PushHandle, PushListener, VoiceConfig, VoiceEventSink, VoiceSession,
VoiceSessionRegistry,
};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, OnceLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AudioSourceSpec {
Mic,
File { path: String },
Fifo { path: String },
System,
PcmPush {
sample_rate: u32,
#[serde(default = "default_channels")]
channels: u16,
},
}
fn default_channels() -> u16 {
1
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TranscribeStreamOptions {
pub model: Option<String>,
pub language: Option<String>,
pub prompt: Option<String>,
#[serde(default)]
pub emit_audio_meta: bool,
#[serde(default)]
pub streaming: bool,
#[serde(default)]
pub diarizer: bool,
#[serde(default)]
pub enrolled: bool,
#[serde(default)]
pub voice_prompt_overlay: Option<String>,
#[serde(default)]
pub provider: Option<String>,
}
fn push_handles() -> &'static DashMap<String, PushHandle> {
static MAP: OnceLock<DashMap<String, PushHandle>> = OnceLock::new();
MAP.get_or_init(DashMap::new)
}
#[cfg(feature = "parakeet")]
fn prepared_parakeet() -> &'static OnceLock<std::sync::Arc<car_voice::ParakeetSttProvider>> {
static P: OnceLock<std::sync::Arc<car_voice::ParakeetSttProvider>> = OnceLock::new();
&P
}
#[cfg(feature = "parakeet")]
pub async fn prepare_parakeet() -> Result<String, String> {
let dir = car_voice::ParakeetSttProvider::default_model_dir()
.map_err(|e| format!("model dir resolution failed: {}", e))?;
let dir_str = dir.display().to_string();
let provider = std::sync::Arc::new(car_voice::ParakeetSttProvider::new(dir.clone()));
let provider_for_load = provider.clone();
tokio::task::spawn_blocking(move || provider_for_load.prepare())
.await
.map_err(|e| format!("blocking task failed: {}", e))?
.map_err(|e| format!("parakeet load failed: {}", e))?;
let _ = prepared_parakeet().set(provider);
Ok(serde_json::json!({ "model_dir": dir_str, "ready": true }).to_string())
}
#[cfg(not(feature = "parakeet"))]
pub async fn prepare_parakeet() -> Result<String, String> {
Err(
"car-ffi-common was built without the `parakeet` feature; rebuild with --features parakeet"
.to_string(),
)
}
#[cfg(feature = "parakeet")]
pub(crate) fn current_prepared_parakeet() -> Option<std::sync::Arc<car_voice::ParakeetSttProvider>>
{
prepared_parakeet().get().cloned()
}
#[cfg(feature = "diarization")]
fn prepared_diarizer() -> &'static OnceLock<car_voice::SharedDiarizer> {
static D: OnceLock<car_voice::SharedDiarizer> = OnceLock::new();
&D
}
#[cfg(feature = "diarization")]
pub async fn prepare_diarizer() -> Result<String, String> {
let result = tokio::task::spawn_blocking(|| {
car_voice::SpeakerDiarizer::new(car_voice::DiarizationConfig::default())
})
.await
.map_err(|e| format!("blocking task failed: {}", e))?
.map_err(|e| format!("diarizer load failed: {}", e))?;
let arc = std::sync::Arc::new(result);
let _ = prepared_diarizer().set(arc);
Ok(serde_json::json!({ "ready": true }).to_string())
}
#[cfg(not(feature = "diarization"))]
pub async fn prepare_diarizer() -> Result<String, String> {
Err("car-ffi-common was built without the `diarization` feature; rebuild with --features diarization".to_string())
}
#[cfg(feature = "diarization")]
pub fn current_prepared_diarizer() -> Option<car_voice::SharedDiarizer> {
prepared_diarizer().get().cloned()
}
fn prepared_pipeline() -> &'static OnceLock<std::sync::Arc<car_voice::SpeakerPipeline>> {
static P: OnceLock<std::sync::Arc<car_voice::SpeakerPipeline>> = OnceLock::new();
&P
}
pub fn current_or_build_pipeline() -> Result<std::sync::Arc<car_voice::SpeakerPipeline>, String> {
if let Some(p) = prepared_pipeline().get() {
return Ok(p.clone());
}
let pipeline = car_voice::SpeakerPipeline::baseline();
let pipeline = match car_voice::list_enrollments()
.map_err(|e| format!("list enrollments: {}", e))?
.into_iter()
.next()
{
Some(info) => match car_voice::load_enrollment(&info.label) {
Ok(e) => pipeline.with_enrollment(e),
Err(e) => {
tracing::warn!(
"[enrollment] failed to load '{}': {} — pipeline starts empty",
info.label,
e
);
pipeline
}
},
None => pipeline,
};
let arc = std::sync::Arc::new(pipeline);
let _ = prepared_pipeline().set(arc.clone());
Ok(arc)
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum EnrollAudioSpec {
Pcm {
sample_rate: u32,
#[serde(default = "default_channels")]
channels: u16,
data_b64: String,
},
Wav { path: String },
}
pub async fn enroll_speaker(label: &str, audio_json: &str) -> Result<String, String> {
let spec: EnrollAudioSpec =
serde_json::from_str(audio_json).map_err(|e| format!("invalid audio JSON: {}", e))?;
let label = label.to_string();
let result = tokio::task::spawn_blocking(move || -> Result<String, String> {
let enrollment = match spec {
EnrollAudioSpec::Pcm {
sample_rate,
channels,
data_b64,
} => {
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD
.decode(&data_b64)
.map_err(|e| format!("invalid pcm data_b64: {}", e))?;
if bytes.len() % 2 != 0 {
return Err("pcm bytes length must be even (16-bit samples)".to_string());
}
let samples_i16: Vec<i16> = bytes
.chunks_exact(2)
.map(|c| i16::from_le_bytes([c[0], c[1]]))
.collect();
car_voice::enroll_from_pcm(&label, &samples_i16, sample_rate, channels)
.map_err(|e| format!("enroll: {}", e))?
}
EnrollAudioSpec::Wav { path } => car_voice::enroll_from_wav(&label, path.as_ref())
.map_err(|e| format!("enroll: {}", e))?,
};
let path = car_voice::save_enrollment(&enrollment).map_err(|e| e.to_string())?;
let _ = prepared_pipeline();
Ok(serde_json::json!({
"label": enrollment.label,
"path": path.display().to_string(),
"model_id": enrollment.embedding.model,
})
.to_string())
})
.await
.map_err(|e| format!("blocking task: {}", e))??;
Ok(result)
}
pub fn list_enrollments() -> Result<String, String> {
let infos = car_voice::list_enrollments().map_err(|e| e.to_string())?;
let arr: Vec<serde_json::Value> = infos
.into_iter()
.map(|i| {
serde_json::json!({
"label": i.label,
"path": i.path.display().to_string(),
"model_id": i.model_id,
})
})
.collect();
Ok(serde_json::json!({ "enrollments": arr }).to_string())
}
pub fn remove_enrollment(label: &str) -> Result<String, String> {
car_voice::remove_enrollment(label).map_err(|e| e.to_string())?;
Ok(serde_json::json!({ "label": label, "removed": true }).to_string())
}
fn build_listener_for_source(
session_id: &str,
source: &AudioSourceSpec,
streaming: bool,
diarizer: bool,
enrolled: bool,
provider: Option<&str>,
) -> Result<Box<dyn Listener>, String> {
match source {
AudioSourceSpec::Mic => {
let mut listener = car_voice::CpalListener::new();
#[cfg(target_os = "macos")]
if streaming {
let provider =
std::sync::Arc::new(car_voice::AppleSpeechSttProvider::new());
listener = listener.with_apple_speech_streaming(provider);
}
#[cfg(all(not(target_os = "macos"), feature = "parakeet"))]
if streaming {
let provider = current_prepared_parakeet().unwrap_or_else(|| {
let dir = car_voice::ParakeetSttProvider::default_model_dir()
.expect("default_model_dir resolution");
std::sync::Arc::new(car_voice::ParakeetSttProvider::new(dir))
});
listener = listener.with_parakeet_streaming(provider);
}
#[cfg(all(not(target_os = "macos"), not(feature = "parakeet")))]
let _ = streaming;
#[cfg(feature = "diarization")]
if diarizer {
if let Some(d) = current_prepared_diarizer() {
listener = listener.with_diarizer(d);
} else {
tracing::warn!(
"[voice] diarizer requested but not prepared — call prepare_diarizer() first; defaulting to no diarization"
);
}
}
#[cfg(not(feature = "diarization"))]
let _ = diarizer;
if enrolled {
match current_or_build_pipeline() {
Ok(p) => listener = listener.with_speaker_pipeline(p),
Err(e) => tracing::warn!(
"[voice] failed to build SpeakerPipeline: {} — proceeding without enrollment",
e
),
}
}
Ok(Box::new(listener))
}
#[cfg(all(target_os = "macos", feature = "system-audio-macos"))]
AudioSourceSpec::System => Ok(Box::new(car_voice::SystemAudioListener::new())),
#[cfg(target_os = "windows")]
AudioSourceSpec::System => Ok(Box::new(car_voice::WindowsLoopbackListener::new())),
#[cfg(target_os = "linux")]
AudioSourceSpec::System => {
let monitor = car_voice::default_monitor_source().ok_or_else(|| {
"no PipeWire/PulseAudio monitor source found — install pipewire-pulse or pulseaudio".to_string()
})?;
tracing::info!(
"[voice] linux system audio: using monitor source '{}'",
monitor
);
Ok(Box::new(car_voice::CpalListener::new()))
}
#[cfg(not(any(
all(target_os = "macos", feature = "system-audio-macos"),
target_os = "windows",
target_os = "linux",
)))]
AudioSourceSpec::System => Err(
"system audio capture: macOS requires the `system-audio-macos` feature (full Xcode); other platforms (Windows / Linux) need building with their respective target_os"
.to_string(),
),
AudioSourceSpec::File { .. } => Err(
"file source not yet wired into transcribeStream — call the existing batch transcribe() in the meantime"
.to_string(),
),
AudioSourceSpec::Fifo { .. } => {
Err("fifo source not yet wired into transcribeStream".to_string())
}
AudioSourceSpec::PcmPush {
sample_rate,
channels,
} => match provider {
Some("elevenlabs") => {
if *channels != 1 {
return Err(format!(
"elevenlabs provider requires 1-channel PCM, got {channels}; \
downmix on the caller side or open a `PcmPush` with channels=1"
));
}
let listener =
car_voice::ElevenLabsStreamingListener::new(*sample_rate);
if let Some(handle) = listener.handle() {
push_handles().insert(session_id.to_string(), handle);
}
Ok(Box::new(listener))
}
None | Some("local") => {
let listener = PushListener::new(*sample_rate, *channels);
if let Some(handle) = listener.handle() {
push_handles().insert(session_id.to_string(), handle);
}
Ok(Box::new(listener))
}
Some(other) => Err(format!(
"unknown transcribe_stream provider '{other}'; supported: \"elevenlabs\", \"local\" (default)"
)),
},
}
}
pub async fn transcribe_stream_start(
session_id: &str,
audio_source_json: &str,
options_json: Option<&str>,
registry: Arc<VoiceSessionRegistry>,
sink: Arc<dyn VoiceEventSink>,
) -> Result<String, String> {
if session_id.is_empty() {
return Err("session_id must not be empty".to_string());
}
if registry.contains(session_id) {
return Err(format!("session_id '{}' already exists", session_id));
}
let source: AudioSourceSpec = serde_json::from_str(audio_source_json)
.map_err(|e| format!("invalid audioSource JSON: {}", e))?;
let opts: TranscribeStreamOptions = match options_json {
Some(s) if !s.is_empty() => {
serde_json::from_str(s).map_err(|e| format!("invalid options JSON: {}", e))?
}
_ => TranscribeStreamOptions::default(),
};
let listener = build_listener_for_source(
session_id,
&source,
opts.streaming,
opts.diarizer,
opts.enrolled,
opts.provider.as_deref(),
)?;
let session = VoiceSession::new(session_id, listener);
let config = build_voice_config(&opts);
session
.start(config, sink)
.await
.map_err(|e| format!("session start error: {}", e))?;
registry
.insert(session)
.map_err(|e| format!("registry insert error: {}", e))?;
Ok(serde_json::json!({ "session_id": session_id }).to_string())
}
pub async fn transcribe_stream_stop(
session_id: &str,
registry: Arc<VoiceSessionRegistry>,
) -> Result<String, String> {
push_handles().remove(session_id);
registry
.stop(session_id)
.await
.map_err(|e| format!("session stop error: {}", e))?;
Ok(serde_json::json!({ "session_id": session_id, "stopped": true }).to_string())
}
pub async fn transcribe_stream_push(
session_id: &str,
pcm_frame: &[u8],
registry: Arc<VoiceSessionRegistry>,
) -> Result<String, String> {
if !registry.contains(session_id) {
return Err(format!("unknown session_id '{}'", session_id));
}
let handle = push_handles()
.get(session_id)
.ok_or_else(|| {
format!(
"session '{}' is not a pcm_push session (push only works for sources of kind 'pcm_push')",
session_id
)
})?
.clone();
handle
.feed_pcm(pcm_frame.to_vec())
.await
.map_err(|e: car_voice::VoiceError| format!("push failed: {}", e))?;
Ok(serde_json::json!({ "accepted": true }).to_string())
}
pub fn list_voice_sessions(registry: Arc<VoiceSessionRegistry>) -> String {
serde_json::json!({ "sessions": registry.list() }).to_string()
}
fn tts_stream_handles() -> &'static DashMap<String, tokio::task::AbortHandle> {
static MAP: OnceLock<DashMap<String, tokio::task::AbortHandle>> = OnceLock::new();
MAP.get_or_init(DashMap::new)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TtsStreamOptions {
pub provider: Option<String>,
pub voice_id: Option<String>,
#[serde(default)]
pub binary_frames: bool,
}
fn parse_tts_provider_kind(s: &str) -> Result<car_voice::TtsProviderKind, String> {
match s.to_ascii_lowercase().as_str() {
"elevenlabs" | "eleven-labs" | "eleven_labs" => {
Ok(car_voice::TtsProviderKind::Elevenlabs)
}
"local" => Ok(car_voice::TtsProviderKind::Local),
"kokoro" => Ok(car_voice::TtsProviderKind::Kokoro),
"apple_speech" | "apple-speech" | "applespeech" => {
Ok(car_voice::TtsProviderKind::AppleSpeech)
}
other => Err(format!("unknown tts provider '{}'", other)),
}
}
pub async fn tts_stream_start(
stream_id: &str,
text: &str,
options_json: Option<&str>,
sink: Arc<dyn VoiceEventSink>,
) -> Result<String, String> {
use base64::Engine as _;
if stream_id.is_empty() {
return Err("stream_id must not be empty".to_string());
}
if tts_stream_handles().contains_key(stream_id) {
return Err(format!("stream_id '{}' already exists", stream_id));
}
let opts: TtsStreamOptions = match options_json {
Some(s) if !s.is_empty() => {
serde_json::from_str(s).map_err(|e| format!("invalid options JSON: {}", e))?
}
_ => TtsStreamOptions::default(),
};
let mut config = VoiceConfig::default();
if let Some(provider) = &opts.provider {
config.tts_provider = parse_tts_provider_kind(provider)?;
}
if let Some(voice_id) = &opts.voice_id {
config.elevenlabs_voice_id = voice_id.clone();
}
let speaker = car_voice::build_tts_speaker(&config)
.map_err(|e| format!("tts provider init: {}", e))?;
let mut rx = speaker
.synth_stream(text)
.await
.map_err(|e| format!("synth_stream: {}", e))?;
let stream_id_owned = stream_id.to_string();
let sink_for_task = sink.clone();
let binary_frames = opts.binary_frames;
if binary_frames {
binary::build_frame(binary::FRAME_TYPE_TTS_CHUNK, &stream_id_owned, 0, 0, &[]).map_err(
|e| {
format!(
"binary_frames=true requires stream_id be 32 hex chars: {}",
e
)
},
)?;
}
let join_handle = tokio::spawn(async move {
while let Some(chunk) = rx.recv().await {
let format_byte = match chunk.format {
car_voice::AudioFormat::Mp3 => binary::FORMAT_MP3,
car_voice::AudioFormat::Wav => binary::FORMAT_WAV,
};
let format_str = match chunk.format {
car_voice::AudioFormat::Mp3 => "mp3",
car_voice::AudioFormat::Wav => "wav",
};
if binary_frames {
if let Ok(frame) = binary::build_frame(
binary::FRAME_TYPE_TTS_CHUNK,
&stream_id_owned,
chunk.seq,
format_byte,
&chunk.bytes,
) {
sink_for_task.send_binary(frame);
}
if chunk.is_final {
if let Ok(marker) = binary::build_frame(
binary::FRAME_TYPE_TTS_FINAL,
&stream_id_owned,
chunk.seq,
0,
&[],
) {
sink_for_task.send_binary(marker);
}
break;
}
} else {
let event_json = serde_json::json!({
"type": "tts_chunk",
"stream_id": stream_id_owned,
"seq": chunk.seq,
"audio_b64": base64::engine::general_purpose::STANDARD.encode(&chunk.bytes),
"format": format_str,
"is_final": chunk.is_final,
})
.to_string();
sink_for_task.send(&stream_id_owned, event_json);
if chunk.is_final {
break;
}
}
}
tts_stream_handles().remove(&stream_id_owned);
});
tts_stream_handles().insert(stream_id.to_string(), join_handle.abort_handle());
Ok(serde_json::json!({
"stream_id": stream_id,
"binary_frames": binary_frames,
})
.to_string())
}
pub async fn tts_stream_cancel(stream_id: &str) -> Result<String, String> {
match tts_stream_handles().remove(stream_id) {
Some((_, abort_handle)) => {
abort_handle.abort();
Ok(serde_json::json!({ "stream_id": stream_id, "cancelled": true }).to_string())
}
None => Ok(serde_json::json!({ "stream_id": stream_id, "cancelled": false }).to_string()),
}
}
pub fn list_tts_streams() -> String {
let ids: Vec<String> = tts_stream_handles()
.iter()
.map(|e| e.key().clone())
.collect();
serde_json::json!({ "streams": ids }).to_string()
}
pub mod binary {
pub const FRAME_TYPE_INBOUND_PCM: u8 = 0x01;
pub const FRAME_TYPE_TTS_CHUNK: u8 = 0x02;
pub const FRAME_TYPE_TTS_FINAL: u8 = 0x03;
pub const FRAME_TYPE_TTS_ERROR: u8 = 0x04;
pub const FORMAT_PCM_S16LE: u8 = 0x00;
pub const FORMAT_MP3: u8 = 0x01;
pub const FORMAT_WAV: u8 = 0x02;
pub const HEADER_LEN: usize = 26;
pub struct ParsedFrame<'a> {
pub frame_type: u8,
pub session_id_hex: String,
pub seq: u64,
pub format: u8,
pub payload: &'a [u8],
}
pub fn parse_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, String> {
if bytes.len() < HEADER_LEN {
return Err(format!(
"binary frame shorter than header ({} bytes; need {})",
bytes.len(),
HEADER_LEN
));
}
let frame_type = bytes[0];
let mut session_id_hex = String::with_capacity(32);
for b in &bytes[1..17] {
use std::fmt::Write as _;
let _ = write!(session_id_hex, "{:02x}", b);
}
let seq = u64::from_le_bytes(bytes[17..25].try_into().unwrap());
let format = bytes[25];
let payload = &bytes[HEADER_LEN..];
Ok(ParsedFrame {
frame_type,
session_id_hex,
seq,
format,
payload,
})
}
pub fn build_frame(
frame_type: u8,
session_id_hex: &str,
seq: u64,
format: u8,
payload: &[u8],
) -> Result<Vec<u8>, String> {
let uuid_bytes = parse_session_id_hex(session_id_hex)?;
let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
out.push(frame_type);
out.extend_from_slice(&uuid_bytes);
out.extend_from_slice(&seq.to_le_bytes());
out.push(format);
out.extend_from_slice(payload);
Ok(out)
}
fn parse_session_id_hex(s: &str) -> Result<[u8; 16], String> {
if s.len() != 32 {
return Err(format!(
"session_id must be 32 hex chars (got {} chars)",
s.len()
));
}
let mut out = [0u8; 16];
for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
let hex_pair = std::str::from_utf8(chunk)
.map_err(|_| "session_id contains non-ASCII".to_string())?;
out[i] = u8::from_str_radix(hex_pair, 16)
.map_err(|_| format!("session_id has non-hex chars at index {}", i * 2))?;
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrip_preserves_fields() {
let id = "0123456789abcdef0123456789abcdef";
let payload = b"hello world";
let frame = build_frame(FRAME_TYPE_TTS_CHUNK, id, 42, FORMAT_MP3, payload).unwrap();
let parsed = parse_frame(&frame).unwrap();
assert_eq!(parsed.frame_type, FRAME_TYPE_TTS_CHUNK);
assert_eq!(parsed.session_id_hex, id);
assert_eq!(parsed.seq, 42);
assert_eq!(parsed.format, FORMAT_MP3);
assert_eq!(parsed.payload, payload);
}
#[test]
fn short_frame_rejected() {
assert!(parse_frame(&[0u8; 10]).is_err());
}
#[test]
fn bad_hex_rejected() {
let bad = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz";
assert!(build_frame(0x01, bad, 0, 0, &[]).is_err());
}
#[test]
fn wrong_length_hex_rejected() {
assert!(build_frame(0x01, "abcd", 0, 0, &[]).is_err());
}
}
}
fn build_voice_config(opts: &TranscribeStreamOptions) -> VoiceConfig {
let mut config = VoiceConfig::default();
if let Some(model) = &opts.model {
config.whisper_cpp_model = model.clone();
}
if let Some(lang) = &opts.language {
config.language = lang.clone();
}
if let Some(overlay) = &opts.voice_prompt_overlay {
config.voice_prompt_overlay = Some(overlay.clone());
}
let _ = &opts.prompt; config
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_mic_source() {
let s: AudioSourceSpec = serde_json::from_str(r#"{"kind":"mic"}"#).unwrap();
matches!(s, AudioSourceSpec::Mic);
}
#[test]
fn parses_pcm_push_with_defaults() {
let s: AudioSourceSpec =
serde_json::from_str(r#"{"kind":"pcm_push","sample_rate":48000}"#).unwrap();
match s {
AudioSourceSpec::PcmPush {
sample_rate,
channels,
} => {
assert_eq!(sample_rate, 48000);
assert_eq!(channels, 1);
}
_ => panic!("wrong variant"),
}
}
#[test]
fn parses_file_source() {
let s: AudioSourceSpec =
serde_json::from_str(r#"{"kind":"file","path":"/tmp/a.wav"}"#).unwrap();
match s {
AudioSourceSpec::File { path } => assert_eq!(path, "/tmp/a.wav"),
_ => panic!("wrong variant"),
}
}
#[test]
fn rejects_unknown_kind() {
let res: Result<AudioSourceSpec, _> = serde_json::from_str(r#"{"kind":"radio"}"#);
assert!(res.is_err());
}
#[test]
fn options_defaults_are_empty() {
let opts: TranscribeStreamOptions = serde_json::from_str("{}").unwrap();
assert!(opts.model.is_none());
assert!(opts.language.is_none());
assert!(!opts.emit_audio_meta);
}
#[tokio::test]
async fn unknown_session_stop_returns_error() {
let registry = Arc::new(VoiceSessionRegistry::new());
let err = transcribe_stream_stop("nope", registry).await.unwrap_err();
assert!(err.contains("nope") || err.contains("not running"));
}
}