ftth_rsipstack/transport/
connection.rs

1use super::{sip_addr::SipAddr, stream::StreamConnection, tcp::TcpConnection, udp::UdpConnection};
2use crate::transport::channel::ChannelConnection;
3use crate::transport::tcp_listener::TcpListenerConnection;
4use crate::Result;
5use get_if_addrs::IfAddr;
6use rsip::{
7    prelude::{HeadersExt, ToTypedHeader},
8    Param, SipMessage,
9};
10use std::net::{IpAddr, Ipv4Addr};
11use std::{fmt, net::SocketAddr};
12use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
13use tokio_util::sync::CancellationToken;
14use tracing::debug;
15
16/// Transport Layer Events
17///
18/// `TransportEvent` represents events that occur at the transport layer,
19/// such as incoming messages, new connections, and connection closures.
20/// These events are used to coordinate between the transport layer and
21/// higher protocol layers.
22///
23/// # Events
24///
25/// * `Incoming` - A SIP message was received from the network
26/// * `New` - A new connection has been established
27/// * `Closed` - An existing connection has been closed
28///
29/// # Examples
30///
31/// ```rust,no_run
32/// use rsipstack::transport::connection::TransportEvent;
33///
34/// # fn handle_event(event: TransportEvent) {
35/// match event {
36///     TransportEvent::Incoming(message, connection, source) => {
37///         // Process incoming SIP message
38///         println!("Received message from {}", source);
39///     },
40///     TransportEvent::New(connection) => {
41///         // Handle new connection
42///         println!("New connection established");
43///     },
44///     TransportEvent::Closed(connection) => {
45///         // Handle connection closure
46///         println!("Connection closed");
47///     }
48/// }
49/// # }
50/// ```
51#[derive(Debug)]
52pub enum TransportEvent {
53    Incoming(SipMessage, SipConnection, SipAddr),
54    New(SipConnection),
55    Closed(SipConnection),
56}
57
58pub type TransportReceiver = UnboundedReceiver<TransportEvent>;
59pub type TransportSender = UnboundedSender<TransportEvent>;
60
61pub const KEEPALIVE_REQUEST: &[u8] = b"\r\n\r\n";
62pub const KEEPALIVE_RESPONSE: &[u8] = b"\r\n";
63
64/// SIP Connection
65///
66/// `SipConnection` is an enum that abstracts different transport protocols
67/// used for SIP communication. It provides a unified interface for sending
68/// SIP messages regardless of the underlying transport mechanism.
69///
70/// # Supported Transports
71///
72/// * `Udp` - UDP transport for connectionless communication
73/// * `Channel` - In-memory channel for testing and local communication
74/// * `Tcp` - TCP transport for reliable connection-oriented communication
75///
76/// # Key Features
77///
78/// * Transport abstraction - uniform interface across protocols
79/// * Reliability detection - distinguishes reliable vs unreliable transports
80/// * Address management - tracks local and remote addresses
81/// * Message sending - handles protocol-specific message transmission
82/// * Via header processing - automatic received parameter handling
83///
84/// # Examples
85///
86/// ```rust,no_run
87/// use rsipstack::transport::{SipConnection, SipAddr};
88/// use rsip::SipMessage;
89///
90/// // Send a message through any connection type
91/// async fn send_message(
92///     connection: &SipConnection,
93///     message: SipMessage,
94///     destination: Option<&SipAddr>
95/// ) -> rsipstack::Result<()> {
96///     connection.send(message, destination).await?;
97///     Ok(())
98/// }
99///
100/// # fn example(connection: &SipConnection) {
101/// // Check if transport is reliable
102/// let is_reliable = connection.is_reliable();
103/// if is_reliable {
104///     println!("Using reliable transport");
105/// } else {
106///     println!("Using unreliable transport - retransmissions may be needed");
107/// }
108/// # }
109/// ```
110///
111/// # Transport Characteristics
112///
113/// ## UDP
114/// * Connectionless and unreliable
115/// * Requires retransmission handling
116/// * Lower overhead
117/// * Default SIP transport
118///
119/// ## TCP
120/// * Connection-oriented and reliable
121/// * No retransmission needed
122/// * Higher overhead
123/// * Better for large messages
124///
125/// # Via Header Processing
126///
127/// SipConnection automatically handles Via header processing for incoming
128/// messages, adding 'received' and 'rport' parameters as needed per RFC 3261.
129#[derive(Clone, Debug)]
130pub enum SipConnection {
131    Channel(ChannelConnection),
132    Udp(UdpConnection),
133    Tcp(TcpConnection),
134    TcpListener(TcpListenerConnection),
135}
136
137impl SipConnection {
138    pub fn is_reliable(&self) -> bool {
139        match self {
140            SipConnection::Udp(_) => false,
141            _ => true,
142        }
143    }
144
145    pub fn cancel_token(&self) -> Option<CancellationToken> {
146        match self {
147            SipConnection::Channel(transport) => transport.cancel_token(),
148            SipConnection::Udp(transport) => transport.cancel_token(),
149            SipConnection::Tcp(transport) => transport.cancel_token(),
150            SipConnection::TcpListener(_) => None,
151        }
152    }
153    pub fn get_addr(&self) -> &SipAddr {
154        match self {
155            SipConnection::Channel(transport) => transport.get_addr(),
156            SipConnection::Udp(transport) => transport.get_addr(),
157            SipConnection::Tcp(transport) => transport.get_addr(),
158            SipConnection::TcpListener(transport) => transport.get_addr(),
159        }
160    }
161    pub async fn send(&self, msg: rsip::SipMessage, destination: Option<&SipAddr>) -> Result<()> {
162        match self {
163            SipConnection::Channel(transport) => transport.send(msg).await,
164            SipConnection::Udp(transport) => transport.send(msg, destination).await,
165            SipConnection::Tcp(transport) => transport.send_message(msg).await,
166            SipConnection::TcpListener(_) => {
167                debug!("SipConnection::send: TcpListener cannot send messages");
168                Ok(())
169            }
170        }
171    }
172    pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
173        match self {
174            SipConnection::Channel(transport) => transport.serve_loop(sender).await,
175            SipConnection::Udp(transport) => transport.serve_loop(sender).await,
176            SipConnection::Tcp(transport) => transport.serve_loop(sender).await,
177            SipConnection::TcpListener(_) => {
178                debug!("SipConnection::serve_loop: TcpListener does not have serve_loop");
179                Ok(())
180            }
181        }
182    }
183
184    pub async fn close(&self) -> Result<()> {
185        match self {
186            SipConnection::Channel(transport) => transport.close().await,
187            SipConnection::Udp(_) => Ok(()), // UDP has no connection state
188            SipConnection::Tcp(transport) => transport.close().await,
189            SipConnection::TcpListener(transport) => transport.close().await,
190        }
191    }
192}
193
194impl SipConnection {
195    pub fn update_msg_received(
196        msg: SipMessage,
197        addr: SocketAddr,
198        transport: rsip::transport::Transport,
199    ) -> Result<SipMessage> {
200        match msg {
201            SipMessage::Request(mut req) => {
202                let via = req.via_header_mut()?;
203                Self::build_via_received(via, addr, transport)?;
204                Ok(req.into())
205            }
206            SipMessage::Response(_) => Ok(msg),
207        }
208    }
209
210    pub fn resolve_bind_address(addr: SocketAddr) -> SocketAddr {
211        let ip = addr.ip();
212        if ip.is_unspecified() {
213            // 0.0.0.0 or ::
214            let interfaces = match get_if_addrs::get_if_addrs() {
215                Ok(interfaces) => interfaces,
216                Err(_) => return addr,
217            };
218            for interface in interfaces {
219                if interface.is_loopback() {
220                    continue;
221                }
222                match interface.addr {
223                    IfAddr::V4(v4addr) => {
224                        return SocketAddr::new(IpAddr::V4(v4addr.ip), addr.port());
225                    }
226                    //TODO: don't support ipv6 for now
227                    _ => continue,
228                }
229            }
230            // fallback to loopback
231            return SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port());
232        }
233        addr
234    }
235    pub fn build_via_received(
236        via: &mut rsip::headers::Via,
237        addr: SocketAddr,
238        transport: rsip::transport::Transport,
239    ) -> Result<()> {
240        let received = addr.into();
241        let mut typed_via = via.typed()?;
242
243        typed_via.params.retain(|param| {
244            if let Param::Other(key, _) = param {
245                !key.value().eq_ignore_ascii_case("rport")
246            } else if matches!(param, Param::Received(_)) {
247                false
248            } else {
249                true
250            }
251        });
252
253        // Only add received parameter if the source address differs from Via header
254        if typed_via.uri.host_with_port == received {
255            return Ok(());
256        }
257
258        // For reliable transports (such as TCP), we need to be more careful about received parameter
259        let should_add_received = match transport {
260            rsip::transport::Transport::Udp => true,
261            _ => {
262                // For connection-oriented protocols, only add if explicitly different
263                typed_via.uri.host_with_port.host != received.host
264            }
265        };
266
267        if !should_add_received {
268            return Ok(());
269        }
270
271        if transport != rsip::transport::Transport::Udp && typed_via.transport != transport {
272            typed_via.params.push(Param::Transport(transport));
273        }
274
275        *via = typed_via
276            .with_param(Param::Received(rsip::param::Received::new(
277                received.host.to_string(),
278            )))
279            .with_param(Param::Other(
280                rsip::param::OtherParam::new("rport"),
281                Some(rsip::param::OtherParamValue::new(addr.port().to_string())),
282            ))
283            .into();
284        Ok(())
285    }
286
287    pub fn parse_target_from_via(
288        via: &rsip::headers::untyped::Via,
289    ) -> Result<(rsip::Transport, rsip::HostWithPort)> {
290        let mut host_with_port = via.uri()?.host_with_port;
291        let mut transport = via.trasnport().unwrap_or(rsip::Transport::Udp);
292        if let Ok(params) = via.params().as_ref() {
293            for param in params {
294                match param {
295                    Param::Received(v) => {
296                        if let Ok(addr) = v.parse() {
297                            host_with_port.host = addr.into();
298                        }
299                    }
300                    Param::Transport(t) => {
301                        transport = t.clone();
302                    }
303                    Param::Other(key, Some(value)) if key.value().eq_ignore_ascii_case("rport") => {
304                        if let Ok(port) = value.value().try_into() {
305                            host_with_port.port = Some(port);
306                        }
307                    }
308                    _ => {}
309                }
310            }
311        }
312        Ok((transport, host_with_port))
313    }
314
315    pub fn get_destination(msg: &rsip::SipMessage) -> Result<SocketAddr> {
316        let host_with_port = match msg {
317            rsip::SipMessage::Request(req) => req.uri().host_with_port.clone(),
318            rsip::SipMessage::Response(res) => Self::parse_target_from_via(res.via_header()?)?.1,
319        };
320        host_with_port.try_into().map_err(Into::into)
321    }
322}
323
324impl fmt::Display for SipConnection {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        match self {
327            SipConnection::Channel(t) => write!(f, "{}", t),
328            SipConnection::Udp(t) => write!(f, "UDP {}", t),
329            SipConnection::Tcp(t) => write!(f, "TCP {}", t),
330            SipConnection::TcpListener(t) => write!(f, "TCP LISTEN {}", t),
331        }
332    }
333}
334
335impl From<ChannelConnection> for SipConnection {
336    fn from(connection: ChannelConnection) -> Self {
337        SipConnection::Channel(connection)
338    }
339}
340
341impl From<UdpConnection> for SipConnection {
342    fn from(connection: UdpConnection) -> Self {
343        SipConnection::Udp(connection)
344    }
345}
346
347impl From<TcpConnection> for SipConnection {
348    fn from(connection: TcpConnection) -> Self {
349        SipConnection::Tcp(connection)
350    }
351}
352
353impl From<TcpListenerConnection> for SipConnection {
354    fn from(connection: TcpListenerConnection) -> Self {
355        SipConnection::TcpListener(connection)
356    }
357}
358
359impl From<SipAddr> for rsip::HostWithPort {
360    fn from(val: SipAddr) -> Self {
361        val.addr
362    }
363}
364
365impl From<SipAddr> for rsip::Uri {
366    fn from(val: SipAddr) -> Self {
367        let scheme = match val.r#type {
368            Some(rsip::transport::Transport::Wss) | Some(rsip::transport::Transport::Tls) => {
369                rsip::Scheme::Sips
370            }
371            _ => rsip::Scheme::Sip,
372        };
373        rsip::Uri {
374            scheme: Some(scheme),
375            host_with_port: val.addr,
376            ..Default::default()
377        }
378    }
379}