use std::sync::Arc;
use std::time::Duration;
use rsip::prelude::HasHeaders;
use rsip::Method;
use tokio::sync::{mpsc, Notify};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use wavekat_sip::re_exports::{DialogState, SipAddr, StatusCode, TerminatedReason};
use wavekat_sip::{
build_sdp, session_expires_in, session_timer_loop, supported_timer_header, supports_timer,
Callee, Caller, Refresher, SessionDialogOps, SessionExpires, SessionTimer, SessionTimerOutcome,
SipAccount, SipEndpoint, Transport, UasSessionTimer, DEFAULT_SESSION_EXPIRES_SECS,
};
fn account(name: &str) -> SipAccount {
SipAccount {
display_name: name.to_string(),
username: name.to_string(),
password: "secret".to_string(),
domain: "127.0.0.1".to_string(),
auth_username: None,
server: Some("127.0.0.1".to_string()),
port: Some(5060),
transport: Transport::Udp,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn session_timer_negotiates_and_refreshes_over_loopback() {
let cancel = CancellationToken::new();
let (callee_ep, mut callee_rx) = SipEndpoint::new(&account("bob"), cancel.clone())
.await
.expect("bind callee endpoint");
let (caller_ep, mut caller_rx) = SipEndpoint::new(&account("alice"), cancel.clone())
.await
.expect("bind caller endpoint");
let callee_ep = Arc::new(callee_ep);
let caller_ep = Arc::new(caller_ep);
let callee_addr = callee_ep.local_addr().expect("callee bound");
let bob_local_ip = callee_ep.local_ip();
let (bob_timer_tx, mut bob_timer_rx) = mpsc::unbounded_channel::<Option<UasSessionTimer>>();
let (bob_refresh_tx, mut bob_refresh_rx) = mpsc::unbounded_channel::<Option<SessionExpires>>();
let (bob_term_tx, mut bob_term_rx) = mpsc::unbounded_channel::<TerminatedReason>();
let callee = Callee::new(account("bob"), callee_ep.clone());
let bob_ep_for_dispatch = callee_ep.clone();
tokio::spawn(async move {
let mut accepted_hold = None;
while let Some(tx) = callee_rx.recv().await {
match tx.original.method {
Method::Invite if accepted_hold.is_none() => {
let pending = callee.handle_pending(tx).await.expect("handle_pending");
let _ = bob_timer_tx.send(pending.session_timer);
let uas = pending.session_timer;
let mut accepted = pending.accept().await.expect("Bob accept");
let answer = build_sdp(bob_local_ip, accepted.local_rtp_addr.port());
let mut srx =
std::mem::replace(&mut accepted.state_rx, mpsc::unbounded_channel().1);
let refresh_tx = bob_refresh_tx.clone();
let term_tx = bob_term_tx.clone();
tokio::spawn(async move {
while let Some(state) = srx.recv().await {
match state {
DialogState::Updated(_, request, handle) => {
let echo = uas.expect("timer was negotiated").echo;
handle
.respond(
StatusCode::OK,
Some(vec![
rsip::Header::ContentType("application/sdp".into()),
supported_timer_header(),
echo.header(),
]),
Some(answer.clone()),
)
.await
.expect("respond to refresh");
let _ = refresh_tx.send(session_expires_in(request.headers()));
}
DialogState::Terminated(_, reason) => {
let _ = term_tx.send(reason);
}
_ => {}
}
}
});
accepted_hold = Some(accepted);
}
_ => {
let _ = bob_ep_for_dispatch.dispatch_in_dialog(tx).await;
}
}
}
drop(accepted_hold);
});
tokio::spawn(async move { while caller_rx.recv().await.is_some() {} });
let caller = Caller::new(account("alice"), caller_ep.clone());
let target: rsip::Uri = format!("sip:bob@{callee_addr}")
.try_into()
.expect("valid target");
let destination: SipAddr = callee_addr.into();
let pending = caller
.dial_with_destination(target, Some(destination))
.await
.expect("dial");
let accepted = timeout(Duration::from_secs(5), pending.on_confirmed())
.await
.expect("on_confirmed didn't resolve within 5s")
.expect("on_confirmed failed");
let bob_timer = timeout(Duration::from_secs(5), bob_timer_rx.recv())
.await
.expect("Bob never reported his session timer")
.expect("channel closed")
.expect("Bob should have negotiated a session timer");
assert_eq!(bob_timer.timer.interval_secs, DEFAULT_SESSION_EXPIRES_SECS);
assert!(
!bob_timer.timer.we_are_refresher,
"Bob (UAS) should rely on Alice to refresh"
);
assert_eq!(bob_timer.echo.refresher, Some(Refresher::Uac));
assert!(bob_timer.require_timer);
assert_eq!(
accepted.session_timer,
Some(SessionTimer {
interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
we_are_refresher: true,
}),
"Alice should know she is the refresher for the granted interval"
);
let refresh_headers = vec![
supported_timer_header(),
SessionExpires {
interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
refresher: Some(Refresher::Uac),
}
.header(),
];
let offer = build_sdp(caller_ep.local_ip(), accepted.local_rtp_addr.port());
let resp = timeout(
Duration::from_secs(5),
accepted.dialog.refresh(refresh_headers, Some(offer)),
)
.await
.expect("refresh timed out")
.expect("refresh failed")
.expect("dialog should still be confirmed");
assert_eq!(
resp.status_code.kind(),
rsip::StatusCodeKind::Successful,
"refresh re-INVITE should get a 2xx, got {}",
resp.status_code
);
assert_eq!(
session_expires_in(&resp.headers),
Some(bob_timer.echo),
"Bob's 200 to the refresh should echo the negotiated Session-Expires"
);
let seen = timeout(Duration::from_secs(5), bob_refresh_rx.recv())
.await
.expect("Bob never observed the refresh re-INVITE")
.expect("channel closed");
assert_eq!(
seen,
Some(SessionExpires {
interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
refresher: Some(Refresher::Uac),
})
);
accepted.dialog.bye().await.expect("alice BYE");
let reason = timeout(Duration::from_secs(5), bob_term_rx.recv())
.await
.expect("Bob never saw termination")
.expect("channel closed");
assert!(
matches!(reason, TerminatedReason::UacBye),
"expected Terminated(UacBye), got {reason:?}"
);
callee_ep.shutdown();
caller_ep.shutdown();
cancel.cancel();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn outbound_invite_carries_session_timer_headers() {
let cancel = CancellationToken::new();
let (callee_ep, mut callee_rx) = SipEndpoint::new(&account("bob"), cancel.clone())
.await
.expect("bind callee endpoint");
let (caller_ep, mut caller_rx) = SipEndpoint::new(&account("alice"), cancel.clone())
.await
.expect("bind caller endpoint");
let callee_ep = Arc::new(callee_ep);
let caller_ep = Arc::new(caller_ep);
let callee_addr = callee_ep.local_addr().expect("callee bound");
let (invite_tx, mut invite_rx) = mpsc::unbounded_channel::<(bool, Option<SessionExpires>)>();
tokio::spawn(async move {
while let Some(tx) = callee_rx.recv().await {
if tx.original.method == Method::Invite {
let headers = tx.original.headers();
let _ = invite_tx.send((supports_timer(headers), session_expires_in(headers)));
}
}
});
tokio::spawn(async move { while caller_rx.recv().await.is_some() {} });
let caller = Caller::new(account("alice"), caller_ep.clone());
let target: rsip::Uri = format!("sip:bob@{callee_addr}")
.try_into()
.expect("valid target");
let destination: SipAddr = callee_addr.into();
let pending = caller
.dial_with_destination(target, Some(destination))
.await
.expect("dial");
let (timer_supported, session_expires) = timeout(Duration::from_secs(5), invite_rx.recv())
.await
.expect("Bob never received the INVITE")
.expect("channel closed");
assert!(timer_supported, "INVITE must carry Supported: timer");
assert_eq!(
session_expires,
Some(SessionExpires {
interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
refresher: None,
}),
"INVITE must request the default interval, refresher unpinned"
);
pending.cancel().await.ok();
callee_ep.shutdown();
caller_ep.shutdown();
cancel.cancel();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "watchdog expiry needs ~60s of wall clock (90s interval minus 30s headroom)"]
async fn watchdog_tears_down_lapsed_session() {
let cancel = CancellationToken::new();
let (callee_ep, mut callee_rx) = SipEndpoint::new(&account("bob"), cancel.clone())
.await
.expect("bind callee endpoint");
let (caller_ep, mut caller_rx) = SipEndpoint::new(&account("alice"), cancel.clone())
.await
.expect("bind caller endpoint");
let callee_ep = Arc::new(callee_ep);
let caller_ep = Arc::new(caller_ep);
let callee_addr = callee_ep.local_addr().expect("callee bound");
let (bob_term_tx, mut bob_term_rx) = mpsc::unbounded_channel::<TerminatedReason>();
let callee = Callee::new(account("bob"), callee_ep.clone());
let bob_ep_for_dispatch = callee_ep.clone();
tokio::spawn(async move {
let mut accepted_hold = None;
while let Some(tx) = callee_rx.recv().await {
match tx.original.method {
Method::Invite if accepted_hold.is_none() => {
let mut accepted = callee
.accept_transaction(tx)
.await
.expect("accept_transaction");
let mut srx =
std::mem::replace(&mut accepted.state_rx, mpsc::unbounded_channel().1);
let term_tx = bob_term_tx.clone();
tokio::spawn(async move {
while let Some(state) = srx.recv().await {
if let DialogState::Terminated(_, reason) = state {
let _ = term_tx.send(reason);
}
}
});
accepted_hold = Some(accepted);
}
_ => {
let _ = bob_ep_for_dispatch.dispatch_in_dialog(tx).await;
}
}
}
drop(accepted_hold);
});
tokio::spawn(async move { while caller_rx.recv().await.is_some() {} });
let caller = Caller::new(account("alice"), caller_ep.clone());
let target: rsip::Uri = format!("sip:bob@{callee_addr}")
.try_into()
.expect("valid target");
let destination: SipAddr = callee_addr.into();
let pending = caller
.dial_with_destination(target, Some(destination))
.await
.expect("dial");
let accepted = timeout(Duration::from_secs(5), pending.on_confirmed())
.await
.expect("on_confirmed didn't resolve within 5s")
.expect("on_confirmed failed");
let watchdog = SessionTimer {
interval_secs: 90,
we_are_refresher: false,
};
let outcome = timeout(
Duration::from_secs(80),
session_timer_loop(
&accepted.dialog,
watchdog,
None,
Arc::new(Notify::new()),
cancel.clone(),
),
)
.await
.expect("watchdog never fired");
assert_eq!(outcome, SessionTimerOutcome::Expired);
let reason = timeout(Duration::from_secs(5), bob_term_rx.recv())
.await
.expect("Bob never saw the watchdog BYE")
.expect("channel closed");
assert!(
matches!(reason, TerminatedReason::UacBye),
"expected Terminated(UacBye) from the watchdog BYE, got {reason:?}"
);
callee_ep.shutdown();
caller_ep.shutdown();
cancel.cancel();
}