1use std::{
2 marker::PhantomData,
3 net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
4};
5
6use tokio::io::{unix::AsyncFd, Interest};
7
8use crate::{
9 control_message::{control_message_space, ControlMessage, MessageQueue},
10 interface::{lookup_phc, InterfaceName},
11 networkaddress::{sealed::PrivateToken, EthernetAddress, MacAddress, NetworkAddress},
12 raw_socket::RawSocket,
13 socket::select_timestamp,
14};
15
16use super::{InterfaceTimestampMode, Open, Socket, Timestamp};
17
18const SOF_TIMESTAMPING_BIND_PHC: libc::c_uint = 1 << 15;
19
20impl<A: NetworkAddress, S> Socket<A, S> {
21 pub(super) async fn fetch_send_timestamp(
22 &self,
23 expected_counter: u32,
24 ) -> std::io::Result<Option<Timestamp>> {
25 use std::time::Duration;
26
27 const TIMEOUT: Duration = Duration::from_millis(200);
28
29 match tokio::time::timeout(TIMEOUT, self.fetch_send_timestamp_loop(expected_counter)).await
30 {
31 Ok(res_opt_timestamp) => res_opt_timestamp,
32 Err(_timeout_elapsed) => Ok(None),
33 }
34 }
35
36 pub(super) async fn fetch_send_timestamp_loop(
37 &self,
38 expected_counter: u32,
39 ) -> std::io::Result<Option<Timestamp>> {
40 let try_read = |_: &RawSocket| self.fetch_send_timestamp_try_read(expected_counter);
41
42 loop {
43 match self.socket.async_io(Interest::ERROR, try_read).await? {
45 Some(timestamp) => break Ok(Some(timestamp)),
46 None => continue,
47 }
48 }
49 }
50
51 pub(super) fn fetch_send_timestamp_try_read(
52 &self,
53 expected_counter: u32,
54 ) -> std::io::Result<Option<Timestamp>> {
55 const CONTROL_SIZE: usize = control_message_space::<[libc::timespec; 3]>()
56 + control_message_space::<(libc::sock_extended_err, libc::sockaddr_storage)>();
57
58 let mut control_buf = [0; CONTROL_SIZE];
59
60 let (_, control_messages, _) = self.socket.get_ref().receive_message(
62 &mut [],
63 &mut control_buf,
64 MessageQueue::Error,
65 )?;
66
67 let mut send_ts = None;
68 for msg in control_messages {
69 match msg {
70 ControlMessage::Timestamping { software, hardware } => {
71 send_ts = select_timestamp(self.timestamp_mode, software, hardware);
72 }
73
74 ControlMessage::ReceiveError(error) => {
75 if error.ee_errno as libc::c_int != libc::ENOMSG {
78 tracing::warn!(
79 expected_counter,
80 error.ee_data,
81 "error message on the MSG_ERRQUEUE"
82 );
83 }
84
85 if error.ee_data != expected_counter {
87 tracing::debug!(
88 error.ee_data,
89 expected_counter,
90 "Timestamp for unrelated packet"
91 );
92 return Ok(None);
93 }
94 }
95
96 ControlMessage::Other(msg) => {
97 tracing::warn!(
98 msg.cmsg_level,
99 msg.cmsg_type,
100 "unexpected message on the MSG_ERRQUEUE",
101 );
102 }
103 }
104 }
105
106 Ok(send_ts)
107 }
108}
109
110pub(super) fn configure_timestamping(
111 socket: &RawSocket,
112 interface: Option<InterfaceName>,
113 mode: InterfaceTimestampMode,
114 mut bind_phc: Option<u32>,
115) -> std::io::Result<()> {
116 if let Some(interface) = interface {
118 if lookup_phc(interface) == bind_phc {
119 bind_phc = None
120 }
121 }
122
123 let options = match mode {
124 InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwarePTPAll => {
125 libc::SOF_TIMESTAMPING_RAW_HARDWARE
126 | libc::SOF_TIMESTAMPING_TX_SOFTWARE
127 | libc::SOF_TIMESTAMPING_RX_HARDWARE
128 | libc::SOF_TIMESTAMPING_TX_HARDWARE
129 | libc::SOF_TIMESTAMPING_OPT_TSONLY
130 | libc::SOF_TIMESTAMPING_OPT_ID
131 | bind_phc
132 .map(|_| SOF_TIMESTAMPING_BIND_PHC)
133 .unwrap_or_default()
134 }
135 InterfaceTimestampMode::HardwareRecv | InterfaceTimestampMode::HardwarePTPRecv => {
136 libc::SOF_TIMESTAMPING_RAW_HARDWARE
137 | libc::SOF_TIMESTAMPING_RX_HARDWARE
138 | bind_phc
139 .map(|_| SOF_TIMESTAMPING_BIND_PHC)
140 .unwrap_or_default()
141 }
142 InterfaceTimestampMode::SoftwareAll => {
143 libc::SOF_TIMESTAMPING_SOFTWARE
144 | libc::SOF_TIMESTAMPING_RX_SOFTWARE
145 | libc::SOF_TIMESTAMPING_TX_SOFTWARE
146 | libc::SOF_TIMESTAMPING_OPT_TSONLY
147 | libc::SOF_TIMESTAMPING_OPT_ID
148 }
149 InterfaceTimestampMode::SoftwareRecv => {
150 libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_RX_SOFTWARE
151 }
152 InterfaceTimestampMode::None => return Ok(()),
153 };
154
155 socket.so_timestamping(options, bind_phc.unwrap_or_default())
156}
157
158pub fn open_interface_udp(
159 interface: InterfaceName,
160 port: u16,
161 timestamping: InterfaceTimestampMode,
162 bind_phc: Option<u32>,
163) -> std::io::Result<Socket<SocketAddr, Open>> {
164 let socket = RawSocket::open(libc::PF_INET6, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
166 socket.reuse_addr()?;
167 socket.ipv6_v6only(false)?;
168 socket.bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0).to_sockaddr(PrivateToken))?;
169 socket.bind_to_device(interface)?;
170 socket.ipv6_multicast_if(interface)?;
171 socket.ipv6_multicast_loop(false)?;
172 configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
173 match timestamping {
174 InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
175 socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
176 }
177 InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
178 .driver_enable_hardware_timestamping(
179 interface,
180 libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
181 )?,
182 InterfaceTimestampMode::None
183 | InterfaceTimestampMode::SoftwareAll
184 | InterfaceTimestampMode::SoftwareRecv => {}
185 }
186 socket.set_nonblocking(true)?;
187
188 Ok(Socket {
189 timestamp_mode: timestamping,
190 socket: AsyncFd::new(socket)?,
191 send_counter: 0,
192 _addr: PhantomData,
193 _state: PhantomData,
194 })
195}
196
197pub fn open_interface_udp4(
198 interface: InterfaceName,
199 port: u16,
200 timestamping: InterfaceTimestampMode,
201 bind_phc: Option<u32>,
202) -> std::io::Result<Socket<SocketAddrV4, Open>> {
203 let socket = RawSocket::open(libc::PF_INET, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
205 socket.reuse_addr()?;
206 socket.bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).to_sockaddr(PrivateToken))?;
207 socket.bind_to_device(interface)?;
208 socket.ip_multicast_if(interface)?;
209 socket.ip_multicast_loop(false)?;
210 configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
211 match timestamping {
212 InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
213 socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
214 }
215 InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
216 .driver_enable_hardware_timestamping(
217 interface,
218 libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
219 )?,
220 InterfaceTimestampMode::None
221 | InterfaceTimestampMode::SoftwareAll
222 | InterfaceTimestampMode::SoftwareRecv => {}
223 }
224 socket.set_nonblocking(true)?;
225
226 Ok(Socket {
227 timestamp_mode: timestamping,
228 socket: AsyncFd::new(socket)?,
229 send_counter: 0,
230 _addr: PhantomData,
231 _state: PhantomData,
232 })
233}
234
235pub fn open_interface_udp6(
236 interface: InterfaceName,
237 port: u16,
238 timestamping: InterfaceTimestampMode,
239 bind_phc: Option<u32>,
240) -> std::io::Result<Socket<SocketAddrV6, Open>> {
241 let socket = RawSocket::open(libc::PF_INET6, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
243 socket.reuse_addr()?;
244 socket.ipv6_v6only(true)?;
245 socket.bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0).to_sockaddr(PrivateToken))?;
246 socket.bind_to_device(interface)?;
247 socket.ipv6_multicast_if(interface)?;
248 socket.ipv6_multicast_loop(false)?;
249 configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
250 match timestamping {
251 InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
252 socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
253 }
254 InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
255 .driver_enable_hardware_timestamping(
256 interface,
257 libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
258 )?,
259 InterfaceTimestampMode::None
260 | InterfaceTimestampMode::SoftwareAll
261 | InterfaceTimestampMode::SoftwareRecv => {}
262 }
263 socket.set_nonblocking(true)?;
264
265 Ok(Socket {
266 timestamp_mode: timestamping,
267 socket: AsyncFd::new(socket)?,
268 send_counter: 0,
269 _addr: PhantomData,
270 _state: PhantomData,
271 })
272}
273
274pub fn open_interface_ethernet(
275 interface: InterfaceName,
276 protocol: u16,
277 timestamping: InterfaceTimestampMode,
278 bind_phc: Option<u32>,
279) -> std::io::Result<Socket<EthernetAddress, Open>> {
280 let socket = RawSocket::open(
281 libc::AF_PACKET,
282 libc::SOCK_DGRAM,
283 u16::from_ne_bytes(protocol.to_be_bytes()) as _,
284 )?;
285 socket.bind(
286 EthernetAddress::new(
287 u16::from_ne_bytes(protocol.to_le_bytes()),
288 MacAddress::new([0; 6]),
289 interface
290 .get_index()
291 .ok_or(std::io::ErrorKind::InvalidInput)? as _,
292 )
293 .to_sockaddr(PrivateToken),
294 )?;
295 configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
296 match timestamping {
297 InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
298 socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
299 }
300 InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
301 .driver_enable_hardware_timestamping(
302 interface,
303 libc::HWTSTAMP_FILTER_PTP_V2_L2_EVENT as _,
304 )?,
305 InterfaceTimestampMode::None
306 | InterfaceTimestampMode::SoftwareAll
307 | InterfaceTimestampMode::SoftwareRecv => {}
308 }
309 socket.set_nonblocking(true)?;
310
311 Ok(Socket {
312 timestamp_mode: timestamping,
313 socket: AsyncFd::new(socket)?,
314 send_counter: 0,
315 _addr: PhantomData,
316 _state: PhantomData,
317 })
318}
319
320#[cfg(test)]
321mod tests {
322 use std::net::IpAddr;
323
324 use crate::socket::{connect_address, open_ip, GeneralTimestampMode};
325
326 use super::*;
327
328 #[tokio::test]
329 async fn test_open_udp6() {
330 use std::str::FromStr;
331 let mut a = open_interface_udp6(
332 InterfaceName::from_str("lo").unwrap(),
333 5123,
334 super::InterfaceTimestampMode::None,
335 None,
336 )
337 .unwrap();
338 let mut b = connect_address(
339 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5123),
340 GeneralTimestampMode::None,
341 )
342 .unwrap();
343 assert!(b.send(&[1, 2, 3]).await.is_ok());
344 let mut buf = [0; 4];
345 let recv_result = a.recv(&mut buf).await.unwrap();
346 assert_eq!(recv_result.bytes_read, 3);
347 assert_eq!(&buf[0..3], &[1, 2, 3]);
348 assert!(a.send_to(&[4, 5, 6], recv_result.remote_addr).await.is_ok());
349 let recv_result = b.recv(&mut buf).await.unwrap();
350 assert_eq!(recv_result.bytes_read, 3);
351 assert_eq!(&buf[0..3], &[4, 5, 6]);
352 }
353
354 #[tokio::test]
355 async fn test_open_udp4() {
356 use std::str::FromStr;
357 let mut a = open_interface_udp4(
358 InterfaceName::from_str("lo").unwrap(),
359 5124,
360 super::InterfaceTimestampMode::None,
361 None,
362 )
363 .unwrap();
364 let mut b = connect_address(
365 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5124),
366 GeneralTimestampMode::None,
367 )
368 .unwrap();
369 assert!(b.send(&[1, 2, 3]).await.is_ok());
370 let mut buf = [0; 4];
371 let recv_result = a.recv(&mut buf).await.unwrap();
372 assert_eq!(recv_result.bytes_read, 3);
373 assert_eq!(&buf[0..3], &[1, 2, 3]);
374 assert!(a.send_to(&[4, 5, 6], recv_result.remote_addr).await.is_ok());
375 let recv_result = b.recv(&mut buf).await.unwrap();
376 assert_eq!(recv_result.bytes_read, 3);
377 assert_eq!(&buf[0..3], &[4, 5, 6]);
378 }
379
380 #[tokio::test]
381 async fn test_software_timestamping() {
382 use std::time::SystemTime;
383
384 let a = open_ip(
385 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5126),
386 GeneralTimestampMode::SoftwareAll,
387 )
388 .unwrap();
389 let mut b = connect_address(
390 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5126),
391 GeneralTimestampMode::SoftwareAll,
392 )
393 .unwrap();
394
395 let before = SystemTime::now();
396 let send_ts = b.send(&[1, 2, 3]).await.unwrap().unwrap();
397 let after = SystemTime::now();
398
399 let mut buf = [0; 4];
400 let recv_result = a.recv(&mut buf).await.unwrap();
401 let recv_ts = recv_result.timestamp.unwrap();
402
403 let before = before
404 .duration_since(SystemTime::UNIX_EPOCH)
405 .unwrap()
406 .as_secs();
407 let after = after
408 .duration_since(SystemTime::UNIX_EPOCH)
409 .unwrap()
410 .as_secs();
411 assert!((send_ts.seconds - (before as i64)).abs() < 2);
412 assert!((send_ts.seconds - (after as i64)).abs() < 2);
413
414 let send_nanos = send_ts.seconds * 1_000_000_000 + (send_ts.nanos as i64);
415 let recv_nanos = recv_ts.seconds * 1_000_000_000 + (recv_ts.nanos as i64);
416 assert!((send_nanos - recv_nanos) < 1_000_000 * 10);
417 }
418}