ftth_rsipstack/transport/
udp.rs1use super::{connection::TransportSender, SipAddr, SipConnection};
2use crate::{
3 transport::{
4 connection::{KEEPALIVE_REQUEST, KEEPALIVE_RESPONSE},
5 TransportEvent,
6 },
7 Result,
8};
9use std::{net::SocketAddr, sync::Arc};
10use tokio::net::UdpSocket;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, info, warn};
13pub struct UdpInner {
14 pub conn: UdpSocket,
15 pub addr: SipAddr,
16}
17
18#[derive(Clone)]
19pub struct UdpConnection {
20 pub external: Option<SipAddr>,
21 cancel_token: Option<CancellationToken>,
22 inner: Arc<UdpInner>,
23}
24
25impl UdpConnection {
26 pub async fn attach(
27 inner: UdpInner,
28 external: Option<SocketAddr>,
29 cancel_token: Option<CancellationToken>,
30 ) -> Self {
31 UdpConnection {
32 external: external.map(|addr| SipAddr {
33 r#type: Some(rsip::transport::Transport::Udp),
34 addr: SipConnection::resolve_bind_address(addr).into(),
35 }),
36 inner: Arc::new(inner),
37 cancel_token,
38 }
39 }
40
41 pub async fn create_connection(
42 local: SocketAddr,
43 external: Option<SocketAddr>,
44 cancel_token: Option<CancellationToken>,
45 ) -> Result<Self> {
46 let conn = UdpSocket::bind(local).await?;
47
48 let addr = SipAddr {
49 r#type: Some(rsip::transport::Transport::Udp),
50 addr: SipConnection::resolve_bind_address(conn.local_addr()?).into(),
51 };
52
53 let t = UdpConnection {
54 external: external.map(|addr| SipAddr {
55 r#type: Some(rsip::transport::Transport::Udp),
56 addr: addr.into(),
57 }),
58 inner: Arc::new(UdpInner { addr, conn }),
59 cancel_token,
60 };
61 info!("created UDP connection: {} external: {:?}", t, external);
62 Ok(t)
63 }
64
65 pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
66 let mut buf = vec![0u8; 2048];
67 loop {
68 let (len, addr) = tokio::select! {
69 _ = async {
71 if let Some(ref cancel_token) = self.cancel_token {
72 cancel_token.cancelled().await;
73 } else {
74 std::future::pending::<()>().await;
76 }
77 } => {
78 debug!("UDP serve_loop cancelled");
79 return Ok(());
80 }
81 result = self.inner.conn.recv_from(&mut buf) => {
83 match result {
84 Ok((len, addr)) => (len, addr),
85 Err(e) => {
86 warn!("error receiving UDP packet: {}", e);
87 continue;
88 }
89 }
90 }
91 };
92
93 match &buf[..len] {
94 KEEPALIVE_REQUEST => {
95 self.inner.conn.send_to(KEEPALIVE_RESPONSE, addr).await.ok();
96 continue;
97 }
98 KEEPALIVE_RESPONSE => continue,
99 _ => {
100 if buf.iter().all(|&b| b.is_ascii_whitespace()) {
101 continue;
102 }
103 }
104 }
105
106 let undecoded = match std::str::from_utf8(&buf[..len]) {
107 Ok(s) => s,
108 Err(e) => {
109 info!(
110 "decoding text from: {} error: {} buf: {:?}",
111 addr,
112 e,
113 &buf[..len]
114 );
115 continue;
116 }
117 };
118
119 let msg = match rsip::SipMessage::try_from(undecoded) {
120 Ok(msg) => msg,
121 Err(e) => {
122 info!(
123 "error parsing SIP message from: {} error: {} buf: {}",
124 addr, e, undecoded
125 );
126 continue;
127 }
128 };
129
130 let msg = match SipConnection::update_msg_received(
131 msg,
132 addr,
133 rsip::transport::Transport::Udp,
134 ) {
135 Ok(msg) => msg,
136 Err(e) => {
137 info!(
138 "error updating SIP via from: {} error: {:?} buf: {}",
139 addr, e, undecoded
140 );
141 continue;
142 }
143 };
144
145 debug!(
146 len, src=%addr,dest=%self.get_addr(), message=undecoded,
147 "udp received"
148 );
149
150 sender.send(TransportEvent::Incoming(
151 msg,
152 SipConnection::Udp(self.clone()),
153 SipAddr {
154 r#type: Some(rsip::transport::Transport::Udp),
155 addr: addr.into(),
156 },
157 ))?;
158 }
159 }
160
161 pub async fn send(
162 &self,
163 msg: rsip::SipMessage,
164 destination: Option<&SipAddr>,
165 ) -> crate::Result<()> {
166 let destination = match destination {
167 Some(addr) => addr.get_socketaddr(),
168 None => SipConnection::get_destination(&msg),
169 }?;
170 let buf = msg.to_string();
171
172 debug!(len=buf.len(), src=%self.get_addr(),
173 dest=%destination, message=%buf,
174 "udp send");
175
176 self.inner
177 .conn
178 .send_to(buf.as_bytes(), destination)
179 .await
180 .map_err(|e| {
181 crate::Error::TransportLayerError(e.to_string(), self.get_addr().to_owned())
182 })
183 .map(|_| ())
184 }
185
186 pub async fn send_raw(&self, buf: &[u8], destination: &SipAddr) -> Result<()> {
187 self.inner
189 .conn
190 .send_to(buf, destination.get_socketaddr()?)
191 .await
192 .map_err(|e| {
193 crate::Error::TransportLayerError(e.to_string(), self.get_addr().to_owned())
194 })
195 .map(|_| ())
196 }
197
198 pub async fn recv_raw(&self, buf: &mut [u8]) -> Result<(usize, SipAddr)> {
199 let (len, addr) = self.inner.conn.recv_from(buf).await?;
200 Ok((
202 len,
203 SipAddr {
204 r#type: Some(rsip::transport::Transport::Udp),
205 addr: addr.into(),
206 },
207 ))
208 }
209
210 pub fn get_addr(&self) -> &SipAddr {
211 if let Some(external) = &self.external {
212 external
213 } else {
214 &self.inner.addr
215 }
216 }
217 pub fn cancel_token(&self) -> Option<CancellationToken> {
218 self.cancel_token.clone()
219 }
220}
221
222impl std::fmt::Display for UdpConnection {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 match self.inner.conn.local_addr() {
225 Ok(addr) => write!(f, "{}", addr),
226 Err(_) => write!(f, "*:*"),
227 }
228 }
229}
230
231impl std::fmt::Debug for UdpConnection {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 write!(f, "{}", self.inner.addr)
234 }
235}
236
237impl Drop for UdpInner {
238 fn drop(&mut self) {
239 info!("dropping UDP transport: {}", self.addr);
240 }
241}