use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::time::{sleep, Duration};
use tracing::debug;
pub const DEFAULT_VOLUME_DBM0: u8 = 10;
const TICKS_PER_PACKET: u16 = 160;
const PACKET_INTERVAL: Duration = Duration::from_millis(20);
const REDUNDANCY: u8 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DtmfDigit {
D0,
D1,
D2,
D3,
D4,
D5,
D6,
D7,
D8,
D9,
Star,
Pound,
A,
B,
C,
D,
}
impl DtmfDigit {
pub fn from_char(c: char) -> Option<Self> {
Some(match c {
'0' => Self::D0,
'1' => Self::D1,
'2' => Self::D2,
'3' => Self::D3,
'4' => Self::D4,
'5' => Self::D5,
'6' => Self::D6,
'7' => Self::D7,
'8' => Self::D8,
'9' => Self::D9,
'*' => Self::Star,
'#' => Self::Pound,
'a' | 'A' => Self::A,
'b' | 'B' => Self::B,
'c' | 'C' => Self::C,
'd' | 'D' => Self::D,
_ => return None,
})
}
pub fn as_char(self) -> char {
match self {
Self::D0 => '0',
Self::D1 => '1',
Self::D2 => '2',
Self::D3 => '3',
Self::D4 => '4',
Self::D5 => '5',
Self::D6 => '6',
Self::D7 => '7',
Self::D8 => '8',
Self::D9 => '9',
Self::Star => '*',
Self::Pound => '#',
Self::A => 'A',
Self::B => 'B',
Self::C => 'C',
Self::D => 'D',
}
}
pub fn from_event_code(code: u8) -> Option<Self> {
Some(match code {
0 => Self::D0,
1 => Self::D1,
2 => Self::D2,
3 => Self::D3,
4 => Self::D4,
5 => Self::D5,
6 => Self::D6,
7 => Self::D7,
8 => Self::D8,
9 => Self::D9,
10 => Self::Star,
11 => Self::Pound,
12 => Self::A,
13 => Self::B,
14 => Self::C,
15 => Self::D,
_ => return None,
})
}
pub fn event_code(self) -> u8 {
match self {
Self::D0 => 0,
Self::D1 => 1,
Self::D2 => 2,
Self::D3 => 3,
Self::D4 => 4,
Self::D5 => 5,
Self::D6 => 6,
Self::D7 => 7,
Self::D8 => 8,
Self::D9 => 9,
Self::Star => 10,
Self::Pound => 11,
Self::A => 12,
Self::B => 13,
Self::C => 14,
Self::D => 15,
}
}
}
pub fn build_event_payload(event: u8, end: bool, volume: u8, duration_ticks: u16) -> [u8; 4] {
let mut out = [0u8; 4];
out[0] = event;
let end_bit = if end { 0x80 } else { 0x00 };
out[1] = end_bit | (volume & 0x3F);
let dur = duration_ticks.to_be_bytes();
out[2] = dur[0];
out[3] = dur[1];
out
}
#[derive(Debug, Clone, Copy)]
pub struct DtmfBurstConfig {
pub payload_type: u8,
pub ssrc: u32,
pub initial_seq: u16,
pub initial_timestamp: u32,
pub hold_duration_ms: u32,
pub volume_dbm0: u8,
}
pub fn build_rtp_dtmf_packet(
payload_type: u8,
marker: bool,
seq: u16,
timestamp: u32,
ssrc: u32,
event_payload: [u8; 4],
) -> [u8; 16] {
let mut pkt = [0u8; 16];
pkt[0] = 0x80;
let marker_bit = if marker { 0x80 } else { 0x00 };
pkt[1] = marker_bit | (payload_type & 0x7F);
pkt[2..4].copy_from_slice(&seq.to_be_bytes());
pkt[4..8].copy_from_slice(×tamp.to_be_bytes());
pkt[8..12].copy_from_slice(&ssrc.to_be_bytes());
pkt[12..16].copy_from_slice(&event_payload);
pkt
}
pub async fn send_dtmf_burst(
socket: Arc<UdpSocket>,
remote: SocketAddr,
config: DtmfBurstConfig,
digit: DtmfDigit,
) -> Result<(u16, u32), std::io::Error> {
let event = digit.event_code();
let volume = config.volume_dbm0.min(0x3F);
debug!(
"DTMF burst '{}' → {remote} (PT={}, SSRC=0x{:08X}, hold={}ms)",
digit.as_char(),
config.payload_type,
config.ssrc,
config.hold_duration_ms,
);
let total_packets = config.hold_duration_ms.div_ceil(20).max(1) as u16;
let total_duration_ticks = total_packets.saturating_mul(TICKS_PER_PACKET);
let mut seq = config.initial_seq;
let mut current_duration = TICKS_PER_PACKET;
for i in 0..REDUNDANCY {
let payload = build_event_payload(event, false, volume, current_duration);
let marker = i == 0;
let pkt = build_rtp_dtmf_packet(
config.payload_type,
marker,
seq,
config.initial_timestamp,
config.ssrc,
payload,
);
socket.send_to(&pkt, remote).await?;
seq = seq.wrapping_add(1);
}
for _ in 1..total_packets {
sleep(PACKET_INTERVAL).await;
current_duration = current_duration.saturating_add(TICKS_PER_PACKET);
let payload = build_event_payload(event, false, volume, current_duration);
let pkt = build_rtp_dtmf_packet(
config.payload_type,
false,
seq,
config.initial_timestamp,
config.ssrc,
payload,
);
socket.send_to(&pkt, remote).await?;
seq = seq.wrapping_add(1);
}
for _ in 0..REDUNDANCY {
let payload = build_event_payload(event, true, volume, current_duration);
let pkt = build_rtp_dtmf_packet(
config.payload_type,
false,
seq,
config.initial_timestamp,
config.ssrc,
payload,
);
socket.send_to(&pkt, remote).await?;
seq = seq.wrapping_add(1);
}
let next_timestamp = config
.initial_timestamp
.wrapping_add(total_duration_ticks as u32);
Ok((seq, next_timestamp))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn digit_from_char_round_trips() {
for d in [
DtmfDigit::D0,
DtmfDigit::D1,
DtmfDigit::D2,
DtmfDigit::D3,
DtmfDigit::D4,
DtmfDigit::D5,
DtmfDigit::D6,
DtmfDigit::D7,
DtmfDigit::D8,
DtmfDigit::D9,
DtmfDigit::Star,
DtmfDigit::Pound,
DtmfDigit::A,
DtmfDigit::B,
DtmfDigit::C,
DtmfDigit::D,
] {
assert_eq!(DtmfDigit::from_char(d.as_char()), Some(d), "digit {d:?}");
}
}
#[test]
fn digit_from_event_code_round_trips() {
for code in 0u8..16 {
let d = DtmfDigit::from_event_code(code).expect("codes 0-15 are digits");
assert_eq!(d.event_code(), code, "code {code}");
}
}
#[test]
fn digit_from_event_code_rejects_non_dtmf_events() {
for code in [16u8, 17, 63, 255] {
assert_eq!(DtmfDigit::from_event_code(code), None, "code {code}");
}
}
#[test]
fn digit_event_codes_match_rfc_4733() {
assert_eq!(DtmfDigit::D0.event_code(), 0);
assert_eq!(DtmfDigit::D9.event_code(), 9);
assert_eq!(DtmfDigit::Star.event_code(), 10);
assert_eq!(DtmfDigit::Pound.event_code(), 11);
assert_eq!(DtmfDigit::A.event_code(), 12);
assert_eq!(DtmfDigit::D.event_code(), 15);
}
#[test]
fn digit_from_char_rejects_non_dtmf() {
for c in [' ', 'e', 'E', '+', '-', '\n'] {
assert_eq!(DtmfDigit::from_char(c), None, "should reject {c:?}");
}
}
#[test]
fn digit_from_char_accepts_letters_case_insensitive() {
assert_eq!(DtmfDigit::from_char('a'), Some(DtmfDigit::A));
assert_eq!(DtmfDigit::from_char('A'), Some(DtmfDigit::A));
assert_eq!(DtmfDigit::from_char('d'), Some(DtmfDigit::D));
assert_eq!(DtmfDigit::from_char('D'), Some(DtmfDigit::D));
}
#[test]
fn event_payload_byte_layout() {
let p = build_event_payload(5, false, 10, 160);
assert_eq!(p[0], 5);
assert_eq!(p[1], 0x0A);
assert_eq!(p[2..4], 160u16.to_be_bytes());
let end = build_event_payload(5, true, 10, 1280);
assert_eq!(end[1], 0x8A);
assert_eq!(end[2..4], 1280u16.to_be_bytes());
}
#[test]
fn event_payload_clamps_volume_to_6_bits() {
let p = build_event_payload(5, false, 0xFF, 160);
assert_eq!(p[1] & 0xC0, 0x00);
assert_eq!(p[1] & 0x3F, 0x3F);
}
#[test]
fn rtp_dtmf_packet_header_shape() {
let event = build_event_payload(5, false, 10, 160);
let pkt = build_rtp_dtmf_packet(101, true, 1000, 12345, 0xCAFE_BABE, event);
assert_eq!(pkt[0], 0x80); assert_eq!(pkt[1], 0x80 | 101); assert_eq!(&pkt[2..4], &1000u16.to_be_bytes());
assert_eq!(&pkt[4..8], &12345u32.to_be_bytes());
assert_eq!(&pkt[8..12], &0xCAFE_BABEu32.to_be_bytes());
assert_eq!(&pkt[12..16], &event);
}
async fn loopback_pair() -> (Arc<UdpSocket>, UdpSocket) {
let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
(Arc::new(a), b)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn burst_packet_count_and_durations() {
let (sender, receiver) = loopback_pair().await;
let remote = receiver.local_addr().unwrap();
let cfg = DtmfBurstConfig {
payload_type: 101,
ssrc: 0xDEAD_BEEF,
initial_seq: 100,
initial_timestamp: 5000,
hold_duration_ms: 100,
volume_dbm0: 10,
};
let handle = tokio::spawn(send_dtmf_burst(sender, remote, cfg, DtmfDigit::D5));
let mut buf = [0u8; 64];
let mut packets: Vec<[u8; 4]> = Vec::new();
let mut markers: Vec<bool> = Vec::new();
let mut seqs: Vec<u16> = Vec::new();
for _ in 0..10 {
let (n, _) =
tokio::time::timeout(Duration::from_millis(500), receiver.recv_from(&mut buf))
.await
.expect("packet arrived in time")
.expect("recv ok");
assert_eq!(n, 16, "DTMF packets are 12 + 4 bytes");
markers.push(buf[1] & 0x80 != 0);
seqs.push(u16::from_be_bytes([buf[2], buf[3]]));
let mut payload = [0u8; 4];
payload.copy_from_slice(&buf[12..16]);
packets.push(payload);
}
let (next_seq, next_ts) = handle.await.unwrap().unwrap();
assert_eq!(next_seq, 100 + 10);
assert_eq!(next_ts, 5000 + 800);
for (i, s) in seqs.iter().enumerate() {
assert_eq!(*s, 100 + i as u16, "packet {i}");
}
assert!(markers[0], "marker on first packet");
for (i, m) in markers.iter().enumerate().skip(1) {
assert!(!m, "no marker on packet {i}");
}
for p in &packets {
assert_eq!(p[0], 5);
}
for p in &packets[0..3] {
assert_eq!(p[1] & 0x80, 0, "E bit clear on initial");
assert_eq!(u16::from_be_bytes([p[2], p[3]]), 160);
}
let expected_durations = [320u16, 480, 640, 800];
for (p, &dur) in packets[3..7].iter().zip(expected_durations.iter()) {
assert_eq!(p[1] & 0x80, 0, "E bit clear on continuation");
assert_eq!(u16::from_be_bytes([p[2], p[3]]), dur);
}
for p in &packets[7..10] {
assert_eq!(p[1] & 0x80, 0x80, "E bit set on end packet");
assert_eq!(u16::from_be_bytes([p[2], p[3]]), 800);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn chained_bursts_keep_timestamp_monotonic() {
let (sender, receiver) = loopback_pair().await;
let remote = receiver.local_addr().unwrap();
let cfg1 = DtmfBurstConfig {
payload_type: 101,
ssrc: 0xCAFE_F00D,
initial_seq: 0,
initial_timestamp: 0,
hold_duration_ms: 40, volume_dbm0: 10,
};
let receiver_task = tokio::spawn(async move {
let mut buf = [0u8; 64];
let mut count = 0;
while count < 14 {
let (_n, _) =
tokio::time::timeout(Duration::from_millis(500), receiver.recv_from(&mut buf))
.await
.unwrap()
.unwrap();
count += 1;
}
});
let (s1, t1) = send_dtmf_burst(sender.clone(), remote, cfg1, DtmfDigit::D5)
.await
.unwrap();
assert_eq!(s1, 7);
assert_eq!(t1, 320);
let cfg2 = DtmfBurstConfig {
initial_seq: s1,
initial_timestamp: t1,
..cfg1
};
let (s2, t2) = send_dtmf_burst(sender, remote, cfg2, DtmfDigit::D7)
.await
.unwrap();
assert_eq!(s2, 14);
assert_eq!(t2, 640);
receiver_task.await.unwrap();
}
}