use super::dialog::DialogInnerRef;
use super::DialogId;
use crate::dialog::dialog::DialogInner;
use crate::rsip;
use crate::transaction::transaction::Transaction;
use crate::Result;
use crate::{
dialog::{
authenticate::handle_client_authenticate,
dialog::{DialogState, TerminatedReason},
},
rsip_ext::extract_uri_from_contact,
};
use rsip::prelude::HasHeaders;
use rsip::{
headers::Route,
prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
Header,
};
use rsip::{Response, SipMessage, StatusCode};
use std::sync::atomic::Ordering;
use tokio_util::sync::CancellationToken;
use tracing::{info, trace};
#[derive(Clone)]
pub struct ClientInviteDialog {
pub(super) inner: DialogInnerRef,
}
impl ClientInviteDialog {
pub fn id(&self) -> DialogId {
self.inner.id.lock().unwrap().clone()
}
pub fn state(&self) -> DialogState {
self.inner.state.lock().unwrap().clone()
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.inner.cancel_token
}
pub async fn hangup(&self) -> Result<()> {
if self.inner.can_cancel() {
self.cancel().await
} else {
self.bye().await
}
}
pub async fn bye(&self) -> Result<()> {
if !self.inner.is_confirmed() {
return Ok(());
}
let request = self
.inner
.make_request(rsip::Method::Bye, None, None, None, None, None)?;
match self.inner.do_request(request).await {
Ok(_) => {}
Err(e) => {
info!("bye error: {}", e);
}
};
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UacBye))?;
Ok(())
}
pub async fn cancel(&self) -> Result<()> {
if self.inner.is_confirmed() {
return Ok(());
}
info!(id=%self.id(),"sending cancel request");
let mut cancel_request = self.inner.initial_request.clone();
cancel_request
.headers_mut()
.retain(|h| !matches!(h, Header::ContentLength(_) | Header::ContentType(_)));
cancel_request
.to_header_mut()?
.mut_tag(self.id().to_tag.clone().into())?;
cancel_request.method = rsip::Method::Cancel;
cancel_request
.cseq_header_mut()?
.mut_seq(self.inner.get_local_seq())?
.mut_method(rsip::Method::Cancel)?;
cancel_request.body = vec![];
self.inner.do_request(cancel_request).await?;
Ok(())
}
pub async fn reinvite(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(),"sending re-invite request, body:\n{:?}", body);
let request =
self.inner
.make_request(rsip::Method::Invite, None, None, None, headers, body)?;
let resp = self.inner.do_request(request.clone()).await;
match resp {
Ok(Some(ref resp)) => {
if resp.status_code == StatusCode::OK {
self.inner
.transition(DialogState::Updated(self.id(), request))?;
}
}
_ => {}
}
resp
}
pub async fn update(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(),"sending update request, body:\n{:?}", body);
let request =
self.inner
.make_request(rsip::Method::Update, None, None, None, headers, body)?;
self.inner.do_request(request.clone()).await
}
pub async fn info(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(),"sending info request, body:\n{:?}", body);
let request =
self.inner
.make_request(rsip::Method::Info, None, None, None, headers, body)?;
self.inner.do_request(request.clone()).await
}
pub async fn options(
&self,
headers: Option<Vec<rsip::Header>>,
body: Option<Vec<u8>>,
) -> Result<Option<rsip::Response>> {
if !self.inner.is_confirmed() {
return Ok(None);
}
info!(id=%self.id(),"sending option request, body:\n{:?}", body);
let request =
self.inner
.make_request(rsip::Method::Options, None, None, None, headers, body)?;
self.inner.do_request(request.clone()).await
}
pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
trace!(
id=%self.id(),
"handle request: {:?} state:{}",
tx.original,
self.inner.state.lock().unwrap()
);
let cseq = tx.original.cseq_header()?.seq()?;
let remote_seq = self.inner.remote_seq.load(Ordering::Relaxed);
if remote_seq > 0 && cseq < remote_seq {
info!(id=%self.id(),"received old request remote_seq: {} > {}", remote_seq, cseq);
tx.reply(rsip::StatusCode::ServerInternalError).await?;
return Ok(());
}
self.inner
.remote_seq
.compare_exchange(remote_seq, cseq, Ordering::Relaxed, Ordering::Relaxed)
.ok();
if self.inner.is_confirmed() {
match tx.original.method {
rsip::Method::Invite => {}
rsip::Method::Bye => return self.handle_bye(tx).await,
rsip::Method::Info => return self.handle_info(tx).await,
rsip::Method::Options => return self.handle_options(tx).await,
rsip::Method::Update => return self.handle_update(tx).await,
_ => {
info!(id=%self.id(), "invalid request method: {:?}", tx.original.method);
tx.reply(rsip::StatusCode::MethodNotAllowed).await?;
return Err(crate::Error::DialogError(
"invalid request".to_string(),
self.id(),
rsip::StatusCode::MethodNotAllowed,
));
}
}
} else {
info!(id=%self.id(),
"received request before confirmed: {:?}",
tx.original.method
);
}
Ok(())
}
async fn handle_bye(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id=%self.id(), "received bye {}", tx.original.uri);
self.inner
.transition(DialogState::Terminated(self.id(), TerminatedReason::UasBye))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_info(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id=%self.id(),"received info {}", tx.original.uri);
self.inner
.transition(DialogState::Info(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_options(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id=%self.id(),"received options {}", tx.original.uri);
self.inner
.transition(DialogState::Options(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
async fn handle_update(&mut self, tx: &mut Transaction) -> Result<()> {
info!(id=%self.id(),"received update {}", tx.original.uri);
self.inner
.transition(DialogState::Updated(self.id(), tx.original.clone()))?;
tx.reply(rsip::StatusCode::OK).await?;
Ok(())
}
pub async fn process_invite(
&self,
mut tx: Transaction,
) -> Result<(DialogId, Option<Response>)> {
self.inner.transition(DialogState::Calling(self.id()))?;
let mut auth_sent = false;
tx.send().await?;
let mut dialog_id = self.id();
let mut final_response = None;
while let Some(msg) = tx.receive().await {
match msg {
SipMessage::Request(_) => {}
SipMessage::Response(resp) => {
match resp.status_code {
StatusCode::Trying => {
self.inner.transition(DialogState::Trying(self.id()))?;
continue;
}
StatusCode::Ringing | StatusCode::SessionProgress => {
match resp.to_header()?.tag() {
Ok(Some(tag)) => self.inner.update_remote_tag(tag.value())?,
_ => {}
}
self.inner.transition(DialogState::Early(self.id(), resp))?;
continue;
}
StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
if auth_sent {
final_response = Some(resp.clone());
info!(id=%self.id(),"received {} response after auth sent", resp.status_code);
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::ProxyAuthRequired,
))?;
break;
}
auth_sent = true;
if let Some(credential) = &self.inner.credential {
tx = handle_client_authenticate(
self.inner.increment_local_seq(),
tx,
resp,
credential,
)
.await?;
tx.send().await?;
self.inner.update_remote_tag("").ok();
continue;
} else {
info!(id=%self.id(),"received 407 response without auth option");
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::ProxyAuthRequired,
))?;
}
continue;
}
_ => {}
};
final_response = Some(resp.clone());
match resp.to_header()?.tag()? {
Some(tag) => self.inner.update_remote_tag(tag.value())?,
None => {}
}
if let Ok(id) = DialogId::try_from(&resp) {
dialog_id = id;
}
match resp.status_code {
StatusCode::OK => {
let contact = resp.contact_header()?;
self.inner
.remote_contact
.lock()
.unwrap()
.replace(contact.clone());
let uri = if let Ok(typed_contact) = contact.typed() {
typed_contact.uri
} else {
let mut uri = extract_uri_from_contact(contact.value())?;
uri.headers.clear();
uri
};
*self.inner.remote_uri.lock().unwrap() = uri;
let mut route_set = Vec::new();
if self.inner.endpoint_inner.option.follow_record_route {
for header in resp.headers.iter() {
if let Header::RecordRoute(record_route) = header {
route_set.push(Route::from(record_route.value()));
}
}
}
*self.inner.route_set.lock().unwrap() = route_set;
self.inner
.transition(DialogState::Confirmed(dialog_id.clone(), resp))?;
DialogInner::serve_keepalive_options(self.inner.clone());
}
_ => {
self.inner.transition(DialogState::Terminated(
self.id(),
TerminatedReason::UasOther(resp.status_code.clone()),
))?;
}
}
break;
}
}
}
Ok((dialog_id, final_response))
}
}