use super::dialog::DialogInnerRef;
use super::DialogId;
use crate::sip::prelude::HasHeaders;
use crate::sip::{prelude::HeadersExt, Header};
use crate::sip::{Response, SipMessage, StatusCode};
use crate::transaction::transaction::Transaction;
use crate::Result;
use crate::{
dialog::{
authenticate::handle_client_authenticate,
dialog::{DialogState, TerminatedReason, TransactionHandle},
subscription::ClientSubscriptionDialog,
},
transaction::key::TransactionRole,
};
use std::sync::atomic::Ordering;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace};
#[derive(Clone)]
pub struct ClientInviteDialog {
pub(super) inner: DialogInnerRef,
}
impl ClientInviteDialog {
pub fn id(&self) -> DialogId {
self.inner.id.lock().clone()
}
pub fn state(&self) -> DialogState {
self.inner.state.lock().clone()
}
pub fn from_inner(inner: DialogInnerRef) -> Self {
Self { inner }
}
pub fn snapshot(&self) -> super::dialog::DialogSnapshot {
self.inner.snapshot()
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.inner.cancel_token
}
pub async fn bye(&self) -> Result<()> {
self.bye_with_headers(None).await
}
pub async fn bye_with_headers(&self, headers: Option<Vec<crate::sip::Header>>) -> Result<()> {
if !self.inner.is_confirmed() {
return Ok(());
}
let request =
self.inner
.make_request(crate::sip::Method::Bye, None, None, None, headers, None)?;
if let Err(e) = self.inner.do_request(request).await {
info!(error = %e, "bye error");
}
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UacBye))?;
Ok(())
}
pub async fn bye_with_reason(&self, reason: String) -> Result<()> {
self.bye_with_headers(Some(vec![crate::sip::Header::Reason(reason.into())]))
.await
}
pub async fn hangup(&self) -> Result<()> {
self.hangup_with_headers(None).await
}
pub async fn hangup_with_headers(
&self,
headers: Option<Vec<crate::sip::Header>>,
) -> Result<()> {
if self.inner.can_cancel() {
self.cancel().await
} else {
self.bye_with_headers(headers).await
}
}
pub async fn hangup_with_reason(&self, reason: String) -> Result<()> {
self.hangup_with_headers(Some(vec![crate::sip::Header::Reason(reason.into())]))
.await
}
pub async fn cancel(&self) -> Result<()> {
if self.inner.is_confirmed() {
return Ok(());
}
debug!(id = %self.id(), "sending cancel request");
let mut cancel_request = self.inner.initial_request.lock().clone();
let invite_seq = cancel_request.cseq_header()?.seq()?;
cancel_request
.headers_mut()
.retain(|h| !matches!(h, Header::ContentLength(_) | Header::ContentType(_)));
cancel_request.method = crate::sip::Method::Cancel;
cancel_request
.cseq_header_mut()?
.mut_seq(invite_seq)?
.mut_method(crate::sip::Method::Cancel)?;
cancel_request.body = vec![];
self.inner.do_request(cancel_request).await?;
Ok(())
}
pub async fn reinvite(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
debug!(id = %self.id(), ?body, "sending re-invite request");
let request =
self.inner
.make_request(crate::sip::Method::Invite, None, None, None, headers, body)?;
let resp = self.inner.do_request(request.clone()).await;
match resp {
Ok(Some(ref resp)) => {
if resp.status_code == StatusCode::OK {
let (handle, _) = TransactionHandle::new();
self.inner
.transition(DialogState::Updated(self.id(), request, handle))?;
}
}
_ => {}
}
resp
}
pub async fn update(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
debug!(id = %self.id(), ?body, "sending update request");
let request =
self.inner
.make_request(crate::sip::Method::Update, None, None, None, headers, body)?;
self.inner.do_request(request.clone()).await
}
pub async fn info(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
debug!(id = %self.id(), ?body, "sending info request");
let request =
self.inner
.make_request(crate::sip::Method::Info, None, None, None, headers, body)?;
self.inner.do_request(request.clone()).await
}
pub async fn options(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
debug!(id = %self.id(), ?body, "sending option request");
let request = self.inner.make_request(
crate::sip::Method::Options,
None,
None,
None,
headers,
body,
)?;
self.inner.do_request(request.clone()).await
}
pub async fn request(
&self,
method: crate::sip::Method,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
debug!(id = %self.id(), %method, "sending request");
let request = self
.inner
.make_request(method, None, None, None, headers, body)?;
self.inner.do_request(request).await
}
pub async fn notify(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
self.request(crate::sip::Method::Notify, headers, body)
.await
}
pub async fn refer(
&self,
refer_to: crate::sip::Uri,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
let mut headers = headers.unwrap_or_default();
headers.push(crate::sip::Header::Other(
"Refer-To".into(),
format!("<{}>", refer_to).into(),
));
self.request(crate::sip::Method::Refer, Some(headers), body)
.await
}
pub async fn notify_refer(
&self,
status: crate::sip::StatusCode,
sub_state: &str,
) -> Result<Option<crate::sip::Response>> {
let headers = vec![
crate::sip::Header::Event("refer".into()),
crate::sip::Header::SubscriptionState(sub_state.into()),
crate::sip::Header::ContentType("message/sipfrag".into()),
];
let body = format!("SIP/2.0 {} {:?}", u16::from(status.clone()), status).into_bytes();
self.notify(Some(headers), Some(body)).await
}
pub fn as_subscription(&self) -> ClientSubscriptionDialog {
ClientSubscriptionDialog {
inner: self.inner.clone(),
}
}
pub async fn message(
&self,
headers: Option<Vec<crate::sip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<crate::sip::Response>> {
self.request(crate::sip::Method::Message, headers, body)
.await
}
pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
trace!(
id = %self.id(),
method = %tx.original.method,
state = %self.inner.state.lock(),
"handle request"
);
let cseq = tx.original.cseq_header()?.seq()?;
let remote_seq = self.inner.remote_seq.load(Ordering::Relaxed);
if remote_seq > 0 && cseq < remote_seq {
debug!(
id = %self.id(),
remote_seq = %remote_seq,
cseq = %cseq,
"received old request"
);
tx.reply(crate::sip::StatusCode::ServerInternalError)
.await?;
return Ok(());
}
self.inner
.remote_seq
.compare_exchange(remote_seq, cseq, Ordering::Relaxed, Ordering::Relaxed)
.ok();
if self.inner.is_confirmed() {
match tx.original.method {
crate::sip::Method::Invite => return self.handle_reinvite(tx).await,
crate::sip::Method::Bye => return self.handle_bye(tx).await,
crate::sip::Method::Info => return self.handle_info(tx).await,
crate::sip::Method::Options => return self.handle_options(tx).await,
crate::sip::Method::Update => return self.handle_update(tx).await,
crate::sip::Method::Refer => return self.handle_refer(tx).await,
crate::sip::Method::Message => return self.handle_message(tx).await,
crate::sip::Method::Notify => return self.handle_notify(tx).await,
_ => {
debug!(id = %self.id(), method = ?tx.original.method, "invalid request method");
tx.reply(crate::sip::StatusCode::MethodNotAllowed).await?;
return Err(crate::Error::DialogError(
"invalid request".to_string(),
self.id(),
crate::sip::StatusCode::MethodNotAllowed,
));
}
}
} else {
debug!(
id = %self.id(),
method = ?tx.original.method,
"received request not confirmed"
);
}
Ok(())
}
async fn handle_bye(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received bye");
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UasBye))?;
tx.reply(crate::sip::StatusCode::OK).await?;
Ok(())
}
async fn handle_info(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received info");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Info(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
async fn handle_options(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received options");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Options(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
async fn handle_update(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received update");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Updated(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
async fn handle_reinvite(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received reinvite");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Updated(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await?;
while let Some(msg) = tx.receive().await {
match msg {
SipMessage::Request(req) if req.method == crate::sip::Method::Ack => {
debug!(id = %self.id(), "received ACK for re-INVITE");
break;
}
_ => {}
}
}
Ok(())
}
async fn handle_refer(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received refer");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Refer(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
async fn handle_message(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received message");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Message(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
async fn handle_notify(&mut self, tx: &mut Transaction) -> Result<()> {
debug!(id = %self.id(), uri = %tx.original.uri, "received notify");
let (handle, rx) = TransactionHandle::new();
self.inner
.transition(DialogState::Notify(self.id(), tx.original.clone(), handle))?;
self.inner.process_transaction_handle(tx, rx).await
}
pub async fn process_invite(
&self,
tx: &mut Transaction,
) -> Result<(DialogId, Option<Response>)> {
self.inner.transition(DialogState::Calling(self.id()))?;
let mut auth_sent = false;
tx.send().await?;
let mut dialog_id = self.id();
let mut final_response = None;
while let Some(msg) = tx.receive().await {
match msg {
SipMessage::Request(_) => {}
SipMessage::Response(resp) => {
let status = resp.status_code.clone();
if status == StatusCode::Trying {
self.inner.transition(DialogState::Trying(self.id()))?;
continue;
}
if matches!(status.kind(), crate::sip::StatusCodeKind::Provisional) {
self.inner.handle_provisional_response(&resp).await?;
self.inner.transition(DialogState::Early(self.id(), resp))?;
continue;
}
if matches!(
status,
StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized
) {
if auth_sent {
final_response = Some(resp.clone());
debug!(id = %self.id(), ?status, "received auth response after auth sent");
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::ProxyAuthRequired,
))?;
break;
}
auth_sent = true;
if let Some(credential) = &self.inner.credential {
*tx = handle_client_authenticate(
self.inner.increment_local_seq(),
tx,
resp,
credential,
)
.await?;
tx.send().await?;
self.inner.update_remote_tag("").ok();
{
let mut req = self.inner.initial_request.lock();
*req = tx.original.clone();
}
continue;
} else {
debug!(id=%self.id(),"received 407 response without auth option");
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::ProxyAuthRequired,
))?;
continue;
}
}
final_response = Some(resp.clone());
match resp.to_header()?.tag()? {
Some(tag) => self.inner.update_remote_tag(tag.value())?,
None => {}
}
if let Ok(id) = DialogId::try_from((&resp, TransactionRole::Client)) {
dialog_id = id;
}
match resp.status_code {
StatusCode::Ringing | StatusCode::SessionProgress
if resp
.to_header()
.ok()
.and_then(|h| h.tag().ok().flatten())
.is_some() =>
{
self.inner.update_route_set_from_response(&resp);
}
StatusCode::OK => {
self.inner.update_route_set_from_response(&resp);
let contact = resp.contact_header()?;
self.inner.remote_contact.lock().replace(contact.clone());
let contact_uri = resp
.typed_contact_headers()?
.first()
.map(|c| c.uri.clone())
.ok_or_else(|| {
crate::Error::Error("missing Contact header".to_string())
})?;
*self.inner.remote_uri.lock() = contact_uri;
self.inner
.transition(DialogState::Confirmed(dialog_id.clone(), resp))?;
}
_ => {
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::UasOther(resp.status_code.clone()),
))?;
}
}
break;
}
}
}
Ok((dialog_id, final_response))
}
}