ezk_sip_core/transaction/
server.rs1use super::TsxRegistration;
2use super::consts::T1;
3use crate::transport::OutgoingResponse;
4use crate::{IncomingRequest, Result};
5use sip_types::{CodeKind, Method};
6use std::time::Instant;
7use tokio::time::timeout_at;
8
9#[derive(Debug)]
16pub struct ServerTsx {
17 registration: TsxRegistration,
18}
19
20impl ServerTsx {
21 pub(crate) fn new(request: &mut IncomingRequest) -> Self {
23 assert!(
24 !matches!(request.line.method, Method::INVITE | Method::ACK),
25 "tried to create server transaction from {} request",
26 request.line.method
27 );
28
29 Self {
30 registration: request.take_tsx_registration(),
31 }
32 }
33
34 pub async fn respond_provisional(&mut self, response: &mut OutgoingResponse) -> Result<()> {
39 assert_eq!(response.msg.line.code.kind(), CodeKind::Provisional);
40
41 self.registration
42 .endpoint
43 .send_outgoing_response(response)
44 .await?;
45
46 Ok(())
47 }
48
49 pub async fn respond(mut self, mut response: OutgoingResponse) -> Result<()> {
55 assert_ne!(
56 response.msg.line.code.kind(),
57 CodeKind::Provisional,
58 "ServerTsx::respond must only be used for final responses, use ServerTsx::respond_provisional instead"
59 );
60
61 self.registration
62 .endpoint
63 .send_outgoing_response(&mut response)
64 .await?;
65
66 if response.parts.transport.reliable() {
67 return Ok(());
68 }
69
70 let abandon = Instant::now() + T1 * 64;
71
72 tokio::spawn(async move {
73 while let Ok(msg) = timeout_at(abandon.into(), self.registration.receive()).await {
74 if msg.line.is_request() {
75 if let Err(e) = self
76 .registration
77 .endpoint
78 .send_outgoing_response(&mut response)
79 .await
80 {
81 log::warn!("Failed to retransmit message, {e}");
82 }
83 }
84 }
85 });
86
87 Ok(())
88 }
89}