ezk_sip_core/transaction/
client.rs1use super::consts::{T1, T2};
2use super::key::TsxKey;
3use super::{TsxRegistration, TsxResponse};
4use crate::error::Error;
5use crate::transaction::consts::T4;
6use crate::transport::{OutgoingRequest, TargetTransportInfo};
7use crate::{Endpoint, Request, Result};
8use sip_types::{CodeKind, Method};
9use std::time::Instant;
10use tokio::time::{timeout, timeout_at};
11
12#[must_use]
19#[derive(Debug)]
20pub struct ClientTsx {
21 registration: Option<TsxRegistration>,
22 request: OutgoingRequest,
23 timeout: Instant,
24 state: State,
25}
26
27#[derive(Debug)]
28enum State {
29 Init,
30 Proceeding,
31 Completed,
32 Terminated,
33}
34
35impl ClientTsx {
36 pub(crate) async fn send(
38 endpoint: Endpoint,
39 request: Request,
40 target: &mut TargetTransportInfo,
41 ) -> Result<Self> {
42 let method = request.line.method.clone();
43
44 assert!(
45 !matches!(method, Method::INVITE | Method::ACK),
46 "tried to create client transaction from {} request",
47 method
48 );
49
50 let mut request = endpoint.create_outgoing(request, target).await?;
51
52 let registration = TsxRegistration::create(endpoint, TsxKey::client(&method));
53
54 let via = registration.endpoint.create_via(
55 &request.parts.transport,
56 ®istration.tsx_key,
57 target.via_host_port.clone(),
58 );
59
60 request.msg.headers.insert_named_front(&via);
61 registration
62 .endpoint
63 .send_outgoing_request(&mut request)
64 .await?;
65
66 let timeout = Instant::now() + T1 * 64;
67
68 Ok(Self {
69 registration: Some(registration),
70 request,
71 timeout,
72 state: State::Init,
73 })
74 }
75
76 pub fn request(&self) -> &OutgoingRequest {
78 &self.request
79 }
80
81 pub async fn receive(&mut self) -> Result<TsxResponse> {
89 let registration = if let Some(registration) = &mut self.registration {
90 registration
91 } else {
92 panic!("transaction already received a final response");
94 };
95
96 match self.state {
97 State::Init if !self.request.parts.transport.reliable() => {
98 loop {
99 let receive = timeout(T2, registration.receive_response());
100
101 match timeout_at(self.timeout.into(), receive).await {
102 Ok(Ok(msg)) => return self.handle_msg(msg),
103 Ok(Err(_)) => {
104 registration
106 .endpoint
107 .send_outgoing_request(&mut self.request)
108 .await?;
109 }
110 Err(_) => return Err(Error::RequestTimedOut),
111 }
112 }
113 }
114 State::Init | State::Proceeding => {
115 match timeout_at(self.timeout.into(), registration.receive_response()).await {
116 Ok(msg) => self.handle_msg(msg),
117 Err(_) => Err(Error::RequestTimedOut),
118 }
119 }
120 State::Completed | State::Terminated => {
121 panic!("transaction already received a final response");
122 }
123 }
124 }
125
126 pub async fn receive_final(&mut self) -> Result<TsxResponse> {
129 loop {
130 let response = self.receive().await?;
131
132 if let CodeKind::Provisional = response.line.code.kind() {
133 continue;
135 }
136
137 return Ok(response);
138 }
139 }
140
141 fn handle_msg(&mut self, response: TsxResponse) -> Result<TsxResponse> {
142 match response.line.code.kind() {
143 CodeKind::Provisional => {
144 self.state = State::Proceeding;
145 }
146 _ => {
147 let mut registration = self.registration.take().expect("already checked");
148
149 if self.request.parts.transport.reliable() {
150 self.state = State::Terminated;
151 } else {
152 self.state = State::Completed;
153
154 tokio::spawn(async move {
156 let timeout = Instant::now() + T4;
157
158 while timeout_at(timeout.into(), registration.receive())
159 .await
160 .is_ok()
161 {
162 }
164 });
165 }
166 }
167 }
168
169 Ok(response)
170 }
171}