ezk_sip_core/transaction/
client.rs

1use super::consts::{T1, T2};
2use super::key::TsxKey;
3use super::{TsxRegistration, TsxResponse};
4use crate::error::Error;
5use crate::transaction::consts::T4;
6use crate::transport::{OutgoingRequest, TargetTransportInfo};
7use crate::{Endpoint, Request, Result};
8use sip_types::{CodeKind, Method};
9use std::time::Instant;
10use tokio::time::{timeout, timeout_at};
11
12/// Client non-INVITE transaction. Used to receive responses to a sent request.
13///
14/// Dropping it prematurely may result in an invalid transaction and it cannot be guaranteed
15/// that the peer has received the request, as the transaction is also responsible
16/// for retransmitting the original request until a response is received or the
17/// timeout is triggered.
18#[must_use]
19#[derive(Debug)]
20pub struct ClientTsx {
21    registration: Option<TsxRegistration>,
22    request: OutgoingRequest,
23    timeout: Instant,
24    state: State,
25}
26
27#[derive(Debug)]
28enum State {
29    Init,
30    Proceeding,
31    Completed,
32    Terminated,
33}
34
35impl ClientTsx {
36    /// Internal: Used by [Endpoint::send_request]
37    pub(crate) async fn send(
38        endpoint: Endpoint,
39        request: Request,
40        target: &mut TargetTransportInfo,
41    ) -> Result<Self> {
42        let method = request.line.method.clone();
43
44        assert!(
45            !matches!(method, Method::INVITE | Method::ACK),
46            "tried to create client transaction from {} request",
47            method
48        );
49
50        let mut request = endpoint.create_outgoing(request, target).await?;
51
52        let registration = TsxRegistration::create(endpoint, TsxKey::client(&method));
53
54        let via = registration.endpoint.create_via(
55            &request.parts.transport,
56            &registration.tsx_key,
57            target.via_host_port.clone(),
58        );
59
60        request.msg.headers.insert_named_front(&via);
61        registration
62            .endpoint
63            .send_outgoing_request(&mut request)
64            .await?;
65
66        let timeout = Instant::now() + T1 * 64;
67
68        Ok(Self {
69            registration: Some(registration),
70            request,
71            timeout,
72            state: State::Init,
73        })
74    }
75
76    /// Returns the request the transaction was created from
77    pub fn request(&self) -> &OutgoingRequest {
78        &self.request
79    }
80
81    /// Receive one or more responses
82    ///
83    /// Must be called until a final response or error is returned.
84    ///
85    /// # Panics
86    /// After receiving the final response this function will panic if called again.
87    /// This is due to it needing to move out some internal state to a new task.
88    pub async fn receive(&mut self) -> Result<TsxResponse> {
89        let registration = if let Some(registration) = &mut self.registration {
90            registration
91        } else {
92            // TODO: This is not a nice API :/
93            panic!("transaction already received a final response");
94        };
95
96        match self.state {
97            State::Init if !self.request.parts.transport.reliable() => {
98                loop {
99                    let receive = timeout(T2, registration.receive_response());
100
101                    match timeout_at(self.timeout.into(), receive).await {
102                        Ok(Ok(msg)) => return self.handle_msg(msg),
103                        Ok(Err(_)) => {
104                            // retransmit
105                            registration
106                                .endpoint
107                                .send_outgoing_request(&mut self.request)
108                                .await?;
109                        }
110                        Err(_) => return Err(Error::RequestTimedOut),
111                    }
112                }
113            }
114            State::Init | State::Proceeding => {
115                match timeout_at(self.timeout.into(), registration.receive_response()).await {
116                    Ok(msg) => self.handle_msg(msg),
117                    Err(_) => Err(Error::RequestTimedOut),
118                }
119            }
120            State::Completed | State::Terminated => {
121                panic!("transaction already received a final response");
122            }
123        }
124    }
125
126    /// Calls [`ClientTsx::receive`] and discards all provisional responses
127    /// until it receives the final one, returning it.
128    pub async fn receive_final(&mut self) -> Result<TsxResponse> {
129        loop {
130            let response = self.receive().await?;
131
132            if let CodeKind::Provisional = response.line.code.kind() {
133                // ignore
134                continue;
135            }
136
137            return Ok(response);
138        }
139    }
140
141    fn handle_msg(&mut self, response: TsxResponse) -> Result<TsxResponse> {
142        match response.line.code.kind() {
143            CodeKind::Provisional => {
144                self.state = State::Proceeding;
145            }
146            _ => {
147                let mut registration = self.registration.take().expect("already checked");
148
149                if self.request.parts.transport.reliable() {
150                    self.state = State::Terminated;
151                } else {
152                    self.state = State::Completed;
153
154                    // TODO can this be handled via tsx-registration instead of spawning a new task
155                    tokio::spawn(async move {
156                        let timeout = Instant::now() + T4;
157
158                        while timeout_at(timeout.into(), registration.receive())
159                            .await
160                            .is_ok()
161                        {
162                            // toss incoming messages, just keep registration alive
163                        }
164                    });
165                }
166            }
167        }
168
169        Ok(response)
170    }
171}