use super::authenticate::Credential;
use super::dialog::{DialogSnapshot, DialogStateSender};
use super::publication::{ClientPublicationDialog, ServerPublicationDialog};
use super::subscription::{ClientSubscriptionDialog, ServerSubscriptionDialog};
use super::{dialog::Dialog, server_dialog::ServerInviteDialog, DialogId};
use crate::dialog::client_dialog::ClientInviteDialog;
use crate::dialog::dialog::{DialogInner, DialogStateReceiver};
use crate::sip::prelude::HeadersExt;
use crate::transaction::key::TransactionRole;
use crate::transaction::make_tag;
use crate::transaction::transaction::transaction_event_sender_noop;
use crate::transaction::{endpoint::EndpointInnerRef, transaction::Transaction};
use crate::Result;
use dashmap::DashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tracing::debug;
pub struct DialogLayerInner {
pub(super) last_seq: AtomicU32,
pub(super) dialogs: DashMap<String, Dialog>,
}
pub type DialogLayerInnerRef = Arc<DialogLayerInner>;
pub struct DialogLayer {
pub endpoint: EndpointInnerRef,
pub inner: DialogLayerInnerRef,
}
impl DialogLayer {
pub fn new(endpoint: EndpointInnerRef) -> Self {
Self {
endpoint,
inner: Arc::new(DialogLayerInner {
last_seq: AtomicU32::new(0),
dialogs: DashMap::new(),
}),
}
}
pub fn get_or_create_server_invite(
&self,
tx: &Transaction,
state_sender: DialogStateSender,
credential: Option<Credential>,
local_contact: Option<crate::sip::Uri>,
) -> Result<ServerInviteDialog> {
let mut id = DialogId::try_from(tx)?;
if !id.local_tag.is_empty() {
let dlg = self.inner.dialogs.get(&id.to_string()).map(|d| d.clone());
match dlg {
Some(Dialog::ServerInvite(dlg)) => return Ok(dlg),
_ => {
return Err(crate::Error::DialogError(
"the dialog not found".to_string(),
id,
crate::sip::StatusCode::CallTransactionDoesNotExist,
));
}
}
}
id.local_tag = make_tag().to_string();
let mut local_contact = local_contact;
if local_contact.is_none() {
local_contact = self
.build_local_contact(credential.as_ref().map(|cred| cred.username.clone()), None)
.ok();
}
let dlg_inner = DialogInner::new(
TransactionRole::Server,
id.clone(),
tx.original.clone(),
self.endpoint.clone(),
state_sender,
credential,
local_contact,
tx.tu_sender.clone(),
)?;
*dlg_inner.remote_contact.lock() = tx.original.contact_header().ok().cloned();
let dialog = ServerInviteDialog {
inner: Arc::new(dlg_inner),
};
self.inner
.dialogs
.insert(id.to_string(), Dialog::ServerInvite(dialog.clone()));
debug!(%id, "server invite dialog created");
Ok(dialog)
}
pub fn get_or_create_server_subscription(
&self,
tx: &Transaction,
state_sender: DialogStateSender,
credential: Option<Credential>,
local_contact: Option<crate::sip::Uri>,
) -> Result<ServerSubscriptionDialog> {
let mut id = DialogId::try_from(tx)?;
if !id.local_tag.is_empty() {
let dlg = self.inner.dialogs.get(&id.to_string()).map(|d| d.clone());
match dlg {
Some(Dialog::ServerSubscription(dlg)) => return Ok(dlg),
_ => {
return Err(crate::Error::DialogError(
"the dialog not found".to_string(),
id,
crate::sip::StatusCode::CallTransactionDoesNotExist,
));
}
}
}
id.local_tag = make_tag().to_string();
let mut local_contact = local_contact;
if local_contact.is_none() {
local_contact = self
.build_local_contact(credential.as_ref().map(|cred| cred.username.clone()), None)
.ok();
}
let dlg_inner = DialogInner::new(
TransactionRole::Server,
id.clone(),
tx.original.clone(),
self.endpoint.clone(),
state_sender,
credential,
local_contact,
tx.tu_sender.clone(),
)?;
*dlg_inner.remote_contact.lock() = tx.original.contact_header().ok().cloned();
let dialog = ServerSubscriptionDialog {
inner: Arc::new(dlg_inner),
};
self.inner
.dialogs
.insert(id.to_string(), Dialog::ServerSubscription(dialog.clone()));
debug!(%id, "server subscription dialog created");
Ok(dialog)
}
pub fn get_or_create_server_publication(
&self,
tx: &Transaction,
state_sender: DialogStateSender,
credential: Option<Credential>,
local_contact: Option<crate::sip::Uri>,
) -> Result<ServerPublicationDialog> {
let mut id = DialogId::try_from(tx)?;
if !id.local_tag.is_empty() {
let dlg = self.inner.dialogs.get(&id.to_string()).map(|d| d.clone());
match dlg {
Some(Dialog::ServerPublication(dlg)) => return Ok(dlg),
_ => {
return Err(crate::Error::DialogError(
"the dialog not found".to_string(),
id,
crate::sip::StatusCode::CallTransactionDoesNotExist,
));
}
}
}
id.local_tag = make_tag().to_string();
let mut local_contact = local_contact;
if local_contact.is_none() {
local_contact = self
.build_local_contact(credential.as_ref().map(|cred| cred.username.clone()), None)
.ok();
}
let dlg_inner = DialogInner::new(
TransactionRole::Server,
id.clone(),
tx.original.clone(),
self.endpoint.clone(),
state_sender,
credential,
local_contact,
tx.tu_sender.clone(),
)?;
*dlg_inner.remote_contact.lock() = tx.original.contact_header().ok().cloned();
let dialog = ServerPublicationDialog::new(Arc::new(dlg_inner));
self.inner
.dialogs
.insert(id.to_string(), Dialog::ServerPublication(dialog.clone()));
debug!(%id, "server publication dialog created");
Ok(dialog)
}
pub fn get_or_create_client_publication(
&self,
call_id: String,
from_tag: String,
to_tag: String,
initial_request: crate::sip::Request,
state_sender: DialogStateSender,
credential: Option<Credential>,
local_contact: Option<crate::sip::Uri>,
) -> Result<ClientPublicationDialog> {
let id = DialogId {
call_id,
local_tag: from_tag,
remote_tag: to_tag,
};
if let Some(Dialog::ClientPublication(dlg)) = self.get_dialog(&id) {
return Ok(dlg);
}
let mut local_contact = local_contact;
if local_contact.is_none() {
local_contact = self
.build_local_contact(credential.as_ref().map(|cred| cred.username.clone()), None)
.ok();
}
let dlg_inner = DialogInner::new(
TransactionRole::Client,
id.clone(),
initial_request,
self.endpoint.clone(),
state_sender,
credential,
local_contact,
{
let (tx, _) = tokio::sync::mpsc::unbounded_channel();
tx
},
)?;
let dialog = ClientPublicationDialog::new(Arc::new(dlg_inner));
self.inner
.dialogs
.insert(id.to_string(), Dialog::ClientPublication(dialog.clone()));
Ok(dialog)
}
pub fn get_or_create_client_subscription(
&self,
call_id: String,
from_tag: String,
to_tag: String,
initial_request: crate::sip::Request,
state_sender: DialogStateSender,
credential: Option<Credential>,
local_contact: Option<crate::sip::Uri>,
) -> Result<ClientSubscriptionDialog> {
let id = DialogId {
call_id,
local_tag: from_tag,
remote_tag: to_tag,
};
if let Some(Dialog::ClientSubscription(dlg)) = self.get_dialog(&id) {
return Ok(dlg);
}
let mut local_contact = local_contact;
if local_contact.is_none() {
local_contact = self
.build_local_contact(credential.as_ref().map(|cred| cred.username.clone()), None)
.ok();
}
let dlg_inner = DialogInner::new(
TransactionRole::Client,
id.clone(),
initial_request,
self.endpoint.clone(),
state_sender,
credential,
local_contact,
{
let (tx, _) = tokio::sync::mpsc::unbounded_channel();
tx
},
)?;
let dialog = ClientSubscriptionDialog {
inner: Arc::new(dlg_inner),
};
self.inner
.dialogs
.insert(id.to_string(), Dialog::ClientSubscription(dialog.clone()));
Ok(dialog)
}
pub fn increment_last_seq(&self) -> u32 {
self.inner.last_seq.fetch_add(1, Ordering::Relaxed);
self.inner.last_seq.load(Ordering::Relaxed)
}
pub fn len(&self) -> usize {
self.inner.dialogs.len()
}
pub fn is_empty(&self) -> bool {
self.inner.dialogs.is_empty()
}
pub fn all_dialog_ids(&self) -> Vec<String> {
self.inner
.dialogs
.iter()
.map(|e| e.key().clone())
.collect::<Vec<_>>()
}
pub fn get_dialog(&self, id: &DialogId) -> Option<Dialog> {
self.get_dialog_with(&id.to_string())
}
pub fn get_dialog_with(&self, id: &String) -> Option<Dialog> {
self.inner.dialogs.get(id).map(|d| d.clone())
}
pub fn get_client_dialog_by_call_id(&self, call_id: &str) -> Vec<ClientInviteDialog> {
self.inner
.dialogs
.iter()
.filter_map(|e| match e.value() {
Dialog::ClientInvite(client_dlg) if client_dlg.id().call_id == call_id => {
Some(client_dlg.clone())
}
_ => None,
})
.collect()
}
pub fn restore_from_snapshot(
&self,
snapshot: DialogSnapshot,
state_sender: DialogStateSender,
) -> crate::Result<bool> {
if self.get_dialog(&snapshot.id).is_some() {
return Ok(false);
}
let tu_sender = transaction_event_sender_noop();
let Some(inner) = DialogInner::try_restore_from_snapshot(
snapshot,
self.endpoint.clone(),
state_sender,
tu_sender,
)?
else {
return Ok(false);
};
let inner = Arc::new(inner);
let dialog = Dialog::from_inner(inner.role, inner.clone());
let key = dialog.id().to_string();
self.inner.dialogs.insert(key, dialog);
Ok(true)
}
pub fn remove_dialog(&self, id: &DialogId) {
debug!(%id, "remove dialog");
if let Some((_, d)) = self.inner.dialogs.remove(&id.to_string()) {
d.on_remove()
}
}
pub fn match_dialog(&self, tx: &Transaction) -> Option<Dialog> {
let id = DialogId::try_from(tx).ok()?;
self.get_dialog(&id)
}
pub fn new_dialog_state_channel(&self) -> (DialogStateSender, DialogStateReceiver) {
tokio::sync::mpsc::unbounded_channel()
}
pub fn build_local_contact(
&self,
username: Option<String>,
params: Option<Vec<crate::sip::Param>>,
) -> Result<crate::sip::Uri> {
let addr = self
.endpoint
.transport_layer
.get_addrs()
.first()
.ok_or(crate::Error::EndpointError("not sipaddrs".to_string()))?
.clone();
let scheme = if matches!(addr.r#type, Some(crate::sip::Transport::Tls)) {
crate::sip::Scheme::Sips
} else {
crate::sip::Scheme::Sip
};
let mut params = params.unwrap_or_default();
if !matches!(addr.r#type, Some(crate::sip::Transport::Udp) | None) {
if let Some(t) = addr.r#type {
params.push(crate::sip::Param::Transport(t))
}
}
let auth = username.map(|user| crate::sip::Auth {
user,
password: None,
});
Ok(crate::sip::Uri {
scheme: Some(scheme),
auth,
host_with_port: addr.addr.clone(),
params,
..Default::default()
})
}
}