use super::endpoint::EndpointInnerRef;
use super::key::TransactionKey;
use super::{SipConnection, TransactionState, TransactionTimer, TransactionType};
use crate::dialog::DialogId;
use crate::sip::{
ContentLength, HasHeaders, Header, HeadersExt, Method, Request, Response, SipMessage,
StatusCode, StatusCodeKind,
};
use crate::transaction::key::TransactionRole;
use crate::transaction::make_tag;
use crate::transport::SipAddr;
use crate::{Error, Result};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::{debug, trace};
pub type TransactionEventReceiver = UnboundedReceiver<TransactionEvent>;
pub type TransactionEventSender = UnboundedSender<TransactionEvent>;
pub enum TransactionEvent {
Received(SipMessage, Option<SipConnection>),
Timer(TransactionTimer),
Respond(Response),
Terminate(TransactionKey),
}
pub fn transaction_event_sender_noop() -> TransactionEventSender {
let (tx, mut rx) = unbounded_channel::<TransactionEvent>();
tokio::spawn(async move {
while let Some(_ev) = rx.recv().await {
}
});
tx
}
pub struct Transaction {
pub transaction_type: TransactionType,
pub key: TransactionKey,
pub original: Request,
pub destination: Option<SipAddr>,
pub state: TransactionState,
pub endpoint_inner: EndpointInnerRef,
pub connection: Option<SipConnection>,
pub last_response: Option<Response>,
pub last_ack: Option<Request>,
pub tu_receiver: TransactionEventReceiver,
pub tu_sender: TransactionEventSender,
pub timer_a: Option<u64>,
pub timer_b: Option<u64>,
pub timer_c: Option<u64>,
pub timer_d: Option<u64>,
pub timer_k: Option<u64>, pub timer_g: Option<u64>, is_cleaned_up: bool,
}
impl Transaction {
fn new(
transaction_type: TransactionType,
key: TransactionKey,
original: Request,
connection: Option<SipConnection>,
endpoint_inner: EndpointInnerRef,
) -> Self {
let (tu_sender, tu_receiver) = unbounded_channel();
let state = if matches!(
transaction_type,
TransactionType::ServerInvite | TransactionType::ServerNonInvite
) {
TransactionState::Trying
} else {
TransactionState::Nothing
};
trace!(%key, %state, "transaction created");
let tx = Self {
transaction_type,
endpoint_inner,
connection,
key,
original,
destination: None,
state,
last_response: None,
last_ack: None,
timer_a: None,
timer_b: None,
timer_c: None,
timer_d: None,
timer_k: None,
timer_g: None,
tu_receiver,
tu_sender,
is_cleaned_up: false,
};
tx.endpoint_inner
.attach_transaction(&tx.key, tx.tu_sender.clone());
tx
}
pub fn new_client(
key: TransactionKey,
original: Request,
endpoint_inner: EndpointInnerRef,
connection: Option<SipConnection>,
) -> Self {
let tx_type = match original.method {
Method::Invite => TransactionType::ClientInvite,
_ => TransactionType::ClientNonInvite,
};
Transaction::new(tx_type, key, original, connection, endpoint_inner)
}
pub fn new_server(
key: TransactionKey,
original: Request,
endpoint_inner: EndpointInnerRef,
connection: Option<SipConnection>,
) -> Self {
let tx_type = match original.method {
Method::Invite | Method::Ack => TransactionType::ServerInvite,
_ => TransactionType::ServerNonInvite,
};
Transaction::new(tx_type, key, original, connection, endpoint_inner)
}
pub async fn send(&mut self) -> Result<()> {
match self.transaction_type {
TransactionType::ClientInvite | TransactionType::ClientNonInvite => {}
_ => {
return Err(Error::TransactionError(
"send is only valid for client transactions".to_string(),
self.key.clone(),
));
}
}
if self.connection.is_none() {
let target_uri = match &self.destination {
Some(addr) => addr,
None => {
if let Some(locator) = self.endpoint_inner.locator.as_ref() {
&locator.locate(&self.original.uri).await?
} else {
&SipAddr::try_from(&self.original.uri)?
}
}
};
let (connection, resolved_addr) = self
.endpoint_inner
.transport_layer
.lookup(target_uri, Some(&self.key))
.await?;
if !connection.is_reliable() {
self.destination.replace(resolved_addr);
}
self.connection.replace(connection);
}
let connection = self.connection.as_ref().ok_or(Error::TransactionError(
"no connection found".to_string(),
self.key.clone(),
))?;
let content_length_header =
Header::ContentLength(ContentLength::from(self.original.body().len() as u32));
self.original
.headers_mut()
.unique_push(content_length_header);
let message = if let Some(ref inspector) = self.endpoint_inner.message_inspector {
inspector.before_send(self.original.to_owned().into(), self.destination.as_ref())
} else {
self.original.to_owned().into()
};
connection.send(message, self.destination.as_ref()).await?;
self.transition(TransactionState::Calling).map(|_| ())
}
pub async fn reply_with(
&mut self,
status_code: StatusCode,
headers: Vec<Header>,
body: Option<Vec<u8>>,
) -> Result<()> {
match status_code.kind() {
StatusCodeKind::Provisional => {}
_ => {
let to = self.original.to_header()?;
if to.tag()?.is_none() {
self.original
.headers
.unique_push(to.clone().with_tag(make_tag()).into());
}
}
}
let mut resp = self
.endpoint_inner
.make_response(&self.original, status_code, body);
resp.headers.extend(headers);
self.respond(resp).await
}
pub async fn reply(&mut self, status_code: StatusCode) -> Result<()> {
self.reply_with(status_code, vec![], None).await
}
pub async fn respond(&mut self, response: Response) -> Result<()> {
match self.transaction_type {
TransactionType::ServerInvite | TransactionType::ServerNonInvite => {}
_ => {
return Err(Error::TransactionError(
"respond is only valid for server transactions".to_string(),
self.key.clone(),
));
}
}
let new_state = match response.status_code.kind() {
StatusCodeKind::Provisional => match response.status_code {
StatusCode::Trying => TransactionState::Trying,
_ => TransactionState::Proceeding,
},
_ => match self.transaction_type {
TransactionType::ServerInvite => TransactionState::Completed,
_ => TransactionState::Terminated,
},
};
self.can_transition(&new_state)?;
let connection = self.connection.as_ref().ok_or(Error::TransactionError(
"no connection found".to_string(),
self.key.clone(),
))?;
let response = if let Some(ref inspector) = self.endpoint_inner.message_inspector {
inspector.before_send(
response.clone().to_owned().into(),
self.destination.as_ref(),
)
} else {
response.to_owned().into()
};
trace!(key = %self.key, response = %response, "responding");
match response.clone() {
SipMessage::Response(resp) => self.last_response.replace(resp),
_ => None,
};
connection.send(response, self.destination.as_ref()).await?;
self.transition(new_state).map(|_| ())
}
fn can_transition(&self, target: &TransactionState) -> Result<()> {
match (&self.state, target) {
(&TransactionState::Nothing, &TransactionState::Calling)
| (&TransactionState::Nothing, &TransactionState::Trying)
| (&TransactionState::Nothing, &TransactionState::Proceeding)
| (&TransactionState::Nothing, &TransactionState::Terminated)
| (&TransactionState::Calling, &TransactionState::Calling)
| (&TransactionState::Calling, &TransactionState::Trying)
| (&TransactionState::Calling, &TransactionState::Proceeding)
| (&TransactionState::Calling, &TransactionState::Completed)
| (&TransactionState::Calling, &TransactionState::Terminated)
| (&TransactionState::Trying, &TransactionState::Trying) | (&TransactionState::Trying, &TransactionState::Proceeding)
| (&TransactionState::Trying, &TransactionState::Completed)
| (&TransactionState::Trying, &TransactionState::Confirmed)
| (&TransactionState::Trying, &TransactionState::Terminated)
| (&TransactionState::Proceeding, &TransactionState::Proceeding)
| (&TransactionState::Proceeding, &TransactionState::Completed)
| (&TransactionState::Proceeding, &TransactionState::Confirmed)
| (&TransactionState::Proceeding, &TransactionState::Terminated)
| (&TransactionState::Completed, &TransactionState::Confirmed)
| (&TransactionState::Completed, &TransactionState::Terminated)
| (&TransactionState::Confirmed, &TransactionState::Terminated) => Ok(()),
_ => {
Err(Error::TransactionError(
format!(
"invalid state transition from {} to {}",
self.state, target
),
self.key.clone(),
))
}
}
}
pub async fn send_cancel(&mut self, cancel: Request) -> Result<()> {
if self.transaction_type != TransactionType::ClientInvite {
return Err(Error::TransactionError(
"send_cancel is only valid for client invite transactions".to_string(),
self.key.clone(),
));
}
match self.state {
TransactionState::Calling | TransactionState::Trying | TransactionState::Proceeding => {
if let Some(connection) = &self.connection {
let cancel = if let Some(ref inspector) = self.endpoint_inner.message_inspector
{
inspector.before_send(cancel.to_owned().into(), self.destination.as_ref())
} else {
cancel.to_owned().into()
};
connection.send(cancel, self.destination.as_ref()).await?;
}
Ok(())
}
_ => Err(Error::TransactionError(
format!("invalid state for sending CANCEL {:?}", self.state),
self.key.clone(),
)),
}
}
pub async fn send_ack(&mut self, connection: Option<SipConnection>) -> Result<()> {
if self.transaction_type != TransactionType::ClientInvite {
return Err(Error::TransactionError(
"send_ack is only valid for client invite transactions".to_string(),
self.key.clone(),
));
}
match self.state {
TransactionState::Completed => {} _ => {
return Err(Error::TransactionError(
format!("invalid state for sending ACK {:?}", self.state),
self.key.clone(),
));
}
}
let ack = match self.last_ack.clone() {
Some(ack) => ack,
None => match self.last_response {
Some(ref resp) => self.endpoint_inner.make_ack(&self.original, resp)?,
None => {
return Err(Error::TransactionError(
"no last response found to send ACK".to_string(),
self.key.clone(),
));
}
},
};
let mut connection = connection;
if let Some(resp) = self.last_response.as_ref() {
if resp.status_code.kind() == StatusCodeKind::Successful {
let target = {
let target = ack.destination();
if let Some(locator) = self.endpoint_inner.locator.as_ref() {
Some(locator.locate(&target).await?)
} else {
(&target).try_into().ok()
}
};
if let Some(addr) = target {
let (via_connection, resolved_addr) = self
.endpoint_inner
.transport_layer
.lookup(&addr, Some(&self.key))
.await?;
if !via_connection.is_reliable() {
self.destination.replace(resolved_addr);
}
connection = Some(via_connection);
}
}
}
let ack = if let Some(ref inspector) = self.endpoint_inner.message_inspector {
inspector.before_send(ack.to_owned().into(), self.destination.as_ref())
} else {
ack.to_owned().into()
};
match ack.clone() {
SipMessage::Request(ack) => self.last_ack.replace(ack),
_ => None,
};
if let Some(conn) = connection {
conn.send(ack, self.destination.as_ref()).await?;
}
self.transition(TransactionState::Terminated).map(|_| ())
}
pub async fn receive(&mut self) -> Option<SipMessage> {
while let Some(event) = self.tu_receiver.recv().await {
match event {
TransactionEvent::Received(msg, connection) => {
if let Some(msg) = match msg {
SipMessage::Request(req) => self.on_received_request(req, connection).await,
SipMessage::Response(resp) => {
self.on_received_response(resp, connection).await
}
} {
return Some(msg);
}
}
TransactionEvent::Timer(t) => {
self.on_timer(t).await.ok();
}
TransactionEvent::Respond(response) => {
self.respond(response).await.ok();
}
TransactionEvent::Terminate(key) => {
debug!(%key, "received terminate event");
return None;
}
}
}
None
}
pub async fn send_trying(&mut self) -> Result<()> {
let response = self
.endpoint_inner
.make_response(&self.original, StatusCode::Trying, None);
self.respond(response).await
}
pub fn is_terminated(&self) -> bool {
self.state == TransactionState::Terminated
}
}
impl Transaction {
fn inform_tu_response(&mut self, response: Response) -> Result<()> {
self.tu_sender
.send(TransactionEvent::Received(
SipMessage::Response(response),
None,
))
.map_err(|e| Error::TransactionError(e.to_string(), self.key.clone()))
}
async fn on_received_request(
&mut self,
req: Request,
connection: Option<SipConnection>,
) -> Option<SipMessage> {
match self.transaction_type {
TransactionType::ClientInvite | TransactionType::ClientNonInvite => return None,
_ => {}
}
if self.connection.is_none() && connection.is_some() {
self.connection = connection;
}
if req.method == Method::Cancel {
match self.state {
TransactionState::Proceeding
| TransactionState::Trying
| TransactionState::Completed => {
if let Some(connection) = &self.connection {
let resp = self
.endpoint_inner
.make_response(&req, StatusCode::OK, None);
let resp =
if let Some(ref inspector) = self.endpoint_inner.message_inspector {
inspector.before_send(resp.into(), self.destination.as_ref())
} else {
resp.into()
};
connection.send(resp, self.destination.as_ref()).await.ok();
}
return Some(req.into()); }
_ => {
if let Some(connection) = &self.connection {
let resp = self.endpoint_inner.make_response(
&req,
StatusCode::CallTransactionDoesNotExist,
None,
);
let resp =
if let Some(ref inspector) = self.endpoint_inner.message_inspector {
inspector.before_send(resp.into(), self.destination.as_ref())
} else {
resp.into()
};
connection.send(resp, self.destination.as_ref()).await.ok();
}
}
};
return None;
}
match self.state {
TransactionState::Trying | TransactionState::Proceeding => {
if let Some(last_response) = &self.last_response {
self.respond(last_response.to_owned()).await.ok();
}
}
TransactionState::Completed | TransactionState::Confirmed => {
if req.method == Method::Ack {
self.transition(TransactionState::Confirmed).ok();
return Some(req.into());
}
}
_ => {}
}
None
}
async fn on_received_response(
&mut self,
resp: Response,
connection: Option<SipConnection>,
) -> Option<SipMessage> {
match self.transaction_type {
TransactionType::ServerInvite | TransactionType::ServerNonInvite => return None,
_ => {}
}
let new_state = match resp.status_code.kind() {
StatusCodeKind::Provisional => {
if resp.status_code == StatusCode::Trying {
TransactionState::Trying
} else {
TransactionState::Proceeding
}
}
_ => {
if self.transaction_type == TransactionType::ClientInvite {
TransactionState::Completed
} else {
TransactionState::Terminated
}
}
};
self.can_transition(&new_state).ok()?;
if self.state == new_state {
if let Some(last) = self.last_response.as_ref() {
if last.status_code == resp.status_code && last.body == resp.body {
return None;
}
}
}
self.last_response.replace(resp.clone());
let is_completed_client_invite = self.transaction_type == TransactionType::ClientInvite
&& new_state == TransactionState::Completed;
self.transition(new_state).ok();
if is_completed_client_invite {
self.send_ack(connection).await.ok();
}
Some(SipMessage::Response(resp))
}
async fn on_timer(&mut self, timer: TransactionTimer) -> Result<()> {
match self.state {
TransactionState::Calling | TransactionState::Trying => {
if matches!(
self.transaction_type,
TransactionType::ClientInvite | TransactionType::ClientNonInvite
) {
if let TransactionTimer::TimerA(key, duration) = timer {
if let Some(connection) = &self.connection {
let retry_message = if let Some(ref inspector) =
self.endpoint_inner.message_inspector
{
inspector.before_send(
self.original.to_owned().into(),
self.destination.as_ref(),
)
} else {
self.original.to_owned().into()
};
connection
.send(retry_message, self.destination.as_ref())
.await?;
}
let duration = (duration * 2).min(self.endpoint_inner.option.t1x64);
let timer_a = self
.endpoint_inner
.timers
.timeout(duration, TransactionTimer::TimerA(key, duration));
self.timer_a.replace(timer_a);
} else if let TransactionTimer::TimerB(_) = timer {
let timeout_response = self.endpoint_inner.make_response(
&self.original,
StatusCode::RequestTimeout,
None,
);
self.inform_tu_response(timeout_response)?;
}
}
}
TransactionState::Proceeding => {
if let TransactionTimer::TimerC(_) = timer {
let timeout_response = self.endpoint_inner.make_response(
&self.original,
StatusCode::RequestTimeout,
None,
);
self.inform_tu_response(timeout_response)?;
}
}
TransactionState::Completed => {
if let TransactionTimer::TimerG(key, duration) = timer {
if let Some(last_response) = &self.last_response {
if let Some(connection) = &self.connection {
let last_response = if let Some(ref inspector) =
self.endpoint_inner.message_inspector
{
inspector.before_send(
last_response.to_owned().into(),
self.destination.as_ref(),
)
} else {
last_response.to_owned().into()
};
connection
.send(last_response, self.destination.as_ref())
.await?;
}
}
let duration = (duration * 2).min(self.endpoint_inner.option.t1x64);
let timer_g = self
.endpoint_inner
.timers
.timeout(duration, TransactionTimer::TimerG(key, duration));
self.timer_g.replace(timer_g);
} else if let TransactionTimer::TimerD(_) = timer {
self.transition(TransactionState::Terminated)?;
} else if let TransactionTimer::TimerK(_) = timer {
self.transition(TransactionState::Terminated)?;
}
}
TransactionState::Confirmed => {
if let TransactionTimer::TimerK(_) = timer {
self.transition(TransactionState::Terminated)?;
}
}
_ => {}
}
Ok(())
}
fn transition(&mut self, state: TransactionState) -> Result<TransactionState> {
if self.state == state {
return Ok(self.state.clone());
}
match state {
TransactionState::Nothing => {}
TransactionState::Calling => {
let connection = self.connection.as_ref().ok_or(Error::TransactionError(
"no connection found".to_string(),
self.key.clone(),
))?;
if matches!(
self.transaction_type,
TransactionType::ClientInvite | TransactionType::ClientNonInvite
) {
if !connection.is_reliable() {
let timer_a = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t1,
TransactionTimer::TimerA(
self.key.clone(),
self.endpoint_inner.option.t1,
),
);
self.timer_a.replace(timer_a);
}
self.timer_b.replace(self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t1x64,
TransactionTimer::TimerB(self.key.clone()),
));
}
}
TransactionState::Trying | TransactionState::Proceeding => {
self.timer_a
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
if matches!(self.transaction_type, TransactionType::ClientInvite) {
self.timer_b
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
if self.timer_c.is_none() {
let timer_c = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.timerc,
TransactionTimer::TimerC(self.key.clone()),
);
self.timer_c.replace(timer_c);
}
}
}
TransactionState::Completed => {
self.timer_a
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_b
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_c
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
if self.transaction_type == TransactionType::ServerInvite {
let connection = self.connection.as_ref().ok_or(Error::TransactionError(
"no connection found".to_string(),
self.key.clone(),
))?;
if !connection.is_reliable() {
let timer_g = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t1,
TransactionTimer::TimerG(
self.key.clone(),
self.endpoint_inner.option.t1,
),
);
self.timer_g.replace(timer_g);
}
debug!(key=%self.key, last = self.last_response.is_none(), "entered confirmed state, waiting for ACK");
if let Some(ref resp) = self.last_response {
let dialog_id = DialogId::try_from((resp, TransactionRole::Server))?;
self.endpoint_inner
.waiting_ack
.insert(dialog_id, self.key.clone());
}
let timer_k = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t4,
TransactionTimer::TimerK(self.key.clone()),
);
self.timer_k.replace(timer_k);
}
let timer_d = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t1x64,
TransactionTimer::TimerD(self.key.clone()),
);
self.timer_d.replace(timer_d);
}
TransactionState::Confirmed => {
self.cleanup_timer();
let timer_k = self.endpoint_inner.timers.timeout(
self.endpoint_inner.option.t4,
TransactionTimer::TimerK(self.key.clone()),
);
self.timer_k.replace(timer_k);
}
TransactionState::Terminated => {
self.cleanup();
self.tu_sender
.send(TransactionEvent::Terminate(self.key.clone()))
.ok(); }
}
debug!(
key = %self.key,
from = %self.state,
to = %state,
"transition"
);
self.state = state;
Ok(self.state.clone())
}
fn cleanup_timer(&mut self) {
self.timer_a
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_b
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_c
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_d
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_k
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
self.timer_g
.take()
.map(|id| self.endpoint_inner.timers.cancel(id));
}
pub fn role(&self) -> TransactionRole {
match self.transaction_type {
crate::transaction::TransactionType::ClientInvite
| crate::transaction::TransactionType::ClientNonInvite => TransactionRole::Client,
crate::transaction::TransactionType::ServerInvite
| crate::transaction::TransactionType::ServerNonInvite => TransactionRole::Server,
}
}
fn cleanup(&mut self) {
if self.is_cleaned_up {
return;
}
self.is_cleaned_up = true;
self.cleanup_timer();
match self.last_response {
Some(ref resp) => match DialogId::try_from((resp, self.role())) {
Ok(dialog_id) => self
.endpoint_inner
.waiting_ack
.remove(&dialog_id)
.map(|_| ()),
Err(_) => None,
},
_ => None,
};
let last_message = {
match self.transaction_type {
TransactionType::ClientInvite => {
if matches!(
self.state,
TransactionState::Proceeding | TransactionState::Trying
) && self.last_ack.is_none()
{
if let Some(ref resp) = self.last_response {
if let Ok(ack) = self.endpoint_inner.make_ack(&self.original, resp) {
self.last_ack.replace(ack);
}
}
}
self.last_ack.take().map(SipMessage::Request)
}
TransactionType::ServerNonInvite => {
self.last_response.take().map(SipMessage::Response)
}
_ => None,
}
};
self.endpoint_inner
.detach_transaction(&self.key, last_message);
}
}
impl Drop for Transaction {
fn drop(&mut self) {
self.cleanup();
trace!(key=%self.key, state=%self.state, "transaction dropped");
}
}