use std::net::SocketAddr;
use std::sync::Arc;
use rsip::{Header, Uri};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::account::SipAccount;
use crate::dtmf_info::{build_info_body, classify, content_type_header, InfoOutcome};
use crate::endpoint::SipEndpoint;
use crate::inbound::InboundRequest;
use crate::rtp::dtmf::DtmfDigit;
use crate::sdp::{build_sdp, build_sdp_with, parse_sdp, MediaDirection, RemoteMedia};
use crate::session_timer::{
negotiate_uac, supported_timer_header, SessionDialogOps, SessionExpires, SessionTimer,
DEFAULT_SESSION_EXPIRES_SECS,
};
use crate::stack::call::{CallConfig, CallOutcome};
use crate::stack::dialog::{Dialog, DialogId};
use crate::stack::transaction::gen_tag;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub struct Call {
endpoint: Arc<SipEndpoint>,
dialog: Arc<Mutex<Dialog>>,
dialog_id: DialogId,
terminated: CancellationToken,
peer: SocketAddr,
held: bool,
sdp_version: u32,
session_timer: Option<SessionTimer>,
pub remote_media: RemoteMedia,
pub rtp_socket: Arc<UdpSocket>,
pub local_rtp_addr: SocketAddr,
}
impl Call {
pub(crate) fn new(
endpoint: Arc<SipEndpoint>,
dialog: Dialog,
peer: SocketAddr,
session_timer: Option<SessionTimer>,
remote_media: RemoteMedia,
rtp_socket: Arc<UdpSocket>,
local_rtp_addr: SocketAddr,
) -> Self {
let dialog_id = dialog.id();
let terminated = endpoint.register_termination(dialog_id.clone());
Self {
endpoint,
dialog: Arc::new(Mutex::new(dialog)),
dialog_id,
terminated,
peer,
held: false,
sdp_version: 0,
session_timer,
remote_media,
rtp_socket,
local_rtp_addr,
}
}
pub async fn set_hold(&mut self, on: bool) -> Result<(), BoxError> {
let direction = if on {
MediaDirection::SendOnly
} else {
MediaDirection::SendRecv
};
self.sdp_version += 1;
let offer = build_sdp_with(
self.endpoint.local_ip(),
self.local_rtp_addr.port(),
direction,
self.sdp_version,
);
let headers = vec![Header::ContentType("application/sdp".into())];
let response = {
let mut dialog = self.dialog.lock().await;
self.endpoint
.ua()
.reinvite(self.peer, &mut dialog, headers, offer)
.await
};
match response {
Some(r) if (200..300).contains(&r.status_code.code()) => {
self.held = on;
info!(on, "hold state updated via re-INVITE");
Ok(())
}
Some(r) => Err(format!("re-INVITE rejected: {}", r.status_code).into()),
None => Err("re-INVITE timed out with no final response".into()),
}
}
pub fn is_held(&self) -> bool {
self.held
}
pub fn session_timer(&self) -> Option<SessionTimer> {
self.session_timer
}
pub fn session_handle(&self) -> CallSession {
CallSession {
endpoint: self.endpoint.clone(),
dialog: self.dialog.clone(),
peer: self.peer,
}
}
pub fn inbound_requests(&self) -> InboundRequests {
let rx = self.endpoint.register_dialog(self.dialog_id.clone());
InboundRequests {
endpoint: self.endpoint.clone(),
dialog_id: self.dialog_id.clone(),
rx,
}
}
pub fn terminated(&self) -> CancellationToken {
self.terminated.clone()
}
pub async fn send_dtmf_info(&mut self, digit: DtmfDigit, duration_ms: u32) -> InfoOutcome {
let body = build_info_body(digit, duration_ms).into_bytes();
let response = {
let mut dialog = self.dialog.lock().await;
self.endpoint
.ua()
.info(self.peer, &mut dialog, vec![content_type_header()], body)
.await
};
classify(response)
}
pub async fn hangup(&mut self) -> Result<(), BoxError> {
let acked = {
let mut dialog = self.dialog.lock().await;
self.endpoint.ua().hangup(self.peer, &mut dialog).await
};
if acked {
info!("call hung up (BYE acknowledged)");
Ok(())
} else {
Err("BYE was not acknowledged".into())
}
}
}
impl Drop for Call {
fn drop(&mut self) {
self.endpoint.unregister_termination(&self.dialog_id);
}
}
pub struct InboundRequests {
endpoint: Arc<SipEndpoint>,
dialog_id: DialogId,
rx: mpsc::Receiver<InboundRequest>,
}
impl InboundRequests {
pub async fn recv(&mut self) -> Option<InboundRequest> {
self.rx.recv().await
}
}
impl Drop for InboundRequests {
fn drop(&mut self) {
self.endpoint.unregister_dialog(&self.dialog_id);
}
}
#[derive(Clone)]
pub struct CallSession {
endpoint: Arc<SipEndpoint>,
dialog: Arc<Mutex<Dialog>>,
peer: SocketAddr,
}
impl SessionDialogOps for CallSession {
async fn refresh(
&self,
mut headers: Vec<Header>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>, BoxError> {
let body = body.unwrap_or_default();
if !body.is_empty() {
headers.push(Header::ContentType("application/sdp".into()));
}
let mut dialog = self.dialog.lock().await;
Ok(self
.endpoint
.ua()
.reinvite(self.peer, &mut dialog, headers, body)
.await)
}
async fn send_bye(&self) -> Result<(), BoxError> {
let mut dialog = self.dialog.lock().await;
if self.endpoint.ua().hangup(self.peer, &mut dialog).await {
Ok(())
} else {
Err("BYE was not acknowledged".into())
}
}
}
pub struct Caller {
account: SipAccount,
endpoint: Arc<SipEndpoint>,
}
impl Caller {
pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
Self { account, endpoint }
}
pub async fn dial(&self, target: Uri) -> Result<Call, BoxError> {
self.dial_inner(target, &CancellationToken::new(), None)
.await
}
pub async fn dial_cancellable(
&self,
target: Uri,
cancel: &CancellationToken,
) -> Result<Call, BoxError> {
self.dial_inner(target, cancel, None).await
}
pub async fn dial_with_progress(
&self,
target: Uri,
cancel: &CancellationToken,
progress: mpsc::Sender<rsip::StatusCode>,
) -> Result<Call, BoxError> {
self.dial_inner(target, cancel, Some(progress)).await
}
async fn dial_inner(
&self,
target: Uri,
cancel: &CancellationToken,
progress: Option<mpsc::Sender<rsip::StatusCode>>,
) -> Result<Call, BoxError> {
let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
let local_rtp_addr = rtp_socket.local_addr()?;
let local_ip = self.endpoint.local_ip();
info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for outbound dial");
let offer = build_sdp(local_ip, local_rtp_addr.port());
debug!("SDP offer:\n{}", String::from_utf8_lossy(&offer));
let from: Uri =
format!("sip:{}@{}", self.account.username, self.account.domain).try_into()?;
let contact: Uri = format!(
"sip:{}@{}",
self.account.username,
self.endpoint.local_addr()
)
.try_into()?;
let cfg = CallConfig {
target,
from,
contact,
from_tag: gen_tag(),
call_id: format!("{}@wavekat.com", gen_tag()),
sdp: offer,
extra_headers: vec![
supported_timer_header(),
SessionExpires {
interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
refresher: None,
}
.header(),
],
username: self.account.auth_username().to_string(),
password: self.account.password.clone(),
};
match self
.endpoint
.ua()
.call_cancellable(&cfg, self.endpoint.server(), 1, cancel, progress.as_ref())
.await
{
CallOutcome::Answered { dialog, response } => {
let remote_media = parse_sdp(&response.body)?;
let session_timer = negotiate_uac(&response.headers);
info!(
remote_addr = %remote_media.addr,
remote_port = remote_media.port,
payload_type = remote_media.payload_type,
?session_timer,
"call answered; parsed SDP answer",
);
Ok(Call::new(
self.endpoint.clone(),
*dialog,
self.endpoint.server(),
session_timer,
remote_media,
Arc::new(rtp_socket),
local_rtp_addr,
))
}
CallOutcome::Rejected(status) => Err(format!("call rejected: {status}").into()),
CallOutcome::Unauthorized => Err("call rejected: authentication failed".into()),
CallOutcome::TimedOut => Err("call timed out with no final response".into()),
CallOutcome::EngineStopped => Err("engine stopped".into()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::account::Transport;
fn test_account() -> SipAccount {
SipAccount {
display_name: "Office".to_string(),
username: "1001".to_string(),
password: "secret".to_string(),
domain: "sip.example.com".to_string(),
auth_username: None,
server: Some("pbx.example.com".to_string()),
port: Some(5080),
transport: Transport::Udp,
}
}
#[test]
fn caller_holds_account_and_endpoint_inputs() {
let acct = test_account();
assert_eq!(acct.auth_username(), "1001");
}
}