use super::dialog::{Dialog, DialogInnerRef, DialogState, TerminatedReason};
use super::DialogId;
use crate::dialog::dialog::DialogInner;
use crate::rsip;
use crate::transport::SipConnection;
use crate::{
transaction::transaction::{Transaction, TransactionEvent},
Result,
};
use rsip::{prelude::HeadersExt, Header, Request, SipMessage, StatusCode};
use std::sync::atomic::Ordering;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
#[derive(Clone)]
pub struct ServerInviteDialog {
pub(super) inner: DialogInnerRef,
}
impl ServerInviteDialog {
pub fn id(&self) -> DialogId {
self.inner.id.lock().unwrap().clone()
}
pub fn state(&self) -> DialogState {
self.inner.state.lock().unwrap().clone()
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.inner.cancel_token
}
pub fn initial_request(&self) -> &Request {
&self.inner.initial_request
}
pub fn ringing(&self, headers: Option<Vec<Header>>, body: Option<Vec<u8>>) -> Result<()> {
if !self.inner.can_cancel() {
return Ok(());
}
info!(id = %self.id(), "sending ringing response");
let resp = self.inner.make_response(
&self.inner.initial_request,
if body.is_some() {
StatusCode::SessionProgress
} else {
StatusCode::Ringing
},
headers,
body,
);
self.inner
.tu_sender
.send(TransactionEvent::Respond(resp.clone()))?;
self.inner.transition(DialogState::Early(self.id(), resp))?;
Ok(())
}
pub fn accept(&self, headers: Option<Vec<Header>>, body: Option<Vec<u8>>) -> Result<()> {
let resp = self.inner.make_response(
&self.inner.initial_request,
rsip::StatusCode::OK,
headers,
body,
);
let via = self.inner.initial_request.via_header()?;
let (via_transport, via_received) = SipConnection::parse_target_from_via(via)?;
let mut params = vec![];
if via_transport != rsip::transport::Transport::Udp {
params.push(rsip::param::Param::Transport(via_transport));
}
let contact = rsip::headers::typed::Contact {
uri: rsip::Uri {
host_with_port: via_received,
params,
..Default::default()
},
display_name: None,
params: vec![],
};
debug!(id = %self.id(), "accepting dialog with contact: {}", contact);
self.inner
.remote_contact
.lock()
.unwrap()
.replace(contact.untyped());
self.inner
.tu_sender
.send(TransactionEvent::Respond(resp.clone()))?;
self.inner
.transition(DialogState::WaitAck(self.id(), resp))?;
Ok(())
}
pub fn accept_with_public_contact(
&self,
username: &str,
public_address: Option<rsip::HostWithPort>,
local_address: &crate::transport::SipAddr,
headers: Option<Vec<Header>>,
body: Option<Vec<u8>>,
) -> Result<()> {
use super::registration::Registration;
let contact_header =
Registration::create_nat_aware_contact(username, public_address, local_address);
let mut final_headers = headers.unwrap_or_default();
final_headers.push(contact_header.into());
self.accept(Some(final_headers), body)
}
pub fn reject(&self, code: Option<rsip::StatusCode>, reason: Option<String>) -> Result<()> {
if self.inner.is_terminated() || self.inner.is_confirmed() {
return Ok(());
}
info!(id=%self.id(), ?code, ?reason, "rejecting dialog");
let headers = if let Some(reason) = reason {
Some(vec![rsip::Header::Other("Reason".into(), reason.into())])
} else {
None
};
let resp = self.inner.make_response(
&self.inner.initial_request,
code.unwrap_or(rsip::StatusCode::Decline),
headers,
None,
);
self.inner
.tu_sender
.send(TransactionEvent::Respond(resp))
.ok();
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::UasDecline,
))
}
pub async fn bye(&self) -> Result<()> {
if !self.inner.is_confirmed() {
return Ok(());
}
info!(id=%self.id(), "sending bye request");
let request = self.inner.make_request_with_vias(
rsip::Method::Bye,
None,
self.inner.build_vias_from_request()?,
None,
None,
)?;
match self.inner.do_request(request).await {
Ok(_) => {}
Err(e) => {
info!(id=%self.id(),"bye error: {}", e);
}
};
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UasBye))?;
Ok(())
}
pub async fn reinvite(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(), "sending re-invite request, body: \n{:?}", body);
let request = self.inner.make_request_with_vias(
rsip::Method::Invite,
None,
self.inner.build_vias_from_request()?,
headers,
body,
)?;
let resp = self.inner.do_request(request.clone()).await;
match resp {
Ok(Some(ref resp)) => {
if resp.status_code == StatusCode::OK {
self.inner
.transition(DialogState::Updated(self.id(), request))?;
}
}
_ => {}
}
resp
}
pub async fn update(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(), "sending update request, body: \n{:?}", body);
let request = self.inner.make_request_with_vias(
rsip::Method::Update,
None,
self.inner.build_vias_from_request()?,
headers,
body,
)?;
self.inner.do_request(request.clone()).await
}
pub async fn info(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(), "sending info request, body: \n{:?}", body);
let request = self.inner.make_request_with_vias(
rsip::Method::Info,
None,
self.inner.build_vias_from_request()?,
headers,
body,
)?;
self.inner.do_request(request.clone()).await
}
pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(
id = %self.id(),
"handle request: {} state:{}",
tx.original,
self.inner.state.lock().unwrap()
);
let cseq = tx.original.cseq_header()?.seq()?;
let remote_seq = self.inner.remote_seq.load(Ordering::Relaxed);
if remote_seq > 0 && cseq < remote_seq {
info!(
id=%self.id(),
"received old request {} remote_seq: {} > {}",
tx.original.method(),
remote_seq,
cseq
);
return Ok(());
}
self.inner
.remote_seq
.compare_exchange(remote_seq, cseq, Ordering::Relaxed, Ordering::Relaxed)
.ok();
if self.inner.is_confirmed() {
match tx.original.method {
rsip::Method::Cancel => {
info!(id=%self.id(),
"invalid request received {} {}",
tx.original.method, tx.original.uri
);
tx.reply(rsip::StatusCode::OK).await?;
return Ok(());
}
rsip::Method::Invite | rsip::Method::Ack => {
info!(id=%self.id(),
"invalid request received {} {}",
tx.original.method, tx.original.uri
);
return Err(crate::Error::DialogError(
"invalid request in confirmed state".to_string(),
self.id(),
rsip::StatusCode::MethodNotAllowed,
));
}
rsip::Method::Bye => return self.handle_bye(tx).await,
rsip::Method::Info => return self.handle_info(tx).await,
rsip::Method::Options => return self.handle_options(tx).await,
rsip::Method::Update => return self.handle_update(tx).await,
_ => {
info!(id=%self.id(),"invalid request method: {:?}", tx.original.method);
tx.reply(rsip::StatusCode::MethodNotAllowed).await?;
return Err(crate::Error::DialogError(
"invalid request".to_string(),
self.id(),
rsip::StatusCode::MethodNotAllowed,
));
}
}
} else {
match tx.original.method {
rsip::Method::Ack => {
self.inner.tu_sender.send(TransactionEvent::Received(
tx.original.clone().into(),
tx.connection.clone(),
))?;
}
_ => {}
}
}
self.handle_invite(tx).await
}
async fn handle_bye(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id = %self.id(), "received bye {}", tx.original.uri);
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UacBye))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_info(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id = %self.id(), "received info {}", tx.original.uri);
self.inner
.transition(DialogState::Info(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_options(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id = %self.id(), "received options {}", tx.original.uri);
self.inner
.transition(DialogState::Options(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_update(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id = %self.id(), "received update {}", tx.original.uri);
self.inner
.transition(DialogState::Updated(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_invite(&mut self, tx: &mut Transaction) -> Result<()> {
let handle_loop = async {
if !self.inner.is_confirmed() && matches!(tx.original.method, rsip::Method::Invite) {
match self.inner.transition(DialogState::Calling(self.id())) {
Ok(_) => {
tx.send_trying().await.ok();
}
Err(_) => {}
}
}
while let Some(msg) = tx.receive().await {
match msg {
SipMessage::Request(req) => match req.method {
rsip::Method::Ack => {
if self.inner.is_terminated() {
break;
}
info!(id = %self.id(),"received ack {}", req.uri);
match req.contact_header() {
Ok(contact) => {
self.inner
.remote_contact
.lock()
.unwrap()
.replace(contact.clone());
}
_ => {}
}
self.inner.transition(DialogState::Confirmed(
self.id(),
tx.last_response.clone().unwrap_or_default(),
))?;
DialogInner::serve_keepalive_options(self.inner.clone());
break;
}
rsip::Method::Cancel => {
info!(id = %self.id(),"received cancel {}", req.uri);
tx.reply(rsip::StatusCode::RequestTerminated).await?;
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::UacCancel,
))?;
}
_ => {}
},
SipMessage::Response(_) => {}
}
}
Ok::<(), crate::Error>(())
};
match handle_loop.await {
Ok(_) => {
trace!(id = %self.id(),"process done");
Ok(())
}
Err(e) => {
warn!(id = %self.id(),"handle_invite error: {:?}", e);
Err(e)
}
}
}
}
impl TryFrom<&Dialog> for ServerInviteDialog {
type Error = crate::Error;
fn try_from(dlg: &Dialog) -> Result<Self> {
match dlg {
Dialog::ServerInvite(dlg) => Ok(dlg.clone()),
_ => Err(crate::Error::DialogError(
"Dialog is not a ServerInviteDialog".to_string(),
dlg.id(),
rsip::StatusCode::BadRequest,
)),
}
}
}