use crate::media::StreamWriter;
use crate::media::negotiate::NegotiatedLegProfile;
use crate::media::wav_writer::CodecWavWriter;
use anyhow::Result;
use audio_codec::{CodecType, Decoder, Encoder, create_decoder, create_encoder};
use bytes::Bytes;
use rustrtc::media::MediaSample;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing::debug;
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
#[serde(default)]
pub struct RecorderOption {
#[serde(default)]
pub recorder_file: String,
#[serde(default)]
pub samplerate: u32,
#[serde(default)]
pub ptime: u32,
}
impl RecorderOption {
pub fn new(recorder_file: String) -> Self {
Self {
recorder_file,
..Default::default()
}
}
}
impl Default for RecorderOption {
fn default() -> Self {
Self {
recorder_file: "".to_string(),
samplerate: 16000,
ptime: 200,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Leg {
A,
B,
}
#[derive(Debug, Clone, Copy)]
struct DtmfEventState {
digit_code: u8,
rtp_timestamp: u32,
absolute_timestamp: u32,
duration_samples: u32,
}
pub struct Recorder {
pub path: String,
pub codec: CodecType,
written_bytes: u32,
sample_rate: u32,
channels: u16,
dtmf_gen: DtmfGenerator,
encoder: Option<Box<dyn Encoder>>,
decoders: HashMap<(Leg, u8), Box<dyn Decoder>>,
resamplers: HashMap<(Leg, u8), audio_codec::Resampler>,
buffer_a: BTreeMap<u32, Bytes>,
buffer_b: BTreeMap<u32, Bytes>,
start_instant: Instant,
last_flush: Instant,
base_timestamp_a: Option<u32>,
base_timestamp_b: Option<u32>,
start_offset_a: u32, start_offset_b: u32, last_ssrc_a: Option<u32>,
last_ssrc_b: Option<u32>,
profile_a: NegotiatedLegProfile,
profile_b: NegotiatedLegProfile,
dtmf_state_a: Option<DtmfEventState>,
dtmf_state_b: Option<DtmfEventState>,
next_flush_ts: u32, ptime: Duration,
written_samples: u64,
writer: Box<dyn StreamWriter>,
}
impl Recorder {
pub fn new(path: &str, codec: CodecType) -> Result<Self> {
if let Some(parent) = PathBuf::from(path).parent() {
std::fs::create_dir_all(parent).ok();
}
let file = File::create(path)
.map_err(|e| anyhow::anyhow!("Failed to create recorder file {}: {}", path, e))?;
let src_codec = codec;
let codec = match codec {
CodecType::Opus => CodecType::PCMU,
CodecType::G722 => CodecType::PCMU,
_ => codec,
};
let sample_rate = codec.samplerate();
let encoder = Some(create_encoder(codec));
debug!(
"Creating recorder: path={}, src_codec={:?} codec={:?}",
path, src_codec, codec
);
let channels = 2;
let mut writer = Box::new(CodecWavWriter::new(file, sample_rate, channels, Some(codec)));
writer.write_header()?;
Ok(Self {
path: path.to_string(),
codec,
written_bytes: 0,
sample_rate,
channels,
dtmf_gen: DtmfGenerator::new(sample_rate),
encoder,
decoders: HashMap::new(),
resamplers: HashMap::new(),
buffer_a: BTreeMap::new(),
buffer_b: BTreeMap::new(),
start_instant: Instant::now(),
last_flush: Instant::now(),
base_timestamp_a: None,
base_timestamp_b: None,
start_offset_a: 0,
start_offset_b: 0,
last_ssrc_a: None,
last_ssrc_b: None,
profile_a: NegotiatedLegProfile::default(),
profile_b: NegotiatedLegProfile::default(),
dtmf_state_a: None,
dtmf_state_b: None,
next_flush_ts: 0,
written_samples: 0,
writer,
ptime: Duration::from_millis(200),
})
}
pub fn set_leg_profile(&mut self, leg: Leg, profile: NegotiatedLegProfile) {
match leg {
Leg::A => self.profile_a = profile,
Leg::B => self.profile_b = profile,
}
}
fn profile_for_leg(&self, leg: Leg) -> &NegotiatedLegProfile {
match leg {
Leg::A => &self.profile_a,
Leg::B => &self.profile_b,
}
}
pub fn write_sample(
&mut self,
leg: Leg,
sample: &MediaSample,
dtmf_pt: Option<u8>,
dtmf_clock_rate: Option<u32>,
codec_hint: Option<CodecType>,
) -> Result<()> {
let frame = match sample {
MediaSample::Audio(frame) => frame,
_ => return Ok(()),
};
let (profile_audio_pt, profile_audio_codec, profile_dtmf_pt, profile_dtmf_clock_rate) = {
let profile = self.profile_for_leg(leg);
(
profile.audio.as_ref().map(|codec| codec.payload_type),
profile.audio.as_ref().map(|codec| codec.codec),
profile.dtmf.as_ref().map(|codec| codec.payload_type),
profile.dtmf.as_ref().map(|codec| codec.clock_rate),
)
};
let dtmf_pt = dtmf_pt.or(profile_dtmf_pt);
let dtmf_clock_rate = dtmf_clock_rate.or(profile_dtmf_clock_rate);
if let (Some(pt), Some(dpt)) = (frame.payload_type, dtmf_pt)
&& pt == dpt
{
return self.write_dtmf_payload(
leg,
&frame.data,
frame.rtp_timestamp,
dtmf_clock_rate.unwrap_or(self.sample_rate),
);
}
let codec_hint = codec_hint.or(match (frame.payload_type, profile_audio_pt) {
(Some(pt), Some(audio_pt)) if pt == audio_pt => profile_audio_codec,
_ => None,
});
let decoder_type = match codec_hint {
Some(codec) => codec,
None => CodecType::try_from(frame.payload_type.unwrap_or(0))?,
};
let packet_ssrc = frame.raw_packet.as_ref().map(|packet| packet.header.ssrc);
let (mut encoded, frame_clock_rate) = match sample {
MediaSample::Audio(frame) => match frame.raw_packet.as_ref() {
Some(packet) => (
Bytes::copy_from_slice(&packet.payload),
decoder_type.clock_rate().max(1),
),
None => (frame.data.clone(), frame.clock_rate.max(1)),
},
_ => return Ok(()),
};
if decoder_type != self.codec {
let decoder = self
.decoders
.entry((leg, decoder_type.payload_type()))
.or_insert_with(|| create_decoder(decoder_type));
let pcm = decoder.decode(&encoded);
let resampler = self
.resamplers
.entry((leg, decoder_type.payload_type()))
.or_insert_with(|| {
audio_codec::Resampler::new(
decoder.sample_rate() as usize,
self.sample_rate as usize,
)
});
let pcm = resampler.resample(&pcm);
encoded = if let Some(enc) = self.encoder.as_mut() {
enc.encode(&pcm)
} else {
audio_codec::samples_to_bytes(&pcm)
}
.into();
}
self.maybe_reset_leg_timeline(
leg,
frame.rtp_timestamp,
frame_clock_rate,
packet_ssrc,
&encoded,
);
let absolute_ts = self.timestamp_to_absolute(leg, frame.rtp_timestamp, frame_clock_rate);
self.insert_audio_block(leg, absolute_ts, encoded);
if self.last_flush.elapsed() >= self.ptime {
self.flush()?;
}
Ok(())
}
fn maybe_reset_leg_timeline(
&mut self,
leg: Leg,
rtp_timestamp: u32,
frame_clock_rate: u32,
packet_ssrc: Option<u32>,
encoded: &[u8],
) {
let leg_end = self.leg_end_ts(leg);
let (base_timestamp, start_offset, last_ssrc) = match leg {
Leg::A => (
&mut self.base_timestamp_a,
&mut self.start_offset_a,
&mut self.last_ssrc_a,
),
Leg::B => (
&mut self.base_timestamp_b,
&mut self.start_offset_b,
&mut self.last_ssrc_b,
),
};
let Some(base) = *base_timestamp else {
*last_ssrc = packet_ssrc;
return;
};
let projected = start_offset.wrapping_add(
((rtp_timestamp.wrapping_sub(base)) as u64 * self.sample_rate as u64
/ frame_clock_rate.max(1) as u64) as u32,
);
let max_gap = self.sample_rate * 2;
let ssrc_changed =
matches!((*last_ssrc, packet_ssrc), (Some(prev), Some(curr)) if prev != curr);
let timestamp_far_ahead = projected > leg_end.saturating_add(max_gap);
let timestamp_far_behind = leg_end > projected.saturating_add(max_gap);
let prev_ssrc = *last_ssrc;
if ssrc_changed || timestamp_far_ahead || timestamp_far_behind {
debug!(
recorder_path = %self.path,
leg = ?leg,
base_timestamp = base,
rtp_timestamp,
frame_clock_rate,
prev_ssrc,
packet_ssrc,
leg_end,
projected,
max_gap,
ssrc_changed,
timestamp_far_ahead,
timestamp_far_behind,
encoded_len = encoded.len(),
"Recorder timeline discontinuity detected, resetting leg base"
);
*base_timestamp = Some(rtp_timestamp);
*start_offset = leg_end;
}
*last_ssrc = packet_ssrc.or(*last_ssrc);
}
fn leg_end_ts(&self, leg: Leg) -> u32 {
let (samples_per_block, bytes_per_block) = self.block_info();
let buffered_end = match leg {
Leg::A => self.buffer_a.last_key_value(),
Leg::B => self.buffer_b.last_key_value(),
}
.map(|(k, v)| k + (v.len() / bytes_per_block) as u32 * samples_per_block)
.unwrap_or(self.next_flush_ts);
buffered_end.max(self.next_flush_ts)
}
fn block_span_samples(&self, data: &[u8]) -> u32 {
let (samples_per_block, bytes_per_block) = self.block_info();
(data.len() / bytes_per_block) as u32 * samples_per_block
}
fn timestamp_to_absolute(&mut self, leg: Leg, timestamp: u32, clock_rate: u32) -> u32 {
let timestamp_clock_rate = clock_rate.max(1);
match leg {
Leg::A => {
if self.base_timestamp_a.is_none() {
self.base_timestamp_a = Some(timestamp);
self.start_offset_a = (self.start_instant.elapsed().as_millis() as u64
* self.sample_rate as u64
/ 1000) as u32;
}
let relative = timestamp.wrapping_sub(self.base_timestamp_a.unwrap());
let scaled_relative = (relative as u64 * self.sample_rate as u64
/ timestamp_clock_rate as u64) as u32;
self.start_offset_a.wrapping_add(scaled_relative)
}
Leg::B => {
if self.base_timestamp_b.is_none() {
self.base_timestamp_b = Some(timestamp);
self.start_offset_b = (self.start_instant.elapsed().as_millis() as u64
* self.sample_rate as u64
/ 1000) as u32;
}
let relative = timestamp.wrapping_sub(self.base_timestamp_b.unwrap());
let scaled_relative = (relative as u64 * self.sample_rate as u64
/ timestamp_clock_rate as u64) as u32;
self.start_offset_b.wrapping_add(scaled_relative)
}
}
}
fn active_dtmf_state(&self, leg: Leg) -> Option<DtmfEventState> {
match leg {
Leg::A => self.dtmf_state_a,
Leg::B => self.dtmf_state_b,
}
}
fn set_dtmf_state(&mut self, leg: Leg, state: DtmfEventState) {
match leg {
Leg::A => self.dtmf_state_a = Some(state),
Leg::B => self.dtmf_state_b = Some(state),
}
}
fn trim_front(&self, data: &Bytes, drop_samples: u32) -> Option<(u32, Bytes)> {
let (samples_per_block, bytes_per_block) = self.block_info();
let drop_blocks = drop_samples.div_ceil(samples_per_block) as usize;
let drop_bytes = drop_blocks * bytes_per_block;
if drop_bytes >= data.len() {
None
} else {
Some((
drop_blocks as u32 * samples_per_block,
Bytes::copy_from_slice(&data[drop_bytes..]),
))
}
}
fn trim_back(&self, data: &Bytes, keep_samples: u32) -> Option<Bytes> {
let (samples_per_block, bytes_per_block) = self.block_info();
let keep_blocks = (keep_samples / samples_per_block) as usize;
let keep_bytes = keep_blocks * bytes_per_block;
if keep_bytes == 0 {
None
} else {
Some(Bytes::copy_from_slice(&data[..keep_bytes.min(data.len())]))
}
}
fn overlay_dtmf_range(&mut self, leg: Leg, start_ts: u32, end_ts: u32, encoded: Bytes) {
let overlapping_keys: Vec<u32> = match leg {
Leg::A => self.buffer_a.range(..end_ts).map(|(k, _)| *k).collect(),
Leg::B => self.buffer_b.range(..end_ts).map(|(k, _)| *k).collect(),
};
for key in overlapping_keys {
let data = match leg {
Leg::A => self.buffer_a.remove(&key),
Leg::B => self.buffer_b.remove(&key),
};
let Some(data) = data else {
continue;
};
let block_end = key.saturating_add(self.block_span_samples(&data));
if block_end <= start_ts || key >= end_ts {
match leg {
Leg::A => {
self.buffer_a.insert(key, data);
}
Leg::B => {
self.buffer_b.insert(key, data);
}
}
continue;
}
if key < start_ts
&& let Some(prefix) = self.trim_back(&data, start_ts - key)
{
match leg {
Leg::A => {
self.buffer_a.insert(key, prefix);
}
Leg::B => {
self.buffer_b.insert(key, prefix);
}
}
}
if block_end > end_ts
&& let Some((trimmed_samples, suffix)) =
self.trim_front(&data, end_ts.saturating_sub(key))
{
let suffix_ts = key.saturating_add(trimmed_samples);
match leg {
Leg::A => {
self.buffer_a.insert(suffix_ts, suffix);
}
Leg::B => {
self.buffer_b.insert(suffix_ts, suffix);
}
}
}
}
match leg {
Leg::A => {
self.buffer_a.insert(start_ts, encoded);
}
Leg::B => {
self.buffer_b.insert(start_ts, encoded);
}
}
}
fn insert_audio_block(&mut self, leg: Leg, start_ts: u32, encoded: Bytes) {
let mut inserts = vec![(start_ts, encoded)];
if let Some(state) = self.active_dtmf_state(leg) {
let dtmf_start = state.absolute_timestamp;
let dtmf_end = state
.absolute_timestamp
.saturating_add(state.duration_samples);
let mut next_inserts = Vec::new();
for (ts, data) in inserts {
let block_end = ts.saturating_add(self.block_span_samples(&data));
if block_end <= dtmf_start || ts >= dtmf_end {
next_inserts.push((ts, data));
continue;
}
if ts < dtmf_start
&& let Some(prefix) = self.trim_back(&data, dtmf_start - ts)
{
next_inserts.push((ts, prefix));
}
if block_end > dtmf_end
&& let Some((trimmed_samples, suffix)) =
self.trim_front(&data, dtmf_end.saturating_sub(ts))
{
next_inserts.push((ts.saturating_add(trimmed_samples), suffix));
}
}
inserts = next_inserts;
}
for (ts, data) in inserts {
match leg {
Leg::A => {
self.buffer_a.insert(ts, data);
}
Leg::B => {
self.buffer_b.insert(ts, data);
}
}
}
}
pub fn write_dtmf_payload(
&mut self,
leg: Leg,
payload: &[u8],
timestamp: u32,
clock_rate: u32,
) -> Result<()> {
if payload.len() < 4 {
return Ok(());
}
let digit_code = payload[0];
let Some(digit) = crate::media::telephone_event::dtmf_code_to_char(digit_code) else {
return Ok(());
};
let end_bit = (payload[1] & 0x80) != 0;
let duration = u16::from_be_bytes([payload[2], payload[3]]) as u32;
let duration_samples =
(duration as u64 * self.sample_rate as u64).div_ceil(clock_rate.max(1) as u64) as u32;
let duration_ms = (duration as u64 * 1000).div_ceil(clock_rate.max(1) as u64) as u32;
if let Some(state) = self.active_dtmf_state(leg)
&& state.digit_code == digit_code
&& state.rtp_timestamp == timestamp
&& duration_samples <= state.duration_samples
{
return Ok(());
}
let absolute_ts = self.timestamp_to_absolute(leg, timestamp, clock_rate);
debug!(
leg = ?leg,
digit = %digit,
duration_ms = %duration_ms,
duration_samples,
"Recording DTMF digit"
);
let pcm = self
.dtmf_gen
.generate_samples(digit, duration_samples as usize);
let encoded = if let Some(enc) = self.encoder.as_mut() {
Bytes::from(enc.encode(&pcm))
} else {
Bytes::from(audio_codec::samples_to_bytes(&pcm))
};
let end_ts = absolute_ts.saturating_add(duration_samples);
self.overlay_dtmf_range(leg, absolute_ts, end_ts, encoded);
self.set_dtmf_state(
leg,
DtmfEventState {
digit_code,
rtp_timestamp: timestamp,
absolute_timestamp: absolute_ts,
duration_samples,
},
);
if end_bit {
self.flush()?;
}
Ok(())
}
pub fn write_dtmf(
&mut self,
leg: Leg,
digit: char,
duration_ms: u32,
timestamp: Option<u32>,
timestamp_clock_rate: Option<u32>,
) -> Result<()> {
let pcm = self.dtmf_gen.generate(digit, duration_ms);
debug!(
"Recording DTMF: leg={:?}, digit={}, duration={}ms, samples={}",
leg,
digit,
duration_ms,
pcm.len()
);
let ts = if let Some(t) = timestamp {
self.timestamp_to_absolute(leg, t, timestamp_clock_rate.unwrap_or(self.sample_rate))
} else {
match leg {
Leg::A => self
.buffer_a
.last_key_value()
.map(|(k, v)| k + v.len() as u32)
.unwrap_or(self.next_flush_ts),
Leg::B => self
.buffer_b
.last_key_value()
.map(|(k, v)| k + v.len() as u32)
.unwrap_or(self.next_flush_ts),
}
};
let encoded = if let Some(enc) = self.encoder.as_mut() {
Bytes::from(enc.encode(&pcm))
} else {
Bytes::from(audio_codec::samples_to_bytes(&pcm))
};
let end_ts = ts.saturating_add(self.block_span_samples(&encoded));
self.overlay_dtmf_range(leg, ts, end_ts, encoded);
self.flush()?;
Ok(())
}
fn flush(&mut self) -> Result<()> {
self.last_flush = Instant::now();
let max_ts_a = self
.buffer_a
.last_key_value()
.map(|(k, v)| {
let (_, bytes_per_block) = self.block_info();
let samples_per_block = self.samples_per_block();
k + (v.len() / bytes_per_block) as u32 * samples_per_block
})
.unwrap_or(0);
let max_ts_b = self
.buffer_b
.last_key_value()
.map(|(k, v)| {
let (_, bytes_per_block) = self.block_info();
let samples_per_block = self.samples_per_block();
k + (v.len() / bytes_per_block) as u32 * samples_per_block
})
.unwrap_or(0);
let max_available_ts = max_ts_a.max(max_ts_b);
if max_available_ts <= self.next_flush_ts {
return Ok(());
}
let mut flush_len = max_available_ts - self.next_flush_ts;
let max_flush_samples = self.sample_rate * 10;
if flush_len > max_flush_samples {
flush_len = max_flush_samples;
}
let (samples_per_block, _) = self.block_info();
flush_len = (flush_len / samples_per_block) * samples_per_block;
if flush_len == 0 {
return Ok(());
}
let data_a = self.get_leg_data(Leg::A, flush_len)?;
let data_b = self.get_leg_data(Leg::B, flush_len)?;
self.next_flush_ts += flush_len;
let output = if self.channels == 1 {
self.mix(&data_a, &data_b)?
} else {
self.interleave(&data_a, &data_b)?
};
self.writer.write_packet(&output, flush_len as usize)?;
self.written_bytes += output.len() as u32;
self.written_samples += flush_len as u64;
Ok(())
}
pub fn finalize(&mut self) -> Result<()> {
self.flush()?;
self.writer.finalize()?;
Ok(())
}
fn block_info(&self) -> (u32, usize) {
match self.codec {
CodecType::G729 => (80, 10),
CodecType::PCMU | CodecType::PCMA => (1, 1),
CodecType::G722 => (2, 1),
CodecType::Opus => (1, 2), _ => (1, 2), }
}
fn samples_per_block(&self) -> u32 {
self.block_info().0
}
fn get_silence_bytes(&mut self, _leg: Leg, blocks: usize) -> Result<Bytes> {
let (samples_per_block, _) = self.block_info();
let mut num_samples = (blocks as u32 * samples_per_block) as usize;
let max_samples = (self.sample_rate * 10) as usize;
if num_samples > max_samples {
num_samples = max_samples;
}
let silence_pcm = vec![0i16; num_samples];
if let Some(enc) = self.encoder.as_mut() {
Ok(enc.encode(&silence_pcm).into())
} else {
Ok(audio_codec::samples_to_bytes(&silence_pcm).into())
}
}
fn get_leg_data(&mut self, leg: Leg, samples: u32) -> Result<Vec<u8>> {
let (samples_per_block, bytes_per_block) = self.block_info();
let num_blocks = samples / samples_per_block;
let mut result = Vec::with_capacity((num_blocks * bytes_per_block as u32) as usize);
let mut current_ts = self.next_flush_ts;
let flush_to = self.next_flush_ts + samples;
while current_ts < flush_to {
let next_ts = match leg {
Leg::A => self.buffer_a.first_key_value().map(|(k, _)| *k),
Leg::B => self.buffer_b.first_key_value().map(|(k, _)| *k),
};
if let Some(ts) = next_ts {
if ts < flush_to {
if ts > current_ts {
let mut gap_samples = ts - current_ts;
let max_gap = self.sample_rate; if gap_samples > max_gap {
debug!(
"Recorder gap too large: {} samples, capping to {}",
gap_samples, max_gap
);
gap_samples = max_gap;
}
let gap_blocks = gap_samples / samples_per_block;
result.extend(self.get_silence_bytes(leg, gap_blocks as usize)?);
current_ts = ts;
}
let data = match leg {
Leg::A => self.buffer_a.pop_first().unwrap().1,
Leg::B => self.buffer_b.pop_first().unwrap().1,
};
let data_samples = (data.len() / bytes_per_block) as u32 * samples_per_block;
if current_ts + data_samples > flush_to {
let keep_samples = flush_to - current_ts;
let keep_blocks = keep_samples / samples_per_block;
let keep_bytes = keep_blocks as usize * bytes_per_block;
if keep_bytes > 0 {
result.extend_from_slice(&data[..keep_bytes]);
let rest_bytes = &data[keep_bytes..];
if !rest_bytes.is_empty() {
let rest_ts = current_ts + (keep_blocks * samples_per_block);
match leg {
Leg::A => self
.buffer_a
.insert(rest_ts, Bytes::copy_from_slice(rest_bytes)),
Leg::B => self
.buffer_b
.insert(rest_ts, Bytes::copy_from_slice(rest_bytes)),
};
}
current_ts += keep_blocks * samples_per_block;
} else {
match leg {
Leg::A => self.buffer_a.insert(ts, data),
Leg::B => self.buffer_b.insert(ts, data),
};
break;
}
} else {
result.extend_from_slice(&data);
current_ts += data_samples;
}
} else {
break;
}
} else {
break;
}
}
if current_ts < flush_to {
let gap_samples = flush_to - current_ts;
let gap_blocks = gap_samples / samples_per_block;
result.extend(self.get_silence_bytes(leg, gap_blocks as usize)?);
}
Ok(result)
}
fn interleave(&mut self, data_a: &[u8], data_b: &[u8]) -> Result<Vec<u8>> {
let (_, bytes_per_block) = self.block_info();
let len = data_a.len().min(data_b.len());
let mut interleaved = Vec::with_capacity(len * 2);
let num_blocks = len / bytes_per_block;
for i in 0..num_blocks {
interleaved.extend_from_slice(&data_a[i * bytes_per_block..(i + 1) * bytes_per_block]);
interleaved.extend_from_slice(&data_b[i * bytes_per_block..(i + 1) * bytes_per_block]);
}
Ok(interleaved)
}
fn mix(&mut self, data_a: &[u8], data_b: &[u8]) -> Result<Vec<u8>> {
let (_, bytes_per_block) = self.block_info();
let len = data_a.len().min(data_b.len());
let mut mixed = Vec::with_capacity(len);
match self.codec {
CodecType::PCMU | CodecType::PCMA => {
let mut decoder_a = create_decoder(self.codec);
let pcm_a = decoder_a.decode(data_a);
let mut decoder_b = create_decoder(self.codec);
let pcm_b = decoder_b.decode(data_b);
let mut pcm_mixed = Vec::with_capacity(pcm_a.len().min(pcm_b.len()));
for i in 0..pcm_a.len().min(pcm_b.len()) {
let sample = ((pcm_a[i] as i32 + pcm_b[i] as i32) / 2) as i16;
pcm_mixed.push(sample);
}
if let Some(enc) = self.encoder.as_mut() {
mixed = enc.encode(&pcm_mixed);
} else {
mixed = audio_codec::samples_to_bytes(&pcm_mixed);
}
}
_ => {
let num_blocks = len / bytes_per_block;
for i in 0..num_blocks {
let start = i * bytes_per_block;
let end = (i + 1) * bytes_per_block;
for j in start..end.min(data_a.len()).min(data_b.len()) {
let sample = ((data_a[j] as i16 + data_b[j] as i16) / 2) as u8;
mixed.push(sample);
}
}
}
}
Ok(mixed)
}
}
impl Drop for Recorder {
fn drop(&mut self) {
let _ = self.finalize();
}
}
pub struct DtmfGenerator {
sample_rate: u32,
}
impl DtmfGenerator {
pub fn new(sample_rate: u32) -> Self {
Self { sample_rate }
}
pub fn generate(&self, digit: char, duration_ms: u32) -> Vec<i16> {
let num_samples = (self.sample_rate as f32 * (duration_ms as f32 / 1000.0)) as usize;
self.generate_samples(digit, num_samples)
}
pub fn generate_samples(&self, digit: char, num_samples: usize) -> Vec<i16> {
let freqs = match digit {
'1' => (697.0, 1209.0),
'2' => (697.0, 1336.0),
'3' => (697.0, 1477.0),
'4' => (770.0, 1209.0),
'5' => (770.0, 1336.0),
'6' => (770.0, 1477.0),
'7' => (852.0, 1209.0),
'8' => (852.0, 1336.0),
'9' => (852.0, 1477.0),
'*' => (941.0, 1209.0),
'0' => (941.0, 1336.0),
'#' => (941.0, 1477.0),
'A' => (697.0, 1633.0),
'B' => (770.0, 1633.0),
'C' => (852.0, 1633.0),
'D' => (941.0, 1633.0),
_ => return Vec::new(),
};
let mut samples = Vec::with_capacity(num_samples);
for i in 0..num_samples {
let t = i as f32 / self.sample_rate as f32;
let s1 = (2.0 * std::f32::consts::PI * freqs.0 * t).sin();
let s2 = (2.0 * std::f32::consts::PI * freqs.1 * t).sin();
let s = (s1 + s2) / 2.0;
samples.push((s * 32767.0) as i16);
}
samples
}
}
#[cfg(test)]
mod tests {
use super::*;
use audio_codec::{CodecType, create_decoder, create_encoder};
#[test]
fn test_mix_pcmu_both_silent() {
let mut recorder = create_test_recorder(CodecType::PCMU, 1);
let pcm_silent = vec![0i16; 160];
let mut encoder = create_encoder(CodecType::PCMU);
let data_a = encoder.encode(&pcm_silent);
let data_b = encoder.encode(&pcm_silent);
let mixed = recorder.mix(&data_a, &data_b).unwrap();
let mut decoder = create_decoder(CodecType::PCMU);
let pcm_result = decoder.decode(&mixed);
let max_sample = pcm_result.iter().map(|&s| s.abs()).max().unwrap_or(0);
assert!(
max_sample < 100,
"Mixed silent audio should remain silent, got max={}",
max_sample
);
}
#[test]
fn test_mix_pcmu_one_silent() {
let mut recorder = create_test_recorder(CodecType::PCMU, 1);
let pcm_silent = vec![0i16; 160];
let pcm_active: Vec<i16> = (0..160)
.map(|i| ((i as f32 / 10.0).sin() * 5000.0) as i16)
.collect();
let mut encoder_a = create_encoder(CodecType::PCMU);
let mut encoder_b = create_encoder(CodecType::PCMU);
let data_a = encoder_a.encode(&pcm_silent);
let data_b = encoder_b.encode(&pcm_active);
let mixed = recorder.mix(&data_a, &data_b).unwrap();
let mut decoder = create_decoder(CodecType::PCMU);
let pcm_result = decoder.decode(&mixed);
let mut decoder_b = create_decoder(CodecType::PCMU);
let pcm_b_decoded = decoder_b.decode(&data_b);
let avg_mixed: i32 = pcm_result
.iter()
.take(100)
.map(|&s| s.abs() as i32)
.sum::<i32>()
/ 100;
let avg_original: i32 = pcm_b_decoded
.iter()
.take(100)
.map(|&s| s.abs() as i32)
.sum::<i32>()
/ 100;
assert!(
avg_original > 100,
"Original signal should be non-zero, got {}",
avg_original
);
assert!(
avg_mixed > 50,
"Mixed signal should be non-zero, got {}",
avg_mixed
);
let ratio = avg_mixed as f32 / avg_original as f32;
assert!(
ratio > 0.35 && ratio < 0.65,
"Mixed amplitude should be ~0.5x original, got ratio={} (mixed={}, orig={})",
ratio,
avg_mixed,
avg_original
);
}
#[test]
fn test_mix_pcmu_both_active() {
let mut recorder = create_test_recorder(CodecType::PCMU, 1);
let pcm_a: Vec<i16> = (0..160)
.map(|i| (i as f32 * 50.0).sin() as i16 * 2000)
.collect();
let pcm_b: Vec<i16> = (0..160)
.map(|i| (i as f32 * 70.0).sin() as i16 * 3000)
.collect();
let mut encoder_a = create_encoder(CodecType::PCMU);
let mut encoder_b = create_encoder(CodecType::PCMU);
let data_a = encoder_a.encode(&pcm_a);
let data_b = encoder_b.encode(&pcm_b);
let mixed = recorder.mix(&data_a, &data_b).unwrap();
assert_eq!(mixed.len(), data_a.len());
assert_eq!(mixed.len(), data_b.len());
let mut decoder_a = create_decoder(CodecType::PCMU);
let mut decoder_b = create_decoder(CodecType::PCMU);
let mut decoder_mixed = create_decoder(CodecType::PCMU);
let pcm_a_decoded = decoder_a.decode(&data_a);
let pcm_b_decoded = decoder_b.decode(&data_b);
let pcm_mixed = decoder_mixed.decode(&mixed);
let sample_idx = 50; let expected =
((pcm_a_decoded[sample_idx] as i32 + pcm_b_decoded[sample_idx] as i32) / 2) as i16;
let actual = pcm_mixed[sample_idx];
let diff = (expected - actual).abs();
assert!(
diff < 200,
"Mixed sample should be average of inputs, expected ~{}, got {}, diff={}",
expected,
actual,
diff
);
}
#[test]
fn test_interleave_pcmu() {
let mut recorder = create_test_recorder(CodecType::PCMU, 2);
let data_a: Vec<u8> = (0..160).map(|i| (i % 256) as u8).collect();
let data_b: Vec<u8> = (0..160).map(|i| ((i + 128) % 256) as u8).collect();
let interleaved = recorder.interleave(&data_a, &data_b).unwrap();
assert_eq!(interleaved.len(), data_a.len() + data_b.len());
for i in 0..10 {
assert_eq!(
interleaved[i * 2],
data_a[i],
"Interleaved data should alternate: A at position {}",
i * 2
);
assert_eq!(
interleaved[i * 2 + 1],
data_b[i],
"Interleaved data should alternate: B at position {}",
i * 2 + 1
);
}
}
#[test]
fn test_channel_selection_mono() {
let recorder = create_test_recorder(CodecType::PCMU, 1);
assert_eq!(recorder.channels, 1, "Mono recorder should have 1 channel");
}
#[test]
fn test_channel_selection_stereo() {
let recorder = create_test_recorder(CodecType::Opus, 2);
assert_eq!(recorder.channels, 2, "Opus recorder should have 2 channels");
let recorder_g722 = create_test_recorder(CodecType::G722, 2);
assert_eq!(
recorder_g722.channels, 2,
"G722 recorder should have 2 channels"
);
}
#[test]
fn test_mix_prevents_clipping() {
let mut recorder = create_test_recorder(CodecType::PCMU, 1);
let pcm_high = vec![20000i16; 160];
let mut encoder_a = create_encoder(CodecType::PCMU);
let mut encoder_b = create_encoder(CodecType::PCMU);
let data_a = encoder_a.encode(&pcm_high);
let data_b = encoder_b.encode(&pcm_high);
let mixed = recorder.mix(&data_a, &data_b).unwrap();
let mut decoder = create_decoder(CodecType::PCMU);
let pcm_result = decoder.decode(&mixed);
let avg_result: i32 =
pcm_result.iter().map(|&s| s as i32).sum::<i32>() / pcm_result.len() as i32;
assert!(
avg_result > 15000 && avg_result < 25000,
"Mixed high-amplitude audio should not clip, avg={}",
avg_result
);
}
fn create_test_recorder(codec: CodecType, channels: u16) -> Recorder {
let sample_rate = codec.samplerate();
let encoder = Some(create_encoder(codec));
Recorder {
path: "test.wav".to_string(),
codec,
written_bytes: 0,
sample_rate,
channels,
dtmf_gen: DtmfGenerator::new(sample_rate),
encoder,
decoders: HashMap::new(),
resamplers: HashMap::new(),
buffer_a: BTreeMap::new(),
buffer_b: BTreeMap::new(),
start_instant: Instant::now(),
last_flush: Instant::now(),
base_timestamp_a: None,
base_timestamp_b: None,
start_offset_a: 0,
start_offset_b: 0,
last_ssrc_a: None,
last_ssrc_b: None,
profile_a: NegotiatedLegProfile::default(),
profile_b: NegotiatedLegProfile::default(),
dtmf_state_a: None,
dtmf_state_b: None,
next_flush_ts: 0,
written_samples: 0,
writer: Box::new(TestWriter::new()),
ptime: Duration::from_millis(20),
}
}
struct TestWriter {
data: Vec<u8>,
}
impl TestWriter {
fn new() -> Self {
Self { data: Vec::new() }
}
}
impl StreamWriter for TestWriter {
fn write_header(&mut self) -> Result<()> {
Ok(())
}
fn write_packet(&mut self, data: &[u8], _samples: usize) -> Result<()> {
self.data.extend_from_slice(data);
Ok(())
}
fn finalize(&mut self) -> Result<()> {
Ok(())
}
}
}