use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Mutex as StdMutex};
use rsip::headers::ToTypedHeader;
use rsip::message::HeadersExt;
use rsip::{Method, Request, StatusCode};
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::account::{SipAccount, Transport};
use crate::callee::IncomingCall;
use crate::inbound::InboundRequest;
use crate::resolve::resolve_sip_server;
use crate::sdp::parse_sdp;
use crate::stack::dialog::DialogId;
use crate::stack::response::build_response;
use crate::stack::transaction::{gen_tag, TransactionKey};
use crate::stack::ua::{Incoming, Ua};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
type DialogRegistry = Arc<StdMutex<HashMap<DialogId, mpsc::Sender<InboundRequest>>>>;
type TerminationRegistry = Arc<StdMutex<HashMap<DialogId, CancellationToken>>>;
struct PendingInvite {
invite: Request,
cancel: CancellationToken,
}
type IncomingRegistry = Arc<StdMutex<HashMap<TransactionKey, PendingInvite>>>;
pub struct SipEndpoint {
ua: Arc<Ua>,
account: SipAccount,
server: SocketAddr,
local_ip: IpAddr,
transport: Transport,
cancel: CancellationToken,
incoming_calls: Mutex<mpsc::Receiver<Incoming>>,
dialogs: DialogRegistry,
terminations: TerminationRegistry,
incoming_invites: IncomingRegistry,
}
impl SipEndpoint {
pub async fn new(
account: &SipAccount,
cancel: CancellationToken,
) -> Result<Arc<Self>, BoxError> {
Self::new_with_app(account, None, cancel).await
}
pub async fn new_with_app(
account: &SipAccount,
product: Option<&str>,
cancel: CancellationToken,
) -> Result<Arc<Self>, BoxError> {
let local_ip = detect_local_ip(account)?;
let bind_addr = SocketAddr::new(local_ip, 0);
info!("Binding SIP transport to {bind_addr}");
let ua = Arc::new(
Ua::bind_with_app(bind_addr, product.map(String::from), cancel.clone()).await?,
);
let server = resolve_sip_server(account)
.await?
.ok_or("could not resolve SIP server address")?;
info!(%server, "resolved SIP server");
let (calls_tx, calls_rx) = mpsc::channel(16);
let dialogs: DialogRegistry = Arc::new(StdMutex::new(HashMap::new()));
let terminations: TerminationRegistry = Arc::new(StdMutex::new(HashMap::new()));
let incoming_invites: IncomingRegistry = Arc::new(StdMutex::new(HashMap::new()));
let endpoint = Arc::new(Self {
ua: ua.clone(),
account: account.clone(),
server,
local_ip,
transport: account.transport,
cancel,
incoming_calls: Mutex::new(calls_rx),
dialogs: dialogs.clone(),
terminations: terminations.clone(),
incoming_invites: incoming_invites.clone(),
});
tokio::spawn(async move {
let routing = Routing {
dialogs,
terminations,
incoming_invites,
};
while let Some(inc) = ua.next_incoming().await {
route_inbound(&ua, &routing, inc, &calls_tx).await;
}
});
Ok(endpoint)
}
pub(crate) fn register_dialog(&self, id: DialogId) -> mpsc::Receiver<InboundRequest> {
let (tx, rx) = mpsc::channel(16);
if let Ok(mut map) = self.dialogs.lock() {
map.insert(id, tx);
}
rx
}
pub(crate) fn unregister_dialog(&self, id: &DialogId) {
if let Ok(mut map) = self.dialogs.lock() {
map.remove(id);
}
}
pub(crate) fn register_termination(&self, id: DialogId) -> CancellationToken {
let token = CancellationToken::new();
if let Ok(mut map) = self.terminations.lock() {
map.insert(id, token.clone());
}
token
}
pub(crate) fn unregister_termination(&self, id: &DialogId) {
if let Ok(mut map) = self.terminations.lock() {
map.remove(id);
}
}
pub(crate) fn register_incoming(
&self,
key: TransactionKey,
invite: Request,
) -> CancellationToken {
let token = CancellationToken::new();
if let Ok(mut map) = self.incoming_invites.lock() {
map.insert(
key,
PendingInvite {
invite,
cancel: token.clone(),
},
);
}
token
}
pub(crate) fn unregister_incoming(&self, key: &TransactionKey) {
if let Ok(mut map) = self.incoming_invites.lock() {
map.remove(key);
}
}
pub fn local_ip(&self) -> IpAddr {
self.local_ip
}
pub fn local_addr(&self) -> SocketAddr {
self.ua.local_addr()
}
pub fn transport(&self) -> Transport {
self.transport
}
pub fn shutdown(&self) {
self.cancel.cancel();
}
pub async fn next_incoming_call(self: &Arc<Self>) -> Option<IncomingCall> {
loop {
let incoming = self.incoming_calls.lock().await.recv().await?;
match parse_sdp(&incoming.request.body) {
Ok(remote_media) => {
let cancelled =
self.register_incoming(incoming.key.clone(), incoming.request.clone());
return Some(IncomingCall::new(
self.clone(),
incoming.key,
incoming.peer,
incoming.request,
remote_media,
cancelled,
));
}
Err(e) => warn!(error = %e, "inbound INVITE has no usable SDP offer; skipping"),
}
}
}
pub(crate) fn ua(&self) -> &Ua {
&self.ua
}
pub(crate) fn server(&self) -> SocketAddr {
self.server
}
pub(crate) fn account(&self) -> &SipAccount {
&self.account
}
}
struct Routing {
dialogs: DialogRegistry,
terminations: TerminationRegistry,
incoming_invites: IncomingRegistry,
}
async fn route_inbound(
ua: &Arc<Ua>,
routing: &Routing,
inc: Incoming,
calls_tx: &mpsc::Sender<Incoming>,
) {
let has_to_tag = inc
.request
.to_header()
.ok()
.and_then(|to| to.typed().ok())
.map(|to| to.tag().is_some())
.unwrap_or(false);
match inc.request.method() {
Method::Invite if !has_to_tag => {
let _ = calls_tx.send(inc).await;
}
Method::Ack => debug!("absorbing 2xx ACK"),
Method::Invite | Method::Info | Method::Refer | Method::Notify => {
let sender = DialogId::from_request(&inc.request).and_then(|id| {
routing
.dialogs
.lock()
.ok()
.and_then(|map| map.get(&id).cloned())
});
match sender {
Some(sender) => {
let req = InboundRequest::new(ua.clone(), inc.key, inc.request);
if sender.send(req).await.is_err() {
warn!("in-dialog request dropped: Call no longer listening");
}
}
None => auto_answer_200(ua, inc.key, &inc.request).await,
}
}
Method::Bye => {
let dialog_id = DialogId::from_request(&inc.request);
auto_answer_200(ua, inc.key, &inc.request).await;
let token = dialog_id.as_ref().and_then(|id| {
routing
.terminations
.lock()
.ok()
.and_then(|map| map.get(id).cloned())
});
match token {
Some(token) => {
info!(?dialog_id, "peer BYE — firing call termination");
token.cancel();
}
None => {
let known: Vec<DialogId> = routing
.terminations
.lock()
.ok()
.map(|map| map.keys().cloned().collect())
.unwrap_or_default();
warn!(
?dialog_id,
registered = ?known,
"peer BYE matched no live dialog — call will not tear down from this BYE"
);
}
}
}
Method::Cancel => {
let invite_key = inc.key.invite_target();
auto_answer_200(ua, inc.key, &inc.request).await;
let pending = routing
.incoming_invites
.lock()
.ok()
.and_then(|mut map| map.remove(&invite_key));
if let Some(pending) = pending {
if let Some(resp) = build_response(
&pending.invite,
StatusCode::RequestTerminated,
Some(&gen_tag()),
None,
None,
) {
ua.answer(invite_key, resp).await;
}
debug!("peer CANCEL — 487ed the INVITE, notifying IncomingCall");
pending.cancel.cancel();
}
}
_ => auto_answer_200(ua, inc.key, &inc.request).await,
}
}
async fn auto_answer_200(ua: &Ua, key: TransactionKey, request: &Request) {
if let Some(response) = build_response(request, StatusCode::OK, Some(&gen_tag()), None, None) {
let _ = ua.answer(key, response).await;
}
}
fn detect_local_ip(account: &SipAccount) -> Result<IpAddr, BoxError> {
let dest = format!("{}:{}", account.server(), account.port());
let sock = std::net::UdpSocket::bind("0.0.0.0:0")?;
sock.connect(&dest)?;
Ok(sock.local_addr()?.ip())
}
#[cfg(test)]
mod tests {
use super::*;
fn make_account(server: Option<&str>, port: Option<u16>) -> SipAccount {
SipAccount {
display_name: "Test".to_string(),
username: "1001".to_string(),
password: "secret".to_string(),
domain: "localhost".to_string(),
auth_username: None,
server: server.map(|s| s.to_string()),
port,
transport: Transport::default(),
}
}
#[test]
fn detect_local_ip_returns_non_unspecified() {
let account = make_account(Some("1.1.1.1"), Some(5060));
let ip = detect_local_ip(&account).expect("detects a local ip");
assert!(!ip.is_unspecified());
}
#[test]
fn detect_local_ip_uses_server_field() {
let account = make_account(Some("8.8.8.8"), Some(5060));
assert!(detect_local_ip(&account).is_ok());
}
#[test]
fn detect_local_ip_falls_back_to_domain() {
let account = make_account(None, None);
assert!(detect_local_ip(&account).is_ok());
}
}