use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use tracing::info;
use crate::callback_pool::spawn_callback;
use crate::config::DialOptions;
use crate::dialog::Dialog;
use crate::dtmf;
use crate::error::{Error, Result};
use crate::media::{self, MediaChannels, MediaConfig, MediaTransport};
use crate::sdp;
use crate::srtp::SrtpContext;
use crate::types::*;
pub struct VideoUpgradeRequest {
call: Arc<Call>,
reinvite_dlg: Arc<dyn Dialog>,
remote_sdp: String,
sess: sdp::Session,
video_socket: Option<(UdpSocket, u16)>,
responded: Arc<AtomicBool>,
}
unsafe impl Send for VideoUpgradeRequest {}
unsafe impl Sync for VideoUpgradeRequest {}
impl VideoUpgradeRequest {
pub fn accept(&self) {
if self.responded.swap(true, Ordering::SeqCst) {
return; }
self.call.accept_video_internal(
&self.reinvite_dlg,
&self.sess,
&self.remote_sdp,
&self.video_socket,
);
}
pub fn reject(&self) {
if self.responded.swap(true, Ordering::SeqCst) {
return; }
self.call
.reject_video_internal(&self.reinvite_dlg, &self.sess, &self.remote_sdp);
}
}
impl Drop for VideoUpgradeRequest {
fn drop(&mut self) {
if !self.responded.load(Ordering::SeqCst) {
self.responded.store(true, Ordering::SeqCst);
self.call
.reject_video_internal(&self.reinvite_dlg, &self.sess, &self.remote_sdp);
}
}
}
const DEFAULT_CODEC_PREFS: &[i32] = &[8, 0, 9, 101, 111];
fn new_call_id() -> String {
let mut buf = [0u8; 16];
for b in &mut buf {
*b = rand_byte();
}
format!("CA{}", hex::encode(&buf))
}
fn rand_byte() -> u8 {
use std::cell::Cell;
thread_local! {
static RNG: Cell<u64> = Cell::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
);
}
RNG.with(|rng| {
let mut s = rng.get();
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
rng.set(s);
s as u8
})
}
mod hex {
pub fn encode(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
}
pub fn sip_header_uri(val: &str) -> &str {
if let (Some(start), Some(end)) = (val.find('<'), val.find('>')) {
if end > start {
return &val[start + 1..end];
}
}
val
}
fn uri_encode(val: &str) -> String {
let mut out = String::with_capacity(val.len() * 2);
for b in val.bytes() {
match b {
b'%' => out.push_str("%25"),
b'@' => out.push_str("%40"),
b' ' => out.push_str("%20"),
b';' => out.push_str("%3B"),
b'?' => out.push_str("%3F"),
b'&' => out.push_str("%26"),
b'=' => out.push_str("%3D"),
b'+' => out.push_str("%2B"),
b':' => out.push_str("%3A"),
_ => out.push(b as char),
}
}
out
}
pub fn sip_header_user(val: &str) -> &str {
let uri = sip_header_uri(val);
let uri = if let Some(i) = uri.find(':') {
&uri[i + 1..]
} else {
uri
};
if let Some(i) = uri.find('@') {
&uri[..i]
} else {
uri
}
}
pub fn sip_header_tag(val: &str) -> &str {
let lower = val.to_ascii_lowercase();
if let Some(idx) = lower.find(";tag=") {
let start = idx + 5;
let rest = &val[start..];
let end = rest.find([';', '>', ',']).unwrap_or(rest.len());
&rest[..end]
} else {
""
}
}
pub fn sip_header_display_name(val: &str) -> &str {
let lt = match val.find('<') {
Some(0) | None => return "",
Some(i) => i,
};
let name = val[..lt].trim();
if name.len() >= 2 && name.starts_with('"') && name.ends_with('"') {
&name[1..name.len() - 1]
} else {
name
}
}
struct CallInner {
id: String,
state: CallState,
direction: Direction,
opts: DialOptions,
start_time: Option<Instant>,
muted: bool,
codec_prefs: Vec<i32>,
local_ip: String,
rtp_port: i32,
remote_ip: String,
remote_port: i32,
local_sdp: String,
remote_sdp: String,
codec: Codec,
media_active: bool,
dtmf_mode: crate::config::DtmfMode,
srtp_enabled: bool,
srtp_local_key: String,
srtp_remote_key: String,
rtp_socket: Option<Arc<UdpSocket>>,
media_streams: Vec<media::MediaStream>,
media_shared: Option<Arc<media::MediaSharedState>>,
video_muted: bool,
video_codec: Option<VideoCodec>,
video_rtp_port: i32,
video_rtp_socket: Option<Arc<UdpSocket>>,
video_rtcp_socket: Option<Arc<UdpSocket>>,
video_remote_port: i32,
fir_seq_nr: u8,
on_ended_fn: Vec<Arc<dyn Fn(EndReason) + Send + Sync>>,
on_ended_internal: Option<Arc<dyn Fn(EndReason) + Send + Sync>>,
on_media_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_state_fn: Vec<Arc<dyn Fn(CallState) + Send + Sync>>,
on_state_internal: Option<Arc<dyn Fn(CallState) + Send + Sync>>,
on_dtmf_fn: Vec<Arc<dyn Fn(String) + Send + Sync>>,
on_dtmf_internal: Option<Arc<dyn Fn(String) + Send + Sync>>,
on_hold_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_resume_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_mute_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_unmute_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_video_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_video_request_fn: Option<Arc<dyn Fn(VideoUpgradeRequest) + Send + Sync>>,
session_timer: Option<std::thread::JoinHandle<()>>,
session_timer_cancel: Option<Arc<std::sync::atomic::AtomicBool>>,
}
pub struct Call {
inner: Mutex<CallInner>,
pub(crate) dlg: Arc<dyn Dialog>,
}
impl Call {
pub fn new_inbound(dlg: Arc<dyn Dialog>) -> Arc<Self> {
Arc::new(Call {
inner: Mutex::new(CallInner {
id: new_call_id(),
state: CallState::Ringing,
direction: Direction::Inbound,
opts: DialOptions::default(),
start_time: None,
muted: false,
codec_prefs: Vec::new(),
local_ip: String::new(),
rtp_port: 0,
remote_ip: String::new(),
remote_port: 0,
local_sdp: String::new(),
remote_sdp: String::new(),
codec: Codec::PCMU,
media_active: false,
dtmf_mode: crate::config::DtmfMode::Rfc4733,
srtp_enabled: false,
srtp_local_key: String::new(),
srtp_remote_key: String::new(),
rtp_socket: None,
media_streams: Vec::new(),
media_shared: None,
video_muted: false,
video_codec: None,
video_rtp_port: 0,
video_rtp_socket: None,
video_rtcp_socket: None,
video_remote_port: 0,
fir_seq_nr: 0,
on_ended_fn: Vec::new(),
on_ended_internal: None,
on_media_fn: Vec::new(),
on_state_fn: Vec::new(),
on_state_internal: None,
on_dtmf_fn: Vec::new(),
on_dtmf_internal: None,
on_hold_fn: Vec::new(),
on_resume_fn: Vec::new(),
on_mute_fn: Vec::new(),
on_unmute_fn: Vec::new(),
on_video_fn: Vec::new(),
on_video_request_fn: None,
session_timer: None,
session_timer_cancel: None,
}),
dlg,
})
}
pub fn new_outbound(dlg: Arc<dyn Dialog>, opts: DialOptions) -> Arc<Self> {
Arc::new(Call {
inner: Mutex::new(CallInner {
id: new_call_id(),
state: CallState::Dialing,
direction: Direction::Outbound,
opts,
start_time: None,
muted: false,
codec_prefs: Vec::new(),
local_ip: String::new(),
rtp_port: 0,
remote_ip: String::new(),
remote_port: 0,
local_sdp: String::new(),
remote_sdp: String::new(),
codec: Codec::PCMU,
media_active: false,
dtmf_mode: crate::config::DtmfMode::Rfc4733,
srtp_enabled: false,
srtp_local_key: String::new(),
srtp_remote_key: String::new(),
rtp_socket: None,
media_streams: Vec::new(),
media_shared: None,
video_muted: false,
video_codec: None,
video_rtp_port: 0,
video_rtp_socket: None,
video_rtcp_socket: None,
video_remote_port: 0,
fir_seq_nr: 0,
on_ended_fn: Vec::new(),
on_ended_internal: None,
on_media_fn: Vec::new(),
on_state_fn: Vec::new(),
on_state_internal: None,
on_dtmf_fn: Vec::new(),
on_dtmf_internal: None,
on_hold_fn: Vec::new(),
on_resume_fn: Vec::new(),
on_mute_fn: Vec::new(),
on_unmute_fn: Vec::new(),
on_video_fn: Vec::new(),
on_video_request_fn: None,
session_timer: None,
session_timer_cancel: None,
}),
dlg,
})
}
pub fn id(&self) -> String {
self.inner.lock().id.clone()
}
pub fn call_id(&self) -> String {
self.dlg.call_id()
}
pub fn direction(&self) -> Direction {
self.inner.lock().direction
}
pub fn state(&self) -> CallState {
self.inner.lock().state
}
pub fn codec(&self) -> Codec {
self.inner.lock().codec
}
pub fn local_sdp(&self) -> String {
self.inner.lock().local_sdp.clone()
}
pub fn remote_sdp(&self) -> String {
self.inner.lock().remote_sdp.clone()
}
pub fn start_time(&self) -> Option<Instant> {
self.inner.lock().start_time
}
pub fn duration(&self) -> Duration {
let inner = self.inner.lock();
match inner.start_time {
Some(t) => t.elapsed(),
None => Duration::ZERO,
}
}
pub fn remote_uri(&self) -> String {
let vals = self.dlg.header("From");
vals.first()
.map(|v| sip_header_uri(v).to_string())
.unwrap_or_default()
}
pub fn dialog_id(&self) -> (String, String, String) {
let call_id = self.dlg.call_id();
let direction = self.inner.lock().direction;
let from_tag = self
.dlg
.header("From")
.first()
.map(|v| sip_header_tag(v).to_string())
.unwrap_or_default();
let to_tag = self
.dlg
.header("To")
.first()
.map(|v| sip_header_tag(v).to_string())
.unwrap_or_default();
match direction {
Direction::Outbound => (call_id, from_tag, to_tag),
Direction::Inbound => (call_id, to_tag, from_tag),
}
}
pub fn from(&self) -> String {
let vals = self.dlg.header("From");
vals.first()
.map(|v| sip_header_user(v).to_string())
.unwrap_or_default()
}
pub fn remote_did(&self) -> String {
match self.direction() {
Direction::Inbound => self.from(),
Direction::Outbound => self.to(),
}
}
pub fn to(&self) -> String {
let vals = self.dlg.header("To");
vals.first()
.map(|v| sip_header_user(v).to_string())
.unwrap_or_default()
}
pub fn from_name(&self) -> String {
let vals = self.dlg.header("From");
vals.first()
.map(|v| sip_header_display_name(v).to_string())
.unwrap_or_default()
}
pub fn remote_ip(&self) -> String {
let inner = self.inner.lock();
if !inner.remote_ip.is_empty() {
return inner.remote_ip.clone();
}
if inner.remote_sdp.is_empty() {
return String::new();
}
sdp::parse(&inner.remote_sdp)
.map(|s| s.connection.clone())
.unwrap_or_default()
}
pub fn remote_port(&self) -> i32 {
let inner = self.inner.lock();
if inner.remote_port != 0 {
return inner.remote_port;
}
if inner.remote_sdp.is_empty() {
return 0;
}
sdp::parse(&inner.remote_sdp)
.map(|s| s.media.first().map(|m| m.port).unwrap_or(0))
.unwrap_or(0)
}
pub fn header(&self, name: &str) -> Vec<String> {
self.dlg.header(name)
}
pub fn headers(&self) -> HashMap<String, Vec<String>> {
self.dlg.headers()
}
pub fn media_session_active(&self) -> bool {
self.inner.lock().media_active
}
fn resolve_codec_prefs(inner: &CallInner) -> &[i32] {
if !inner.codec_prefs.is_empty() {
&inner.codec_prefs
} else {
DEFAULT_CODEC_PREFS
}
}
fn build_local_sdp(inner: &mut CallInner, direction: &str) -> String {
if inner.local_ip.is_empty() {
inner.local_ip = "127.0.0.1".into();
}
let prefs = Self::resolve_codec_prefs(inner);
if inner.srtp_enabled && !inner.srtp_local_key.is_empty() {
sdp::build_offer_srtp(
&inner.local_ip,
inner.rtp_port,
prefs,
direction,
&inner.srtp_local_key,
)
} else {
sdp::build_offer(&inner.local_ip, inner.rtp_port, prefs, direction)
}
}
fn build_answer_sdp(inner: &mut CallInner, remote: &sdp::Session, direction: &str) -> String {
if inner.local_ip.is_empty() {
inner.local_ip = "127.0.0.1".into();
}
let remote_codecs: &[i32] = remote
.media
.first()
.map(|m| m.codecs.as_slice())
.unwrap_or(&[]);
let prefs = Self::resolve_codec_prefs(inner);
if remote.has_video() && inner.video_rtp_socket.is_some() {
let empty_rtpmap: Vec<(i32, String)> = Vec::new();
let remote_video_rtpmap = remote
.video_media()
.map(|vm| vm.rtpmap.as_slice())
.unwrap_or(&empty_rtpmap);
let local_video = &[VideoCodec::H264, VideoCodec::VP8];
return sdp::build_answer_video(
&inner.local_ip,
inner.rtp_port,
prefs,
remote_codecs,
inner.video_rtp_port,
local_video,
remote_video_rtpmap,
direction,
);
}
if inner.srtp_enabled && !inner.srtp_local_key.is_empty() {
sdp::build_answer_srtp(
&inner.local_ip,
inner.rtp_port,
prefs,
remote_codecs,
direction,
&inner.srtp_local_key,
)
} else {
sdp::build_answer(
&inner.local_ip,
inner.rtp_port,
prefs,
remote_codecs,
direction,
)
}
}
fn negotiate_codec(inner: &mut CallInner, sess: &sdp::Session) {
let remote_codecs: &[i32] = sess
.audio_media()
.map(|m| m.codecs.as_slice())
.unwrap_or(&[]);
let prefs = Self::resolve_codec_prefs(inner);
let pt = sdp::negotiate_codec(prefs, remote_codecs);
if pt >= 0 {
if let Some(c) = Codec::from_payload_type(pt) {
inner.codec = c;
}
}
if let Some(vm) = sess.video_media() {
if vm.port > 0 {
if let Some(vc) = sess.video_codec() {
inner.video_codec = Some(vc);
}
}
}
}
fn set_remote_endpoint(inner: &mut CallInner, sess: &sdp::Session) {
inner.remote_ip = sess.connection.clone();
if let Some(m) = sess.audio_media() {
inner.remote_port = m.port;
}
if let Some(vm) = sess.video_media() {
if vm.port > 0 {
inner.video_remote_port = vm.port;
}
}
}
fn fire_on_state(inner: &CallInner, state: CallState) {
if let Some(ref f) = inner.on_state_internal {
let f = Arc::clone(f);
spawn_callback(move || f(state));
}
for f in &inner.on_state_fn {
let f = Arc::clone(f);
spawn_callback(move || f(state));
}
}
fn fire_on_ended(inner: &mut CallInner, reason: EndReason) {
if let Some(ref cancel) = inner.session_timer_cancel {
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
}
if let Some(ref f) = inner.on_ended_internal {
let f = Arc::clone(f);
spawn_callback(move || f(reason));
}
for f in &inner.on_ended_fn {
let f = Arc::clone(f);
spawn_callback(move || f(reason));
}
inner.on_ended_internal = None;
inner.on_ended_fn.clear();
inner.on_state_internal = None;
inner.on_state_fn.clear();
inner.on_dtmf_internal = None;
inner.on_dtmf_fn.clear();
inner.on_media_fn.clear();
inner.on_hold_fn.clear();
inner.on_resume_fn.clear();
inner.on_mute_fn.clear();
inner.on_unmute_fn.clear();
inner.on_video_fn.clear();
inner.on_video_request_fn = None;
}
fn start_session_timer(self: &Arc<Self>) {
let vals = self.dlg.header("Session-Expires");
if vals.is_empty() {
return;
}
let raw = vals[0].split(';').next().unwrap_or("").trim();
let seconds: u64 = match raw.parse() {
Ok(s) if s > 0 => s,
_ => return,
};
let interval = Duration::from_secs(seconds) / 2;
let cancel = Arc::new(std::sync::atomic::AtomicBool::new(false));
let cancel_clone = Arc::clone(&cancel);
let call = Arc::clone(self);
let handle = std::thread::spawn(move || loop {
std::thread::sleep(interval);
if cancel_clone.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
let refresh_sdp = {
let mut inner = call.inner.lock();
if inner.state == CallState::Ended {
return;
}
Self::build_local_sdp(&mut inner, sdp::DIR_SEND_RECV)
};
let _ = call.dlg.send_reinvite(refresh_sdp.as_bytes());
});
let mut inner = self.inner.lock();
inner.session_timer = Some(handle);
inner.session_timer_cancel = Some(cancel);
}
pub fn accept(self: &Arc<Self>) -> Result<()> {
info!(call_id = %self.call_id(), "Call accepting");
let on_media_fn;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Ringing {
info!(state = ?inner.state, "Call accept rejected — not in Ringing state");
return Err(Error::InvalidState);
}
if !inner.remote_sdp.is_empty() {
if let Ok(sess) = sdp::parse(&inner.remote_sdp) {
Self::negotiate_codec(&mut inner, &sess);
inner.local_sdp = Self::build_answer_sdp(&mut inner, &sess, sdp::DIR_SEND_RECV);
Self::set_remote_endpoint(&mut inner, &sess);
} else {
inner.local_sdp = Self::build_local_sdp(&mut inner, sdp::DIR_SEND_RECV);
}
} else {
inner.local_sdp = Self::build_local_sdp(&mut inner, sdp::DIR_SEND_RECV);
}
let _ = self.dlg.respond(200, "OK", inner.local_sdp.as_bytes());
inner.state = CallState::Active;
inner.start_time = Some(Instant::now());
inner.media_active = true;
Self::start_media_pipeline(&mut inner);
Self::fire_on_state(&inner, CallState::Active);
on_media_fn = inner.on_media_fn.clone();
}
self.start_session_timer();
for f in on_media_fn {
spawn_callback(move || f());
}
Ok(())
}
pub fn reject(&self, code: u16, reason: &str) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Ringing {
return Err(Error::InvalidState);
}
let _ = self.dlg.respond(code, reason, &[]);
inner.state = CallState::Ended;
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, EndReason::Rejected);
Ok(())
}
pub fn end(&self) -> Result<()> {
let mut inner = self.inner.lock();
if let Some(ref cancel) = inner.session_timer_cancel {
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
}
match inner.state {
CallState::Dialing | CallState::RemoteRinging | CallState::EarlyMedia => {
let _ = self.dlg.send_cancel();
inner.state = CallState::Ended;
for s in &mut inner.media_streams {
s.stop();
}
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, EndReason::Cancelled);
Ok(())
}
CallState::Active | CallState::OnHold => {
let _ = self.dlg.send_bye();
inner.state = CallState::Ended;
for s in &mut inner.media_streams {
s.stop();
}
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, EndReason::Local);
Ok(())
}
_ => Err(Error::InvalidState),
}
}
pub fn hold(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
inner.local_sdp = Self::build_local_sdp(&mut inner, sdp::DIR_SEND_ONLY);
let _ = self.dlg.send_reinvite(inner.local_sdp.as_bytes());
inner.state = CallState::OnHold;
Self::fire_on_state(&inner, CallState::OnHold);
Ok(())
}
pub fn resume(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::OnHold {
return Err(Error::InvalidState);
}
inner.local_sdp = Self::build_local_sdp(&mut inner, sdp::DIR_SEND_RECV);
let _ = self.dlg.send_reinvite(inner.local_sdp.as_bytes());
inner.state = CallState::Active;
Self::fire_on_state(&inner, CallState::Active);
Ok(())
}
pub fn mute(&self) -> Result<()> {
let on_mute;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.muted {
return Err(Error::AlreadyMuted);
}
inner.muted = true;
for s in &inner.media_streams {
s.muted.store(true, std::sync::atomic::Ordering::Relaxed);
}
on_mute = inner.on_mute_fn.clone();
}
for f in on_mute {
spawn_callback(move || f());
}
Ok(())
}
pub fn unmute(&self) -> Result<()> {
let on_unmute;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if !inner.muted {
return Err(Error::NotMuted);
}
inner.muted = false;
for s in &inner.media_streams {
s.muted.store(false, std::sync::atomic::Ordering::Relaxed);
}
on_unmute = inner.on_unmute_fn.clone();
}
for f in on_unmute {
spawn_callback(move || f());
}
Ok(())
}
pub fn mute_audio(&self) -> Result<()> {
let on_mute;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.muted {
return Err(Error::AlreadyMuted);
}
inner.muted = true;
if let Some(s) = inner.media_streams.first() {
s.muted.store(true, std::sync::atomic::Ordering::Relaxed);
}
on_mute = inner.on_mute_fn.clone();
}
for f in on_mute {
spawn_callback(move || f());
}
Ok(())
}
pub fn unmute_audio(&self) -> Result<()> {
let on_unmute;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if !inner.muted {
return Err(Error::NotMuted);
}
inner.muted = false;
if let Some(s) = inner.media_streams.first() {
s.muted.store(false, std::sync::atomic::Ordering::Relaxed);
}
on_unmute = inner.on_unmute_fn.clone();
}
for f in on_unmute {
spawn_callback(move || f());
}
Ok(())
}
pub fn add_video(
&self,
video_codecs: &[VideoCodec],
rtp_port_min: u16,
rtp_port_max: u16,
) -> Result<()> {
if video_codecs.is_empty() {
return Err(Error::Other("no video codecs specified".into()));
}
let (vsock, vport) = crate::media::listen_rtp_port(rtp_port_min, rtp_port_max)
.map_err(|e| Error::Other(format!("failed to allocate video port: {e}")))?;
let local_sdp;
{
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_some() {
return Err(Error::Other("video already active".into()));
}
inner.video_rtp_port = vport as i32;
inner.video_rtp_socket = Some(Arc::new(vsock));
if inner.local_ip.is_empty() {
inner.local_ip = "127.0.0.1".into();
}
let prefs = Self::resolve_codec_prefs(&inner);
inner.local_sdp = sdp::build_offer_video(
&inner.local_ip,
inner.rtp_port,
prefs,
inner.video_rtp_port,
video_codecs,
sdp::DIR_SEND_RECV,
);
local_sdp = inner.local_sdp.clone();
}
self.dlg.send_reinvite(local_sdp.as_bytes())
}
pub fn has_video(&self) -> bool {
self.inner.lock().video_codec.is_some()
}
pub fn video_codec(&self) -> Option<VideoCodec> {
self.inner.lock().video_codec
}
pub fn mute_video(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
if inner.video_muted {
return Err(Error::VideoAlreadyMuted);
}
inner.video_muted = true;
if let Some(s) = inner.media_streams.get(1) {
s.muted.store(true, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
}
pub fn unmute_video(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
if !inner.video_muted {
return Err(Error::VideoNotMuted);
}
inner.video_muted = false;
if let Some(s) = inner.media_streams.get(1) {
s.muted.store(false, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
}
pub fn request_keyframe(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
let rtcp_addr: Option<std::net::SocketAddr> = if inner.video_remote_port > 0 {
format!("{}:{}", inner.remote_ip, inner.video_remote_port + 1)
.parse()
.ok()
} else {
None
};
if let (Some(ref sock), Some(addr)) = (&inner.video_rtcp_socket, rtcp_addr) {
let local_ssrc = 0; let remote_ssrc = 0; let pli = crate::rtcp::build_pli(local_ssrc, remote_ssrc);
let _ = sock.send_to(&pli, addr);
let fir = crate::rtcp::build_fir(local_ssrc, remote_ssrc, inner.fir_seq_nr);
let _ = sock.send_to(&fir, addr);
inner.fir_seq_nr = inner.fir_seq_nr.wrapping_add(1);
}
Ok(())
}
pub fn video_rtp_reader(&self) -> Option<crossbeam_channel::Receiver<RtpPacket>> {
self.inner
.lock()
.media_streams
.get(1)
.map(|s| s.channels.rtp_reader.rx.clone())
}
pub fn video_rtp_writer(&self) -> Option<crossbeam_channel::Sender<RtpPacket>> {
self.inner
.lock()
.media_streams
.get(1)
.map(|s| s.channels.rtp_writer.tx.clone())
}
pub fn video_reader(&self) -> Option<crossbeam_channel::Receiver<VideoFrame>> {
self.inner
.lock()
.media_streams
.get(1)
.map(|s| s.channels.video_frame_reader.rx.clone())
}
pub fn video_writer(&self) -> Option<crossbeam_channel::Sender<VideoFrame>> {
self.inner
.lock()
.media_streams
.get(1)
.map(|s| s.channels.video_frame_writer.tx.clone())
}
pub(crate) fn set_video_rtp_port(&self, port: i32) {
self.inner.lock().video_rtp_port = port;
}
pub(crate) fn set_video_rtp_socket(&self, socket: UdpSocket) {
self.inner.lock().video_rtp_socket = Some(Arc::new(socket));
}
#[cfg(test)]
pub(crate) fn set_video_codec(&self, codec: VideoCodec) {
self.inner.lock().video_codec = Some(codec);
}
pub fn send_dtmf(&self, digit: &str) -> Result<()> {
let (rtp_socket, remote_ip, remote_port, dtmf_mode) = {
let inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
(
inner.rtp_socket.clone(),
inner.remote_ip.clone(),
inner.remote_port,
inner.dtmf_mode,
)
};
if dtmf::digit_to_code(digit).is_none() {
return Err(Error::InvalidDtmfDigit);
}
match dtmf_mode {
crate::config::DtmfMode::SipInfo => {
self.dlg.send_info_dtmf(digit, 160)?;
}
crate::config::DtmfMode::Rfc4733 | crate::config::DtmfMode::Both => {
let pkts = dtmf::encode_dtmf(digit, 0, 0, 0)?;
if let Some(sock) = rtp_socket {
if !remote_ip.is_empty() && remote_port > 0 {
if let Ok(addr) =
format!("{}:{}", remote_ip, remote_port).parse::<std::net::SocketAddr>()
{
for pkt in &pkts {
let _ = sock.send_to(&pkt.to_bytes(), addr);
}
}
}
}
}
}
Ok(())
}
pub fn blind_transfer(self: &Arc<Self>, target: &str) -> Result<()> {
{
let inner = self.inner.lock();
if inner.state != CallState::Active && inner.state != CallState::OnHold {
return Err(Error::InvalidState);
}
}
let weak = Arc::downgrade(self);
self.dlg.on_notify(Box::new(move |code| {
let reason = if (200..300).contains(&code) {
EndReason::Transfer
} else if code >= 300 {
EndReason::TransferFailed
} else {
return;
};
let Some(call) = weak.upgrade() else {
return;
};
let mut inner = call.inner.lock();
if inner.state == CallState::Ended {
return;
}
inner.state = CallState::Ended;
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, reason);
}));
self.dlg.send_refer(target)?;
Ok(())
}
pub fn attended_transfer(self: &Arc<Self>, other: &Arc<Call>) -> Result<()> {
{
let state_self = self.state();
if state_self != CallState::Active && state_self != CallState::OnHold {
return Err(Error::InvalidState);
}
let state_other = other.state();
if state_other != CallState::Active && state_other != CallState::OnHold {
return Err(Error::InvalidState);
}
}
let (b_call_id, b_local_tag, b_remote_tag) = other.dialog_id();
if b_call_id.is_empty() || b_local_tag.is_empty() || b_remote_tag.is_empty() {
return Err(Error::Other(
"attended transfer: other call dialog missing call-id or tags".into(),
));
}
let remote_uri = match other.direction() {
Direction::Outbound => other
.header("To")
.first()
.map(|v| sip_header_uri(v).to_string())
.unwrap_or_default(),
Direction::Inbound => other
.header("From")
.first()
.map(|v| sip_header_uri(v).to_string())
.unwrap_or_default(),
};
if remote_uri.is_empty() {
return Err(Error::Other(
"attended transfer: cannot determine other call remote URI".into(),
));
}
let refer_to = format!(
"{}?Replaces={}%3Bto-tag%3D{}%3Bfrom-tag%3D{}",
remote_uri,
uri_encode(&b_call_id),
uri_encode(&b_remote_tag),
uri_encode(&b_local_tag),
);
let weak_self = Arc::downgrade(self);
let weak_other = Arc::downgrade(other);
self.dlg.on_notify(Box::new(move |code| {
let reason = if (200..300).contains(&code) {
EndReason::Transfer
} else if code >= 300 {
EndReason::TransferFailed
} else {
return;
};
if let Some(a) = weak_self.upgrade() {
a.end_with_reason(reason);
}
if let Some(b) = weak_other.upgrade() {
b.end_with_reason(reason);
}
}));
self.dlg.send_refer(&refer_to)?;
Ok(())
}
pub(crate) fn end_with_reason(&self, reason: EndReason) {
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
if let Some(ref cancel) = inner.session_timer_cancel {
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
}
if inner.state == CallState::Active || inner.state == CallState::OnHold {
let _ = self.dlg.send_bye();
}
inner.state = CallState::Ended;
for s in &mut inner.media_streams {
s.stop();
}
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, reason);
}
pub fn simulate_response(self: &Arc<Self>, code: u16, _reason: &str) {
let start_timer;
{
let mut inner = self.inner.lock();
start_timer = match code {
180 => {
if inner.state == CallState::Dialing {
inner.state = CallState::RemoteRinging;
Self::fire_on_state(&inner, CallState::RemoteRinging);
}
false
}
183 => {
if inner.opts.early_media
&& (inner.state == CallState::Dialing
|| inner.state == CallState::RemoteRinging)
{
inner.state = CallState::EarlyMedia;
inner.media_active = true;
Self::start_media_pipeline(&mut inner);
Self::fire_on_state(&inner, CallState::EarlyMedia);
for f in &inner.on_media_fn {
let f = Arc::clone(f);
spawn_callback(move || f());
}
}
false
}
200 => {
if inner.state == CallState::Dialing
|| inner.state == CallState::RemoteRinging
|| inner.state == CallState::EarlyMedia
{
inner.state = CallState::Active;
inner.start_time = Some(Instant::now());
inner.media_active = true;
Self::start_media_pipeline(&mut inner);
Self::fire_on_state(&inner, CallState::Active);
for f in &inner.on_media_fn {
let f = Arc::clone(f);
spawn_callback(move || f());
}
true
} else {
false
}
}
_ => false,
};
}
if start_timer {
self.start_session_timer();
}
}
pub fn simulate_bye(&self) {
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
let reason = if inner.state == CallState::Ringing {
info!(call_id = %self.dlg.call_id(), state = ?inner.state, "Call cancelled by remote (BYE/CANCEL while ringing)");
EndReason::Cancelled
} else {
info!(call_id = %self.dlg.call_id(), state = ?inner.state, "Call ended by remote BYE");
EndReason::Remote
};
inner.state = CallState::Ended;
for s in &mut inner.media_streams {
s.stop();
}
Self::fire_on_state(&inner, CallState::Ended);
Self::fire_on_ended(&mut inner, reason);
}
pub fn fire_notify(&self, code: u16) {
self.dlg.fire_notify(code);
}
pub fn fire_dtmf(&self, digit: &str) {
let (f_int, f_usr) = {
let inner = self.inner.lock();
(inner.on_dtmf_internal.clone(), inner.on_dtmf_fn.clone())
};
let d = digit.to_string();
if let Some(f) = f_int {
f(d.clone());
}
for f in f_usr {
f(d.clone());
}
}
pub fn simulate_reinvite(&self, raw_sdp: &str) {
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
let sess = match sdp::parse(raw_sdp) {
Ok(s) => s,
Err(_) => return,
};
inner.remote_sdp = raw_sdp.to_string();
Self::set_remote_endpoint(&mut inner, &sess);
let dir = sess.dir();
let mut hold_fn = Vec::new();
let mut resume_fn = Vec::new();
let mut new_state = None;
let is_hold_dir =
dir == sdp::DIR_SEND_ONLY || dir == sdp::DIR_RECV_ONLY || dir == sdp::DIR_INACTIVE;
match (is_hold_dir, inner.state) {
(true, CallState::Active) => {
inner.state = CallState::OnHold;
hold_fn = inner.on_hold_fn.clone();
new_state = Some(CallState::OnHold);
}
(false, CallState::OnHold) if dir == sdp::DIR_SEND_RECV => {
inner.state = CallState::Active;
resume_fn = inner.on_resume_fn.clone();
new_state = Some(CallState::Active);
}
_ => {}
}
Self::negotiate_codec(&mut inner, &sess);
if let Some(s) = new_state {
Self::fire_on_state(&inner, s);
}
drop(inner);
for f in hold_fn {
spawn_callback(move || f());
}
for f in resume_fn {
spawn_callback(move || f());
}
}
pub(crate) fn handle_reinvite(
self: &Arc<Self>,
reinvite_dlg: &Arc<dyn Dialog>,
remote_sdp: &str,
rtp_port_min: u16,
rtp_port_max: u16,
) {
let sess = match sdp::parse(remote_sdp) {
Ok(s) => s,
Err(e) => {
tracing::warn!("failed to parse re-INVITE SDP: {}", e);
return;
}
};
if let Some(vm) = sess.video_media() {
if vm.port == 0 && self.has_video() {
info!(call_id = %self.call_id(), "Video downgrade via re-INVITE");
self.stop_video_pipeline();
self.reject_video_internal(reinvite_dlg, &sess, remote_sdp);
return;
}
}
if !sess.has_video() {
self.simulate_reinvite_parsed(&sess, remote_sdp);
return;
}
let (request_fn, need_socket) = {
let inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
(
inner.on_video_request_fn.clone(),
inner.video_rtp_socket.is_none(),
)
};
let video_socket = if need_socket {
match crate::media::listen_rtp_port(rtp_port_min, rtp_port_max) {
Ok((vsock, vport)) => Some((vsock, vport)),
Err(e) => {
tracing::error!("failed to allocate video RTP socket: {}", e);
return;
}
}
} else {
None
};
let request = VideoUpgradeRequest {
call: Arc::clone(self),
reinvite_dlg: Arc::clone(reinvite_dlg),
remote_sdp: remote_sdp.to_string(),
sess,
video_socket,
responded: Arc::new(AtomicBool::new(false)),
};
if let Some(f) = request_fn {
spawn_callback(move || f(request));
} else {
info!(call_id = %self.call_id(), "No on_video_request handler — rejecting video upgrade");
request.reject();
}
}
fn accept_video_internal(
&self,
reinvite_dlg: &Arc<dyn Dialog>,
sess: &sdp::Session,
remote_sdp: &str,
video_socket: &Option<(UdpSocket, u16)>,
) {
let local_sdp;
{
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
if let Some((ref vsock, vport)) = *video_socket {
match vsock.try_clone() {
Ok(cloned) => {
inner.video_rtp_port = vport as i32;
inner.video_rtp_socket = Some(Arc::new(cloned));
}
Err(e) => {
tracing::error!("failed to clone video socket: {}", e);
return;
}
}
}
inner.remote_sdp = remote_sdp.to_string();
Self::set_remote_endpoint(&mut inner, sess);
Self::negotiate_codec(&mut inner, sess);
inner.local_sdp = Self::build_answer_sdp(&mut inner, sess, sdp::DIR_SEND_RECV);
local_sdp = inner.local_sdp.clone();
Self::start_video_pipeline(&mut inner);
}
if let Err(e) = reinvite_dlg.respond(200, "OK", local_sdp.as_bytes()) {
tracing::error!("failed to send 200 OK for video re-INVITE: {}", e);
}
}
fn reject_video_internal(
&self,
reinvite_dlg: &Arc<dyn Dialog>,
sess: &sdp::Session,
remote_sdp: &str,
) {
let local_sdp;
{
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
inner.remote_sdp = remote_sdp.to_string();
inner.remote_ip = sess.connection.clone();
if let Some(m) = sess.audio_media() {
inner.remote_port = m.port;
}
let remote_audio: &[i32] = sess
.audio_media()
.map(|m| m.codecs.as_slice())
.unwrap_or(&[]);
let prefs_for_negotiate = Self::resolve_codec_prefs(&inner);
let pt = sdp::negotiate_codec(prefs_for_negotiate, remote_audio);
if pt >= 0 {
if let Some(c) = Codec::from_payload_type(pt) {
inner.codec = c;
}
}
let prefs = Self::resolve_codec_prefs(&inner);
let remote_codecs: &[i32] = sess
.audio_media()
.map(|m| m.codecs.as_slice())
.unwrap_or(&[]);
let mut answer = sdp::build_answer(
&inner.local_ip,
inner.rtp_port,
prefs,
remote_codecs,
sdp::DIR_SEND_RECV,
);
if sess.video_media().is_some() {
answer.push_str("m=video 0 RTP/AVP 0\r\n");
}
inner.local_sdp = answer.clone();
local_sdp = answer;
}
if let Err(e) = reinvite_dlg.respond(200, "OK", local_sdp.as_bytes()) {
tracing::error!("failed to send 200 OK rejecting video: {}", e);
}
}
fn stop_video_pipeline(&self) {
let mut inner = self.inner.lock();
if inner.media_streams.len() >= 2 {
let mut video_stream = inner.media_streams.remove(1);
video_stream.stop();
}
inner.video_codec = None;
inner.video_rtp_socket = None;
inner.video_rtcp_socket = None;
inner.video_rtp_port = 0;
inner.video_remote_port = 0;
}
fn simulate_reinvite_parsed(&self, sess: &sdp::Session, raw_sdp: &str) {
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
inner.remote_sdp = raw_sdp.to_string();
Self::set_remote_endpoint(&mut inner, sess);
let dir = sess.dir();
let mut hold_fn = Vec::new();
let mut resume_fn = Vec::new();
let mut new_state = None;
let is_hold_dir =
dir == sdp::DIR_SEND_ONLY || dir == sdp::DIR_RECV_ONLY || dir == sdp::DIR_INACTIVE;
match (is_hold_dir, inner.state) {
(true, CallState::Active) => {
inner.state = CallState::OnHold;
hold_fn = inner.on_hold_fn.clone();
new_state = Some(CallState::OnHold);
}
(false, CallState::OnHold) if dir == sdp::DIR_SEND_RECV => {
inner.state = CallState::Active;
resume_fn = inner.on_resume_fn.clone();
new_state = Some(CallState::Active);
}
_ => {}
}
Self::negotiate_codec(&mut inner, sess);
if let Some(s) = new_state {
Self::fire_on_state(&inner, s);
}
drop(inner);
for f in hold_fn {
spawn_callback(move || f());
}
for f in resume_fn {
spawn_callback(move || f());
}
}
pub fn set_remote_sdp(&self, raw_sdp: &str) {
let mut inner = self.inner.lock();
inner.remote_sdp = raw_sdp.to_string();
if let Ok(sess) = sdp::parse(raw_sdp) {
Self::set_remote_endpoint(&mut inner, &sess);
Self::negotiate_codec(&mut inner, &sess);
if sess.is_srtp() {
if let Some(crypto) = sess.first_crypto() {
if crypto.suite == crate::srtp::SUPPORTED_SUITE {
inner.srtp_remote_key = crypto.key_params.clone();
inner.srtp_enabled = true;
} else {
tracing::warn!("remote offered unsupported SRTP suite: {}", crypto.suite);
}
}
}
}
}
pub fn set_local_media(&self, ip: &str, port: i32) {
let mut inner = self.inner.lock();
inner.local_ip = ip.to_string();
inner.rtp_port = port;
}
pub(crate) fn set_local_sdp(&self, sdp: &str) {
self.inner.lock().local_sdp = sdp.to_string();
}
pub fn set_rtp_socket(&self, socket: UdpSocket) {
self.inner.lock().rtp_socket = Some(Arc::new(socket));
}
pub(crate) fn set_dtmf_mode(&self, mode: crate::config::DtmfMode) {
self.inner.lock().dtmf_mode = mode;
}
pub(crate) fn set_srtp(&self, local_inline_key: &str) {
let mut inner = self.inner.lock();
inner.srtp_enabled = true;
inner.srtp_local_key = local_inline_key.to_string();
}
fn start_media_pipeline(inner: &mut CallInner) {
if !inner.media_streams.is_empty() {
return; }
let socket = match inner.rtp_socket.as_ref() {
Some(s) => Arc::clone(s),
None => return, };
if inner.remote_ip.is_empty() || inner.remote_port <= 0 {
return; }
let remote_addr: std::net::SocketAddr =
match format!("{}:{}", inner.remote_ip, inner.remote_port).parse() {
Ok(a) => a,
Err(_) => return,
};
let transport = Arc::new(MediaTransport::new(
socket.try_clone().expect("failed to clone RTP socket"),
remote_addr,
));
let channels = Arc::new(MediaChannels::new());
let shared = Arc::new(media::MediaSharedState::new(inner.state));
Self::sync_dtmf_to_media(inner, &shared);
let (srtp_in, srtp_out) = if inner.srtp_enabled
&& !inner.srtp_local_key.is_empty()
&& !inner.srtp_remote_key.is_empty()
{
let inbound = match SrtpContext::from_sdes_inline(&inner.srtp_remote_key) {
Ok(ctx) => Some(ctx),
Err(e) => {
tracing::error!("SRTP inbound context creation failed: {}", e);
return;
}
};
let outbound =
match SrtpContext::from_sdes_inline(&format!("inline:{}", inner.srtp_local_key)) {
Ok(ctx) => Some(ctx),
Err(e) => {
tracing::error!("SRTP outbound context creation failed: {}", e);
return;
}
};
(inbound, outbound)
} else {
(None, None)
};
let (rtcp_socket, rtcp_remote_addr) = if let Some(ref s) = inner.rtp_socket {
let rtp_port = s.local_addr().map(|a| a.port()).unwrap_or(0);
let rsock = match media::listen_rtcp_port(rtp_port) {
Ok(s) => Some(Arc::new(s)),
Err(e) => {
tracing::warn!(rtp_port, error = %e, "RTCP port bind failed, RTCP disabled");
None
}
};
let raddr = if inner.remote_port > 0 {
format!("{}:{}", inner.remote_ip, inner.remote_port + 1)
.parse()
.ok()
} else {
None
};
(rsock, raddr)
} else {
(None, None)
};
let config = MediaConfig {
codec: inner.codec,
srtp_inbound: srtp_in,
srtp_outbound: srtp_out,
rtcp_socket,
rtcp_remote_addr,
..MediaConfig::default()
};
let muted = Arc::new(std::sync::atomic::AtomicBool::new(inner.muted));
let stream = media::start_media(
config,
Arc::clone(&channels),
Arc::clone(&shared),
Some(transport),
muted,
);
inner.media_streams.push(stream);
inner.media_shared = Some(shared);
Self::start_video_pipeline(inner);
}
fn start_video_pipeline(inner: &mut CallInner) {
if inner.video_codec.is_none() || inner.video_rtp_socket.is_none() {
return;
}
if inner.media_streams.len() >= 2 {
return; }
let video_socket = inner.video_rtp_socket.as_ref().unwrap();
if inner.remote_ip.is_empty() || inner.video_remote_port <= 0 {
return;
}
let video_remote_addr: std::net::SocketAddr =
match format!("{}:{}", inner.remote_ip, inner.video_remote_port).parse() {
Ok(a) => a,
Err(_) => return,
};
let video_transport = Arc::new(MediaTransport::new(
video_socket
.try_clone()
.expect("failed to clone video RTP socket"),
video_remote_addr,
));
let video_channels = Arc::new(MediaChannels::new());
let video_rtp_port = video_socket.local_addr().map(|a| a.port()).unwrap_or(0);
let video_rtcp_socket = media::listen_rtcp_port(video_rtp_port).ok().map(Arc::new);
inner.video_rtcp_socket = video_rtcp_socket.clone();
let video_rtcp_addr: Option<std::net::SocketAddr> = if inner.video_remote_port > 0 {
format!("{}:{}", inner.remote_ip, inner.video_remote_port + 1)
.parse()
.ok()
} else {
None
};
let (video_srtp_in, video_srtp_out) = if inner.srtp_enabled
&& !inner.srtp_local_key.is_empty()
&& !inner.srtp_remote_key.is_empty()
{
let inbound = SrtpContext::from_sdes_inline(&inner.srtp_remote_key)
.map_err(|e| tracing::error!("Video SRTP inbound context failed: {}", e))
.ok();
let outbound =
SrtpContext::from_sdes_inline(&format!("inline:{}", inner.srtp_local_key))
.map_err(|e| tracing::error!("Video SRTP outbound context failed: {}", e))
.ok();
(inbound, outbound)
} else {
(None, None)
};
let video_config = media::VideoMediaConfig {
srtp_inbound: video_srtp_in,
srtp_outbound: video_srtp_out,
rtcp_socket: video_rtcp_socket,
rtcp_remote_addr: video_rtcp_addr,
video_codec: inner.video_codec,
video_payload_type: inner
.video_codec
.map(|c| c.default_payload_type())
.unwrap_or(96),
};
let video_muted = Arc::new(std::sync::atomic::AtomicBool::new(inner.video_muted));
let video_stream = media::start_video_media(
video_config,
video_channels,
Some(video_transport),
video_muted,
);
inner.media_streams.push(video_stream);
for f in &inner.on_video_fn {
let f = Arc::clone(f);
spawn_callback(move || f());
}
}
pub(crate) fn dlg_respond(&self, code: u16, reason: &str) -> Result<()> {
self.dlg.respond(code, reason, &[])
}
pub fn on_state(&self, f: impl Fn(CallState) + Send + Sync + 'static) {
self.inner.lock().on_state_fn.push(Arc::new(f));
}
pub fn on_ended(&self, f: impl Fn(EndReason) + Send + Sync + 'static) {
self.inner.lock().on_ended_fn.push(Arc::new(f));
}
pub fn on_media(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_media_fn.push(Arc::new(f));
}
pub fn on_dtmf(&self, f: impl Fn(String) + Send + Sync + 'static) {
let mut inner = self.inner.lock();
inner.on_dtmf_fn.push(Arc::new(f));
if let Some(ref shared) = inner.media_shared {
Self::sync_dtmf_to_media(&inner, shared);
}
}
pub fn on_hold(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_hold_fn.push(Arc::new(f));
}
pub fn on_resume(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_resume_fn.push(Arc::new(f));
}
pub fn on_mute(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_mute_fn.push(Arc::new(f));
}
pub fn on_unmute(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_unmute_fn.push(Arc::new(f));
}
pub fn on_video(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_video_fn.push(Arc::new(f));
}
pub fn on_video_request(&self, f: impl Fn(VideoUpgradeRequest) + Send + Sync + 'static) {
self.inner.lock().on_video_request_fn = Some(Arc::new(f));
}
pub(crate) fn on_ended_internal(&self, f: impl Fn(EndReason) + Send + Sync + 'static) {
self.inner.lock().on_ended_internal = Some(Arc::new(f));
}
pub(crate) fn on_state_internal(&self, f: impl Fn(CallState) + Send + Sync + 'static) {
self.inner.lock().on_state_internal = Some(Arc::new(f));
}
pub(crate) fn on_dtmf_internal(&self, f: impl Fn(String) + Send + Sync + 'static) {
let mut inner = self.inner.lock();
inner.on_dtmf_internal = Some(Arc::new(f));
if let Some(ref shared) = inner.media_shared {
Self::sync_dtmf_to_media(&inner, shared);
}
}
fn sync_dtmf_to_media(inner: &CallInner, shared: &Arc<media::MediaSharedState>) {
let f_int = inner.on_dtmf_internal.clone();
let f_usr = inner.on_dtmf_fn.clone();
if f_int.is_none() && f_usr.is_empty() {
return;
}
*shared.on_dtmf_fn.lock() = Some(Arc::new(move |d: String| {
if let Some(ref a) = f_int {
a(d.clone());
}
for f in &f_usr {
f(d.clone());
}
}));
}
pub fn rtp_writer(&self) -> Option<crossbeam_channel::Sender<RtpPacket>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.rtp_writer.tx.clone())
}
pub fn rtp_reader(&self) -> Option<crossbeam_channel::Receiver<RtpPacket>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.rtp_reader.rx.clone())
}
pub fn rtp_raw_reader(&self) -> Option<crossbeam_channel::Receiver<RtpPacket>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.rtp_raw_reader.rx.clone())
}
pub fn pcm_writer(&self) -> Option<crossbeam_channel::Sender<Vec<i16>>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.pcm_writer.tx.clone())
}
pub fn paced_pcm_writer(&self) -> Option<crossbeam_channel::Sender<Vec<i16>>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.paced_pcm_writer.tx.clone())
}
pub fn pcm_reader(&self) -> Option<crossbeam_channel::Receiver<Vec<i16>>> {
self.inner
.lock()
.media_streams
.first()
.map(|s| s.channels.pcm_reader.rx.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::dialog::MockDialog;
use std::sync::mpsc;
use std::time::Duration;
fn mock_dlg() -> Arc<MockDialog> {
Arc::new(MockDialog::new())
}
fn mock_dlg_with_headers(headers: HashMap<String, Vec<String>>) -> Arc<MockDialog> {
Arc::new(MockDialog::with_headers(headers))
}
fn test_sdp(ip: &str, port: i32, dir: &str, codecs: &[i32]) -> String {
sdp::build_offer(ip, port, codecs, dir)
}
#[test]
fn inbound_initial_state_is_ringing() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.state(), CallState::Ringing);
}
#[test]
fn accept_transitions_to_active() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert_eq!(call.state(), CallState::Active);
}
#[test]
fn accept_sends_sdp_answer() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
assert_eq!(dlg.last_response_code(), 200);
assert!(!dlg.last_response_body().is_empty());
}
#[test]
fn accept_sets_media_active() {
let call = Call::new_inbound(mock_dlg());
assert!(!call.media_session_active());
call.accept().unwrap();
assert!(call.media_session_active());
}
#[test]
fn reject_sends_correct_sip_code() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.reject(486, "Busy Here").unwrap();
assert_eq!(dlg.last_response_code(), 486);
assert_eq!(dlg.last_response_reason(), "Busy Here");
}
#[test]
fn reject_transitions_to_ended() {
let call = Call::new_inbound(mock_dlg());
call.reject(486, "Busy Here").unwrap();
assert_eq!(call.state(), CallState::Ended);
}
#[test]
fn reject_fires_ended_by_rejected() {
let call = Call::new_inbound(mock_dlg());
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.reject(486, "Busy Here").unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Rejected
);
}
#[test]
fn cannot_accept_after_rejected() {
let call = Call::new_inbound(mock_dlg());
call.reject(486, "Busy Here").unwrap();
assert!(call.accept().is_err());
}
#[test]
fn cannot_reject_after_accepted() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(call.reject(486, "Busy Here").is_err());
}
#[test]
fn outbound_initial_state_is_dialing() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert_eq!(call.state(), CallState::Dialing);
}
#[test]
fn outbound_transitions_on_remote_ringing() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(180, "Ringing");
assert_eq!(call.state(), CallState::RemoteRinging);
}
#[test]
fn outbound_transitions_to_active_on_200() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(180, "Ringing");
call.simulate_response(200, "OK");
assert_eq!(call.state(), CallState::Active);
}
#[test]
fn early_media_183_transitions() {
let opts = DialOptions {
early_media: true,
..Default::default()
};
let call = Call::new_outbound(mock_dlg(), opts);
call.simulate_response(183, "Session Progress");
assert_eq!(call.state(), CallState::EarlyMedia);
}
#[test]
fn no_early_media_183_stays_remote_ringing() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(180, "Ringing");
call.simulate_response(183, "Session Progress");
assert_eq!(call.state(), CallState::RemoteRinging);
}
#[test]
fn no_early_media_183_media_not_active() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(183, "Session Progress");
assert!(!call.media_session_active());
}
#[test]
fn on_media_fires_after_200() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
let (tx, rx) = mpsc::channel();
call.on_media(move || {
let _ = tx.send(());
});
call.simulate_response(200, "OK");
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn on_media_fires_on_183_with_early_media() {
let opts = DialOptions {
early_media: true,
..Default::default()
};
let call = Call::new_outbound(mock_dlg(), opts);
let (tx, rx) = mpsc::channel();
call.on_media(move || {
let _ = tx.send(());
});
call.simulate_response(183, "Session Progress");
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn on_media_does_not_fire_on_183_without_early_media() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
let (tx, _rx) = mpsc::channel::<()>();
call.on_media(move || {
let _ = tx.send(());
});
call.simulate_response(183, "Session Progress");
std::thread::sleep(Duration::from_millis(50));
assert!(_rx.try_recv().is_err());
}
#[test]
fn end_before_answer_sends_cancel() {
let dlg = mock_dlg();
let call = Call::new_outbound(dlg.clone(), DialOptions::default());
call.simulate_response(180, "Ringing");
call.end().unwrap();
assert!(dlg.cancel_sent());
assert!(!dlg.bye_sent());
}
#[test]
fn end_before_answer_fires_ended_by_cancelled() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(180, "Ringing");
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.end().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Cancelled
);
}
#[test]
fn end_while_active_sends_bye() {
let dlg = mock_dlg();
let call = Call::new_outbound(dlg.clone(), DialOptions::default());
call.simulate_response(200, "OK");
call.end().unwrap();
assert!(dlg.bye_sent());
assert!(!dlg.cancel_sent());
}
#[test]
fn end_while_active_fires_ended_by_local() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.simulate_response(200, "OK");
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.end().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Local
);
}
#[test]
fn end_while_on_hold_sends_bye() {
let dlg = mock_dlg();
let call = Call::new_outbound(dlg.clone(), DialOptions::default());
call.simulate_response(200, "OK");
call.hold().unwrap();
call.end().unwrap();
assert!(dlg.bye_sent());
}
#[test]
fn remote_bye_fires_ended_by_remote() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.simulate_bye();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Remote
);
}
#[test]
fn end_on_already_ended_returns_invalid_state() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
assert!(call.end().is_err());
}
#[test]
fn simulate_bye_on_ended_is_noop() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
call.simulate_bye();
assert_eq!(call.state(), CallState::Ended);
}
#[test]
fn hold_sends_reinvite_with_sendonly() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.hold().unwrap();
assert!(dlg.last_reinvite_sdp().contains("a=sendonly"));
}
#[test]
fn hold_transitions_to_on_hold() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.hold().unwrap();
assert_eq!(call.state(), CallState::OnHold);
}
#[test]
fn resume_sends_reinvite_with_sendrecv() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.hold().unwrap();
call.resume().unwrap();
assert!(dlg.last_reinvite_sdp().contains("a=sendrecv"));
}
#[test]
fn resume_transitions_to_active() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.hold().unwrap();
call.resume().unwrap();
assert_eq!(call.state(), CallState::Active);
}
#[test]
fn hold_when_not_active_returns_invalid_state() {
let call = Call::new_inbound(mock_dlg());
assert!(call.hold().is_err());
}
#[test]
fn id_is_unique_per_call() {
let c1 = Call::new_inbound(mock_dlg());
let c2 = Call::new_inbound(mock_dlg());
assert_ne!(c1.id(), c2.id());
}
#[test]
fn call_id_matches_sip_header() {
let dlg = Arc::new(MockDialog::with_call_id("test-call-id-xyz"));
let call = Call::new_inbound(dlg);
assert_eq!(call.call_id(), "test-call-id-xyz");
}
#[test]
fn headers_returns_copy() {
let mut h = HashMap::new();
h.insert("X-Custom".into(), vec!["value1".into()]);
let dlg = mock_dlg_with_headers(h);
let call = Call::new_inbound(dlg);
let mut headers = call.headers();
headers.insert("X-Custom".into(), vec!["mutated".into()]);
assert_eq!(call.header("X-Custom"), vec!["value1"]);
}
#[test]
fn header_case_insensitive() {
let mut h = HashMap::new();
h.insert("P-Asserted-Identity".into(), vec!["sip:1001@pbx".into()]);
let dlg = mock_dlg_with_headers(h);
let call = Call::new_inbound(dlg);
assert_eq!(call.header("p-asserted-identity"), vec!["sip:1001@pbx"]);
}
#[test]
fn direction_inbound() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.direction(), Direction::Inbound);
}
#[test]
fn direction_outbound() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert_eq!(call.direction(), Direction::Outbound);
}
#[test]
fn start_time_none_before_active() {
let call = Call::new_inbound(mock_dlg());
assert!(call.start_time().is_none());
}
#[test]
fn start_time_set_on_active() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(call.start_time().is_some());
}
#[test]
fn duration_zero_before_active() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.duration(), Duration::ZERO);
}
#[test]
fn duration_grows_while_active() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
std::thread::sleep(Duration::from_millis(30));
assert!(call.duration() > Duration::from_millis(20));
}
#[test]
fn blind_transfer_sends_refer() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.blind_transfer("sip:1003@pbx").unwrap();
assert!(dlg.refer_sent());
assert_eq!(dlg.last_refer_target(), "sip:1003@pbx");
}
#[test]
fn blind_transfer_fires_ended_by_transfer() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.blind_transfer("sip:1003@pbx").unwrap();
dlg.simulate_notify(200);
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
}
#[test]
fn blind_transfer_failure_notify_ends_call() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.blind_transfer("sip:1003@pbx").unwrap();
dlg.simulate_notify(503); assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::TransferFailed
);
}
#[test]
fn blind_transfer_1xx_notify_does_not_end_call() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.blind_transfer("sip:1003@pbx").unwrap();
dlg.simulate_notify(100); assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
assert_eq!(call.state(), CallState::Active);
}
#[test]
fn blind_transfer_when_not_active_returns_invalid_state() {
let call = Call::new_inbound(mock_dlg());
assert!(call.blind_transfer("sip:1003@pbx").is_err());
}
fn mock_dlg_with_tags(call_id: &str, from: &str, to: &str) -> Arc<MockDialog> {
let mut h = std::collections::HashMap::new();
h.insert("From".into(), vec![from.into()]);
h.insert("To".into(), vec![to.into()]);
let dlg = MockDialog::with_headers(h);
dlg.set_call_id(call_id);
Arc::new(dlg)
}
#[test]
fn attended_transfer_sends_refer_with_replaces() {
let dlg_a = mock_dlg_with_tags("call-a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags(
"call-b@pbx.local",
"<sip:1001@pbx>;tag=a2",
"<sip:charlie@pbx>;tag=c2",
);
let call_a = Call::new_inbound(dlg_a.clone());
call_a.accept().unwrap();
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
call_a.attended_transfer(&call_b).unwrap();
assert!(dlg_a.refer_sent());
let target = dlg_a.last_refer_target();
assert!(target.starts_with("sip:charlie@pbx?Replaces="));
assert!(target.contains("call-b%40pbx.local"));
assert!(target.contains("to-tag%3Dc2"));
assert!(target.contains("from-tag%3Da2"));
}
#[test]
fn attended_transfer_success_ends_both() {
let dlg_a = mock_dlg_with_tags("a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags("b", "<sip:1001@pbx>;tag=a2", "<sip:charlie@pbx>;tag=c2");
let call_a = Call::new_inbound(dlg_a.clone());
call_a.accept().unwrap();
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
let (tx_a, rx_a) = mpsc::channel();
let (tx_b, rx_b) = mpsc::channel();
call_a.on_ended(move |r| {
let _ = tx_a.send(r);
});
call_b.on_ended(move |r| {
let _ = tx_b.send(r);
});
call_a.attended_transfer(&call_b).unwrap();
dlg_a.simulate_notify(200);
assert_eq!(
rx_a.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
assert_eq!(
rx_b.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
}
#[test]
fn attended_transfer_failure_ends_both() {
let dlg_a = mock_dlg_with_tags("a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags("b", "<sip:1001@pbx>;tag=a2", "<sip:charlie@pbx>;tag=c2");
let call_a = Call::new_inbound(dlg_a.clone());
call_a.accept().unwrap();
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
let (tx_a, rx_a) = mpsc::channel();
let (tx_b, rx_b) = mpsc::channel();
call_a.on_ended(move |r| {
let _ = tx_a.send(r);
});
call_b.on_ended(move |r| {
let _ = tx_b.send(r);
});
call_a.attended_transfer(&call_b).unwrap();
dlg_a.simulate_notify(503);
assert_eq!(
rx_a.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::TransferFailed
);
assert_eq!(
rx_b.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::TransferFailed
);
}
#[test]
fn attended_transfer_1xx_keeps_both_alive() {
let dlg_a = mock_dlg_with_tags("a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags("b", "<sip:1001@pbx>;tag=a2", "<sip:charlie@pbx>;tag=c2");
let call_a = Call::new_inbound(dlg_a.clone());
call_a.accept().unwrap();
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
call_a.attended_transfer(&call_b).unwrap();
dlg_a.simulate_notify(100);
assert_eq!(call_a.state(), CallState::Active);
assert_eq!(call_b.state(), CallState::Active);
}
#[test]
fn attended_transfer_rejects_inactive_calls() {
let call_a = Call::new_inbound(mock_dlg()); let call_b = Call::new_inbound(mock_dlg());
call_b.accept().unwrap();
assert!(call_a.attended_transfer(&call_b).is_err());
let call_a2 = Call::new_inbound(mock_dlg());
call_a2.accept().unwrap();
let call_b2 = Call::new_inbound(mock_dlg()); assert!(call_a2.attended_transfer(&call_b2).is_err());
}
#[test]
fn sip_header_tag_extracts_tag() {
assert_eq!(sip_header_tag("<sip:1001@host>;tag=abc123"), "abc123");
assert_eq!(sip_header_tag("\"Alice\" <sip:1001@host>;tag=abc"), "abc");
assert_eq!(sip_header_tag("<sip:1001@host>"), "");
assert_eq!(sip_header_tag("<sip:1001@host>;tag=abc;other=1"), "abc");
}
#[test]
fn dialog_id_outbound_call() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["<sip:1001@host>;tag=local1".into()]);
h.insert("To".into(), vec!["<sip:1002@host>;tag=remote2".into()]);
let dlg = Arc::new(MockDialog::with_headers(h));
let call = Call::new_outbound(dlg.clone(), DialOptions::default());
let (cid, local, remote) = call.dialog_id();
assert_eq!(cid, dlg.call_id());
assert_eq!(local, "local1");
assert_eq!(remote, "remote2");
}
#[test]
fn dialog_id_inbound_call() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["<sip:1001@host>;tag=remote1".into()]);
h.insert("To".into(), vec!["<sip:1002@host>;tag=local2".into()]);
let dlg = Arc::new(MockDialog::with_headers(h));
let call = Call::new_inbound(dlg.clone());
let (cid, local, remote) = call.dialog_id();
assert_eq!(cid, dlg.call_id());
assert_eq!(local, "local2");
assert_eq!(remote, "remote1");
}
#[test]
fn end_with_reason_transfer() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_ended(move |r| {
let _ = tx.send(r);
});
call.end_with_reason(EndReason::Transfer);
assert_eq!(call.state(), CallState::Ended);
assert!(dlg.bye_sent());
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
}
#[test]
fn end_with_reason_already_ended_is_noop() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
call.end_with_reason(EndReason::Transfer);
assert_eq!(call.state(), CallState::Ended);
}
#[test]
fn local_sdp_empty_before_active() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.local_sdp(), "");
}
#[test]
fn local_sdp_populated_after_accept() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(call.local_sdp().contains("v=0"));
}
#[test]
fn codec_negotiated_from_sdp() {
let remote_sdp = test_sdp("192.168.1.200", 5004, "sendrecv", &[0, 8]);
let call = Call::new_inbound(mock_dlg());
call.set_remote_sdp(&remote_sdp);
call.accept().unwrap();
assert_eq!(call.codec(), Codec::PCMA);
}
#[test]
fn hold_sends_sdp_with_sendonly() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.hold().unwrap();
let raw = dlg.last_reinvite_sdp();
let s = sdp::parse(&raw).unwrap();
assert_eq!(s.dir(), "sendonly");
}
#[test]
fn resume_sends_sdp_with_sendrecv() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.hold().unwrap();
call.resume().unwrap();
let raw = dlg.last_reinvite_sdp();
let s = sdp::parse(&raw).unwrap();
assert_eq!(s.dir(), "sendrecv");
}
#[test]
fn inbound_reinvite_hold() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendonly", &[0]));
assert_eq!(call.state(), CallState::OnHold);
}
#[test]
fn inbound_reinvite_resume() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendonly", &[0]));
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendrecv", &[0]));
assert_eq!(call.state(), CallState::Active);
}
#[test]
fn on_hold_callback_fires() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_hold(move || {
let _ = tx.send(());
});
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendonly", &[0]));
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn on_resume_callback_fires() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_resume(move || {
let _ = tx.send(());
});
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendonly", &[0]));
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendrecv", &[0]));
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn reinvite_codec_change() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendrecv", &[9]));
assert_eq!(call.codec(), Codec::G722);
}
#[test]
fn reinvite_on_ended_call_ignored() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.200", 5004, "sendonly", &[0]));
assert_eq!(call.state(), CallState::Ended);
}
#[test]
fn send_dtmf_invalid_digit_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(call.send_dtmf("X").is_err());
}
#[test]
fn send_dtmf_when_not_active_returns_error() {
let call = Call::new_inbound(mock_dlg());
assert!(call.send_dtmf("1").is_err());
}
#[test]
fn send_dtmf_sip_info_mode() {
let dlg = Arc::new(crate::mock::dialog::MockDialog::new());
let call = Call::new_inbound(Arc::clone(&dlg) as Arc<dyn Dialog>);
call.set_dtmf_mode(crate::config::DtmfMode::SipInfo);
call.accept().unwrap();
call.send_dtmf("5").unwrap();
let sent = dlg.info_dtmf_sent();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0].0, "5");
assert_eq!(sent[0].1, 160);
}
#[test]
fn send_dtmf_both_mode_uses_rfc4733() {
let dlg = Arc::new(crate::mock::dialog::MockDialog::new());
let call = Call::new_inbound(Arc::clone(&dlg) as Arc<dyn Dialog>);
call.set_dtmf_mode(crate::config::DtmfMode::Both);
call.accept().unwrap();
let _ = call.send_dtmf("1");
assert!(dlg.info_dtmf_sent().is_empty());
}
#[test]
fn fire_dtmf_triggers_callbacks() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = crossbeam_channel::bounded(1);
call.on_dtmf(move |d| {
let _ = tx.send(d);
});
call.fire_dtmf("7");
let digit = rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap();
assert_eq!(digit, "7");
}
#[test]
fn fire_dtmf_triggers_internal_and_user_callbacks() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx_int, rx_int) = crossbeam_channel::bounded(1);
let (tx_usr, rx_usr) = crossbeam_channel::bounded(1);
call.on_dtmf_internal(move |d| {
let _ = tx_int.send(d);
});
call.on_dtmf(move |d| {
let _ = tx_usr.send(d);
});
call.fire_dtmf("#");
assert_eq!(
rx_int
.recv_timeout(std::time::Duration::from_secs(1))
.unwrap(),
"#"
);
assert_eq!(
rx_usr
.recv_timeout(std::time::Duration::from_secs(1))
.unwrap(),
"#"
);
}
#[test]
fn session_timer_sends_refresh_reinvite() {
let dlg = Arc::new(MockDialog::with_session_expires(1));
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
std::thread::sleep(Duration::from_millis(600));
assert!(!dlg.last_reinvite_sdp().is_empty());
}
#[test]
fn session_timer_no_header_no_timer() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
std::thread::sleep(Duration::from_millis(100));
assert!(dlg.last_reinvite_sdp().is_empty());
}
#[test]
fn session_timer_cancelled_on_end() {
let dlg = Arc::new(MockDialog::with_session_expires(1));
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
call.end().unwrap();
std::thread::sleep(Duration::from_millis(600));
assert!(dlg.last_reinvite_sdp().is_empty());
}
#[test]
fn session_timer_parses_header_with_params() {
let mut headers = HashMap::new();
headers.insert("Session-Expires".into(), vec!["1;refresher=uac".into()]);
let dlg = Arc::new(MockDialog::with_headers(headers));
let call = Call::new_inbound(dlg.clone());
call.accept().unwrap();
std::thread::sleep(Duration::from_millis(600));
assert!(!dlg.last_reinvite_sdp().is_empty());
}
#[test]
fn inbound_reinvite_recvonly_triggers_hold() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.100", 5000, "recvonly", &[0]));
assert_eq!(call.state(), CallState::OnHold);
}
#[test]
fn inbound_reinvite_inactive_triggers_hold() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.simulate_reinvite(&test_sdp("192.168.1.100", 5000, "inactive", &[0]));
assert_eq!(call.state(), CallState::OnHold);
}
#[test]
fn mute_when_not_active_returns_error() {
let call = Call::new_inbound(mock_dlg());
assert!(call.mute().is_err());
}
#[test]
fn unmute_when_not_active_returns_error() {
let call = Call::new_inbound(mock_dlg());
assert!(call.unmute().is_err());
}
#[test]
fn mute_when_already_muted_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.mute().unwrap();
assert!(call.mute().is_err());
}
#[test]
fn unmute_when_not_muted_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(call.unmute().is_err());
}
#[test]
fn mute_when_on_hold_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.hold().unwrap();
assert!(call.mute().is_err());
}
#[test]
fn unmute_when_ended_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
assert!(call.unmute().is_err());
}
#[test]
fn on_mute_callback_fires() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_mute(move || {
let _ = tx.send(());
});
call.mute().unwrap();
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn on_unmute_callback_fires() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_unmute(move || {
let _ = tx.send(());
});
call.mute().unwrap();
call.unmute().unwrap();
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
}
#[test]
fn remote_uri_from_dialog_header() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["<sip:1001@pbx.example.com>".into()]);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.remote_uri(), "sip:1001@pbx.example.com");
}
#[test]
fn remote_uri_empty_when_no_from_header() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.remote_uri(), "");
}
#[test]
fn remote_uri_strips_display_name() {
let mut h = HashMap::new();
h.insert(
"From".into(),
vec!["\"Alice\" <sip:alice@example.com>".into()],
);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.remote_uri(), "sip:alice@example.com");
}
#[test]
fn from_extracts_user_part() {
let mut h = HashMap::new();
h.insert(
"From".into(),
vec!["\"Alice\" <sip:+15551234567@pbx.example.com>;tag=abc".into()],
);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.from(), "+15551234567");
}
#[test]
fn from_extension() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["<sip:1001@10.200.1.2>".into()]);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.from(), "1001");
}
#[test]
fn from_empty_when_no_header() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.from(), "");
}
#[test]
fn to_extracts_user_part() {
let mut h = HashMap::new();
h.insert("To".into(), vec!["<sip:1002@pbx.example.com>".into()]);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.to(), "1002");
}
#[test]
fn to_empty_when_no_header() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.to(), "");
}
#[test]
fn from_name_quoted_display_name() {
let mut h = HashMap::new();
h.insert(
"From".into(),
vec!["\"Alice Smith\" <sip:alice@example.com>".into()],
);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.from_name(), "Alice Smith");
}
#[test]
fn from_name_unquoted_display_name() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["Alice <sip:alice@example.com>".into()]);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.from_name(), "Alice");
}
#[test]
fn from_name_empty_when_no_display_name() {
let mut h = HashMap::new();
h.insert("From".into(), vec!["<sip:1001@pbx.example.com>".into()]);
let call = Call::new_inbound(mock_dlg_with_headers(h));
assert_eq!(call.from_name(), "");
}
#[test]
fn from_name_empty_when_no_header() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.from_name(), "");
}
#[test]
fn remote_ip_from_remote_sdp() {
let call = Call::new_inbound(mock_dlg());
call.set_remote_sdp(&test_sdp("192.168.1.200", 5004, "sendrecv", &[0]));
assert_eq!(call.remote_ip(), "192.168.1.200");
}
#[test]
fn remote_ip_empty_before_sdp() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.remote_ip(), "");
}
#[test]
fn remote_port_from_remote_sdp() {
let call = Call::new_inbound(mock_dlg());
call.set_remote_sdp(&test_sdp("192.168.1.200", 5004, "sendrecv", &[0]));
assert_eq!(call.remote_port(), 5004);
}
#[test]
fn remote_port_zero_before_sdp() {
let call = Call::new_inbound(mock_dlg());
assert_eq!(call.remote_port(), 0);
}
#[test]
fn remote_media_updates_after_reinvite() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_remote_sdp(&test_sdp("192.168.1.100", 5000, "sendrecv", &[0]));
assert_eq!(call.remote_ip(), "192.168.1.100");
call.simulate_reinvite(&test_sdp("10.0.0.50", 6000, "sendrecv", &[0]));
assert_eq!(call.remote_ip(), "10.0.0.50");
assert_eq!(call.remote_port(), 6000);
}
#[test]
fn on_state_fires_on_accept() {
let call = Call::new_inbound(mock_dlg());
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.accept().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Active
);
}
#[test]
fn on_state_fires_on_reject() {
let call = Call::new_inbound(mock_dlg());
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.reject(486, "Busy Here").unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Ended
);
}
#[test]
fn on_state_fires_on_end() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.end().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Ended
);
}
#[test]
fn on_state_fires_on_hold() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.hold().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::OnHold
);
}
#[test]
fn on_state_fires_on_resume() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.hold().unwrap();
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.resume().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Active
);
}
#[test]
fn on_state_fires_on_remote_bye() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.simulate_bye();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Ended
);
}
#[test]
fn on_state_fires_on_outbound_ringing() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.simulate_response(180, "Ringing");
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::RemoteRinging
);
}
#[test]
fn on_state_fires_on_outbound_200() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.simulate_response(200, "OK");
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Active
);
}
#[test]
fn on_state_fires_on_early_media() {
let opts = DialOptions {
early_media: true,
..Default::default()
};
let call = Call::new_outbound(mock_dlg(), opts);
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.simulate_response(183, "Session Progress");
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::EarlyMedia
);
}
#[test]
fn on_state_does_not_fire_on_mute() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let (tx, rx) = mpsc::channel::<CallState>();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.mute().unwrap();
std::thread::sleep(Duration::from_millis(100));
assert!(rx.try_recv().is_err());
}
#[test]
fn on_state_tracks_full_lifecycle() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
let (tx, rx) = mpsc::channel();
call.on_state(move |s| {
let _ = tx.send(s);
});
call.simulate_response(180, "Ringing");
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::RemoteRinging
);
call.simulate_response(200, "OK");
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Active
);
call.hold().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::OnHold
);
call.resume().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Active
);
call.end().unwrap();
assert_eq!(
rx.recv_timeout(Duration::from_millis(200)).unwrap(),
CallState::Ended
);
}
#[test]
fn has_video_false_for_audio_only() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert!(!call.has_video());
assert!(call.video_codec().is_none());
}
#[test]
fn has_video_true_when_set() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.set_video_codec(VideoCodec::H264);
assert!(call.has_video());
assert_eq!(call.video_codec(), Some(VideoCodec::H264));
}
#[test]
fn video_rtp_reader_none_without_video_stream() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert!(call.video_rtp_reader().is_none());
assert!(call.video_rtp_writer().is_none());
}
#[test]
fn video_reader_writer_none_pr3() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert!(call.video_reader().is_none());
assert!(call.video_writer().is_none());
}
#[test]
fn mute_video_requires_active() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.set_video_codec(VideoCodec::H264);
assert!(matches!(call.mute_video(), Err(Error::InvalidState)));
}
#[test]
fn mute_video_requires_video_stream() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(matches!(call.mute_video(), Err(Error::NoVideoStream)));
}
#[test]
fn mute_video_double_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_video_codec(VideoCodec::VP8);
call.mute_video().unwrap();
assert!(matches!(call.mute_video(), Err(Error::VideoAlreadyMuted)));
}
#[test]
fn unmute_video_when_not_muted_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_video_codec(VideoCodec::VP8);
assert!(matches!(call.unmute_video(), Err(Error::VideoNotMuted)));
}
#[test]
fn mute_unmute_video_round_trip() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_video_codec(VideoCodec::H264);
call.mute_video().unwrap();
call.unmute_video().unwrap();
call.mute_video().unwrap();
}
#[test]
fn audio_and_video_mute_independent() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_video_codec(VideoCodec::H264);
call.mute().unwrap();
call.mute_video().unwrap();
call.unmute().unwrap();
assert!(matches!(call.mute_video(), Err(Error::VideoAlreadyMuted)));
call.unmute_video().unwrap();
}
#[test]
fn request_keyframe_requires_active() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
call.set_video_codec(VideoCodec::H264);
assert!(matches!(call.request_keyframe(), Err(Error::InvalidState)));
}
#[test]
fn request_keyframe_requires_video() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
assert!(matches!(call.request_keyframe(), Err(Error::NoVideoStream)));
}
#[test]
fn video_codec_from_remote_sdp() {
let call = Call::new_inbound(mock_dlg());
let sdp = "v=0\r\n\
o=- 0 0 IN IP4 10.0.0.1\r\n\
s=-\r\n\
c=IN IP4 10.0.0.1\r\n\
t=0 0\r\n\
m=audio 5000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
m=video 5002 RTP/AVP 96\r\n\
a=rtpmap:96 H264/90000\r\n\
a=fmtp:96 profile-level-id=42e01f;packetization-mode=1\r\n";
call.set_remote_sdp(sdp);
assert!(call.has_video());
assert_eq!(call.video_codec(), Some(VideoCodec::H264));
let inner = call.inner.lock();
assert_eq!(inner.video_remote_port, 5002);
}
fn video_reinvite_sdp() -> &'static str {
"v=0\r\n\
o=- 0 0 IN IP4 10.0.0.1\r\n\
s=-\r\n\
c=IN IP4 10.0.0.1\r\n\
t=0 0\r\n\
m=audio 5000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n\
m=video 5002 RTP/AVP 96\r\n\
a=rtpmap:96 H264/90000\r\n\
a=fmtp:96 profile-level-id=42e01f;packetization-mode=1\r\n\
a=sendrecv\r\n"
}
#[test]
fn handle_reinvite_accepts_video() {
let call = Call::new_inbound(mock_dlg());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
assert!(!call.has_video());
call.on_video_request(|req| req.accept());
let reinvite_mock = mock_dlg();
let reinvite_dlg: Arc<dyn Dialog> = Arc::clone(&reinvite_mock) as _;
call.handle_reinvite(&reinvite_dlg, video_reinvite_sdp(), 10000, 20000);
std::thread::sleep(Duration::from_millis(100));
assert!(call.has_video());
assert_eq!(call.video_codec(), Some(VideoCodec::H264));
assert_eq!(reinvite_mock.last_response_code(), 200);
let body_bytes = reinvite_mock.last_response_body();
let body = String::from_utf8_lossy(&body_bytes);
assert!(body.contains("m=video"), "SDP should contain video m= line");
assert!(
body.contains("H264/90000"),
"SDP should contain H264 rtpmap"
);
}
#[test]
fn handle_reinvite_rejects_video_by_default() {
let call = Call::new_inbound(mock_dlg());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
let reinvite_mock = mock_dlg();
let reinvite_dlg: Arc<dyn Dialog> = Arc::clone(&reinvite_mock) as _;
call.handle_reinvite(&reinvite_dlg, video_reinvite_sdp(), 10000, 20000);
assert!(!call.has_video());
assert_eq!(reinvite_mock.last_response_code(), 200);
let body_bytes = reinvite_mock.last_response_body();
let body = String::from_utf8_lossy(&body_bytes);
assert!(
body.contains("m=video 0"),
"SDP should reject video with port 0"
);
}
#[test]
fn handle_reinvite_reject_explicit() {
let call = Call::new_inbound(mock_dlg());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
call.on_video_request(|req| req.reject());
let reinvite_mock = mock_dlg();
let reinvite_dlg: Arc<dyn Dialog> = Arc::clone(&reinvite_mock) as _;
call.handle_reinvite(&reinvite_dlg, video_reinvite_sdp(), 10000, 20000);
std::thread::sleep(Duration::from_millis(100));
assert!(!call.has_video());
assert_eq!(reinvite_mock.last_response_code(), 200);
let body_bytes = reinvite_mock.last_response_body();
let body = String::from_utf8_lossy(&body_bytes);
assert!(
body.contains("m=video 0"),
"SDP should reject video with port 0"
);
}
#[test]
fn handle_reinvite_fires_on_video_callback() {
let call = Call::new_inbound(mock_dlg());
let reinvite_dlg: Arc<dyn Dialog> = mock_dlg();
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
let (tx_video, rx_video) = mpsc::channel();
call.on_video(move || {
let _ = tx_video.send(());
});
call.on_video_request(|req| req.accept());
call.handle_reinvite(&reinvite_dlg, video_reinvite_sdp(), 10000, 20000);
assert!(
rx_video.recv_timeout(Duration::from_secs(2)).is_ok(),
"on_video callback should have fired"
);
}
#[test]
fn handle_reinvite_hold_delegates() {
let call = Call::new_inbound(mock_dlg());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
let reinvite_dlg: Arc<dyn Dialog> = mock_dlg();
let hold_sdp = "v=0\r\n\
o=- 0 0 IN IP4 10.0.0.1\r\n\
s=-\r\n\
c=IN IP4 10.0.0.1\r\n\
t=0 0\r\n\
m=audio 5000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendonly\r\n";
call.handle_reinvite(&reinvite_dlg, hold_sdp, 10000, 20000);
assert_eq!(call.state(), CallState::OnHold);
assert!(!call.has_video());
}
#[test]
fn handle_reinvite_on_ended_call_is_noop() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.end().unwrap();
let reinvite_dlg: Arc<dyn Dialog> = mock_dlg();
call.handle_reinvite(&reinvite_dlg, video_reinvite_sdp(), 10000, 20000);
assert!(!call.has_video());
}
#[test]
fn video_downgrade_stops_pipeline() {
let call = Call::new_inbound(mock_dlg());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
{
let mut inner = call.inner.lock();
inner.video_codec = Some(VideoCodec::H264);
}
assert!(call.has_video());
let reinvite_dlg: Arc<dyn Dialog> = mock_dlg();
let downgrade_sdp = "v=0\r\n\
o=- 0 0 IN IP4 10.0.0.1\r\n\
s=-\r\n\
c=IN IP4 10.0.0.1\r\n\
t=0 0\r\n\
m=audio 5000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n\
m=video 0 RTP/AVP 0\r\n";
call.handle_reinvite(&reinvite_dlg, downgrade_sdp, 10000, 20000);
assert!(!call.has_video());
}
#[test]
fn add_video_sends_reinvite() {
let dlg = mock_dlg();
let call = Call::new_inbound(dlg.clone());
call.set_local_media("192.168.1.100", 5000);
call.accept().unwrap();
call.add_video(&[VideoCodec::H264], 10000, 20000).unwrap();
let sdp = dlg.last_reinvite_sdp();
assert!(
sdp.contains("m=video"),
"re-INVITE SDP should have video m= line"
);
assert!(sdp.contains("H264/90000"), "re-INVITE SDP should have H264");
}
#[test]
fn add_video_requires_active() {
let call = Call::new_outbound(mock_dlg(), DialOptions::default());
assert!(matches!(
call.add_video(&[VideoCodec::H264], 10000, 20000),
Err(Error::InvalidState)
));
}
#[test]
fn add_video_when_already_active_returns_error() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
call.set_video_codec(VideoCodec::H264);
assert!(call.add_video(&[VideoCodec::VP8], 10000, 20000).is_err());
}
#[test]
fn multiple_on_ended_callbacks_all_fire() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = Arc::clone(&count);
call.on_ended(move |_| {
c1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let c2 = Arc::clone(&count);
call.on_ended(move |_| {
c2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
call.end().unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[test]
fn multiple_on_state_callbacks_all_fire() {
let call = Call::new_inbound(mock_dlg());
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = Arc::clone(&count);
call.on_state(move |_| {
c1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let c2 = Arc::clone(&count);
call.on_state(move |_| {
c2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
call.accept().unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[test]
fn multiple_on_hold_callbacks_all_fire() {
let call = Call::new_inbound(mock_dlg());
call.accept().unwrap();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = Arc::clone(&count);
call.on_hold(move || {
c1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let c2 = Arc::clone(&count);
call.on_hold(move || {
c2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let hold_sdp = "v=0\r\no=- 1 1 IN IP4 10.0.0.1\r\ns=-\r\nc=IN IP4 10.0.0.1\r\nt=0 0\r\nm=audio 20000 RTP/AVP 0\r\na=sendonly\r\n";
call.simulate_reinvite(hold_sdp);
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 2);
}
}