ftth_rsipstack/transport/
connection.rs1use super::{sip_addr::SipAddr, stream::StreamConnection, tcp::TcpConnection, udp::UdpConnection};
2use crate::rsip;
3use crate::transport::channel::ChannelConnection;
4use crate::transport::tcp_listener::TcpListenerConnection;
5use crate::Result;
6use get_if_addrs::IfAddr;
7use rsip::{
8 prelude::{HeadersExt, ToTypedHeader},
9 Param, SipMessage,
10};
11use std::net::{IpAddr, Ipv4Addr};
12use std::{fmt, net::SocketAddr};
13use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
14use tokio_util::sync::CancellationToken;
15use tracing::debug;
16
17#[derive(Debug)]
53pub enum TransportEvent {
54 Incoming(SipMessage, SipConnection, SipAddr),
55 New(SipConnection),
56 Closed(SipConnection),
57}
58
59pub type TransportReceiver = UnboundedReceiver<TransportEvent>;
60pub type TransportSender = UnboundedSender<TransportEvent>;
61
62pub const KEEPALIVE_REQUEST: &[u8] = b"\r\n\r\n";
63pub const KEEPALIVE_RESPONSE: &[u8] = b"\r\n";
64
65#[derive(Clone, Debug)]
132pub enum SipConnection {
133 Channel(ChannelConnection),
134 Udp(UdpConnection),
135 Tcp(TcpConnection),
136 TcpListener(TcpListenerConnection),
137}
138
139impl SipConnection {
140 pub fn is_reliable(&self) -> bool {
141 match self {
142 SipConnection::Udp(_) => false,
143 _ => true,
144 }
145 }
146
147 pub fn cancel_token(&self) -> Option<CancellationToken> {
148 match self {
149 SipConnection::Channel(transport) => transport.cancel_token(),
150 SipConnection::Udp(transport) => transport.cancel_token(),
151 SipConnection::Tcp(transport) => transport.cancel_token(),
152 SipConnection::TcpListener(_) => None,
153 }
154 }
155 pub fn get_addr(&self) -> &SipAddr {
156 match self {
157 SipConnection::Channel(transport) => transport.get_addr(),
158 SipConnection::Udp(transport) => transport.get_addr(),
159 SipConnection::Tcp(transport) => transport.get_addr(),
160 SipConnection::TcpListener(transport) => transport.get_addr(),
161 }
162 }
163 pub async fn send(&self, msg: rsip::SipMessage, destination: Option<&SipAddr>) -> Result<()> {
164 match self {
165 SipConnection::Channel(transport) => transport.send(msg).await,
166 SipConnection::Udp(transport) => transport.send(msg, destination).await,
167 SipConnection::Tcp(transport) => transport.send_message(msg).await,
168 SipConnection::TcpListener(_) => {
169 debug!("SipConnection::send: TcpListener cannot send messages");
170 Ok(())
171 }
172 }
173 }
174 pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
175 match self {
176 SipConnection::Channel(transport) => transport.serve_loop(sender).await,
177 SipConnection::Udp(transport) => transport.serve_loop(sender).await,
178 SipConnection::Tcp(transport) => transport.serve_loop(sender).await,
179 SipConnection::TcpListener(_) => {
180 debug!("SipConnection::serve_loop: TcpListener does not have serve_loop");
181 Ok(())
182 }
183 }
184 }
185
186 pub async fn close(&self) -> Result<()> {
187 match self {
188 SipConnection::Channel(transport) => transport.close().await,
189 SipConnection::Udp(_) => Ok(()), SipConnection::Tcp(transport) => transport.close().await,
191 SipConnection::TcpListener(transport) => transport.close().await,
192 }
193 }
194}
195
196impl SipConnection {
197 pub fn update_msg_received(
198 msg: SipMessage,
199 addr: SocketAddr,
200 transport: rsip::transport::Transport,
201 ) -> Result<SipMessage> {
202 match msg {
203 SipMessage::Request(mut req) => {
204 let via = req.via_header_mut()?;
205 Self::build_via_received(via, addr, transport)?;
206 Ok(req.into())
207 }
208 SipMessage::Response(_) => Ok(msg),
209 }
210 }
211
212 pub fn resolve_bind_address(addr: SocketAddr) -> SocketAddr {
213 let ip = addr.ip();
214 if ip.is_unspecified() {
215 let interfaces = match get_if_addrs::get_if_addrs() {
217 Ok(interfaces) => interfaces,
218 Err(_) => return addr,
219 };
220 for interface in interfaces {
221 if interface.is_loopback() {
222 continue;
223 }
224 match interface.addr {
225 IfAddr::V4(v4addr) => {
226 return SocketAddr::new(IpAddr::V4(v4addr.ip), addr.port());
227 }
228 _ => continue,
230 }
231 }
232 return SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port());
234 }
235 addr
236 }
237 pub fn build_via_received(
238 via: &mut rsip::headers::Via,
239 addr: SocketAddr,
240 transport: rsip::transport::Transport,
241 ) -> Result<()> {
242 let received = addr.into();
243 let mut typed_via = via.typed()?;
244
245 typed_via.params.retain(|param| {
246 if let Param::Other(key, _) = param {
247 !key.value().eq_ignore_ascii_case("rport")
248 } else if matches!(param, Param::Received(_)) {
249 false
250 } else {
251 true
252 }
253 });
254
255 if typed_via.uri.host_with_port == received {
257 return Ok(());
258 }
259
260 let should_add_received = match transport {
262 rsip::transport::Transport::Udp => true,
263 _ => {
264 typed_via.uri.host_with_port.host != received.host
266 }
267 };
268
269 if !should_add_received {
270 return Ok(());
271 }
272
273 if transport != rsip::transport::Transport::Udp && typed_via.transport != transport {
274 typed_via.params.push(Param::Transport(transport));
275 }
276
277 *via = typed_via
278 .with_param(Param::Received(rsip::param::Received::new(
279 received.host.to_string(),
280 )))
281 .with_param(Param::Other(
282 rsip::param::OtherParam::new("rport"),
283 Some(rsip::param::OtherParamValue::new(addr.port().to_string())),
284 ))
285 .into();
286 Ok(())
287 }
288
289 pub fn parse_target_from_via(
290 via: &rsip::headers::untyped::Via,
291 ) -> Result<(rsip::Transport, rsip::HostWithPort)> {
292 let mut host_with_port = via.uri()?.host_with_port;
293 let mut transport = via.trasnport().unwrap_or(rsip::Transport::Udp);
294 if let Ok(params) = via.params().as_ref() {
295 for param in params {
296 match param {
297 Param::Received(v) => {
298 if let Ok(addr) = v.parse() {
299 host_with_port.host = addr.into();
300 }
301 }
302 Param::Transport(t) => {
303 transport = t.clone();
304 }
305 Param::Other(key, Some(value)) if key.value().eq_ignore_ascii_case("rport") => {
306 if let Ok(port) = value.value().try_into() {
307 host_with_port.port = Some(port);
308 }
309 }
310 _ => {}
311 }
312 }
313 }
314 Ok((transport, host_with_port))
315 }
316
317 pub fn get_destination(msg: &rsip::SipMessage) -> Result<SocketAddr> {
318 let host_with_port = match msg {
319 rsip::SipMessage::Request(req) => req.uri().host_with_port.clone(),
320 rsip::SipMessage::Response(res) => Self::parse_target_from_via(res.via_header()?)?.1,
321 };
322 host_with_port.try_into().map_err(Into::into)
323 }
324}
325
326impl fmt::Display for SipConnection {
327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328 match self {
329 SipConnection::Channel(t) => write!(f, "{}", t),
330 SipConnection::Udp(t) => write!(f, "UDP {}", t),
331 SipConnection::Tcp(t) => write!(f, "TCP {}", t),
332 SipConnection::TcpListener(t) => write!(f, "TCP LISTEN {}", t),
333 }
334 }
335}
336
337impl From<ChannelConnection> for SipConnection {
338 fn from(connection: ChannelConnection) -> Self {
339 SipConnection::Channel(connection)
340 }
341}
342
343impl From<UdpConnection> for SipConnection {
344 fn from(connection: UdpConnection) -> Self {
345 SipConnection::Udp(connection)
346 }
347}
348
349impl From<TcpConnection> for SipConnection {
350 fn from(connection: TcpConnection) -> Self {
351 SipConnection::Tcp(connection)
352 }
353}
354
355impl From<TcpListenerConnection> for SipConnection {
356 fn from(connection: TcpListenerConnection) -> Self {
357 SipConnection::TcpListener(connection)
358 }
359}
360
361impl From<SipAddr> for rsip::HostWithPort {
362 fn from(val: SipAddr) -> Self {
363 val.addr
364 }
365}
366
367impl From<SipAddr> for rsip::Uri {
368 fn from(val: SipAddr) -> Self {
369 let scheme = match val.r#type {
370 Some(rsip::transport::Transport::Wss) | Some(rsip::transport::Transport::Tls) => {
371 rsip::Scheme::Sips
372 }
373 _ => rsip::Scheme::Sip,
374 };
375 rsip::Uri {
376 scheme: Some(scheme),
377 host_with_port: val.addr,
378 ..Default::default()
379 }
380 }
381}