use std::net::SocketAddr;
use std::sync::Arc;
use rsip::headers::UntypedHeader;
use rsip::message::HeadersExt;
use rsip::{Request, StatusCode, Uri};
use tokio::net::UdpSocket;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::caller::Call;
use crate::endpoint::SipEndpoint;
use crate::sdp::{build_sdp, RemoteMedia};
use crate::session_timer::{negotiate_uas, require_timer_header, supported_timer_header};
use crate::stack::dialog::Dialog;
use crate::stack::response::{build_response, ResponseBody};
use crate::stack::transaction::{gen_tag, TransactionKey};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub struct IncomingCall {
endpoint: Arc<SipEndpoint>,
key: TransactionKey,
peer: SocketAddr,
request: Request,
cancelled: CancellationToken,
pub remote_media: RemoteMedia,
}
impl IncomingCall {
pub(crate) fn new(
endpoint: Arc<SipEndpoint>,
key: TransactionKey,
peer: SocketAddr,
request: Request,
remote_media: RemoteMedia,
cancelled: CancellationToken,
) -> Self {
Self {
endpoint,
key,
peer,
request,
cancelled,
remote_media,
}
}
pub fn cancelled(&self) -> CancellationToken {
self.cancelled.clone()
}
pub fn caller(&self) -> Option<String> {
self.request
.from_header()
.ok()
.map(|h| h.value().to_string())
}
pub async fn accept(self) -> Result<Call, BoxError> {
self.endpoint.unregister_incoming(&self.key);
let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
let local_rtp_addr = rtp_socket.local_addr()?;
let local_ip = self.endpoint.local_ip();
info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for inbound call");
let answer = build_sdp(local_ip, local_rtp_addr.port());
debug!("SDP answer:\n{}", String::from_utf8_lossy(&answer));
let to_tag = gen_tag();
let contact: Uri = format!(
"sip:{}@{}",
self.endpoint.account().username,
self.endpoint.local_addr()
)
.try_into()?;
let uas_timer = negotiate_uas(&self.request.headers);
let mut response = build_response(
&self.request,
StatusCode::OK,
Some(&to_tag),
Some(&contact),
Some(ResponseBody {
content_type: "application/sdp",
bytes: answer,
}),
)
.ok_or("could not build 200 OK")?;
if let Some(uas) = &uas_timer {
response.headers.push(supported_timer_header());
response.headers.push(uas.echo.header());
if uas.require_timer {
response.headers.push(require_timer_header());
}
}
if !self.endpoint.ua().answer(self.key, response).await {
return Err("engine stopped before the 200 OK was sent".into());
}
info!("sent 200 OK with SDP answer");
let dialog = Dialog::uas(&self.request, to_tag, contact)
.ok_or("inbound INVITE lacked the headers a dialog requires")?;
Ok(Call::new(
self.endpoint.clone(),
dialog,
self.peer,
uas_timer.map(|u| u.timer),
self.remote_media,
Arc::new(rtp_socket),
local_rtp_addr,
))
}
pub async fn reject(self, status: StatusCode) -> Result<(), BoxError> {
self.endpoint.unregister_incoming(&self.key);
if status.code() < 300 {
return Err(format!("reject() got a non-failure status {status}").into());
}
let response = build_response(&self.request, status.clone(), Some(&gen_tag()), None, None)
.ok_or("could not build reject response")?;
if !self.endpoint.ua().answer(self.key, response).await {
return Err("engine stopped before the reject was sent".into());
}
info!(%status, "rejected inbound INVITE");
Ok(())
}
}
#[cfg(test)]
mod tests {
use rsip::StatusCode;
#[test]
fn reject_requires_non_2xx() {
assert!(StatusCode::OK.code() < 300);
assert!(StatusCode::BusyHere.code() >= 300);
}
}