ftth_rsipstack/transport/
connection.rs1use super::{sip_addr::SipAddr, stream::StreamConnection, tcp::TcpConnection, udp::UdpConnection};
2use crate::transport::channel::ChannelConnection;
3use crate::transport::tcp_listener::TcpListenerConnection;
4use crate::Result;
5use get_if_addrs::IfAddr;
6use rsip::{
7 prelude::{HeadersExt, ToTypedHeader},
8 Param, SipMessage,
9};
10use std::net::{IpAddr, Ipv4Addr};
11use std::{fmt, net::SocketAddr};
12use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
13use tokio_util::sync::CancellationToken;
14use tracing::debug;
15
16#[derive(Debug)]
52pub enum TransportEvent {
53 Incoming(SipMessage, SipConnection, SipAddr),
54 New(SipConnection),
55 Closed(SipConnection),
56}
57
58pub type TransportReceiver = UnboundedReceiver<TransportEvent>;
59pub type TransportSender = UnboundedSender<TransportEvent>;
60
61pub const KEEPALIVE_REQUEST: &[u8] = b"\r\n\r\n";
62pub const KEEPALIVE_RESPONSE: &[u8] = b"\r\n";
63
64#[derive(Clone, Debug)]
130pub enum SipConnection {
131 Channel(ChannelConnection),
132 Udp(UdpConnection),
133 Tcp(TcpConnection),
134 TcpListener(TcpListenerConnection),
135}
136
137impl SipConnection {
138 pub fn is_reliable(&self) -> bool {
139 match self {
140 SipConnection::Udp(_) => false,
141 _ => true,
142 }
143 }
144
145 pub fn cancel_token(&self) -> Option<CancellationToken> {
146 match self {
147 SipConnection::Channel(transport) => transport.cancel_token(),
148 SipConnection::Udp(transport) => transport.cancel_token(),
149 SipConnection::Tcp(transport) => transport.cancel_token(),
150 SipConnection::TcpListener(_) => None,
151 }
152 }
153 pub fn get_addr(&self) -> &SipAddr {
154 match self {
155 SipConnection::Channel(transport) => transport.get_addr(),
156 SipConnection::Udp(transport) => transport.get_addr(),
157 SipConnection::Tcp(transport) => transport.get_addr(),
158 SipConnection::TcpListener(transport) => transport.get_addr(),
159 }
160 }
161 pub async fn send(&self, msg: rsip::SipMessage, destination: Option<&SipAddr>) -> Result<()> {
162 match self {
163 SipConnection::Channel(transport) => transport.send(msg).await,
164 SipConnection::Udp(transport) => transport.send(msg, destination).await,
165 SipConnection::Tcp(transport) => transport.send_message(msg).await,
166 SipConnection::TcpListener(_) => {
167 debug!("SipConnection::send: TcpListener cannot send messages");
168 Ok(())
169 }
170 }
171 }
172 pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
173 match self {
174 SipConnection::Channel(transport) => transport.serve_loop(sender).await,
175 SipConnection::Udp(transport) => transport.serve_loop(sender).await,
176 SipConnection::Tcp(transport) => transport.serve_loop(sender).await,
177 SipConnection::TcpListener(_) => {
178 debug!("SipConnection::serve_loop: TcpListener does not have serve_loop");
179 Ok(())
180 }
181 }
182 }
183
184 pub async fn close(&self) -> Result<()> {
185 match self {
186 SipConnection::Channel(transport) => transport.close().await,
187 SipConnection::Udp(_) => Ok(()), SipConnection::Tcp(transport) => transport.close().await,
189 SipConnection::TcpListener(transport) => transport.close().await,
190 }
191 }
192}
193
194impl SipConnection {
195 pub fn update_msg_received(
196 msg: SipMessage,
197 addr: SocketAddr,
198 transport: rsip::transport::Transport,
199 ) -> Result<SipMessage> {
200 match msg {
201 SipMessage::Request(mut req) => {
202 let via = req.via_header_mut()?;
203 Self::build_via_received(via, addr, transport)?;
204 Ok(req.into())
205 }
206 SipMessage::Response(_) => Ok(msg),
207 }
208 }
209
210 pub fn resolve_bind_address(addr: SocketAddr) -> SocketAddr {
211 let ip = addr.ip();
212 if ip.is_unspecified() {
213 let interfaces = match get_if_addrs::get_if_addrs() {
215 Ok(interfaces) => interfaces,
216 Err(_) => return addr,
217 };
218 for interface in interfaces {
219 if interface.is_loopback() {
220 continue;
221 }
222 match interface.addr {
223 IfAddr::V4(v4addr) => {
224 return SocketAddr::new(IpAddr::V4(v4addr.ip), addr.port());
225 }
226 _ => continue,
228 }
229 }
230 return SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port());
232 }
233 addr
234 }
235 pub fn build_via_received(
236 via: &mut rsip::headers::Via,
237 addr: SocketAddr,
238 transport: rsip::transport::Transport,
239 ) -> Result<()> {
240 let received = addr.into();
241 let mut typed_via = via.typed()?;
242
243 typed_via.params.retain(|param| {
244 if let Param::Other(key, _) = param {
245 !key.value().eq_ignore_ascii_case("rport")
246 } else if matches!(param, Param::Received(_)) {
247 false
248 } else {
249 true
250 }
251 });
252
253 if typed_via.uri.host_with_port == received {
255 return Ok(());
256 }
257
258 let should_add_received = match transport {
260 rsip::transport::Transport::Udp => true,
261 _ => {
262 typed_via.uri.host_with_port.host != received.host
264 }
265 };
266
267 if !should_add_received {
268 return Ok(());
269 }
270
271 if transport != rsip::transport::Transport::Udp && typed_via.transport != transport {
272 typed_via.params.push(Param::Transport(transport));
273 }
274
275 *via = typed_via
276 .with_param(Param::Received(rsip::param::Received::new(
277 received.host.to_string(),
278 )))
279 .with_param(Param::Other(
280 rsip::param::OtherParam::new("rport"),
281 Some(rsip::param::OtherParamValue::new(addr.port().to_string())),
282 ))
283 .into();
284 Ok(())
285 }
286
287 pub fn parse_target_from_via(
288 via: &rsip::headers::untyped::Via,
289 ) -> Result<(rsip::Transport, rsip::HostWithPort)> {
290 let mut host_with_port = via.uri()?.host_with_port;
291 let mut transport = via.trasnport().unwrap_or(rsip::Transport::Udp);
292 if let Ok(params) = via.params().as_ref() {
293 for param in params {
294 match param {
295 Param::Received(v) => {
296 if let Ok(addr) = v.parse() {
297 host_with_port.host = addr.into();
298 }
299 }
300 Param::Transport(t) => {
301 transport = t.clone();
302 }
303 Param::Other(key, Some(value)) if key.value().eq_ignore_ascii_case("rport") => {
304 if let Ok(port) = value.value().try_into() {
305 host_with_port.port = Some(port);
306 }
307 }
308 _ => {}
309 }
310 }
311 }
312 Ok((transport, host_with_port))
313 }
314
315 pub fn get_destination(msg: &rsip::SipMessage) -> Result<SocketAddr> {
316 let host_with_port = match msg {
317 rsip::SipMessage::Request(req) => req.uri().host_with_port.clone(),
318 rsip::SipMessage::Response(res) => Self::parse_target_from_via(res.via_header()?)?.1,
319 };
320 host_with_port.try_into().map_err(Into::into)
321 }
322}
323
324impl fmt::Display for SipConnection {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 match self {
327 SipConnection::Channel(t) => write!(f, "{}", t),
328 SipConnection::Udp(t) => write!(f, "UDP {}", t),
329 SipConnection::Tcp(t) => write!(f, "TCP {}", t),
330 SipConnection::TcpListener(t) => write!(f, "TCP LISTEN {}", t),
331 }
332 }
333}
334
335impl From<ChannelConnection> for SipConnection {
336 fn from(connection: ChannelConnection) -> Self {
337 SipConnection::Channel(connection)
338 }
339}
340
341impl From<UdpConnection> for SipConnection {
342 fn from(connection: UdpConnection) -> Self {
343 SipConnection::Udp(connection)
344 }
345}
346
347impl From<TcpConnection> for SipConnection {
348 fn from(connection: TcpConnection) -> Self {
349 SipConnection::Tcp(connection)
350 }
351}
352
353impl From<TcpListenerConnection> for SipConnection {
354 fn from(connection: TcpListenerConnection) -> Self {
355 SipConnection::TcpListener(connection)
356 }
357}
358
359impl From<SipAddr> for rsip::HostWithPort {
360 fn from(val: SipAddr) -> Self {
361 val.addr
362 }
363}
364
365impl From<SipAddr> for rsip::Uri {
366 fn from(val: SipAddr) -> Self {
367 let scheme = match val.r#type {
368 Some(rsip::transport::Transport::Wss) | Some(rsip::transport::Transport::Tls) => {
369 rsip::Scheme::Sips
370 }
371 _ => rsip::Scheme::Sip,
372 };
373 rsip::Uri {
374 scheme: Some(scheme),
375 host_with_port: val.addr,
376 ..Default::default()
377 }
378 }
379}