use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use tracing::{debug, info, warn};
use crate::call::Call;
use crate::config::{Config, DialOptions};
use crate::dialog::Dialog;
use crate::dialog_info::parse_dialog_info;
use crate::error::{Error, Result};
use crate::mock::dialog::MockDialog;
use crate::mwi::MwiSubscriber;
use crate::registry::Registry;
use crate::subscription::{SubId, SubscriptionManager};
use crate::transport::SipTransport;
#[cfg(test)]
use crate::types::{CallState, EndReason};
use crate::types::{
ExtensionState, ExtensionStatus, NotifyEvent, PhoneState, SipMessage, VoicemailStatus,
};
type CallStateCb = Arc<dyn Fn(Arc<Call>, crate::types::CallState) + Send + Sync>;
type CallEndedCb = Arc<dyn Fn(Arc<Call>, crate::types::EndReason) + Send + Sync>;
type CallDtmfCb = Arc<dyn Fn(Arc<Call>, String) + Send + Sync>;
struct Inner {
state: PhoneState,
tr: Option<Arc<dyn SipTransport>>,
reg: Option<Arc<Registry>>,
mwi: Option<Arc<MwiSubscriber>>,
incoming: Vec<Arc<dyn Fn(Arc<Call>) + Send + Sync>>,
calls: HashMap<String, Arc<Call>>,
on_registered_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_unregistered_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_error_fn: Vec<Arc<dyn Fn(Error) + Send + Sync>>,
on_voicemail_fn: Vec<Arc<dyn Fn(VoicemailStatus) + Send + Sync>>,
on_message_fn: Vec<Arc<dyn Fn(SipMessage) + Send + Sync>>,
on_subscription_error_fn: Vec<Arc<dyn Fn(String, Error) + Send + Sync>>,
on_call_state_fn: Vec<CallStateCb>,
on_call_ended_fn: Vec<CallEndedCb>,
on_call_dtmf_fn: Vec<CallDtmfCb>,
dtmf_mode: crate::config::DtmfMode,
subscription_mgr: Option<Arc<SubscriptionManager>>,
blf_watchers: HashMap<String, (SubId, ExtensionState)>,
last_reg_error: Option<Error>,
}
#[derive(Clone)]
pub struct Phone {
cfg: Config,
inner: Arc<Mutex<Inner>>,
}
impl Phone {
pub fn new(mut cfg: Config) -> Self {
cfg.normalize_host();
let dtmf_mode = cfg.dtmf_mode;
Self {
cfg,
inner: Arc::new(Mutex::new(Inner {
state: PhoneState::Disconnected,
tr: None,
reg: None,
mwi: None,
incoming: Vec::new(),
calls: HashMap::new(),
on_registered_fn: Vec::new(),
on_unregistered_fn: Vec::new(),
on_error_fn: Vec::new(),
on_voicemail_fn: Vec::new(),
on_message_fn: Vec::new(),
on_subscription_error_fn: Vec::new(),
on_call_state_fn: Vec::new(),
on_call_ended_fn: Vec::new(),
on_call_dtmf_fn: Vec::new(),
dtmf_mode,
subscription_mgr: None,
blf_watchers: HashMap::new(),
last_reg_error: None,
})),
}
}
pub fn connect(&self) -> crate::error::Result<()> {
info!(host = %self.cfg.host, port = self.cfg.port, user = %self.cfg.username, "Phone connecting");
let tr = Arc::new(crate::sip::ua::SipUA::new(&self.cfg)?);
self.connect_with_transport(tr);
let state = self.state();
if state == PhoneState::Registered {
info!("Phone connected and registered");
return Ok(());
}
let last_err = self.inner.lock().last_reg_error.clone();
warn!(err = ?last_err, "Phone registration failed");
Err(last_err.unwrap_or(crate::error::Error::RegistrationFailed {
code: 0,
reason: String::new(),
}))
}
pub fn connect_with_transport(&self, tr: Arc<dyn SipTransport>) {
let reg = Arc::new(Registry::new(Arc::clone(&tr), self.cfg.clone()));
let (reg_cbs, unreg_cbs, err_cbs) = {
let inner = self.inner.lock();
(
inner.on_registered_fn.clone(),
inner.on_unregistered_fn.clone(),
inner.on_error_fn.clone(),
)
};
for f in reg_cbs {
reg.on_registered(move || f());
}
for f in unreg_cbs {
reg.on_unregistered(move || f());
}
for f in err_cbs {
reg.on_error(move |e| f(e));
}
let reg_result = reg.start();
let effective_ip = if !self.cfg.local_ip.is_empty() {
self.cfg.local_ip.clone()
} else if let Some(addr) = tr.advertised_addr() {
addr.ip().to_string()
} else {
local_ip_for(&self.cfg.host)
};
let inner_clone = Arc::clone(&self.inner);
let incoming_ip = effective_ip.clone();
let rtp_port_min = self.cfg.rtp_port_min;
let rtp_port_max = self.cfg.rtp_port_max;
tr.on_dialog_invite(Box::new(move |dlg, from, to, remote_sdp| {
handle_dialog_incoming(
&inner_clone,
dlg,
&from,
&to,
&remote_sdp,
&incoming_ip,
rtp_port_min,
rtp_port_max,
);
}));
let inner_clone = Arc::clone(&self.inner);
tr.on_incoming(Box::new(move |from, to| {
handle_incoming(&inner_clone, &from, &to);
}));
let inner_clone = Arc::clone(&self.inner);
tr.on_bye(Box::new(move |call_id| {
handle_bye(&inner_clone, &call_id);
}));
let inner_clone = Arc::clone(&self.inner);
tr.on_notify(Box::new(move |call_id, code| {
handle_notify(&inner_clone, &call_id, code);
}));
let inner_clone = Arc::clone(&self.inner);
tr.on_info_dtmf(Box::new(move |call_id, digit| {
handle_info_dtmf(&inner_clone, &call_id, &digit);
}));
let inner_clone = Arc::clone(&self.inner);
tr.on_message(Box::new(move |from, content_type, body| {
handle_message(&inner_clone, &from, &content_type, &body);
}));
let sub_mgr = Arc::new(SubscriptionManager::new(Arc::clone(&tr)));
let sub_mgr_clone = Arc::clone(&sub_mgr);
tr.on_subscription_notify(Box::new(move |event, ct, body, sub_state, from_uri| {
sub_mgr_clone.handle_notify(event, ct, body, sub_state, from_uri);
}));
let sub_err_cbs = self.inner.lock().on_subscription_error_fn.clone();
for f in sub_err_cbs {
sub_mgr.on_error(move |uri, err| f(uri, err));
}
let mwi = if reg_result.is_ok() {
if let Some(ref vm_uri) = self.cfg.voicemail_uri {
let sub = Arc::new(MwiSubscriber::new(Arc::clone(&tr), vm_uri.clone()));
let vm_cbs = self.inner.lock().on_voicemail_fn.clone();
for f in vm_cbs {
sub.on_voicemail(move |s| f(s));
}
sub.start();
Some(sub)
} else {
None
}
} else {
None
};
let mut inner = self.inner.lock();
inner.tr = Some(tr);
inner.reg = Some(reg);
inner.mwi = mwi;
inner.subscription_mgr = Some(sub_mgr);
match reg_result {
Ok(()) => {
inner.state = PhoneState::Registered;
inner.last_reg_error = None;
}
Err(e) => {
inner.state = PhoneState::RegistrationFailed;
inner.last_reg_error = Some(e);
}
}
}
pub fn disconnect(&self) -> Result<()> {
info!("Phone disconnecting");
let (reg, tr, unreg_fns, active_calls, mwi, sub_mgr) = {
let mut inner = self.inner.lock();
if inner.state == PhoneState::Disconnected {
return Err(Error::NotConnected);
}
let reg = inner.reg.take();
let tr = inner.tr.take();
let unreg_fns = inner.on_unregistered_fn.clone();
let active_calls: Vec<Arc<Call>> = inner.calls.drain().map(|(_, c)| c).collect();
let mwi = inner.mwi.take();
let sub_mgr = inner.subscription_mgr.take();
inner.blf_watchers.clear();
inner.state = PhoneState::Disconnected;
(reg, tr, unreg_fns, active_calls, mwi, sub_mgr)
};
for call in active_calls {
let _ = call.end();
}
if let Some(sub_mgr) = sub_mgr {
sub_mgr.stop();
}
if let Some(mwi) = mwi {
mwi.stop();
}
if let Some(reg) = reg {
reg.stop();
}
if let Some(tr) = tr {
let _ = tr.close();
}
for f in unreg_fns {
crate::callback_pool::spawn_callback(move || f());
}
Ok(())
}
pub fn dial(&self, target: &str, opts: DialOptions) -> Result<Arc<Call>> {
info!(target = %target, "Phone dialing");
let tr = {
let inner = self.inner.lock();
if inner.state != PhoneState::Registered {
return Err(Error::NotRegistered);
}
inner.tr.as_ref().cloned().ok_or(Error::NotConnected)?
};
let local_ip = if let Some(ref addr) = opts.rtp_address {
addr.clone()
} else if !self.cfg.local_ip.is_empty() {
self.cfg.local_ip.clone()
} else if let Some(addr) = tr.advertised_addr() {
addr.ip().to_string()
} else {
local_ip_for(&self.cfg.host)
};
let mut audio_codecs: Vec<i32> = if !opts.codec_override.is_empty() {
opts.codec_override
.iter()
.map(|c| c.payload_type())
.collect()
} else if !self.cfg.codec_prefs.is_empty() {
self.cfg
.codec_prefs
.iter()
.map(|c| c.payload_type())
.collect()
} else {
crate::call::DEFAULT_CODEC_PREFS.to_vec()
};
if matches!(
self.cfg.dtmf_mode,
crate::config::DtmfMode::Rfc4733 | crate::config::DtmfMode::Both
) && !audio_codecs.contains(&crate::call::PT_TELEPHONE_EVENT)
{
audio_codecs.push(crate::call::PT_TELEPHONE_EVENT);
}
let (rtp_socket, rtp_port) = {
let (sock, port) =
crate::media::listen_rtp_port(self.cfg.rtp_port_min, self.cfg.rtp_port_max)?;
(Some(sock), port as i32)
};
let (video_rtp_socket, video_rtp_port) = if opts.video {
match crate::media::listen_rtp_port(self.cfg.rtp_port_min, self.cfg.rtp_port_max) {
Ok((sock, port)) => (Some(sock), port as i32),
Err(_) => (None, 0),
}
} else {
(None, 0)
};
let srtp_inline_key = if self.cfg.srtp {
let (_material, encoded) = crate::srtp::generate_keying_material()?;
Some(encoded)
} else {
None
};
let video_srtp_inline_key = if self.cfg.srtp && opts.video {
let (_material, encoded) = crate::srtp::generate_keying_material()?;
Some(encoded)
} else {
None
};
let video_codecs = if opts.video_codecs.is_empty() {
vec![
crate::types::VideoCodec::H264,
crate::types::VideoCodec::VP8,
]
} else {
opts.video_codecs.clone()
};
let local_sdp = if opts.video {
match (&srtp_inline_key, &video_srtp_inline_key) {
(Some(audio_key), Some(video_key)) => crate::sdp::build_offer_video_srtp(
&local_ip,
rtp_port,
&audio_codecs,
video_rtp_port,
&video_codecs,
crate::sdp::DIR_SEND_RECV,
audio_key,
video_key,
),
_ => crate::sdp::build_offer_video(
&local_ip,
rtp_port,
&audio_codecs,
video_rtp_port,
&video_codecs,
crate::sdp::DIR_SEND_RECV,
),
}
} else if let Some(ref key) = srtp_inline_key {
crate::sdp::build_offer_srtp(
&local_ip,
rtp_port,
&audio_codecs,
crate::sdp::DIR_SEND_RECV,
key,
)
} else {
crate::sdp::build_offer(
&local_ip,
rtp_port,
&audio_codecs,
crate::sdp::DIR_SEND_RECV,
)
};
let dial_result = tr.dial(target, local_sdp.as_bytes(), opts.timeout, &opts);
let (call, responses) = match dial_result {
Ok(result) => {
let call = Call::new_outbound(result.dialog, opts);
call.set_local_media(&local_ip, rtp_port);
call.set_local_sdp(&local_sdp);
if let Some(ref key) = srtp_inline_key {
call.set_srtp(key);
}
if let Some(sock) = rtp_socket {
call.set_rtp_socket(sock);
}
if let Some(vsock) = video_rtp_socket {
call.set_video_rtp_port(video_rtp_port);
call.set_video_rtp_socket(vsock);
}
wire_phone_call_callbacks(&self.inner, &call);
if let Some(ref early_sdp) = result.early_sdp {
call.set_remote_sdp(early_sdp);
call.simulate_response(183, "Session Progress");
}
if !result.remote_sdp.is_empty() {
call.set_remote_sdp(&result.remote_sdp);
}
call.simulate_response(200, "OK");
(call, Vec::new())
}
Err(e) if e.to_string().contains("not supported") => {
let resp = tr.send_request("INVITE", None, opts.timeout)?;
let mut code = resp.status_code;
let mut responses = vec![(code, resp.reason.clone())];
while (100..200).contains(&code) {
let next = tr.read_response(opts.timeout)?;
code = next.status_code;
responses.push((code, next.reason.clone()));
}
let dlg = Arc::new(MockDialog::new());
let call = Call::new_outbound(dlg as Arc<dyn Dialog>, opts);
wire_phone_call_callbacks(&self.inner, &call);
(call, responses)
}
Err(e) => return Err(e),
};
for (c, r) in &responses {
call.simulate_response(*c, r);
}
self.inner
.lock()
.calls
.insert(call.call_id(), Arc::clone(&call));
Ok(call)
}
pub fn on_incoming<F: Fn(Arc<Call>) + Send + Sync + 'static>(&self, f: F) {
self.inner.lock().incoming.push(Arc::new(f));
}
pub fn on_registered<F: Fn() + Send + Sync + 'static>(&self, f: F) {
let cb: Arc<dyn Fn() + Send + Sync> = Arc::new(f);
let reg = {
let mut inner = self.inner.lock();
inner.on_registered_fn.push(Arc::clone(&cb));
inner.reg.clone()
};
if let Some(reg) = reg {
let cb = Arc::clone(&cb);
reg.on_registered(move || cb());
}
}
pub fn on_unregistered<F: Fn() + Send + Sync + 'static>(&self, f: F) {
let cb: Arc<dyn Fn() + Send + Sync> = Arc::new(f);
let reg = {
let mut inner = self.inner.lock();
inner.on_unregistered_fn.push(Arc::clone(&cb));
inner.reg.clone()
};
if let Some(reg) = reg {
let cb = Arc::clone(&cb);
reg.on_unregistered(move || cb());
}
}
pub fn on_error<F: Fn(Error) + Send + Sync + 'static>(&self, f: F) {
let cb: Arc<dyn Fn(Error) + Send + Sync> = Arc::new(f);
let reg = {
let mut inner = self.inner.lock();
inner.on_error_fn.push(Arc::clone(&cb));
inner.reg.clone()
};
if let Some(reg) = reg {
let cb = Arc::clone(&cb);
reg.on_error(move |e| cb(e));
}
}
pub fn on_voicemail<F: Fn(VoicemailStatus) + Send + Sync + 'static>(&self, f: F) {
let cb: Arc<dyn Fn(VoicemailStatus) + Send + Sync> = Arc::new(f);
let mwi = {
let mut inner = self.inner.lock();
inner.on_voicemail_fn.push(Arc::clone(&cb));
inner.mwi.clone()
};
if let Some(mwi) = mwi {
let cb = Arc::clone(&cb);
mwi.on_voicemail(move |s| cb(s));
}
}
pub fn on_message<F: Fn(SipMessage) + Send + Sync + 'static>(&self, f: F) {
self.inner.lock().on_message_fn.push(Arc::new(f));
}
pub fn send_message(&self, target: &str, body: &str) -> Result<()> {
let tr = {
let inner = self.inner.lock();
if inner.state != PhoneState::Registered {
return Err(Error::NotRegistered);
}
inner.tr.as_ref().cloned().ok_or(Error::NotConnected)?
};
tr.send_message(
target,
"text/plain",
body.as_bytes(),
std::time::Duration::from_secs(10),
)
}
pub fn send_message_with_type(
&self,
target: &str,
content_type: &str,
body: &str,
) -> Result<()> {
let tr = {
let inner = self.inner.lock();
if inner.state != PhoneState::Registered {
return Err(Error::NotRegistered);
}
inner.tr.as_ref().cloned().ok_or(Error::NotConnected)?
};
tr.send_message(
target,
content_type,
body.as_bytes(),
std::time::Duration::from_secs(10),
)
}
fn get_sub_mgr(&self) -> Result<Arc<SubscriptionManager>> {
let inner = self.inner.lock();
inner
.subscription_mgr
.as_ref()
.ok_or(Error::NotConnected)
.cloned()
}
pub fn watch<F>(&self, extension: &str, f: F) -> Result<()>
where
F: Fn(ExtensionStatus, Option<ExtensionState>) + Send + Sync + 'static,
{
if self.inner.lock().state != PhoneState::Registered {
return Err(Error::NotRegistered);
}
let sub_mgr = self.get_sub_mgr()?;
let uri = format!("sip:{}@{}", extension, self.cfg.host);
let ext = extension.to_string();
let phone_inner = Arc::clone(&self.inner);
let f = Arc::new(f);
let sub_id = sub_mgr.subscribe(
&uri,
"dialog",
"application/dialog-info+xml",
Arc::new(move |notify: NotifyEvent| {
let new_state = if notify.body.is_empty() {
ExtensionState::Unknown
} else {
parse_dialog_info(¬ify.body)
};
let (prev, should_fire) = {
let mut inner = phone_inner.lock();
if let Some((_sub_id, last)) = inner.blf_watchers.get_mut(&ext) {
if *last == new_state {
return; }
let prev = Some(*last);
*last = new_state; (prev, true)
} else {
inner.blf_watchers.insert(ext.clone(), (0, new_state));
(None, true)
}
};
if should_fire {
let status = ExtensionStatus {
extension: ext.clone(),
state: new_state,
};
f(status, prev);
}
}),
);
self.inner
.lock()
.blf_watchers
.entry(extension.to_string())
.and_modify(|(id, _)| *id = sub_id)
.or_insert((sub_id, ExtensionState::Unknown));
Ok(())
}
pub fn unwatch(&self, extension: &str) -> Result<()> {
let sub_mgr = self.get_sub_mgr()?;
let sub_id = {
let inner = self.inner.lock();
let (sub_id, _) = inner
.blf_watchers
.get(extension)
.ok_or_else(|| Error::Other(format!("not watching {}", extension)))?;
*sub_id
};
sub_mgr.unsubscribe(sub_id);
self.inner.lock().blf_watchers.remove(extension);
Ok(())
}
pub fn subscribe_event<F>(&self, uri: &str, event: &str, accept: &str, f: F) -> Result<SubId>
where
F: Fn(NotifyEvent) + Send + Sync + 'static,
{
if self.inner.lock().state != PhoneState::Registered {
return Err(Error::NotRegistered);
}
let sub_mgr = self.get_sub_mgr()?;
Ok(sub_mgr.subscribe(uri, event, accept, Arc::new(f)))
}
pub fn unsubscribe_event(&self, sub_id: SubId) -> Result<()> {
let sub_mgr = self.get_sub_mgr()?;
sub_mgr.unsubscribe(sub_id);
Ok(())
}
pub fn on_subscription_error<F>(&self, f: F)
where
F: Fn(String, Error) + Send + Sync + 'static,
{
let mut inner = self.inner.lock();
let f: Arc<dyn Fn(String, Error) + Send + Sync> = Arc::new(f);
inner.on_subscription_error_fn.push(Arc::clone(&f));
if let Some(ref mgr) = inner.subscription_mgr {
let f = Arc::clone(&f);
mgr.on_error(move |uri, err| f(uri, err));
}
}
pub fn state(&self) -> PhoneState {
self.inner.lock().state
}
pub fn host(&self) -> &str {
&self.cfg.host
}
pub fn on_call_state<F: Fn(Arc<Call>, crate::types::CallState) + Send + Sync + 'static>(
&self,
f: F,
) {
self.inner.lock().on_call_state_fn.push(Arc::new(f));
}
pub fn on_call_ended<F: Fn(Arc<Call>, crate::types::EndReason) + Send + Sync + 'static>(
&self,
f: F,
) {
self.inner.lock().on_call_ended_fn.push(Arc::new(f));
}
pub fn on_call_dtmf<F: Fn(Arc<Call>, String) + Send + Sync + 'static>(&self, f: F) {
self.inner.lock().on_call_dtmf_fn.push(Arc::new(f));
}
pub fn find_call(&self, call_id: &str) -> Option<Arc<Call>> {
self.inner.lock().calls.get(call_id).cloned()
}
pub fn calls(&self) -> Vec<Arc<Call>> {
self.inner.lock().calls.values().cloned().collect()
}
pub fn attended_transfer(&self, call_a: &Arc<Call>, call_b: &Arc<Call>) -> Result<()> {
call_a.attended_transfer(call_b)
}
}
impl Drop for Phone {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) == 1 {
let _ = self.disconnect();
}
}
}
fn local_ip_for(host: &str) -> String {
use std::net::UdpSocket;
let target = format!("{}:5060", host);
match UdpSocket::bind("0.0.0.0:0") {
Ok(sock) => match sock.connect(&target) {
Ok(()) => match sock.local_addr() {
Ok(addr) if !addr.ip().is_unspecified() => addr.ip().to_string(),
_ => "127.0.0.1".into(),
},
Err(_) => "127.0.0.1".into(),
},
Err(_) => "127.0.0.1".into(),
}
}
fn handle_incoming(inner: &Arc<Mutex<Inner>>, from: &str, to: &str) {
let (tr, incoming_fns) = {
let guard = inner.lock();
(guard.tr.clone(), guard.incoming.clone())
};
if let Some(ref tr) = tr {
tr.respond(100, "Trying");
}
let dlg = Arc::new(MockDialog::new());
let call = Call::new_inbound(dlg as Arc<dyn Dialog>);
wire_phone_call_callbacks(inner, &call);
inner.lock().calls.insert(call.call_id(), Arc::clone(&call));
let _ = (from, to);
for f in &incoming_fns {
f(Arc::clone(&call));
}
if let Some(ref tr) = tr {
tr.respond(180, "Ringing");
}
}
fn wire_phone_call_callbacks(inner: &Arc<Mutex<Inner>>, call: &Arc<Call>) {
let (state_fns, ended_fns, dtmf_fns, dtmf_mode) = {
let locked = inner.lock();
(
locked.on_call_state_fn.clone(),
locked.on_call_ended_fn.clone(),
locked.on_call_dtmf_fn.clone(),
locked.dtmf_mode,
)
};
call.set_dtmf_mode(dtmf_mode);
if !state_fns.is_empty() {
let c = Arc::clone(call);
call.on_state_internal(move |s| {
for f in &state_fns {
f(Arc::clone(&c), s);
}
});
}
{
let inner_clone = Arc::clone(inner);
let call_id = call.call_id();
let c = Arc::clone(call);
call.on_ended_internal(move |r| {
inner_clone.lock().calls.remove(&call_id);
for f in &ended_fns {
f(Arc::clone(&c), r);
}
});
}
if !dtmf_fns.is_empty() {
let c = Arc::clone(call);
call.on_dtmf_internal(move |d| {
for f in &dtmf_fns {
f(Arc::clone(&c), d.clone());
}
});
}
}
#[allow(clippy::too_many_arguments)]
fn handle_dialog_incoming(
inner: &Arc<Mutex<Inner>>,
dlg: Arc<dyn Dialog>,
_from: &str,
_to: &str,
remote_sdp: &str,
local_ip: &str,
rtp_port_min: u16,
rtp_port_max: u16,
) {
let call_id = dlg.call_id();
let existing_call = inner.lock().calls.get(&call_id).cloned();
if let Some(call) = existing_call {
info!(call_id = %call_id, "Phone handling re-INVITE for existing call");
handle_reinvite(&call, dlg, remote_sdp, rtp_port_min, rtp_port_max);
return;
}
info!(from = _from, to = _to, "Phone handling incoming INVITE");
let incoming_fns = inner.lock().incoming.clone();
let (rtp_socket, actual_port) = match crate::media::listen_rtp_port(rtp_port_min, rtp_port_max)
{
Ok((sock, port)) => (Some(sock), port as i32),
Err(e) => {
warn!("RTP port allocation failed for incoming call, rejecting: {e}");
let _ = dlg.respond(503, "Service Unavailable", &[]);
return;
}
};
let parsed_sdp = crate::sdp::parse(remote_sdp).ok();
let use_srtp = parsed_sdp.as_ref().is_some_and(|sess| {
sess.is_srtp()
&& sess
.first_crypto()
.is_some_and(|c| c.suite == crate::srtp::SUPPORTED_SUITE)
});
let call = Call::new_inbound(dlg);
call.set_local_media(local_ip, actual_port);
if use_srtp {
match crate::srtp::generate_keying_material() {
Ok((_material, encoded)) => call.set_srtp(&encoded),
Err(e) => {
tracing::error!("failed to generate SRTP keying material: {}", e);
return;
}
}
}
if let Some(sock) = rtp_socket {
call.set_rtp_socket(sock);
}
if let Some(ref sess) = parsed_sdp {
if sess.has_video() {
if let Ok((vsock, vport)) = crate::media::listen_rtp_port(rtp_port_min, rtp_port_max) {
call.set_video_rtp_port(vport as i32);
call.set_video_rtp_socket(vsock);
}
}
}
if !remote_sdp.is_empty() {
call.set_remote_sdp(remote_sdp);
}
wire_phone_call_callbacks(inner, &call);
inner.lock().calls.insert(call.call_id(), Arc::clone(&call));
let _ = call.dlg_respond(180, "Ringing");
for f in &incoming_fns {
f(Arc::clone(&call));
}
}
fn handle_reinvite(
call: &Arc<Call>,
dlg: Arc<dyn Dialog>,
remote_sdp: &str,
rtp_port_min: u16,
rtp_port_max: u16,
) {
call.handle_reinvite(&dlg, remote_sdp, rtp_port_min, rtp_port_max);
}
fn handle_bye(inner: &Arc<Mutex<Inner>>, call_id: &str) {
info!(call_id = %call_id, "Phone handling BYE");
let call = inner.lock().calls.get(call_id).cloned();
if let Some(call) = call {
call.simulate_bye();
} else {
debug!(call_id = %call_id, "Phone BYE for unknown call (already ended)");
}
}
fn handle_info_dtmf(inner: &Arc<Mutex<Inner>>, call_id: &str, digit: &str) {
info!(call_id = %call_id, digit = %digit, "Phone handling INFO DTMF");
let call = inner.lock().calls.get(call_id).cloned();
if let Some(call) = call {
call.fire_dtmf(digit);
} else {
debug!(call_id = %call_id, "Phone INFO DTMF for unknown call");
}
}
fn handle_notify(inner: &Arc<Mutex<Inner>>, call_id: &str, code: u16) {
info!(call_id = %call_id, code = code, "Phone handling NOTIFY");
let call = inner.lock().calls.get(call_id).cloned();
if let Some(call) = call {
call.fire_notify(code);
} else {
warn!(call_id = %call_id, "Phone NOTIFY for unknown call");
}
}
fn handle_message(inner: &Arc<Mutex<Inner>>, from: &str, content_type: &str, body: &str) {
info!(from = %from, "Phone handling MESSAGE");
let cbs = inner.lock().on_message_fn.clone();
for f in cbs {
let msg = SipMessage {
from: from.to_string(),
to: String::new(),
content_type: content_type.to_string(),
body: body.to_string(),
};
crate::callback_pool::spawn_callback(move || f(msg));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::transport::{MockTransport, Response};
use std::time::Duration;
fn test_cfg() -> Config {
Config {
register_expiry: Duration::from_secs(60),
register_retry: Duration::from_millis(50),
register_max_retry: 3,
nat_keepalive_interval: None,
..Config::default()
}
}
#[test]
fn connect_and_state() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
assert_eq!(phone.state(), PhoneState::Registered);
assert_eq!(tr.count_sent("REGISTER"), 1);
}
#[test]
fn disconnect_sets_disconnected() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
phone.disconnect().unwrap();
assert_eq!(phone.state(), PhoneState::Disconnected);
assert!(tr.closed());
}
#[test]
fn disconnect_when_not_connected_returns_error() {
let phone = Phone::new(test_cfg());
let result = phone.disconnect();
assert!(result.is_err());
}
#[test]
fn dial_before_connect_returns_error() {
let phone = Phone::new(test_cfg());
let result = phone.dial("sip:1002@pbx.local", DialOptions::default());
assert!(result.is_err());
}
#[test]
fn dial_sends_invite_and_creates_call() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK");
let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(tr.count_sent("INVITE"), 1);
assert_eq!(call.state(), crate::types::CallState::Active);
}
#[test]
fn dial_with_ringing() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_sequence(vec![
Response::new(180, "Ringing"),
Response::new(200, "OK"),
]);
let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(call.state(), crate::types::CallState::Active);
}
#[test]
fn incoming_call_fires_callback() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_incoming(move |_call| {
let _ = tx.send(true);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.simulate_invite("sip:1001@pbx.local", "sip:1002@pbx.local");
let fired = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(fired);
}
#[test]
fn incoming_call_sends_100_and_180() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.on_incoming(|_| {});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let rx100 = tr.wait_for_response(100, Duration::from_secs(2));
let rx180 = tr.wait_for_response(180, Duration::from_secs(2));
tr.simulate_invite("sip:1001@pbx.local", "sip:1002@pbx.local");
assert!(rx100.recv_timeout(Duration::from_secs(2)).unwrap());
assert!(rx180.recv_timeout(Duration::from_secs(2)).unwrap());
}
#[test]
fn on_registered_callback() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_registered(move || {
let _ = tx.send(true);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let fired = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(fired);
}
#[test]
fn on_unregistered_fires_on_disconnect() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_unregistered(move || {
let _ = tx.send(true);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
phone.disconnect().unwrap();
let fired = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(fired);
}
#[test]
fn call_tracking() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let call_id = call.call_id();
assert!(phone.find_call(&call_id).is_some());
call.end().unwrap();
std::thread::sleep(Duration::from_millis(100));
assert!(phone.find_call(&call_id).is_none());
}
#[test]
fn dial_uses_advertised_addr_in_sdp() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let stun_ip: std::net::SocketAddr = "203.0.113.42:5060".parse().unwrap();
tr.set_advertised_addr(stun_ip);
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = call.local_sdp();
assert!(
sdp.contains("c=IN IP4 203.0.113.42"),
"SDP should contain STUN-mapped IP, got: {}",
sdp
);
}
#[test]
fn dial_prefers_local_ip_config_over_advertised_addr() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let stun_ip: std::net::SocketAddr = "203.0.113.42:5060".parse().unwrap();
tr.set_advertised_addr(stun_ip);
let mut cfg = test_cfg();
cfg.local_ip = "10.0.0.99".into();
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = call.local_sdp();
assert!(
sdp.contains("c=IN IP4 10.0.0.99"),
"SDP should use explicit local_ip over STUN, got: {}",
sdp
);
}
#[test]
fn dial_rtp_address_overrides_config_local_ip() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.local_ip = "10.0.0.99".into();
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let opts = crate::config::DialOptionsBuilder::new()
.rtp_address("192.168.1.50")
.build();
let call = phone.dial("sip:1002@pbx.local", opts).unwrap();
let sdp = call.local_sdp();
assert!(
sdp.contains("c=IN IP4 192.168.1.50"),
"DialOptions::rtp_address should win over Config::local_ip, got: {}",
sdp
);
}
#[test]
fn dial_codec_override_appears_in_sdp() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let opts = crate::config::DialOptionsBuilder::new()
.codec_override(vec![crate::types::Codec::G722])
.build();
let call = phone.dial("sip:1002@pbx.local", opts).unwrap();
let sdp = call.local_sdp();
let audio_line = sdp.lines().find(|l| l.starts_with("m=audio")).unwrap_or("");
assert!(
audio_line.contains(" 9 ") || audio_line.ends_with(" 9"),
"G.722 (PT 9) should be in m=audio, got: {}",
audio_line
);
assert!(
!audio_line.contains(" 0 ") && !audio_line.ends_with(" 0"),
"PCMU (PT 0) must not appear when override excludes it, got: {}",
audio_line
);
assert!(
audio_line.contains(" 101 ") || audio_line.ends_with(" 101"),
"PT 101 should be auto-injected for default Rfc4733 DTMF, got: {}",
audio_line
);
}
#[test]
fn dial_auto_injects_pt_101_when_rfc4733() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.codec_prefs = vec![crate::types::Codec::PCMU];
cfg.dtmf_mode = crate::config::DtmfMode::Rfc4733;
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = call.local_sdp();
assert!(
sdp.contains("a=rtpmap:101 telephone-event"),
"PT 101 should be auto-injected when DtmfMode is Rfc4733, got: {}",
sdp
);
}
#[test]
fn dial_does_not_inject_pt_101_for_sip_info_mode() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.codec_prefs = vec![crate::types::Codec::PCMU];
cfg.dtmf_mode = crate::config::DtmfMode::SipInfo;
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = call.local_sdp();
assert!(
!sdp.contains("telephone-event"),
"PT 101 should NOT be injected when DtmfMode is SipInfo, got: {}",
sdp
);
}
#[test]
fn dial_with_early_media_transitions_through_early_media_state() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let early_sdp = "v=0\r\no=- 0 0 IN IP4 10.0.0.1\r\ns=-\r\nc=IN IP4 10.0.0.1\r\nm=audio 20000 RTP/AVP 8\r\n";
tr.set_early_sdp(early_sdp);
tr.respond_with(200, "OK");
let (em_tx, em_rx) = crossbeam_channel::bounded(1);
phone.on_call_state(move |_call, state| {
if state == crate::types::CallState::EarlyMedia {
let _ = em_tx.try_send(());
}
});
let opts = crate::config::DialOptions {
early_media: true,
..Default::default()
};
let call = phone.dial("sip:1002@pbx.local", opts).unwrap();
assert_eq!(call.state(), crate::types::CallState::Active);
let got_early = em_rx.recv_timeout(Duration::from_secs(2)).is_ok();
assert!(got_early, "should have transitioned through EarlyMedia");
assert!(!call.remote_sdp().is_empty());
}
#[test]
fn phone_and_user_callbacks_both_fire() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (phone_tx, phone_rx) = crossbeam_channel::bounded(1);
phone.on_call_state(move |_call, state| {
if state == crate::types::CallState::Active {
let _ = phone_tx.try_send(());
}
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let (user_tx, user_rx) = crossbeam_channel::bounded(1);
call.on_state(move |state| {
if state == crate::types::CallState::Ended {
let _ = user_tx.try_send(());
}
});
let got_phone = phone_rx.recv_timeout(Duration::from_secs(2)).is_ok();
assert!(got_phone, "phone-level on_call_state should have fired");
call.end().unwrap();
let got_user = user_rx.recv_timeout(Duration::from_secs(2)).is_ok();
assert!(got_user, "user-level on_state should have fired");
}
#[test]
fn info_dtmf_fires_call_dtmf_callback() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (dtmf_tx, dtmf_rx) = crossbeam_channel::bounded(1);
phone.on_call_dtmf(move |_call, digit| {
let _ = dtmf_tx.send(digit);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let call_id = call.call_id();
tr.simulate_info_dtmf(&call_id, "5");
let digit = dtmf_rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_eq!(digit, "5");
}
#[test]
fn dtmf_mode_propagated_to_calls() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.dtmf_mode = crate::config::DtmfMode::SipInfo;
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
call.send_dtmf("3").unwrap();
}
#[test]
fn two_concurrent_outbound_calls() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call1 = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
tr.respond_with(200, "OK"); let call2 = phone
.dial("sip:1003@pbx.local", DialOptions::default())
.unwrap();
assert_ne!(call1.call_id(), call2.call_id());
assert!(phone.find_call(&call1.call_id()).is_some());
assert!(phone.find_call(&call2.call_id()).is_some());
assert_eq!(phone.calls().len(), 2);
}
#[test]
fn incoming_during_active_call_fires_callback() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_incoming(move |_call| {
let _ = tx.send(true);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call1 = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(call1.state(), crate::types::CallState::Active);
tr.simulate_invite("sip:1001@pbx.local", "sip:1003@pbx.local");
let fired = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(fired);
assert_eq!(phone.calls().len(), 2);
}
#[test]
fn bye_for_one_call_leaves_other_active() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK");
let call1 = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
tr.respond_with(200, "OK");
let call2 = phone
.dial("sip:1003@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(phone.calls().len(), 2);
call1.end().unwrap();
std::thread::sleep(Duration::from_millis(100));
assert!(phone.find_call(&call1.call_id()).is_none());
assert!(phone.find_call(&call2.call_id()).is_some());
assert_eq!(call2.state(), crate::types::CallState::Active);
assert_eq!(phone.calls().len(), 1);
}
#[test]
fn disconnect_ends_all_calls() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK");
let call1 = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
tr.respond_with(200, "OK");
let call2 = phone
.dial("sip:1003@pbx.local", DialOptions::default())
.unwrap();
phone.disconnect().unwrap();
assert_eq!(call1.state(), crate::types::CallState::Ended);
assert_eq!(call2.state(), crate::types::CallState::Ended);
assert!(phone.calls().is_empty());
}
#[test]
fn calls_returns_all_active() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
assert!(phone.calls().is_empty());
tr.respond_with(200, "OK");
phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(phone.calls().len(), 1);
tr.respond_with(200, "OK");
phone
.dial("sip:1003@pbx.local", DialOptions::default())
.unwrap();
assert_eq!(phone.calls().len(), 2);
}
#[test]
fn dialog_invite_during_active_call() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_incoming(move |call| {
let _ = tx.send(call.call_id());
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK");
let call1 = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = "v=0\r\no=- 0 0 IN IP4 10.0.0.1\r\ns=-\r\nc=IN IP4 10.0.0.1\r\nm=audio 20000 RTP/AVP 8\r\n";
tr.simulate_dialog_invite("sip:1001@pbx.local", "sip:1003@pbx.local", sdp);
let incoming_id = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_ne!(incoming_id, call1.call_id());
assert_eq!(phone.calls().len(), 2);
}
#[test]
fn call_arc_freed_after_end() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.on_call_state(|_call, _state| {});
phone.on_call_ended(|_call, _reason| {});
phone.on_call_dtmf(|_call, _digit| {});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK");
let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
call.end().unwrap();
std::thread::sleep(Duration::from_millis(200));
assert!(phone.find_call(&call.call_id()).is_none());
assert_eq!(
Arc::strong_count(&call),
1,
"Call Arc should have no other holders after end + callback cleanup"
);
}
fn mock_dlg_with_tags(
call_id: &str,
from: &str,
to: &str,
) -> Arc<crate::mock::dialog::MockDialog> {
let mut h = std::collections::HashMap::new();
h.insert("From".into(), vec![from.into()]);
h.insert("To".into(), vec![to.into()]);
let dlg = crate::mock::dialog::MockDialog::with_headers(h);
dlg.set_call_id(call_id);
Arc::new(dlg)
}
#[test]
fn attended_transfer_sends_refer_with_replaces() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let dlg_a = mock_dlg_with_tags(
"call-a-id",
"<sip:1001@pbx>;tag=alice-a",
"<sip:bob@pbx>;tag=bob-a",
);
let call_a = Call::new_outbound(dlg_a.clone(), DialOptions::default());
call_a.simulate_response(200, "OK");
let dlg_b = mock_dlg_with_tags(
"call-b-id@pbx.local",
"<sip:1001@pbx>;tag=alice-b",
"<sip:charlie@pbx>;tag=charlie-b",
);
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
call_b.hold().unwrap();
phone.attended_transfer(&call_a, &call_b).unwrap();
assert!(dlg_a.refer_sent());
let refer_target = dlg_a.last_refer_target();
assert!(
refer_target.starts_with("sip:charlie@pbx?Replaces="),
"REFER target should start with Charlie's URI: {}",
refer_target
);
assert!(
refer_target.contains("call-b-id%40pbx.local"),
"Call-ID @ should be encoded: {}",
refer_target
);
assert!(
refer_target.contains("to-tag%3Dcharlie-b"),
"remote tag (charlie) should be in to-tag: {}",
refer_target
);
assert!(
refer_target.contains("from-tag%3Dalice-b"),
"local tag (alice) should be in from-tag: {}",
refer_target
);
}
#[test]
fn attended_transfer_ends_both_on_notify_200() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let dlg_a = mock_dlg_with_tags("call-a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags(
"call-b",
"<sip:1001@pbx>;tag=a2",
"<sip:charlie@pbx>;tag=c2",
);
let call_a = Call::new_outbound(dlg_a.clone(), DialOptions::default());
call_a.simulate_response(200, "OK");
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
let (tx_a, rx_a) = crossbeam_channel::bounded(1);
let (tx_b, rx_b) = crossbeam_channel::bounded(1);
call_a.on_ended(move |r| {
let _ = tx_a.send(r);
});
call_b.on_ended(move |r| {
let _ = tx_b.send(r);
});
phone.attended_transfer(&call_a, &call_b).unwrap();
dlg_a.simulate_notify(200);
std::thread::sleep(Duration::from_millis(100));
assert_eq!(call_a.state(), CallState::Ended);
assert_eq!(call_b.state(), CallState::Ended);
assert_eq!(
rx_a.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
assert_eq!(
rx_b.recv_timeout(Duration::from_millis(200)).unwrap(),
EndReason::Transfer
);
}
#[test]
fn attended_transfer_rejects_inactive_call_a() {
let phone = Phone::new(test_cfg());
let dlg_a = Arc::new(MockDialog::new());
let dlg_b = Arc::new(MockDialog::new());
let call_a = Call::new_inbound(dlg_a); let call_b = Call::new_outbound(dlg_b, DialOptions::default());
call_b.simulate_response(200, "OK");
let result = phone.attended_transfer(&call_a, &call_b);
assert!(result.is_err());
}
#[test]
fn attended_transfer_rejects_inactive_call_b() {
let phone = Phone::new(test_cfg());
let dlg_a = Arc::new(MockDialog::new());
let dlg_b = Arc::new(MockDialog::new());
let call_a = Call::new_inbound(dlg_a);
call_a.accept().unwrap();
let call_b = Call::new_inbound(dlg_b);
let result = phone.attended_transfer(&call_a, &call_b);
assert!(result.is_err());
}
#[test]
fn attended_transfer_notify_non_200_keeps_calls_alive() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let dlg_a = mock_dlg_with_tags("call-a", "<sip:1001@pbx>;tag=a1", "<sip:bob@pbx>;tag=b1");
let dlg_b = mock_dlg_with_tags(
"call-b",
"<sip:1001@pbx>;tag=a2",
"<sip:charlie@pbx>;tag=c2",
);
let call_a = Call::new_outbound(dlg_a.clone(), DialOptions::default());
call_a.simulate_response(200, "OK");
let call_b = Call::new_outbound(dlg_b.clone(), DialOptions::default());
call_b.simulate_response(200, "OK");
phone.attended_transfer(&call_a, &call_b).unwrap();
dlg_a.simulate_notify(100);
std::thread::sleep(Duration::from_millis(50));
assert_eq!(call_a.state(), CallState::Active);
assert_eq!(call_b.state(), CallState::Active);
}
#[test]
fn mwi_subscribes_on_connect() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.voicemail_uri = Some("sip:*97@pbx.local".into());
let phone = Phone::new(cfg);
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
std::thread::sleep(Duration::from_millis(300));
assert!(
tr.count_sent("SUBSCRIBE") >= 1,
"expected at least 1 SUBSCRIBE, got {}",
tr.count_sent("SUBSCRIBE")
);
phone.disconnect().unwrap();
}
#[test]
fn mwi_fires_on_voicemail_callback() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let mut cfg = test_cfg();
cfg.voicemail_uri = Some("sip:*97@pbx.local".into());
let phone = Phone::new(cfg);
let (tx, rx) = crossbeam_channel::bounded(1);
phone.on_voicemail(move |status| {
let _ = tx.send(status);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
std::thread::sleep(Duration::from_millis(200));
tr.simulate_mwi_notify("Messages-Waiting: yes\r\nVoice-Message: 2/4\r\n");
let status = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(status.messages_waiting);
assert_eq!(status.voice, (2, 4));
phone.disconnect().unwrap();
}
#[test]
fn no_mwi_without_voicemail_uri() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg()); phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
std::thread::sleep(Duration::from_millis(200));
assert_eq!(tr.count_sent("SUBSCRIBE"), 0);
phone.disconnect().unwrap();
}
#[test]
fn send_message_before_connect_returns_error() {
let phone = Phone::new(test_cfg());
let result = phone.send_message("sip:1002@pbx.local", "Hello");
assert!(result.is_err());
}
#[test]
fn send_message_sends_via_transport() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
phone.send_message("sip:1002@pbx.local", "Hello!").unwrap();
assert_eq!(tr.count_sent("MESSAGE"), 1);
phone.disconnect().unwrap();
}
#[test]
fn on_message_fires_on_incoming() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
let received = Arc::new(Mutex::new(None));
let received_clone = Arc::clone(&received);
phone.on_message(move |msg| {
*received_clone.lock() = Some(msg);
});
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.simulate_message("sip:1001@pbx.local", "text/plain", "Hi there");
std::thread::sleep(Duration::from_millis(100));
let msg = received.lock().clone().unwrap();
assert_eq!(msg.from, "sip:1001@pbx.local");
assert_eq!(msg.body, "Hi there");
assert_eq!(msg.content_type, "text/plain");
phone.disconnect().unwrap();
}
#[test]
fn watch_before_connect_errors() {
let phone = Phone::new(test_cfg());
let result = phone.watch("1001", |_, _| {});
assert!(result.is_err());
}
#[test]
fn watch_fires_callback_on_notify() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let (tx, rx) = crossbeam_channel::bounded(1);
phone
.watch("1002", move |status, prev| {
let _ = tx.send((status, prev));
})
.unwrap();
std::thread::sleep(Duration::from_millis(300));
tr.simulate_subscription_notify(
"dialog",
"application/dialog-info+xml",
r#"<?xml version="1.0"?>
<dialog-info xmlns="urn:ietf:params:xml:ns:dialog-info"
version="1" state="full" entity="sip:1002@test">
<dialog id="d1"><state>confirmed</state></dialog>
</dialog-info>"#,
"active;expires=600",
"sip:1002@test",
);
let (status, prev) = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_eq!(status.extension, "1002");
assert_eq!(status.state, ExtensionState::OnThePhone);
assert!(prev.is_none() || prev == Some(ExtensionState::Unknown));
phone.disconnect().unwrap();
}
#[test]
fn watch_duplicate_suppression() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let (tx, rx) = crossbeam_channel::bounded(10);
phone
.watch("1002", move |status, _| {
let _ = tx.send(status.state);
})
.unwrap();
std::thread::sleep(Duration::from_millis(300));
let confirmed_xml = r#"<?xml version="1.0"?>
<dialog-info xmlns="urn:ietf:params:xml:ns:dialog-info"
version="1" state="full" entity="sip:1002@test">
<dialog id="d1"><state>confirmed</state></dialog>
</dialog-info>"#;
tr.simulate_subscription_notify(
"dialog",
"application/dialog-info+xml",
confirmed_xml,
"active;expires=600",
"sip:1002@test",
);
std::thread::sleep(Duration::from_millis(100));
tr.simulate_subscription_notify(
"dialog",
"application/dialog-info+xml",
confirmed_xml,
"active;expires=600",
"sip:1002@test",
);
std::thread::sleep(Duration::from_millis(100));
let _first = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let second = rx.recv_timeout(Duration::from_millis(500));
assert!(second.is_err(), "duplicate should be suppressed");
phone.disconnect().unwrap();
}
#[test]
fn unwatch_removes_subscription() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
phone.watch("1002", |_, _| {}).unwrap();
std::thread::sleep(Duration::from_millis(300));
phone.unwatch("1002").unwrap();
std::thread::sleep(Duration::from_millis(200));
assert!(tr.count_sent("SUBSCRIBE") >= 2);
phone.disconnect().unwrap();
}
#[test]
fn subscribe_event_returns_id() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK"); tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
let id = phone
.subscribe_event(
"sip:1002@test",
"dialog",
"application/dialog-info+xml",
|_| {},
)
.unwrap();
assert!(id > 0);
std::thread::sleep(Duration::from_millis(200));
phone.unsubscribe_event(id).unwrap();
phone.disconnect().unwrap();
}
#[test]
fn dial_with_video_builds_video_sdp() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let opts = DialOptions {
video: true,
..DialOptions::default()
};
let call = phone.dial("sip:1002@pbx.local", opts).unwrap();
assert_eq!(call.state(), crate::types::CallState::Active);
let sdp = call.local_sdp();
assert!(sdp.contains("m=video"), "SDP should contain video m= line");
assert!(
sdp.contains("m=audio"),
"SDP should still contain audio m= line"
);
}
#[test]
fn dial_without_video_no_video_sdp() {
let tr = Arc::new(MockTransport::new());
tr.respond_with(200, "OK");
let phone = Phone::new(test_cfg());
phone.connect_with_transport(Arc::clone(&tr) as Arc<dyn SipTransport>);
tr.respond_with(200, "OK"); let call = phone
.dial("sip:1002@pbx.local", DialOptions::default())
.unwrap();
let sdp = call.local_sdp();
assert!(
!sdp.contains("m=video"),
"SDP should not contain video m= line"
);
}
}