use crate::adaptive_quality_constants::{
AUDIO_QUALITY_TIERS, AUDIO_REDUNDANCY_ENABLED, AUDIO_RED_FORMAT, VAD_POLL_INTERVAL_MS,
};
use crate::audio_constants::{
rms_to_intensity, AUDIO_LEVEL_DELTA_THRESHOLD, DEFAULT_VAD_THRESHOLD, VAD_FFT_SIZE,
VAD_SMOOTHING_TIME_CONSTANT,
};
use crate::audio_worklet_codec::EncoderInitOptions;
use crate::audio_worklet_codec::{AudioWorkletCodec, CodecMessages};
use crate::constants::AUDIO_CHANNELS;
use crate::constants::AUDIO_SAMPLE_RATE;
use crate::crypto::aes::Aes128State;
use crate::encode::encoder_state::EncoderState;
use crate::wrappers::EncodedAudioChunkTypeWrapper;
use crate::VideoCallClient;
use gloo::timers::callback::Interval;
use gloo_utils::window;
use js_sys::Array;
use js_sys::Boolean;
use js_sys::Uint8Array;
use protobuf::Message;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use videocall_types::protos::packet_wrapper::PacketWrapper;
use videocall_types::protos::{
media_packet::{media_packet::MediaType, AudioMetadata, MediaPacket},
packet_wrapper::packet_wrapper::PacketType,
};
use videocall_types::Callback;
use wasm_bindgen::prelude::Closure;
use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;
use web_sys::AudioContext;
use web_sys::AudioContextOptions;
use web_sys::EncodedAudioChunkType;
use web_sys::MediaStream;
use web_sys::MediaStreamConstraints;
use web_sys::MediaStreamTrack;
use web_sys::MessageEvent;
use web_time::SystemTime;
pub(crate) struct PreviousAudioFrame {
data: Vec<u8>,
sequence: u64,
}
fn pack_redundant_audio(primary: &[u8], redundant: &PreviousAudioFrame) -> Vec<u8> {
let primary_len = primary.len() as u32;
let redundant_seq = redundant.sequence as u32;
let total_len = 4 + primary.len() + 4 + redundant.data.len();
let mut buf = Vec::with_capacity(total_len);
buf.extend_from_slice(&primary_len.to_le_bytes());
buf.extend_from_slice(primary);
buf.extend_from_slice(&redundant_seq.to_le_bytes());
buf.extend_from_slice(&redundant.data);
buf
}
pub fn transform_audio_chunk(
chunk: &Uint8Array,
user_id: &str,
sequence: u64,
aes: Rc<Aes128State>,
previous_frame: Option<&PreviousAudioFrame>,
) -> PacketWrapper {
let now_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as f64;
let primary_data = chunk.to_vec();
let (data, audio_format) = match previous_frame {
Some(prev) => {
let packed = pack_redundant_audio(&primary_data, prev);
(packed, AUDIO_RED_FORMAT.to_string())
}
None => (primary_data, String::new()),
};
let media_packet: MediaPacket = MediaPacket {
user_id: Vec::new(),
media_type: MediaType::AUDIO.into(),
frame_type: EncodedAudioChunkTypeWrapper(EncodedAudioChunkType::Key).to_string(),
data,
timestamp: now_ms,
audio_metadata: Some(AudioMetadata {
sequence,
audio_format,
..Default::default()
})
.into(),
..Default::default()
};
let data = media_packet.write_to_bytes().unwrap();
let data = aes.encrypt(&data).unwrap();
PacketWrapper {
data,
user_id: user_id.as_bytes().to_vec(),
packet_type: PacketType::MEDIA.into(),
..Default::default()
}
}
pub struct MicrophoneEncoder {
client: VideoCallClient,
state: EncoderState,
_on_encoder_settings_update: Option<Callback<String>>,
codec: AudioWorkletCodec,
on_error: Option<Callback<String>>,
is_speaking: Rc<AtomicBool>,
vad_interval: Rc<RefCell<Option<Interval>>>,
vad_threshold: f32,
#[allow(dead_code)]
tier_audio_bitrate: Rc<AtomicU32>,
tier_enable_fec: Rc<AtomicBool>,
}
impl MicrophoneEncoder {
pub fn new(
client: VideoCallClient,
_bitrate_kbps: u32,
on_encoder_settings_update: Callback<String>,
on_error: Callback<String>,
vad_threshold: Option<f32>,
shared_audio_tier_bitrate: Option<Rc<AtomicU32>>,
shared_audio_tier_fec: Option<Rc<AtomicBool>>,
) -> Self {
let default_audio_bitrate_bps = AUDIO_QUALITY_TIERS[0].bitrate_kbps * 1000;
let default_enable_fec = AUDIO_QUALITY_TIERS[0].enable_fec;
Self {
client,
state: EncoderState::new(),
_on_encoder_settings_update: Some(on_encoder_settings_update),
codec: AudioWorkletCodec::default(),
on_error: Some(on_error),
is_speaking: Rc::new(AtomicBool::new(false)),
vad_interval: Rc::new(RefCell::new(None)),
vad_threshold: vad_threshold.unwrap_or(DEFAULT_VAD_THRESHOLD),
tier_audio_bitrate: shared_audio_tier_bitrate
.unwrap_or_else(|| Rc::new(AtomicU32::new(default_audio_bitrate_bps))),
tier_enable_fec: shared_audio_tier_fec
.unwrap_or_else(|| Rc::new(AtomicBool::new(default_enable_fec))),
}
}
pub fn set_error_callback(&mut self, on_error: Callback<String>) {
self.on_error = Some(on_error);
}
pub fn set_enabled(&mut self, value: bool) -> bool {
let is_changed = self.state.set_enabled(value);
if is_changed {
if value {
let _ = self.codec.start();
} else {
let _ = self.codec.stop();
if let Some(interval) = self.vad_interval.borrow_mut().take() {
drop(interval);
}
self.is_speaking.store(false, Ordering::Relaxed);
self.client.set_speaking(false);
self.client.set_audio_level(0.0);
};
}
is_changed
}
pub fn select(&mut self, device: String) -> bool {
self.state.select(device)
}
pub fn stop(&mut self) {
self.state.stop();
self.codec.destroy();
if let Some(interval) = self.vad_interval.borrow_mut().take() {
drop(interval);
}
self.is_speaking.store(false, Ordering::Relaxed);
self.client.set_speaking(false);
self.client.set_audio_level(0.0);
}
pub fn start(&mut self) {
let user_id = self.client.user_id().clone();
let client = self.client.clone();
let device_id = if let Some(mic) = &self.state.selected {
mic.to_string()
} else {
return;
};
if !self.state.is_enabled() {
log::debug!("Microphone encoder start() called but encoder is not enabled");
return;
}
if self.state.switching.load(Ordering::Acquire) && self.codec.is_instantiated() {
self.stop();
}
if self.state.is_enabled() && self.codec.is_instantiated() {
return;
}
let aes = client.aes();
let on_error = self.on_error.clone();
let EncoderState {
enabled, switching, ..
} = self.state.clone();
let enabled_for_handler = enabled.clone();
let enable_fec_for_handler = self.tier_enable_fec.clone();
let audio_output_handler = {
log::info!("Starting Microphone audio encoder with AnalyserNode VAD");
let mut sequence_number: u64 = 0;
let client_for_send = client.clone();
let mut previous_frame: Option<PreviousAudioFrame> = None;
Box::new(move |chunk: MessageEvent| {
if !enabled_for_handler.load(Ordering::Acquire) {
log::debug!(
"Audio handler stopping: enabled={}",
enabled_for_handler.load(Ordering::Acquire)
);
return;
}
if let Ok(message_type) = js_sys::Reflect::get(&chunk.data(), &"message".into()) {
if let Some(msg_str) = message_type.as_string() {
if msg_str != "page" {
log::debug!("Received control message: {msg_str}");
return;
}
}
}
let data = js_sys::Reflect::get(&chunk.data(), &"page".into()).unwrap();
if let Ok(data) = data.dyn_into::<Uint8Array>() {
let use_redundancy = AUDIO_REDUNDANCY_ENABLED
&& enable_fec_for_handler.load(Ordering::Relaxed)
&& previous_frame.is_some();
let red_ref = if use_redundancy {
previous_frame.as_ref()
} else {
None
};
let packet: PacketWrapper = transform_audio_chunk(
&data,
&user_id,
sequence_number,
aes.clone(),
red_ref,
);
client_for_send.send_media_packet(packet);
previous_frame = Some(PreviousAudioFrame {
data: data.to_vec(),
sequence: sequence_number,
});
sequence_number += 1;
} else {
log::error!("Received non-MessageEvent: {chunk:?}");
}
})
};
let codec = self.codec.clone();
let is_speaking_for_vad = self.is_speaking.clone();
let client_for_vad = client.clone();
let vad_interval_holder = self.vad_interval.clone();
let vad_threshold = self.vad_threshold;
wasm_bindgen_futures::spawn_local(async move {
let navigator = window().navigator();
let media_devices = match navigator.media_devices() {
Ok(md) => md,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to access media devices: {e:?}"));
}
return;
}
};
let constraints = MediaStreamConstraints::new();
let media_info = web_sys::MediaTrackConstraints::new();
if device_id.is_empty() {
log::warn!("Microphone device_id is empty, using default constraint");
constraints.set_audio(&JsValue::TRUE);
} else {
let exact = js_sys::Object::new();
js_sys::Reflect::set(
&exact,
&JsValue::from_str("exact"),
&JsValue::from_str(&device_id),
)
.unwrap();
log::info!("MicrophoneEncoder: deviceId.exact = {}", device_id);
media_info.set_device_id(&exact.into());
constraints.set_audio(&media_info.into());
}
constraints.set_video(&Boolean::from(false));
let devices_query = match media_devices.get_user_media_with_constraints(&constraints) {
Ok(p) => p,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Microphone access failed: {e:?}"));
}
return;
}
};
let device = match JsFuture::from(devices_query).await {
Ok(ok) => ok.unchecked_into::<MediaStream>(),
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to get microphone stream: {e:?}"));
}
return;
}
};
let audio_track = Box::new(
device
.get_audio_tracks()
.find(&mut |_: JsValue, _: u32, _: Array| true)
.unchecked_into::<MediaStreamTrack>(),
);
let track_settings = audio_track.get_settings();
let input_rate: u32 = match js_sys::Reflect::get(
&track_settings,
&JsValue::from_str("sampleRate"),
) {
Ok(v) => match v.as_f64() {
Some(f) => f as u32,
None => {
log::info!("sampleRate not in track settings (Firefox), using AudioContext default");
match AudioContext::new() {
Ok(temp_ctx) => {
let rate = temp_ctx.sample_rate() as u32;
let _ = temp_ctx.close();
rate
}
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!(
"Could not determine microphone sample rate: {e:?}"
));
}
return;
}
}
}
},
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed reading microphone settings: {e:?}"));
}
return;
}
};
log::info!("Microphone input sample rate: {input_rate} Hz");
let options = AudioContextOptions::new();
options.set_sample_rate(input_rate as f32);
let context = match AudioContext::new_with_context_options(&options) {
Ok(ctx) => ctx,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to create audio context: {e:?}"));
}
return;
}
};
log::info!("Created AudioContext with sample rate: {input_rate} Hz");
let analyser = match context.create_analyser() {
Ok(a) => a,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to create analyser: {e:?}"));
}
let _ = context.close();
return;
}
};
analyser.set_fft_size(VAD_FFT_SIZE);
analyser.set_smoothing_time_constant(VAD_SMOOTHING_TIME_CONSTANT);
let worklet = match codec
.create_node(
&context,
"/encoderWorker.min.js",
"encoder-worklet",
AUDIO_CHANNELS,
)
.await
{
Ok(node) => node,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to initialize audio encoder: {e:?}"));
}
let _ = context.close();
return;
}
};
let output_handler =
Closure::wrap(audio_output_handler as Box<dyn FnMut(MessageEvent)>);
codec.set_onmessage(output_handler.as_ref().unchecked_ref());
output_handler.forget();
let initial_tier = &AUDIO_QUALITY_TIERS[0];
let _ = codec.send_message(&CodecMessages::Init {
options: Some(EncoderInitOptions {
encoder_frame_size: Some(20), original_sample_rate: Some(input_rate),
encoder_bit_rate: Some(initial_tier.bitrate_kbps * 1000),
encoder_sample_rate: Some(AUDIO_SAMPLE_RATE),
encoder_fec: Some(initial_tier.enable_fec),
encoder_dtx: Some(initial_tier.enable_dtx),
..Default::default()
}),
});
let source_node = match context.create_media_stream_source(&device) {
Ok(s) => s,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to create media source: {e:?}"));
}
let _ = context.close();
return;
}
};
let gain_node = match context.create_gain() {
Ok(g) => g,
Err(e) => {
if let Some(cb) = &on_error {
cb.emit(format!("Failed to create gain node: {e:?}"));
}
let _ = context.close();
return;
}
};
if let Err(e) = source_node
.connect_with_audio_node(&gain_node)
.and_then(|g| g.connect_with_audio_node(&analyser))
.and_then(|a| a.connect_with_audio_node(&worklet))
{
if let Some(cb) = &on_error {
cb.emit(format!("Failed to connect audio graph: {e:?}"));
}
let _ = context.close();
return;
}
let buffer_length = analyser.frequency_bin_count() as usize;
let data_array = Rc::new(RefCell::new(vec![0.0f32; buffer_length]));
let enabled_check = enabled.clone();
let switching_check = switching.clone();
let data_array_for_interval = data_array.clone();
let is_speaking_clone = is_speaking_for_vad.clone();
let client_clone = client_for_vad.clone();
let prev_audio_level = Rc::new(Cell::new(0.0f32));
let prev_level_clone = prev_audio_level.clone();
let vad_interval = Interval::new(VAD_POLL_INTERVAL_MS, move || {
if !enabled_check.load(Ordering::Acquire) || switching_check.load(Ordering::Acquire)
{
let prev_lvl = prev_level_clone.get();
if prev_lvl > 0.0 {
prev_level_clone.set(0.0);
client_clone.set_audio_level(0.0);
}
return;
}
let mut array = data_array_for_interval.borrow_mut();
analyser.get_float_time_domain_data(&mut array);
let mut sum = 0.0f32;
for sample in array.iter() {
sum += sample * sample;
}
let rms = (sum / array.len() as f32).sqrt();
let speaking = rms > vad_threshold;
let intensity = rms_to_intensity(rms, vad_threshold);
let prev_lvl = prev_level_clone.get();
if (intensity - prev_lvl).abs() > AUDIO_LEVEL_DELTA_THRESHOLD {
prev_level_clone.set(intensity);
client_clone.set_audio_level(intensity);
}
log::trace!("VAD: RMS={:.4}, speaking={}", rms, speaking);
let prev = is_speaking_clone.load(Ordering::Relaxed);
if speaking != prev {
is_speaking_clone.store(speaking, Ordering::Relaxed);
client_clone.set_speaking(speaking);
}
});
*vad_interval_holder.borrow_mut() = Some(vad_interval);
let check_interval = VAD_POLL_INTERVAL_MS as i32; let enabled_check_monitor = enabled.clone();
let switching_check_monitor = switching.clone();
loop {
let delay_promise = js_sys::Promise::new(&mut |resolve, _| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(
&resolve,
check_interval,
)
.unwrap();
});
let _ = wasm_bindgen_futures::JsFuture::from(delay_promise).await;
if !enabled_check_monitor.load(Ordering::Acquire)
|| switching_check_monitor.load(Ordering::Acquire)
{
log::info!("Stopping Microphone audio encoder");
switching_check_monitor.store(false, Ordering::Release);
is_speaking_for_vad.store(false, Ordering::Relaxed);
client_for_vad.set_speaking(false);
client_for_vad.set_audio_level(0.0);
if let Some(interval) = vad_interval_holder.borrow_mut().take() {
drop(interval);
}
audio_track.stop();
if let Err(e) = context.close() {
log::error!("Error closing AudioContext: {e:?}");
}
codec.destroy();
log::info!("Microphone audio encoder stopped and cleaned up");
break;
}
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::decode::neteq_audio_decoder::NetEqAudioPeerDecoder;
use wasm_bindgen_test::*;
#[wasm_bindgen_test]
fn pack_normal_primary_and_redundant() {
let primary = b"hello_primary";
let redundant = PreviousAudioFrame {
data: b"prev_frame".to_vec(),
sequence: 42,
};
let packed = pack_redundant_audio(primary, &redundant);
assert_eq!(packed.len(), 4 + 13 + 4 + 10);
let primary_len = u32::from_le_bytes([packed[0], packed[1], packed[2], packed[3]]);
assert_eq!(primary_len, 13);
assert_eq!(&packed[4..4 + 13], b"hello_primary");
let redundant_seq = u32::from_le_bytes([packed[17], packed[18], packed[19], packed[20]]);
assert_eq!(redundant_seq, 42);
assert_eq!(&packed[21..], b"prev_frame");
}
#[wasm_bindgen_test]
fn pack_empty_primary() {
let primary = b"";
let redundant = PreviousAudioFrame {
data: b"redundant_data".to_vec(),
sequence: 0,
};
let packed = pack_redundant_audio(primary, &redundant);
assert_eq!(packed.len(), 22);
let primary_len = u32::from_le_bytes([packed[0], packed[1], packed[2], packed[3]]);
assert_eq!(primary_len, 0);
let redundant_seq = u32::from_le_bytes([packed[4], packed[5], packed[6], packed[7]]);
assert_eq!(redundant_seq, 0);
assert_eq!(&packed[8..], b"redundant_data");
}
#[wasm_bindgen_test]
fn pack_empty_redundant_data() {
let primary = b"some_audio";
let redundant = PreviousAudioFrame {
data: vec![],
sequence: 100,
};
let packed = pack_redundant_audio(primary, &redundant);
assert_eq!(packed.len(), 18);
let primary_len = u32::from_le_bytes([packed[0], packed[1], packed[2], packed[3]]);
assert_eq!(primary_len, 10);
assert_eq!(&packed[4..14], b"some_audio");
let redundant_seq = u32::from_le_bytes([packed[14], packed[15], packed[16], packed[17]]);
assert_eq!(redundant_seq, 100);
assert_eq!(packed.len(), 18);
}
#[wasm_bindgen_test]
fn pack_typical_opus_frame_size() {
let primary: Vec<u8> = (0..120).collect();
let redundant = PreviousAudioFrame {
data: (0..100).collect(),
sequence: 9999,
};
let packed = pack_redundant_audio(&primary, &redundant);
assert_eq!(packed.len(), 4 + 120 + 4 + 100);
let primary_len = u32::from_le_bytes([packed[0], packed[1], packed[2], packed[3]]);
assert_eq!(primary_len, 120);
assert_eq!(&packed[4..124], primary.as_slice());
let redundant_seq =
u32::from_le_bytes([packed[124], packed[125], packed[126], packed[127]]);
assert_eq!(redundant_seq, 9999);
assert_eq!(&packed[128..], redundant.data.as_slice());
}
#[wasm_bindgen_test]
fn pack_large_sequence_number_truncation() {
let primary = b"data";
let redundant = PreviousAudioFrame {
data: b"red".to_vec(),
sequence: (u32::MAX as u64) + 5, };
let packed = pack_redundant_audio(primary, &redundant);
let redundant_seq = u32::from_le_bytes([packed[8], packed[9], packed[10], packed[11]]);
assert_eq!(redundant_seq, 4);
}
#[wasm_bindgen_test]
fn round_trip_pack_then_unpack() {
let primary = b"primary_audio_frame_data";
let redundant = PreviousAudioFrame {
data: b"redundant_audio_frame".to_vec(),
sequence: 77,
};
let packed = pack_redundant_audio(primary, &redundant);
let result = NetEqAudioPeerDecoder::unpack_red_audio_public(&packed);
assert!(
result.is_some(),
"unpack should succeed for valid packed data"
);
let (unpacked_primary, unpacked_seq, unpacked_redundant) = result.unwrap();
assert_eq!(unpacked_primary, primary);
assert_eq!(unpacked_seq, 77);
assert_eq!(unpacked_redundant, redundant.data);
}
#[wasm_bindgen_test]
fn round_trip_with_typical_opus_sizes() {
let primary: Vec<u8> = (0..80).collect();
let redundant = PreviousAudioFrame {
data: (0..60).collect(),
sequence: 12345,
};
let packed = pack_redundant_audio(&primary, &redundant);
let result = NetEqAudioPeerDecoder::unpack_red_audio_public(&packed);
assert!(result.is_some());
let (unpacked_primary, unpacked_seq, unpacked_redundant) = result.unwrap();
assert_eq!(unpacked_primary, primary);
assert_eq!(unpacked_seq, 12345);
assert_eq!(unpacked_redundant, redundant.data);
}
}