use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use crate::error::{Error, Result};
use crate::types::*;
#[allow(clippy::type_complexity)]
struct Inner {
id: String,
call_id: String,
state: CallState,
direction: Direction,
from: String,
to: String,
from_name: String,
remote_uri: String,
remote_ip: String,
remote_port: i32,
codec: Codec,
local_sdp: String,
remote_sdp: String,
start_time: Option<Instant>,
muted: bool,
video_muted: bool,
video_codec: Option<VideoCodec>,
sent_dtmf: Vec<String>,
transfer_to: String,
headers: HashMap<String, Vec<String>>,
on_dtmf_fn: Vec<Arc<dyn Fn(String) + Send + Sync>>,
on_hold_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_resume_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_mute_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_unmute_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_media_fn: Vec<Arc<dyn Fn() + Send + Sync>>,
on_state_fn: Vec<Arc<dyn Fn(CallState) + Send + Sync>>,
on_ended_fn: Vec<Arc<dyn Fn(EndReason) + Send + Sync>>,
}
fn mock_call_id() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static CTR: AtomicU64 = AtomicU64::new(0);
format!("mock-{}", CTR.fetch_add(1, Ordering::Relaxed))
}
fn mock_call_call_id() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static CTR: AtomicU64 = AtomicU64::new(1);
format!("mock-call-{}", CTR.fetch_add(1, Ordering::Relaxed))
}
pub struct MockCall {
inner: Mutex<Inner>,
}
impl std::fmt::Debug for MockCall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.lock();
f.debug_struct("MockCall")
.field("id", &inner.id)
.field("state", &inner.state)
.field("direction", &inner.direction)
.finish()
}
}
impl MockCall {
pub fn new() -> Self {
Self {
inner: Mutex::new(Inner {
id: mock_call_id(),
call_id: mock_call_call_id(),
state: CallState::Ringing,
direction: Direction::Inbound,
from: String::new(),
to: String::new(),
from_name: String::new(),
remote_uri: String::new(),
remote_ip: String::new(),
remote_port: 0,
codec: Codec::PCMU,
local_sdp: String::new(),
remote_sdp: String::new(),
start_time: None,
muted: false,
video_muted: false,
video_codec: None,
sent_dtmf: Vec::new(),
transfer_to: String::new(),
headers: HashMap::new(),
on_dtmf_fn: Vec::new(),
on_hold_fn: Vec::new(),
on_resume_fn: Vec::new(),
on_mute_fn: Vec::new(),
on_unmute_fn: Vec::new(),
on_media_fn: Vec::new(),
on_state_fn: Vec::new(),
on_ended_fn: Vec::new(),
}),
}
}
pub fn id(&self) -> String {
self.inner.lock().id.clone()
}
pub fn call_id(&self) -> String {
self.inner.lock().call_id.clone()
}
pub fn state(&self) -> CallState {
self.inner.lock().state
}
pub fn direction(&self) -> Direction {
self.inner.lock().direction
}
pub fn from(&self) -> String {
self.inner.lock().from.clone()
}
pub fn to(&self) -> String {
self.inner.lock().to.clone()
}
pub fn from_name(&self) -> String {
self.inner.lock().from_name.clone()
}
pub fn remote_uri(&self) -> String {
self.inner.lock().remote_uri.clone()
}
pub fn remote_ip(&self) -> String {
self.inner.lock().remote_ip.clone()
}
pub fn remote_port(&self) -> i32 {
self.inner.lock().remote_port
}
pub fn codec(&self) -> Codec {
self.inner.lock().codec
}
pub fn local_sdp(&self) -> String {
self.inner.lock().local_sdp.clone()
}
pub fn remote_sdp(&self) -> String {
self.inner.lock().remote_sdp.clone()
}
pub fn start_time(&self) -> Option<Instant> {
self.inner.lock().start_time
}
pub fn duration(&self) -> Duration {
self.inner
.lock()
.start_time
.map(|t| t.elapsed())
.unwrap_or(Duration::ZERO)
}
pub fn header(&self, name: &str) -> Vec<String> {
let inner = self.inner.lock();
let lower = name.to_lowercase();
for (k, v) in &inner.headers {
if k.to_lowercase() == lower {
return v.clone();
}
}
Vec::new()
}
pub fn headers(&self) -> HashMap<String, Vec<String>> {
self.inner.lock().headers.clone()
}
pub fn accept(&self) -> Result<()> {
let cbs = {
let mut inner = self.inner.lock();
if inner.state != CallState::Ringing {
return Err(Error::InvalidState);
}
inner.state = CallState::Active;
inner.start_time = Some(Instant::now());
inner.on_state_fn.clone()
};
for f in cbs {
f(CallState::Active);
}
Ok(())
}
pub fn reject(&self, _code: u16, _reason: &str) -> Result<()> {
let (state_cbs, ended_cbs) = {
let mut inner = self.inner.lock();
if inner.state != CallState::Ringing {
return Err(Error::InvalidState);
}
inner.state = CallState::Ended;
let cbs = (inner.on_state_fn.clone(), inner.on_ended_fn.clone());
Self::clear_callbacks(&mut inner);
cbs
};
for f in state_cbs {
f(CallState::Ended);
}
for f in ended_cbs {
f(EndReason::Rejected);
}
Ok(())
}
pub fn end(&self) -> Result<()> {
let (reason, state_cbs, ended_cbs) = {
let mut inner = self.inner.lock();
let reason = match inner.state {
CallState::Dialing | CallState::RemoteRinging | CallState::EarlyMedia => {
EndReason::Cancelled
}
CallState::Active | CallState::OnHold => EndReason::Local,
_ => return Err(Error::InvalidState),
};
inner.state = CallState::Ended;
let cbs = (reason, inner.on_state_fn.clone(), inner.on_ended_fn.clone());
Self::clear_callbacks(&mut inner);
cbs
};
for f in state_cbs {
f(CallState::Ended);
}
for f in ended_cbs {
f(reason);
}
Ok(())
}
pub fn hold(&self) -> Result<()> {
let (state_cbs, hold_cbs) = {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
inner.state = CallState::OnHold;
(inner.on_state_fn.clone(), inner.on_hold_fn.clone())
};
for f in state_cbs {
f(CallState::OnHold);
}
for f in hold_cbs {
f();
}
Ok(())
}
pub fn resume(&self) -> Result<()> {
let (state_cbs, resume_cbs) = {
let mut inner = self.inner.lock();
if inner.state != CallState::OnHold {
return Err(Error::InvalidState);
}
inner.state = CallState::Active;
(inner.on_state_fn.clone(), inner.on_resume_fn.clone())
};
for f in state_cbs {
f(CallState::Active);
}
for f in resume_cbs {
f();
}
Ok(())
}
pub fn mute(&self) -> Result<()> {
let cbs = {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.muted {
return Err(Error::AlreadyMuted);
}
inner.muted = true;
inner.on_mute_fn.clone()
};
for f in cbs {
f();
}
Ok(())
}
pub fn unmute(&self) -> Result<()> {
let cbs = {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if !inner.muted {
return Err(Error::NotMuted);
}
inner.muted = false;
inner.on_unmute_fn.clone()
};
for f in cbs {
f();
}
Ok(())
}
pub fn send_dtmf(&self, digit: &str) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
inner.sent_dtmf.push(digit.into());
Ok(())
}
pub fn blind_transfer(&self, target: &str) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active && inner.state != CallState::OnHold {
return Err(Error::InvalidState);
}
inner.transfer_to = target.into();
Ok(())
}
pub fn has_video(&self) -> bool {
self.inner.lock().video_codec.is_some()
}
pub fn video_codec(&self) -> Option<VideoCodec> {
self.inner.lock().video_codec
}
pub fn mute_video(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
if inner.video_muted {
return Err(Error::VideoAlreadyMuted);
}
inner.video_muted = true;
Ok(())
}
pub fn unmute_video(&self) -> Result<()> {
let mut inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
if !inner.video_muted {
return Err(Error::VideoNotMuted);
}
inner.video_muted = false;
Ok(())
}
pub fn request_keyframe(&self) -> Result<()> {
let inner = self.inner.lock();
if inner.state != CallState::Active {
return Err(Error::InvalidState);
}
if inner.video_codec.is_none() {
return Err(Error::NoVideoStream);
}
Ok(())
}
pub fn dialog_id(&self) -> (String, String, String) {
let from_tag = self
.header("From")
.first()
.map(|v| crate::call::sip_header_tag(v).to_string())
.unwrap_or_default();
let to_tag = self
.header("To")
.first()
.map(|v| crate::call::sip_header_tag(v).to_string())
.unwrap_or_default();
let inner = self.inner.lock();
match inner.direction {
Direction::Outbound => (inner.call_id.clone(), from_tag, to_tag),
Direction::Inbound => (inner.call_id.clone(), to_tag, from_tag),
}
}
pub fn end_with_reason(&self, reason: EndReason) {
let (state_cbs, ended_cbs) = {
let mut inner = self.inner.lock();
if inner.state == CallState::Ended {
return;
}
inner.state = CallState::Ended;
let cbs = (inner.on_state_fn.clone(), inner.on_ended_fn.clone());
Self::clear_callbacks(&mut inner);
cbs
};
for f in state_cbs {
f(CallState::Ended);
}
for f in ended_cbs {
f(reason);
}
}
fn clear_callbacks(inner: &mut Inner) {
inner.on_state_fn.clear();
inner.on_ended_fn.clear();
inner.on_dtmf_fn.clear();
inner.on_hold_fn.clear();
inner.on_resume_fn.clear();
inner.on_mute_fn.clear();
inner.on_unmute_fn.clear();
inner.on_media_fn.clear();
}
pub fn on_dtmf(&self, f: impl Fn(String) + Send + Sync + 'static) {
self.inner.lock().on_dtmf_fn.push(Arc::new(f));
}
pub fn on_hold(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_hold_fn.push(Arc::new(f));
}
pub fn on_resume(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_resume_fn.push(Arc::new(f));
}
pub fn on_mute(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_mute_fn.push(Arc::new(f));
}
pub fn on_unmute(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_unmute_fn.push(Arc::new(f));
}
pub fn on_media(&self, f: impl Fn() + Send + Sync + 'static) {
self.inner.lock().on_media_fn.push(Arc::new(f));
}
pub fn on_state(&self, f: impl Fn(CallState) + Send + Sync + 'static) {
self.inner.lock().on_state_fn.push(Arc::new(f));
}
pub fn on_ended(&self, f: impl Fn(EndReason) + Send + Sync + 'static) {
self.inner.lock().on_ended_fn.push(Arc::new(f));
}
pub fn set_state(&self, s: CallState) {
self.inner.lock().state = s;
}
pub fn set_direction(&self, d: Direction) {
self.inner.lock().direction = d;
}
pub fn set_from(&self, from: &str) {
self.inner.lock().from = from.into();
}
pub fn set_to(&self, to: &str) {
self.inner.lock().to = to.into();
}
pub fn set_from_name(&self, name: &str) {
self.inner.lock().from_name = name.into();
}
pub fn set_remote_uri(&self, uri: &str) {
self.inner.lock().remote_uri = uri.into();
}
pub fn set_remote_ip(&self, ip: &str) {
self.inner.lock().remote_ip = ip.into();
}
pub fn set_remote_port(&self, port: i32) {
self.inner.lock().remote_port = port;
}
pub fn set_codec(&self, codec: Codec) {
self.inner.lock().codec = codec;
}
pub fn set_local_sdp(&self, sdp: &str) {
self.inner.lock().local_sdp = sdp.into();
}
pub fn set_remote_sdp(&self, sdp: &str) {
self.inner.lock().remote_sdp = sdp.into();
}
pub fn set_video_codec(&self, codec: VideoCodec) {
self.inner.lock().video_codec = Some(codec);
}
pub fn set_header(&self, name: &str, value: &str) {
self.inner
.lock()
.headers
.insert(name.into(), vec![value.into()]);
}
pub fn muted(&self) -> bool {
self.inner.lock().muted
}
pub fn video_muted(&self) -> bool {
self.inner.lock().video_muted
}
pub fn sent_dtmf(&self) -> Vec<String> {
self.inner.lock().sent_dtmf.clone()
}
pub fn last_transfer_target(&self) -> String {
self.inner.lock().transfer_to.clone()
}
pub fn simulate_dtmf(&self, digit: &str) {
let cbs = self.inner.lock().on_dtmf_fn.clone();
for f in cbs {
f(digit.to_string());
}
}
}
impl Default for MockCall {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn new_mock_call_defaults() {
let c = MockCall::new();
assert_eq!(c.state(), CallState::Ringing);
assert_eq!(c.direction(), Direction::Inbound);
assert!(c.call_id().starts_with("mock-call-"));
}
#[test]
fn accept_transitions_to_active() {
let c = MockCall::new();
c.accept().unwrap();
assert_eq!(c.state(), CallState::Active);
assert!(c.start_time().is_some());
}
#[test]
fn reject_transitions_to_ended() {
let c = MockCall::new();
c.reject(486, "Busy Here").unwrap();
assert_eq!(c.state(), CallState::Ended);
}
#[test]
fn end_active_call() {
let c = MockCall::new();
c.accept().unwrap();
c.end().unwrap();
assert_eq!(c.state(), CallState::Ended);
}
#[test]
fn end_ringing_returns_error() {
let c = MockCall::new();
assert!(c.end().is_err());
}
#[test]
fn hold_and_resume() {
let c = MockCall::new();
c.accept().unwrap();
c.hold().unwrap();
assert_eq!(c.state(), CallState::OnHold);
c.resume().unwrap();
assert_eq!(c.state(), CallState::Active);
}
#[test]
fn mute_unmute() {
let c = MockCall::new();
c.accept().unwrap();
assert!(!c.muted());
c.mute().unwrap();
assert!(c.muted());
c.unmute().unwrap();
assert!(!c.muted());
}
#[test]
fn send_dtmf_records() {
let c = MockCall::new();
c.accept().unwrap();
c.send_dtmf("1").unwrap();
c.send_dtmf("2").unwrap();
assert_eq!(c.sent_dtmf(), vec!["1", "2"]);
}
#[test]
fn blind_transfer_records() {
let c = MockCall::new();
c.accept().unwrap();
c.blind_transfer("sip:1003@pbx.local").unwrap();
assert_eq!(c.last_transfer_target(), "sip:1003@pbx.local");
}
#[test]
fn setters_work() {
let c = MockCall::new();
c.set_remote_uri("sip:1001@host");
c.set_remote_ip("10.0.0.1");
c.set_remote_port(5060);
c.set_codec(Codec::PCMA);
c.set_local_sdp("v=0...");
c.set_remote_sdp("v=0...");
c.set_header("From", "<sip:1001@host>");
assert_eq!(c.remote_uri(), "sip:1001@host");
assert_eq!(c.remote_ip(), "10.0.0.1");
assert_eq!(c.remote_port(), 5060);
assert_eq!(c.codec(), Codec::PCMA);
assert_eq!(c.local_sdp(), "v=0...");
assert_eq!(c.remote_sdp(), "v=0...");
assert_eq!(c.header("from"), vec!["<sip:1001@host>"]);
}
#[test]
fn on_state_fires() {
let c = MockCall::new();
let fired = Arc::new(AtomicBool::new(false));
let fired_clone = Arc::clone(&fired);
c.on_state(move |s| {
if s == CallState::Active {
fired_clone.store(true, Ordering::Relaxed);
}
});
c.accept().unwrap();
assert!(fired.load(Ordering::Relaxed));
}
#[test]
fn on_ended_fires() {
let c = MockCall::new();
c.accept().unwrap();
let fired = Arc::new(AtomicBool::new(false));
let fired_clone = Arc::clone(&fired);
c.on_ended(move |r| {
if r == EndReason::Local {
fired_clone.store(true, Ordering::Relaxed);
}
});
c.end().unwrap();
assert!(fired.load(Ordering::Relaxed));
}
#[test]
fn simulate_dtmf_fires_callback() {
let c = MockCall::new();
let received = Arc::new(Mutex::new(String::new()));
let received_clone = Arc::clone(&received);
c.on_dtmf(move |d| {
*received_clone.lock() = d;
});
c.simulate_dtmf("5");
assert_eq!(*received.lock(), "5");
}
#[test]
fn end_dialing_gives_cancelled() {
let c = MockCall::new();
c.set_state(CallState::Dialing);
let reason = Arc::new(Mutex::new(None));
let reason_clone = Arc::clone(&reason);
c.on_ended(move |r| {
*reason_clone.lock() = Some(r);
});
c.end().unwrap();
assert_eq!(*reason.lock(), Some(EndReason::Cancelled));
}
#[test]
fn on_hold_callback_fires() {
let c = MockCall::new();
c.accept().unwrap();
let fired = Arc::new(AtomicBool::new(false));
let fired_clone = Arc::clone(&fired);
c.on_hold(move || {
fired_clone.store(true, Ordering::Relaxed);
});
c.hold().unwrap();
assert!(fired.load(Ordering::Relaxed));
}
#[test]
fn on_mute_callback_fires() {
let c = MockCall::new();
c.accept().unwrap();
let fired = Arc::new(AtomicBool::new(false));
let fired_clone = Arc::clone(&fired);
c.on_mute(move || {
fired_clone.store(true, Ordering::Relaxed);
});
c.mute().unwrap();
assert!(fired.load(Ordering::Relaxed));
}
#[test]
fn duration_zero_before_active() {
let c = MockCall::new();
assert_eq!(c.duration(), Duration::ZERO);
}
#[test]
fn duration_grows_after_accept() {
let c = MockCall::new();
c.accept().unwrap();
std::thread::sleep(Duration::from_millis(10));
assert!(c.duration() >= Duration::from_millis(10));
}
#[test]
fn callback_can_query_state() {
let c = Arc::new(MockCall::new());
let c2 = Arc::clone(&c);
let state = Arc::new(Mutex::new(CallState::Ringing));
let state_clone = Arc::clone(&state);
c.on_state(move |_| {
*state_clone.lock() = c2.state();
});
c.accept().unwrap();
assert_eq!(*state.lock(), CallState::Active);
}
#[test]
fn dialog_id_outbound_from_headers() {
let c = MockCall::new();
c.set_direction(Direction::Outbound);
c.set_header("From", "<sip:1001@host>;tag=local1");
c.set_header("To", "<sip:1002@host>;tag=remote2");
let (cid, local, remote) = c.dialog_id();
assert!(!cid.is_empty());
assert_eq!(local, "local1");
assert_eq!(remote, "remote2");
}
#[test]
fn dialog_id_inbound_swaps_tags() {
let c = MockCall::new();
c.set_direction(Direction::Inbound);
c.set_header("From", "<sip:1001@host>;tag=remote1");
c.set_header("To", "<sip:1002@host>;tag=local2");
let (_, local, remote) = c.dialog_id();
assert_eq!(local, "local2");
assert_eq!(remote, "remote1");
}
#[test]
fn has_video_default_false() {
let c = MockCall::new();
assert!(!c.has_video());
assert_eq!(c.video_codec(), None);
}
#[test]
fn set_video_codec_enables_video() {
let c = MockCall::new();
c.set_video_codec(VideoCodec::H264);
assert!(c.has_video());
assert_eq!(c.video_codec(), Some(VideoCodec::H264));
}
#[test]
fn mute_video_requires_active_and_video() {
let c = MockCall::new();
assert!(matches!(c.mute_video(), Err(Error::InvalidState)));
c.accept().unwrap();
assert!(matches!(c.mute_video(), Err(Error::NoVideoStream)));
c.set_video_codec(VideoCodec::VP8);
c.mute_video().unwrap();
assert!(c.video_muted());
assert!(matches!(c.mute_video(), Err(Error::VideoAlreadyMuted)));
}
#[test]
fn unmute_video_requires_muted() {
let c = MockCall::new();
c.accept().unwrap();
c.set_video_codec(VideoCodec::H264);
assert!(matches!(c.unmute_video(), Err(Error::VideoNotMuted)));
c.mute_video().unwrap();
c.unmute_video().unwrap();
assert!(!c.video_muted());
}
#[test]
fn request_keyframe_mock() {
let c = MockCall::new();
c.accept().unwrap();
assert!(matches!(c.request_keyframe(), Err(Error::NoVideoStream)));
c.set_video_codec(VideoCodec::H264);
c.request_keyframe().unwrap();
}
#[test]
fn end_with_reason_fires_callback() {
let c = MockCall::new();
c.accept().unwrap();
let reason = Arc::new(Mutex::new(None));
let reason_clone = Arc::clone(&reason);
c.on_ended(move |r| {
*reason_clone.lock() = Some(r);
});
c.end_with_reason(EndReason::Transfer);
assert_eq!(c.state(), CallState::Ended);
assert_eq!(*reason.lock(), Some(EndReason::Transfer));
}
#[test]
fn multiple_on_ended_callbacks_all_fire() {
let c = MockCall::new();
c.accept().unwrap();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = Arc::clone(&count);
c.on_ended(move |_| {
c1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let c2 = Arc::clone(&count);
c.on_ended(move |_| {
c2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
c.end().unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[test]
fn multiple_on_state_callbacks_all_fire() {
let c = MockCall::new();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = Arc::clone(&count);
c.on_state(move |_| {
c1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
let c2 = Arc::clone(&count);
c.on_state(move |_| {
c2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
c.accept().unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 2);
}
}