use crate::rtp::dtmf::DtmfDigit;
use crate::rtp::RtpHeader;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DtmfEventPayload {
pub event: u8,
pub end: bool,
pub volume: u8,
pub duration_ticks: u16,
}
pub fn parse_event_payload(buf: &[u8]) -> Option<DtmfEventPayload> {
if buf.len() < 4 {
return None;
}
Some(DtmfEventPayload {
event: buf[0],
end: buf[1] & 0x80 != 0,
volume: buf[1] & 0x3F,
duration_ticks: u16::from_be_bytes([buf[2], buf[3]]),
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DtmfEvent {
Pressed {
digit: DtmfDigit,
},
Released {
digit: DtmfDigit,
duration_ticks: u16,
},
}
#[derive(Debug, Clone, Copy)]
struct ActiveEvent {
ssrc: u32,
timestamp: u32,
event: u8,
released: bool,
}
#[derive(Debug)]
pub struct DtmfReceiver {
payload_type: u8,
active: Option<ActiveEvent>,
}
impl DtmfReceiver {
pub fn new(payload_type: u8) -> Self {
Self {
payload_type,
active: None,
}
}
pub fn push(&mut self, packet: &[u8]) -> Option<DtmfEvent> {
let header = RtpHeader::parse(packet)?;
if header.payload_type != self.payload_type {
return None;
}
let payload = parse_event_payload(packet.get(header.header_len()..)?)?;
let digit = DtmfDigit::from_event_code(payload.event)?;
if let Some(active) = &mut self.active {
let same_event = active.ssrc == header.ssrc
&& active.timestamp == header.timestamp
&& active.event == payload.event;
if same_event {
if payload.end && !active.released {
active.released = true;
return Some(DtmfEvent::Released {
digit,
duration_ticks: payload.duration_ticks,
});
}
return None;
}
if active.ssrc == header.ssrc && !timestamp_newer(header.timestamp, active.timestamp) {
return None;
}
}
self.active = Some(ActiveEvent {
ssrc: header.ssrc,
timestamp: header.timestamp,
event: payload.event,
released: false,
});
Some(DtmfEvent::Pressed { digit })
}
}
fn timestamp_newer(a: u32, b: u32) -> bool {
let forward = a.wrapping_sub(b);
forward != 0 && forward < 0x8000_0000
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rtp::dtmf::{
build_event_payload, build_rtp_dtmf_packet, send_dtmf_burst, DtmfBurstConfig,
};
const PT: u8 = 101;
const SSRC: u32 = 0xDEAD_BEEF;
#[test]
fn parse_round_trips_with_build() {
for code in 0u8..16 {
for end in [false, true] {
let built = build_event_payload(code, end, 10, 800);
let parsed = parse_event_payload(&built).expect("4 bytes parse");
assert_eq!(parsed.event, code);
assert_eq!(parsed.end, end);
assert_eq!(parsed.volume, 10);
assert_eq!(parsed.duration_ticks, 800);
}
}
}
#[test]
fn parse_extracts_volume_and_extreme_durations() {
let p = parse_event_payload(&build_event_payload(11, true, 63, u16::MAX)).unwrap();
assert_eq!(p.event, 11);
assert!(p.end);
assert_eq!(p.volume, 63);
assert_eq!(p.duration_ticks, u16::MAX);
let p = parse_event_payload(&build_event_payload(0, false, 0, 0)).unwrap();
assert!(!p.end);
assert_eq!(p.volume, 0);
assert_eq!(p.duration_ticks, 0);
}
#[test]
fn parse_rejects_short_buffer() {
assert_eq!(parse_event_payload(&[]), None);
assert_eq!(parse_event_payload(&[5]), None);
assert_eq!(parse_event_payload(&[5, 0x0A, 0x00]), None);
}
fn synth_burst(
digit: DtmfDigit,
ssrc: u32,
start_seq: u16,
timestamp: u32,
total_ticks: u16,
) -> Vec<[u8; 16]> {
let event = digit.event_code();
let mut out = Vec::new();
let mut seq = start_seq;
let mut dur = 160u16;
for i in 0..3 {
let payload = build_event_payload(event, false, 10, dur);
out.push(build_rtp_dtmf_packet(
PT,
i == 0,
seq,
timestamp,
ssrc,
payload,
));
seq = seq.wrapping_add(1);
}
for _ in 1..total_ticks {
dur += 160;
let payload = build_event_payload(event, false, 10, dur);
out.push(build_rtp_dtmf_packet(
PT, false, seq, timestamp, ssrc, payload,
));
seq = seq.wrapping_add(1);
}
for _ in 0..3 {
let payload = build_event_payload(event, true, 10, dur);
out.push(build_rtp_dtmf_packet(
PT, false, seq, timestamp, ssrc, payload,
));
seq = seq.wrapping_add(1);
}
out
}
fn feed(rx: &mut DtmfReceiver, packets: &[[u8; 16]]) -> Vec<DtmfEvent> {
packets.iter().filter_map(|p| rx.push(p)).collect()
}
#[test]
fn full_burst_emits_one_press_and_one_release() {
let mut rx = DtmfReceiver::new(PT);
let events = feed(&mut rx, &synth_burst(DtmfDigit::D5, SSRC, 100, 5000, 5));
assert_eq!(
events,
vec![
DtmfEvent::Pressed {
digit: DtmfDigit::D5
},
DtmfEvent::Released {
digit: DtmfDigit::D5,
duration_ticks: 800,
},
]
);
}
#[test]
fn redundant_start_copies_do_not_repeat_the_press() {
let mut rx = DtmfReceiver::new(PT);
let burst = synth_burst(DtmfDigit::Star, SSRC, 0, 0, 5);
let events = feed(&mut rx, &burst[..3]);
assert_eq!(
events,
vec![DtmfEvent::Pressed {
digit: DtmfDigit::Star
}]
);
}
#[test]
fn lost_marker_packet_still_registers_the_press() {
let mut rx = DtmfReceiver::new(PT);
let burst = synth_burst(DtmfDigit::D7, SSRC, 100, 5000, 5);
let events = feed(&mut rx, &burst[1..]);
assert_eq!(events.len(), 2);
assert_eq!(
events[0],
DtmfEvent::Pressed {
digit: DtmfDigit::D7
}
);
}
#[test]
fn lost_end_packets_then_next_digit_emits_both_presses() {
let mut rx = DtmfReceiver::new(PT);
let mut burst1 = synth_burst(DtmfDigit::D1, SSRC, 0, 0, 5);
burst1.truncate(burst1.len() - 3); let burst2 = synth_burst(DtmfDigit::D2, SSRC, 7, 800, 5);
let mut events = feed(&mut rx, &burst1);
events.extend(feed(&mut rx, &burst2));
assert_eq!(
events,
vec![
DtmfEvent::Pressed {
digit: DtmfDigit::D1
},
DtmfEvent::Pressed {
digit: DtmfDigit::D2
},
DtmfEvent::Released {
digit: DtmfDigit::D2,
duration_ticks: 800,
},
]
);
}
#[test]
fn interleaved_audio_packets_are_ignored() {
let mut rx = DtmfReceiver::new(PT);
let burst = synth_burst(DtmfDigit::Pound, SSRC, 0, 0, 2);
let mut audio = vec![0u8; 12 + 160];
audio[0] = 0x80;
audio[1] = 0; audio[8..12].copy_from_slice(&0xAAAA_AAAAu32.to_be_bytes());
let mut events = Vec::new();
for (i, pkt) in burst.iter().enumerate() {
events.extend(rx.push(pkt));
if i % 2 == 0 {
assert_eq!(rx.push(&audio), None, "audio must not decode");
}
}
assert_eq!(events.len(), 2, "one press + one release despite audio");
}
#[test]
fn wrong_payload_type_is_ignored() {
let mut rx = DtmfReceiver::new(PT);
let event = build_event_payload(5, false, 10, 160);
let pkt = build_rtp_dtmf_packet(96, true, 0, 0, SSRC, event);
assert_eq!(rx.push(&pkt), None);
}
#[test]
fn non_digit_event_codes_are_ignored() {
let mut rx = DtmfReceiver::new(PT);
let event = build_event_payload(16, false, 10, 160);
let pkt = build_rtp_dtmf_packet(PT, true, 0, 0, SSRC, event);
assert_eq!(rx.push(&pkt), None);
}
#[test]
fn malformed_packets_are_ignored() {
let mut rx = DtmfReceiver::new(PT);
assert_eq!(rx.push(&[]), None);
assert_eq!(rx.push(&[0u8; 11]), None, "short of an RTP header");
let full = build_rtp_dtmf_packet(PT, true, 0, 0, SSRC, [5, 0x0A, 0, 160]);
assert_eq!(rx.push(&full[..14]), None);
}
#[test]
fn two_back_to_back_digits_emit_two_presses() {
let mut rx = DtmfReceiver::new(PT);
let burst1 = synth_burst(DtmfDigit::D5, SSRC, 0, 0, 2); let burst2 = synth_burst(DtmfDigit::D7, SSRC, 7, 320, 2);
let mut events = feed(&mut rx, &burst1);
events.extend(feed(&mut rx, &burst2));
assert_eq!(
events,
vec![
DtmfEvent::Pressed {
digit: DtmfDigit::D5
},
DtmfEvent::Released {
digit: DtmfDigit::D5,
duration_ticks: 320,
},
DtmfEvent::Pressed {
digit: DtmfDigit::D7
},
DtmfEvent::Released {
digit: DtmfDigit::D7,
duration_ticks: 320,
},
]
);
}
#[test]
fn out_of_order_straggler_from_previous_event_is_dropped() {
let mut rx = DtmfReceiver::new(PT);
let burst1 = synth_burst(DtmfDigit::D5, SSRC, 0, 0, 2);
let burst2 = synth_burst(DtmfDigit::D7, SSRC, 7, 320, 2);
assert_eq!(feed(&mut rx, &burst1).len(), 2);
assert_eq!(
rx.push(&burst2[0]),
Some(DtmfEvent::Pressed {
digit: DtmfDigit::D7
})
);
let straggler = burst1.last().unwrap();
assert_eq!(rx.push(straggler), None, "straggler must not re-press 5");
let events = feed(&mut rx, &burst2[1..]);
assert_eq!(
events,
vec![DtmfEvent::Released {
digit: DtmfDigit::D7,
duration_ticks: 320,
}]
);
}
#[test]
fn ssrc_change_starts_a_fresh_stream() {
let mut rx = DtmfReceiver::new(PT);
assert_eq!(
feed(&mut rx, &synth_burst(DtmfDigit::D9, SSRC, 0, 90_000, 2)).len(),
2
);
let events = feed(&mut rx, &synth_burst(DtmfDigit::D3, 0x1234_5678, 0, 100, 2));
assert_eq!(
events[0],
DtmfEvent::Pressed {
digit: DtmfDigit::D3
}
);
assert_eq!(events.len(), 2);
}
#[test]
fn first_seen_end_packet_still_reports_the_press() {
let mut rx = DtmfReceiver::new(PT);
let burst = synth_burst(DtmfDigit::D8, SSRC, 0, 0, 5);
let ends = &burst[burst.len() - 3..];
let events = feed(&mut rx, ends);
assert_eq!(
events,
vec![
DtmfEvent::Pressed {
digit: DtmfDigit::D8
},
DtmfEvent::Released {
digit: DtmfDigit::D8,
duration_ticks: 800,
},
]
);
}
#[test]
fn timestamp_newer_handles_wraparound() {
assert!(timestamp_newer(1, 0));
assert!(!timestamp_newer(0, 1));
assert!(!timestamp_newer(5, 5));
assert!(timestamp_newer(100, u32::MAX - 100));
assert!(!timestamp_newer(u32::MAX - 100, 100));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn round_trips_with_send_dtmf_burst_over_loopback() {
use std::sync::Arc;
use tokio::net::UdpSocket;
let sender = Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap());
let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let remote = receiver.local_addr().unwrap();
let cfg = DtmfBurstConfig {
payload_type: PT,
ssrc: SSRC,
initial_seq: 100,
initial_timestamp: 5000,
hold_duration_ms: 100, volume_dbm0: 10,
};
let send = tokio::spawn(send_dtmf_burst(sender, remote, cfg, DtmfDigit::D5));
let mut rx = DtmfReceiver::new(PT);
let mut events = Vec::new();
let mut buf = [0u8; 64];
for _ in 0..10 {
let (n, _) = tokio::time::timeout(
std::time::Duration::from_millis(500),
receiver.recv_from(&mut buf),
)
.await
.expect("packet arrived in time")
.expect("recv ok");
events.extend(rx.push(&buf[..n]));
}
send.await.unwrap().unwrap();
assert_eq!(
events,
vec![
DtmfEvent::Pressed {
digit: DtmfDigit::D5
},
DtmfEvent::Released {
digit: DtmfDigit::D5,
duration_ticks: 800, },
]
);
}
}