VAMQ
Consume audio chunks from Voice Activity Messaging via ZeroMQ to support speech-to-X.
Currently, VAMQ only supports voice activity input from Silero VAD.

AI providers
Usage
Prerequisites
VAMQ retrieve data from Silero-VAD service, sender should push header & payload to your target service via ZeroMQ look like this:
speech_timestamps = get_speech_ts(
audio_float32, model,
sampling_rate=cfg.sampling_rate,
threshold=ARGS.trig_sum, min_speech_duration_ms=min_speech_ms, min_silence_duration_ms=min_silence_ms, window_size_samples=win, speech_pad_ms=30 )
if(len(speech_timestamps)>0):
print("silero VAD has detected a possible speech")
for seg in speech_timestamps:
s = int(seg['start'])
e = int(seg['end'])
pre_start = max(0, s - preroll_samples)
preroll_i16 = newsound[pre_start:s]
preroll_bytes = preroll_i16.astype(np.int16).tobytes()
seg_i16 = newsound[s:e]
seg_bytes = seg_i16.astype(np.int16).tobytes()
flags = 0b001 | (0b100 if len(preroll_bytes) > 0 else 0)
sender.send(sender.header(session_id, flags), preroll_bytes)
for chunk in chunks_20ms(seg_bytes, sr=cfg.sampling_rate, ch=cfg.channels, bytes_per_sample=cfg.bytes_per_sample):
if not chunk:
continue
sender.send(sender.header(session_id, 0), chunk)
sender.send(sender.header(session_id, 0b010), b"")
else:
print("silero VAD has detected a noise")
Ex. header method in sender function
def header(self, session_id:str, flags:int):
return {
"session_id": session_id,
"seq": self.seq,
"ts_ns": time.monotonic_ns(), "sr": self.configs.sampling_rate,
"ch": self.configs.channels,
"fmt": "s16le",
"flags": flags
}
Ex. send method in sender function
It used socket with zmq.PUSH method, receiver will zmq.PULL the request:
def send(self, header:dict, payload:bytes):
self.sock.send(json.dumps(header).encode("utf-8"), zmq.SNDMORE)
self.sock.send(payload, 0)
self.seq += 1
OpenAI
Assume you set data criteria look like this:
-
Consume audio chunk from ZeroMQ port number 5551.
-
For RealtimeClient, use mode speech-to-speech. You just change profile of RealtimeFeatures to RealtimeProfile::S2S:
RealtimeFeatures::from_profile(RealtimeProfile::S2S)
-
chunk of 30 ms @ 16kHz = 0.03 * 16_000 = 480 samples.
-
minimum commit size (@ 24k) – e.g. 360 ms.
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use vamq::{
audio::{
upsampling::general::rate16to24::min_bytes_24k_pcm16,
vad::consumer::VadConsumer
}
providers::openai::{
RealtimeClient, RtEvent, SharedClient,
schema::{RealtimeFeatures, RealtimeProfile}
}
};
pub async fn run() -> Result<()> {
let in_chunk_16k = 480;
let min_commit_ms: u32 = 360u32;
let min_commit_bytes = min_bytes_24k_pcm16(min_commit_ms);
let mut consumer = VadConsumer::new(
"tcp://0.0.0.0:5551",
24_000u32,
min_commit_bytes,
in_chunk_16k,
)?;
let cfg = OpenAiConfig {
api_key: SecretString::from(env::var("OPENAI_API_KEY").unwrap_or("".to_string())),
model_realtime: "gpt-4o-realtime-preview-2024-12-17",
model_transcribe: "whisper-1",
sample_rate: 24_000
};
let client = RealtimeClient::connect(
&cfg.api_key,
&cfg,
RealtimeFeatures::from_profile(RealtimeProfile::S2S)
)
.await
.map_err(|e| { error!("Cannot connect to OpenAI's API: {:?}", e); e })?;
let client: SharedClient = Arc::new(tokio::sync::Mutex::new(client));
let (event_tx, event_rx) = mpsc::unbounded_channel::<RtEvent>();
RealtimeClient::listen(&client, event_tx);
RealtimeClient::recv_event(event_rx, ws_sender.clone(), |ev, ws| {
let ws_clone = ws.clone();
async move {
handle_event(ev, &ws_clone).await
}
});
loop {
if let Some(mut commit) = consumer.recv(10)? {
let mut is_end = true;
commit_once(&client, &mut commit.pcm24k_s16le, &mut is_end).await?;
}
}
}
This is example commit_once of speech-to-speech for OpenAI:
async fn commit_once(
client: &SharedClient,
acc: &mut Vec<u8>,
is_end: &mut bool
) -> anyhow::Result<()> {
if acc.is_empty() {
return Ok(());
}
let ms = (acc.len() as f64) / (24_000.0 * 2.0) * 1000.0;
debug!(
commit_bytes = %acc.len(),
approx_ms = %format!("{ms:.1}"),
"commit @24k"
);
{
let mut c = client.lock().await;
c.send_input_pcm16(acc).await?;
c.commit().await?;
if *is_end {
c.request_response(true).await?;
}
}
acc.clear();
Ok(())
}
After sent data chunk, you can handle the response look like this:
async fn handle_event(
ev: RtEvent,
ws_sender: &WsSender,
) -> anyhow::Result<()> {
let mut full_audio: Vec<u8> = Vec::new();
match ev {
RtEvent::AudioDelta(bytes) => {
debug!(len = bytes.len(), "audio Δ");
full_audio.extend_from_slice(&bytes);
for chunk in bytes.chunks(4800) {
ws_send_pcm16(ws_sender, &chunk).await?;
}
}
RtEvent::TextDelta(s) => {
info!(target: "realtime.text", "text Δ: {}", s);
}
RtEvent::UserTranscriptDelta(d) => {
let _ = d;
}
RtEvent::UserTranscriptFinal(t) => {
info!("user: {t}");
}
RtEvent::AssistantTranscriptDelta(d) => {
let _ = d;
}
RtEvent::SessionCreated(v) => {
debug!(target: "realtime.session", "session.created: {}", v);
}
RtEvent::Completed => {
debug!("completed – flushing remaining audio");
}
RtEvent::Error(msg) => {
error!("realtime error: {:?}", msg);
}
RtEvent::Closed => {
warn!("realtime closed");
}
RtEvent::Other(v) => {
debug!("realtime others event: {:?}", v);
}
RtEvent::Idle => {},
_ => {}
}
if !full_audio.is_empty() {
let timestamp = now_unix_nanos();
let path = format!("/temp/debug_audio_{}.wav", timestamp);
if let Err(e) = write_wav_pcm16_mono_24k(&path, &full_audio) {
warn!("Failed to write WAV: {:?}", e);
} else {
info!("Saved full audio response → {}", path);
}
}
Ok(())
}
License
MIT