ftth_rsipstack/transport/
connection.rs

1use super::{sip_addr::SipAddr, stream::StreamConnection, tcp::TcpConnection, udp::UdpConnection};
2use crate::rsip;
3use crate::transport::channel::ChannelConnection;
4use crate::transport::tcp_listener::TcpListenerConnection;
5use crate::Result;
6use get_if_addrs::IfAddr;
7use rsip::{
8    prelude::{HeadersExt, ToTypedHeader},
9    Param, SipMessage,
10};
11use std::net::{IpAddr, Ipv4Addr};
12use std::{fmt, net::SocketAddr};
13use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
14use tokio_util::sync::CancellationToken;
15use tracing::debug;
16
17/// Transport Layer Events
18///
19/// `TransportEvent` represents events that occur at the transport layer,
20/// such as incoming messages, new connections, and connection closures.
21/// These events are used to coordinate between the transport layer and
22/// higher protocol layers.
23///
24/// # Events
25///
26/// * `Incoming` - A SIP message was received from the network
27/// * `New` - A new connection has been established
28/// * `Closed` - An existing connection has been closed
29///
30/// # Examples
31///
32/// ```rust,no_run
33/// use ftth_rsipstack::transport::connection::TransportEvent;
34///
35/// # fn handle_event(event: TransportEvent) {
36/// match event {
37///     TransportEvent::Incoming(message, connection, source) => {
38///         // Process incoming SIP message
39///         println!("Received message from {}", source);
40///     },
41///     TransportEvent::New(connection) => {
42///         // Handle new connection
43///         println!("New connection established");
44///     },
45///     TransportEvent::Closed(connection) => {
46///         // Handle connection closure
47///         println!("Connection closed");
48///     }
49/// }
50/// # }
51/// ```
52#[derive(Debug)]
53pub enum TransportEvent {
54    Incoming(SipMessage, SipConnection, SipAddr),
55    New(SipConnection),
56    Closed(SipConnection),
57}
58
59pub type TransportReceiver = UnboundedReceiver<TransportEvent>;
60pub type TransportSender = UnboundedSender<TransportEvent>;
61
62pub const KEEPALIVE_REQUEST: &[u8] = b"\r\n\r\n";
63pub const KEEPALIVE_RESPONSE: &[u8] = b"\r\n";
64
65/// SIP Connection
66///
67/// `SipConnection` is an enum that abstracts different transport protocols
68/// used for SIP communication. It provides a unified interface for sending
69/// SIP messages regardless of the underlying transport mechanism.
70///
71/// # Supported Transports
72///
73/// * `Udp` - UDP transport for connectionless communication
74/// * `Channel` - In-memory channel for testing and local communication
75/// * `Tcp` - TCP transport for reliable connection-oriented communication
76///
77/// # Key Features
78///
79/// * Transport abstraction - uniform interface across protocols
80/// * Reliability detection - distinguishes reliable vs unreliable transports
81/// * Address management - tracks local and remote addresses
82/// * Message sending - handles protocol-specific message transmission
83/// * Via header processing - automatic received parameter handling
84///
85/// # Examples
86///
87/// ```rust,no_run
88/// use ftth_rsipstack::transport::{SipConnection, SipAddr};
89/// use ftth_rsipstack::rsip;
90/// use rsip::SipMessage;
91///
92/// // Send a message through any connection type
93/// async fn send_message(
94///     connection: &SipConnection,
95///     message: SipMessage,
96///     destination: Option<&SipAddr>
97/// ) -> ftth_rsipstack::Result<()> {
98///     connection.send(message, destination).await?;
99///     Ok(())
100/// }
101///
102/// # fn example(connection: &SipConnection) {
103/// // Check if transport is reliable
104/// let is_reliable = connection.is_reliable();
105/// if is_reliable {
106///     println!("Using reliable transport");
107/// } else {
108///     println!("Using unreliable transport - retransmissions may be needed");
109/// }
110/// # }
111/// ```
112///
113/// # Transport Characteristics
114///
115/// ## UDP
116/// * Connectionless and unreliable
117/// * Requires retransmission handling
118/// * Lower overhead
119/// * Default SIP transport
120///
121/// ## TCP
122/// * Connection-oriented and reliable
123/// * No retransmission needed
124/// * Higher overhead
125/// * Better for large messages
126///
127/// # Via Header Processing
128///
129/// SipConnection automatically handles Via header processing for incoming
130/// messages, adding 'received' and 'rport' parameters as needed per RFC 3261.
131#[derive(Clone, Debug)]
132pub enum SipConnection {
133    Channel(ChannelConnection),
134    Udp(UdpConnection),
135    Tcp(TcpConnection),
136    TcpListener(TcpListenerConnection),
137}
138
139impl SipConnection {
140    pub fn is_reliable(&self) -> bool {
141        match self {
142            SipConnection::Udp(_) => false,
143            _ => true,
144        }
145    }
146
147    pub fn cancel_token(&self) -> Option<CancellationToken> {
148        match self {
149            SipConnection::Channel(transport) => transport.cancel_token(),
150            SipConnection::Udp(transport) => transport.cancel_token(),
151            SipConnection::Tcp(transport) => transport.cancel_token(),
152            SipConnection::TcpListener(_) => None,
153        }
154    }
155    pub fn get_addr(&self) -> &SipAddr {
156        match self {
157            SipConnection::Channel(transport) => transport.get_addr(),
158            SipConnection::Udp(transport) => transport.get_addr(),
159            SipConnection::Tcp(transport) => transport.get_addr(),
160            SipConnection::TcpListener(transport) => transport.get_addr(),
161        }
162    }
163    pub async fn send(&self, msg: rsip::SipMessage, destination: Option<&SipAddr>) -> Result<()> {
164        match self {
165            SipConnection::Channel(transport) => transport.send(msg).await,
166            SipConnection::Udp(transport) => transport.send(msg, destination).await,
167            SipConnection::Tcp(transport) => transport.send_message(msg).await,
168            SipConnection::TcpListener(_) => {
169                debug!("SipConnection::send: TcpListener cannot send messages");
170                Ok(())
171            }
172        }
173    }
174    pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
175        match self {
176            SipConnection::Channel(transport) => transport.serve_loop(sender).await,
177            SipConnection::Udp(transport) => transport.serve_loop(sender).await,
178            SipConnection::Tcp(transport) => transport.serve_loop(sender).await,
179            SipConnection::TcpListener(_) => {
180                debug!("SipConnection::serve_loop: TcpListener does not have serve_loop");
181                Ok(())
182            }
183        }
184    }
185
186    pub async fn close(&self) -> Result<()> {
187        match self {
188            SipConnection::Channel(transport) => transport.close().await,
189            SipConnection::Udp(_) => Ok(()), // UDP has no connection state
190            SipConnection::Tcp(transport) => transport.close().await,
191            SipConnection::TcpListener(transport) => transport.close().await,
192        }
193    }
194}
195
196impl SipConnection {
197    pub fn update_msg_received(
198        msg: SipMessage,
199        addr: SocketAddr,
200        transport: rsip::transport::Transport,
201    ) -> Result<SipMessage> {
202        match msg {
203            SipMessage::Request(mut req) => {
204                let via = req.via_header_mut()?;
205                Self::build_via_received(via, addr, transport)?;
206                Ok(req.into())
207            }
208            SipMessage::Response(_) => Ok(msg),
209        }
210    }
211
212    pub fn resolve_bind_address(addr: SocketAddr) -> SocketAddr {
213        let ip = addr.ip();
214        if ip.is_unspecified() {
215            // 0.0.0.0 or ::
216            let interfaces = match get_if_addrs::get_if_addrs() {
217                Ok(interfaces) => interfaces,
218                Err(_) => return addr,
219            };
220            for interface in interfaces {
221                if interface.is_loopback() {
222                    continue;
223                }
224                match interface.addr {
225                    IfAddr::V4(v4addr) => {
226                        return SocketAddr::new(IpAddr::V4(v4addr.ip), addr.port());
227                    }
228                    //TODO: don't support ipv6 for now
229                    _ => continue,
230                }
231            }
232            // fallback to loopback
233            return SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port());
234        }
235        addr
236    }
237    pub fn build_via_received(
238        via: &mut rsip::headers::Via,
239        addr: SocketAddr,
240        transport: rsip::transport::Transport,
241    ) -> Result<()> {
242        let received = addr.into();
243        let mut typed_via = via.typed()?;
244
245        typed_via.params.retain(|param| {
246            if let Param::Other(key, _) = param {
247                !key.value().eq_ignore_ascii_case("rport")
248            } else if matches!(param, Param::Received(_)) {
249                false
250            } else {
251                true
252            }
253        });
254
255        // Only add received parameter if the source address differs from Via header
256        if typed_via.uri.host_with_port == received {
257            return Ok(());
258        }
259
260        // For reliable transports (such as TCP), we need to be more careful about received parameter
261        let should_add_received = match transport {
262            rsip::transport::Transport::Udp => true,
263            _ => {
264                // For connection-oriented protocols, only add if explicitly different
265                typed_via.uri.host_with_port.host != received.host
266            }
267        };
268
269        if !should_add_received {
270            return Ok(());
271        }
272
273        if transport != rsip::transport::Transport::Udp && typed_via.transport != transport {
274            typed_via.params.push(Param::Transport(transport));
275        }
276
277        *via = typed_via
278            .with_param(Param::Received(rsip::param::Received::new(
279                received.host.to_string(),
280            )))
281            .with_param(Param::Other(
282                rsip::param::OtherParam::new("rport"),
283                Some(rsip::param::OtherParamValue::new(addr.port().to_string())),
284            ))
285            .into();
286        Ok(())
287    }
288
289    pub fn parse_target_from_via(
290        via: &rsip::headers::untyped::Via,
291    ) -> Result<(rsip::Transport, rsip::HostWithPort)> {
292        let mut host_with_port = via.uri()?.host_with_port;
293        let mut transport = via.trasnport().unwrap_or(rsip::Transport::Udp);
294        if let Ok(params) = via.params().as_ref() {
295            for param in params {
296                match param {
297                    Param::Received(v) => {
298                        if let Ok(addr) = v.parse() {
299                            host_with_port.host = addr.into();
300                        }
301                    }
302                    Param::Transport(t) => {
303                        transport = t.clone();
304                    }
305                    Param::Other(key, Some(value)) if key.value().eq_ignore_ascii_case("rport") => {
306                        if let Ok(port) = value.value().try_into() {
307                            host_with_port.port = Some(port);
308                        }
309                    }
310                    _ => {}
311                }
312            }
313        }
314        Ok((transport, host_with_port))
315    }
316
317    pub fn get_destination(msg: &rsip::SipMessage) -> Result<SocketAddr> {
318        let host_with_port = match msg {
319            rsip::SipMessage::Request(req) => req.uri().host_with_port.clone(),
320            rsip::SipMessage::Response(res) => Self::parse_target_from_via(res.via_header()?)?.1,
321        };
322        host_with_port.try_into().map_err(Into::into)
323    }
324}
325
326impl fmt::Display for SipConnection {
327    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328        match self {
329            SipConnection::Channel(t) => write!(f, "{}", t),
330            SipConnection::Udp(t) => write!(f, "UDP {}", t),
331            SipConnection::Tcp(t) => write!(f, "TCP {}", t),
332            SipConnection::TcpListener(t) => write!(f, "TCP LISTEN {}", t),
333        }
334    }
335}
336
337impl From<ChannelConnection> for SipConnection {
338    fn from(connection: ChannelConnection) -> Self {
339        SipConnection::Channel(connection)
340    }
341}
342
343impl From<UdpConnection> for SipConnection {
344    fn from(connection: UdpConnection) -> Self {
345        SipConnection::Udp(connection)
346    }
347}
348
349impl From<TcpConnection> for SipConnection {
350    fn from(connection: TcpConnection) -> Self {
351        SipConnection::Tcp(connection)
352    }
353}
354
355impl From<TcpListenerConnection> for SipConnection {
356    fn from(connection: TcpListenerConnection) -> Self {
357        SipConnection::TcpListener(connection)
358    }
359}
360
361impl From<SipAddr> for rsip::HostWithPort {
362    fn from(val: SipAddr) -> Self {
363        val.addr
364    }
365}
366
367impl From<SipAddr> for rsip::Uri {
368    fn from(val: SipAddr) -> Self {
369        let scheme = match val.r#type {
370            Some(rsip::transport::Transport::Wss) | Some(rsip::transport::Transport::Tls) => {
371                rsip::Scheme::Sips
372            }
373            _ => rsip::Scheme::Sip,
374        };
375        rsip::Uri {
376            scheme: Some(scheme),
377            host_with_port: val.addr,
378            ..Default::default()
379        }
380    }
381}