ftth_rsipstack/transport/
udp.rs

1use super::{connection::TransportSender, SipAddr, SipConnection};
2use crate::{
3    transport::{
4        connection::{KEEPALIVE_REQUEST, KEEPALIVE_RESPONSE},
5        TransportEvent,
6    },
7    Result,
8};
9use std::{net::SocketAddr, sync::Arc};
10use tokio::net::UdpSocket;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, info, warn};
13pub struct UdpInner {
14    pub conn: UdpSocket,
15    pub addr: SipAddr,
16}
17
18#[derive(Clone)]
19pub struct UdpConnection {
20    pub external: Option<SipAddr>,
21    cancel_token: Option<CancellationToken>,
22    inner: Arc<UdpInner>,
23}
24
25impl UdpConnection {
26    pub async fn attach(
27        inner: UdpInner,
28        external: Option<SocketAddr>,
29        cancel_token: Option<CancellationToken>,
30    ) -> Self {
31        UdpConnection {
32            external: external.map(|addr| SipAddr {
33                r#type: Some(rsip::transport::Transport::Udp),
34                addr: SipConnection::resolve_bind_address(addr).into(),
35            }),
36            inner: Arc::new(inner),
37            cancel_token,
38        }
39    }
40
41    pub async fn create_connection(
42        local: SocketAddr,
43        external: Option<SocketAddr>,
44        cancel_token: Option<CancellationToken>,
45    ) -> Result<Self> {
46        let conn = UdpSocket::bind(local).await?;
47
48        let addr = SipAddr {
49            r#type: Some(rsip::transport::Transport::Udp),
50            addr: SipConnection::resolve_bind_address(conn.local_addr()?).into(),
51        };
52
53        let t = UdpConnection {
54            external: external.map(|addr| SipAddr {
55                r#type: Some(rsip::transport::Transport::Udp),
56                addr: addr.into(),
57            }),
58            inner: Arc::new(UdpInner { addr, conn }),
59            cancel_token,
60        };
61        info!("created UDP connection: {} external: {:?}", t, external);
62        Ok(t)
63    }
64
65    pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
66        let mut buf = vec![0u8; 2048];
67        loop {
68            let (len, addr) = tokio::select! {
69                // Check for cancellation on each iteration
70                _ = async {
71                    if let Some(ref cancel_token) = self.cancel_token {
72                        cancel_token.cancelled().await;
73                    } else {
74                        // If no cancel token, wait forever
75                        std::future::pending::<()>().await;
76                    }
77                } => {
78                    debug!("UDP serve_loop cancelled");
79                    return Ok(());
80                }
81                // Receive UDP packets
82                result = self.inner.conn.recv_from(&mut buf) => {
83                    match result {
84                        Ok((len, addr)) => (len, addr),
85                        Err(e) => {
86                            warn!("error receiving UDP packet: {}", e);
87                            continue;
88                        }
89                    }
90                }
91            };
92
93            match &buf[..len] {
94                KEEPALIVE_REQUEST => {
95                    self.inner.conn.send_to(KEEPALIVE_RESPONSE, addr).await.ok();
96                    continue;
97                }
98                KEEPALIVE_RESPONSE => continue,
99                _ => {
100                    if buf.iter().all(|&b| b.is_ascii_whitespace()) {
101                        continue;
102                    }
103                }
104            }
105
106            let undecoded = match std::str::from_utf8(&buf[..len]) {
107                Ok(s) => s,
108                Err(e) => {
109                    info!(
110                        "decoding text from: {} error: {} buf: {:?}",
111                        addr,
112                        e,
113                        &buf[..len]
114                    );
115                    continue;
116                }
117            };
118
119            let msg = match rsip::SipMessage::try_from(undecoded) {
120                Ok(msg) => msg,
121                Err(e) => {
122                    info!(
123                        "error parsing SIP message from: {} error: {} buf: {}",
124                        addr, e, undecoded
125                    );
126                    continue;
127                }
128            };
129
130            let msg = match SipConnection::update_msg_received(
131                msg,
132                addr,
133                rsip::transport::Transport::Udp,
134            ) {
135                Ok(msg) => msg,
136                Err(e) => {
137                    info!(
138                        "error updating SIP via from: {} error: {:?} buf: {}",
139                        addr, e, undecoded
140                    );
141                    continue;
142                }
143            };
144
145            debug!(
146                len, src=%addr,dest=%self.get_addr(), message=undecoded,
147                "udp received"
148            );
149
150            sender.send(TransportEvent::Incoming(
151                msg,
152                SipConnection::Udp(self.clone()),
153                SipAddr {
154                    r#type: Some(rsip::transport::Transport::Udp),
155                    addr: addr.into(),
156                },
157            ))?;
158        }
159    }
160
161    pub async fn send(
162        &self,
163        msg: rsip::SipMessage,
164        destination: Option<&SipAddr>,
165    ) -> crate::Result<()> {
166        let destination = match destination {
167            Some(addr) => addr.get_socketaddr(),
168            None => SipConnection::get_destination(&msg),
169        }?;
170        let buf = msg.to_string();
171
172        debug!(len=buf.len(), src=%self.get_addr(),
173        dest=%destination, message=%buf,
174        "udp send");
175
176        self.inner
177            .conn
178            .send_to(buf.as_bytes(), destination)
179            .await
180            .map_err(|e| {
181                crate::Error::TransportLayerError(e.to_string(), self.get_addr().to_owned())
182            })
183            .map(|_| ())
184    }
185
186    pub async fn send_raw(&self, buf: &[u8], destination: &SipAddr) -> Result<()> {
187        //trace!("send_raw {} -> {}", buf.len(), target);
188        self.inner
189            .conn
190            .send_to(buf, destination.get_socketaddr()?)
191            .await
192            .map_err(|e| {
193                crate::Error::TransportLayerError(e.to_string(), self.get_addr().to_owned())
194            })
195            .map(|_| ())
196    }
197
198    pub async fn recv_raw(&self, buf: &mut [u8]) -> Result<(usize, SipAddr)> {
199        let (len, addr) = self.inner.conn.recv_from(buf).await?;
200        // trace!("received {} -> {}", len, addr);
201        Ok((
202            len,
203            SipAddr {
204                r#type: Some(rsip::transport::Transport::Udp),
205                addr: addr.into(),
206            },
207        ))
208    }
209
210    pub fn get_addr(&self) -> &SipAddr {
211        if let Some(external) = &self.external {
212            external
213        } else {
214            &self.inner.addr
215        }
216    }
217    pub fn cancel_token(&self) -> Option<CancellationToken> {
218        self.cancel_token.clone()
219    }
220}
221
222impl std::fmt::Display for UdpConnection {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        match self.inner.conn.local_addr() {
225            Ok(addr) => write!(f, "{}", addr),
226            Err(_) => write!(f, "*:*"),
227        }
228    }
229}
230
231impl std::fmt::Debug for UdpConnection {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        write!(f, "{}", self.inner.addr)
234    }
235}
236
237impl Drop for UdpInner {
238    fn drop(&mut self) {
239        info!("dropping UDP transport: {}", self.addr);
240    }
241}