use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use rsip::{Header, StatusCode};
use rsipstack::dialog::dialog::DialogStateReceiver;
use rsipstack::dialog::server_dialog::ServerInviteDialog;
use rsipstack::transaction::transaction::Transaction;
use tokio::net::UdpSocket;
use tracing::{debug, info, warn};
use crate::account::SipAccount;
use crate::endpoint::SipEndpoint;
use crate::sdp::{build_sdp, parse_sdp, RemoteMedia};
use crate::session_timer::{
negotiate_uas, require_timer_header, supported_timer_header, SessionTimer, UasSessionTimer,
};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub struct AcceptedCall {
pub dialog: ServerInviteDialog,
pub remote_media: RemoteMedia,
pub rtp_socket: Arc<UdpSocket>,
pub local_rtp_addr: SocketAddr,
pub state_rx: DialogStateReceiver,
pub session_timer: Option<SessionTimer>,
}
pub struct PendingCall {
pub dialog: ServerInviteDialog,
pub remote_media: RemoteMedia,
pub state_rx: DialogStateReceiver,
pub session_timer: Option<UasSessionTimer>,
local_ip: IpAddr,
}
impl PendingCall {
pub async fn accept(self) -> Result<AcceptedCall, BoxError> {
let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
let local_rtp_addr = rtp_socket.local_addr()?;
let rtp_port = local_rtp_addr.port();
info!(local_ip = %self.local_ip, rtp_port, "bound RTP socket");
let sdp_answer = build_sdp(self.local_ip, rtp_port);
debug!("SDP answer:\n{}", String::from_utf8_lossy(&sdp_answer));
let headers = accept_headers(self.session_timer.as_ref());
self.dialog.accept(Some(headers), Some(sdp_answer))?;
info!("sent 200 OK with SDP answer");
Ok(AcceptedCall {
dialog: self.dialog,
remote_media: self.remote_media,
rtp_socket: Arc::new(rtp_socket),
local_rtp_addr,
state_rx: self.state_rx,
session_timer: self.session_timer.map(|uas| uas.timer),
})
}
pub fn reject(self, status: StatusCode) -> Result<(), BoxError> {
if status.code() < 300 {
return Err(format!("reject() got 2xx status {status}").into());
}
self.dialog.reject(Some(status.clone()), None)?;
info!(%status, "rejected inbound INVITE");
Ok(())
}
}
pub struct Callee {
account: SipAccount,
endpoint: Arc<SipEndpoint>,
}
impl Callee {
pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
Self { account, endpoint }
}
pub async fn handle_pending(&self, mut tx: Transaction) -> Result<PendingCall, BoxError> {
let remote_media = parse_sdp(&tx.original.body)?;
let session_timer = negotiate_uas(&tx.original.headers);
info!(
remote_addr = %remote_media.addr,
remote_port = remote_media.port,
payload_type = remote_media.payload_type,
?session_timer,
"parsed SDP offer",
);
let (state_sender, state_rx) = self.endpoint.dialog_layer.new_dialog_state_channel();
let contact_uri: rsip::Uri = format!(
"sip:{}@{}",
self.account.username, self.endpoint.sip_addr.addr
)
.try_into()?;
let dialog = self.endpoint.dialog_layer.get_or_create_server_invite(
&tx,
state_sender,
None,
Some(contact_uri),
)?;
let dialog_for_handler = dialog.clone();
tokio::spawn(async move {
let mut dialog = dialog_for_handler;
if let Err(e) = dialog.handle(&mut tx).await {
warn!("INVITE transaction handle error: {e}");
}
});
Ok(PendingCall {
dialog,
remote_media,
state_rx,
session_timer,
local_ip: self.endpoint.local_ip(),
})
}
pub async fn accept_transaction(&self, tx: Transaction) -> Result<AcceptedCall, BoxError> {
self.handle_pending(tx).await?.accept().await
}
pub async fn reject_transaction(
&self,
mut tx: Transaction,
status: StatusCode,
) -> Result<(), BoxError> {
if status.code() < 300 {
return Err(format!("reject_transaction got 2xx status {status}").into());
}
let (state_sender, _state_rx) = self.endpoint.dialog_layer.new_dialog_state_channel();
let contact_uri: rsip::Uri = format!(
"sip:{}@{}",
self.account.username, self.endpoint.sip_addr.addr
)
.try_into()?;
let dialog = self.endpoint.dialog_layer.get_or_create_server_invite(
&tx,
state_sender,
None,
Some(contact_uri),
)?;
dialog.reject(Some(status.clone()), None)?;
info!(%status, "rejected inbound INVITE");
tokio::spawn(async move {
let mut dialog = dialog;
if let Err(e) = dialog.handle(&mut tx).await {
debug!("rejected INVITE handle error (expected for some flows): {e}");
}
});
Ok(())
}
}
fn accept_headers(session_timer: Option<&UasSessionTimer>) -> Vec<Header> {
let mut headers = vec![Header::ContentType("application/sdp".into())];
if let Some(uas) = session_timer {
headers.push(supported_timer_header());
if uas.require_timer {
headers.push(require_timer_header());
}
headers.push(uas.echo.header());
}
headers
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session_timer::{Refresher, SessionExpires};
#[test]
fn reject_with_2xx_is_an_error() {
let ok = StatusCode::OK;
assert!(ok.code() < 300);
let busy = StatusCode::BusyHere;
assert!(busy.code() >= 300);
}
#[test]
fn accept_headers_without_timer_is_content_type_only() {
let rendered: Vec<String> = accept_headers(None).iter().map(|h| h.to_string()).collect();
assert_eq!(rendered, vec!["Content-Type: application/sdp"]);
}
#[test]
fn accept_headers_echoes_negotiated_session_timer() {
let uas = UasSessionTimer {
timer: SessionTimer {
interval_secs: 1800,
we_are_refresher: false,
},
echo: SessionExpires {
interval_secs: 1800,
refresher: Some(Refresher::Uac),
},
require_timer: true,
};
let rendered: Vec<String> = accept_headers(Some(&uas))
.iter()
.map(|h| h.to_string())
.collect();
assert_eq!(
rendered,
vec![
"Content-Type: application/sdp",
"Supported: timer",
"Require: timer",
"Session-Expires: 1800;refresher=uac",
]
);
}
#[test]
fn accept_headers_omits_require_when_caller_lacks_timer_support() {
let uas = UasSessionTimer {
timer: SessionTimer {
interval_secs: 90,
we_are_refresher: true,
},
echo: SessionExpires {
interval_secs: 90,
refresher: Some(Refresher::Uas),
},
require_timer: false,
};
let rendered: Vec<String> = accept_headers(Some(&uas))
.iter()
.map(|h| h.to_string())
.collect();
assert!(!rendered.iter().any(|h| h.starts_with("Require")));
assert!(rendered.contains(&"Session-Expires: 90;refresher=uas".to_string()));
}
}