timestamped_socket/socket/
linux.rs

1use std::{
2    marker::PhantomData,
3    net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
4};
5
6use tokio::io::{unix::AsyncFd, Interest};
7
8use crate::{
9    control_message::{control_message_space, ControlMessage, MessageQueue},
10    interface::{lookup_phc, InterfaceName},
11    networkaddress::{sealed::PrivateToken, EthernetAddress, MacAddress, NetworkAddress},
12    raw_socket::RawSocket,
13    socket::select_timestamp,
14};
15
16use super::{InterfaceTimestampMode, Open, Socket, Timestamp};
17
18const SOF_TIMESTAMPING_BIND_PHC: libc::c_uint = 1 << 15;
19
20impl<A: NetworkAddress, S> Socket<A, S> {
21    pub(super) async fn fetch_send_timestamp(
22        &self,
23        expected_counter: u32,
24    ) -> std::io::Result<Option<Timestamp>> {
25        use std::time::Duration;
26
27        const TIMEOUT: Duration = Duration::from_millis(200);
28
29        match tokio::time::timeout(TIMEOUT, self.fetch_send_timestamp_loop(expected_counter)).await
30        {
31            Ok(res_opt_timestamp) => res_opt_timestamp,
32            Err(_timeout_elapsed) => Ok(None),
33        }
34    }
35
36    pub(super) async fn fetch_send_timestamp_loop(
37        &self,
38        expected_counter: u32,
39    ) -> std::io::Result<Option<Timestamp>> {
40        let try_read = |_: &RawSocket| self.fetch_send_timestamp_try_read(expected_counter);
41
42        loop {
43            // the timestamp being available triggers the error interest
44            match self.socket.async_io(Interest::ERROR, try_read).await? {
45                Some(timestamp) => break Ok(Some(timestamp)),
46                None => continue,
47            }
48        }
49    }
50
51    pub(super) fn fetch_send_timestamp_try_read(
52        &self,
53        expected_counter: u32,
54    ) -> std::io::Result<Option<Timestamp>> {
55        const CONTROL_SIZE: usize = control_message_space::<[libc::timespec; 3]>()
56            + control_message_space::<(libc::sock_extended_err, libc::sockaddr_storage)>();
57
58        let mut control_buf = [0; CONTROL_SIZE];
59
60        // NOTE: this read could block!
61        let (_, control_messages, _) = self.socket.get_ref().receive_message(
62            &mut [],
63            &mut control_buf,
64            MessageQueue::Error,
65        )?;
66
67        let mut send_ts = None;
68        for msg in control_messages {
69            match msg {
70                ControlMessage::Timestamping { software, hardware } => {
71                    send_ts = select_timestamp(self.timestamp_mode, software, hardware);
72                }
73
74                ControlMessage::ReceiveError(error) => {
75                    // the timestamping does not set a message; if there is a message, that means
76                    // something else is wrong, and we want to know about it.
77                    if error.ee_errno as libc::c_int != libc::ENOMSG {
78                        tracing::warn!(
79                            expected_counter,
80                            error.ee_data,
81                            "error message on the MSG_ERRQUEUE"
82                        );
83                    }
84
85                    // Check that this message belongs to the send we are interested in
86                    if error.ee_data != expected_counter {
87                        tracing::debug!(
88                            error.ee_data,
89                            expected_counter,
90                            "Timestamp for unrelated packet"
91                        );
92                        return Ok(None);
93                    }
94                }
95
96                ControlMessage::Other(msg) => {
97                    tracing::warn!(
98                        msg.cmsg_level,
99                        msg.cmsg_type,
100                        "unexpected message on the MSG_ERRQUEUE",
101                    );
102                }
103            }
104        }
105
106        Ok(send_ts)
107    }
108}
109
110pub(super) fn configure_timestamping(
111    socket: &RawSocket,
112    interface: Option<InterfaceName>,
113    mode: InterfaceTimestampMode,
114    mut bind_phc: Option<u32>,
115) -> std::io::Result<()> {
116    // Check if the phc is not the interface-native phc.
117    if let Some(interface) = interface {
118        if lookup_phc(interface) == bind_phc {
119            bind_phc = None
120        }
121    }
122
123    let options = match mode {
124        InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwarePTPAll => {
125            libc::SOF_TIMESTAMPING_RAW_HARDWARE
126                | libc::SOF_TIMESTAMPING_TX_SOFTWARE
127                | libc::SOF_TIMESTAMPING_RX_HARDWARE
128                | libc::SOF_TIMESTAMPING_TX_HARDWARE
129                | libc::SOF_TIMESTAMPING_OPT_TSONLY
130                | libc::SOF_TIMESTAMPING_OPT_ID
131                | bind_phc
132                    .map(|_| SOF_TIMESTAMPING_BIND_PHC)
133                    .unwrap_or_default()
134        }
135        InterfaceTimestampMode::HardwareRecv | InterfaceTimestampMode::HardwarePTPRecv => {
136            libc::SOF_TIMESTAMPING_RAW_HARDWARE
137                | libc::SOF_TIMESTAMPING_RX_HARDWARE
138                | bind_phc
139                    .map(|_| SOF_TIMESTAMPING_BIND_PHC)
140                    .unwrap_or_default()
141        }
142        InterfaceTimestampMode::SoftwareAll => {
143            libc::SOF_TIMESTAMPING_SOFTWARE
144                | libc::SOF_TIMESTAMPING_RX_SOFTWARE
145                | libc::SOF_TIMESTAMPING_TX_SOFTWARE
146                | libc::SOF_TIMESTAMPING_OPT_TSONLY
147                | libc::SOF_TIMESTAMPING_OPT_ID
148        }
149        InterfaceTimestampMode::SoftwareRecv => {
150            libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_RX_SOFTWARE
151        }
152        InterfaceTimestampMode::None => return Ok(()),
153    };
154
155    socket.so_timestamping(options, bind_phc.unwrap_or_default())
156}
157
158pub fn open_interface_udp(
159    interface: InterfaceName,
160    port: u16,
161    timestamping: InterfaceTimestampMode,
162    bind_phc: Option<u32>,
163) -> std::io::Result<Socket<SocketAddr, Open>> {
164    // Setup the socket
165    let socket = RawSocket::open(libc::PF_INET6, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
166    socket.reuse_addr()?;
167    socket.ipv6_v6only(false)?;
168    socket.bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0).to_sockaddr(PrivateToken))?;
169    socket.bind_to_device(interface)?;
170    socket.ipv6_multicast_if(interface)?;
171    socket.ipv6_multicast_loop(false)?;
172    configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
173    match timestamping {
174        InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
175            socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
176        }
177        InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
178            .driver_enable_hardware_timestamping(
179                interface,
180                libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
181            )?,
182        InterfaceTimestampMode::None
183        | InterfaceTimestampMode::SoftwareAll
184        | InterfaceTimestampMode::SoftwareRecv => {}
185    }
186    socket.set_nonblocking(true)?;
187
188    Ok(Socket {
189        timestamp_mode: timestamping,
190        socket: AsyncFd::new(socket)?,
191        send_counter: 0,
192        _addr: PhantomData,
193        _state: PhantomData,
194    })
195}
196
197pub fn open_interface_udp4(
198    interface: InterfaceName,
199    port: u16,
200    timestamping: InterfaceTimestampMode,
201    bind_phc: Option<u32>,
202) -> std::io::Result<Socket<SocketAddrV4, Open>> {
203    // Setup the socket
204    let socket = RawSocket::open(libc::PF_INET, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
205    socket.reuse_addr()?;
206    socket.bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).to_sockaddr(PrivateToken))?;
207    socket.bind_to_device(interface)?;
208    socket.ip_multicast_if(interface)?;
209    socket.ip_multicast_loop(false)?;
210    configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
211    match timestamping {
212        InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
213            socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
214        }
215        InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
216            .driver_enable_hardware_timestamping(
217                interface,
218                libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
219            )?,
220        InterfaceTimestampMode::None
221        | InterfaceTimestampMode::SoftwareAll
222        | InterfaceTimestampMode::SoftwareRecv => {}
223    }
224    socket.set_nonblocking(true)?;
225
226    Ok(Socket {
227        timestamp_mode: timestamping,
228        socket: AsyncFd::new(socket)?,
229        send_counter: 0,
230        _addr: PhantomData,
231        _state: PhantomData,
232    })
233}
234
235pub fn open_interface_udp6(
236    interface: InterfaceName,
237    port: u16,
238    timestamping: InterfaceTimestampMode,
239    bind_phc: Option<u32>,
240) -> std::io::Result<Socket<SocketAddrV6, Open>> {
241    // Setup the socket
242    let socket = RawSocket::open(libc::PF_INET6, libc::SOCK_DGRAM, libc::IPPROTO_UDP)?;
243    socket.reuse_addr()?;
244    socket.ipv6_v6only(true)?;
245    socket.bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0).to_sockaddr(PrivateToken))?;
246    socket.bind_to_device(interface)?;
247    socket.ipv6_multicast_if(interface)?;
248    socket.ipv6_multicast_loop(false)?;
249    configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
250    match timestamping {
251        InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
252            socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
253        }
254        InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
255            .driver_enable_hardware_timestamping(
256                interface,
257                libc::HWTSTAMP_FILTER_PTP_V2_L4_EVENT as _,
258            )?,
259        InterfaceTimestampMode::None
260        | InterfaceTimestampMode::SoftwareAll
261        | InterfaceTimestampMode::SoftwareRecv => {}
262    }
263    socket.set_nonblocking(true)?;
264
265    Ok(Socket {
266        timestamp_mode: timestamping,
267        socket: AsyncFd::new(socket)?,
268        send_counter: 0,
269        _addr: PhantomData,
270        _state: PhantomData,
271    })
272}
273
274pub fn open_interface_ethernet(
275    interface: InterfaceName,
276    protocol: u16,
277    timestamping: InterfaceTimestampMode,
278    bind_phc: Option<u32>,
279) -> std::io::Result<Socket<EthernetAddress, Open>> {
280    let socket = RawSocket::open(
281        libc::AF_PACKET,
282        libc::SOCK_DGRAM,
283        u16::from_ne_bytes(protocol.to_be_bytes()) as _,
284    )?;
285    socket.bind(
286        EthernetAddress::new(
287            u16::from_ne_bytes(protocol.to_le_bytes()),
288            MacAddress::new([0; 6]),
289            interface
290                .get_index()
291                .ok_or(std::io::ErrorKind::InvalidInput)? as _,
292        )
293        .to_sockaddr(PrivateToken),
294    )?;
295    configure_timestamping(&socket, Some(interface), timestamping, bind_phc)?;
296    match timestamping {
297        InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwareRecv => {
298            socket.driver_enable_hardware_timestamping(interface, libc::HWTSTAMP_FILTER_ALL as _)?
299        }
300        InterfaceTimestampMode::HardwarePTPAll | InterfaceTimestampMode::HardwarePTPRecv => socket
301            .driver_enable_hardware_timestamping(
302                interface,
303                libc::HWTSTAMP_FILTER_PTP_V2_L2_EVENT as _,
304            )?,
305        InterfaceTimestampMode::None
306        | InterfaceTimestampMode::SoftwareAll
307        | InterfaceTimestampMode::SoftwareRecv => {}
308    }
309    socket.set_nonblocking(true)?;
310
311    Ok(Socket {
312        timestamp_mode: timestamping,
313        socket: AsyncFd::new(socket)?,
314        send_counter: 0,
315        _addr: PhantomData,
316        _state: PhantomData,
317    })
318}
319
320#[cfg(test)]
321mod tests {
322    use std::net::IpAddr;
323
324    use crate::socket::{connect_address, open_ip, GeneralTimestampMode};
325
326    use super::*;
327
328    #[tokio::test]
329    async fn test_open_udp6() {
330        use std::str::FromStr;
331        let mut a = open_interface_udp6(
332            InterfaceName::from_str("lo").unwrap(),
333            5123,
334            super::InterfaceTimestampMode::None,
335            None,
336        )
337        .unwrap();
338        let mut b = connect_address(
339            SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5123),
340            GeneralTimestampMode::None,
341        )
342        .unwrap();
343        assert!(b.send(&[1, 2, 3]).await.is_ok());
344        let mut buf = [0; 4];
345        let recv_result = a.recv(&mut buf).await.unwrap();
346        assert_eq!(recv_result.bytes_read, 3);
347        assert_eq!(&buf[0..3], &[1, 2, 3]);
348        assert!(a.send_to(&[4, 5, 6], recv_result.remote_addr).await.is_ok());
349        let recv_result = b.recv(&mut buf).await.unwrap();
350        assert_eq!(recv_result.bytes_read, 3);
351        assert_eq!(&buf[0..3], &[4, 5, 6]);
352    }
353
354    #[tokio::test]
355    async fn test_open_udp4() {
356        use std::str::FromStr;
357        let mut a = open_interface_udp4(
358            InterfaceName::from_str("lo").unwrap(),
359            5124,
360            super::InterfaceTimestampMode::None,
361            None,
362        )
363        .unwrap();
364        let mut b = connect_address(
365            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5124),
366            GeneralTimestampMode::None,
367        )
368        .unwrap();
369        assert!(b.send(&[1, 2, 3]).await.is_ok());
370        let mut buf = [0; 4];
371        let recv_result = a.recv(&mut buf).await.unwrap();
372        assert_eq!(recv_result.bytes_read, 3);
373        assert_eq!(&buf[0..3], &[1, 2, 3]);
374        assert!(a.send_to(&[4, 5, 6], recv_result.remote_addr).await.is_ok());
375        let recv_result = b.recv(&mut buf).await.unwrap();
376        assert_eq!(recv_result.bytes_read, 3);
377        assert_eq!(&buf[0..3], &[4, 5, 6]);
378    }
379
380    #[tokio::test]
381    async fn test_software_timestamping() {
382        use std::time::SystemTime;
383
384        let a = open_ip(
385            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5126),
386            GeneralTimestampMode::SoftwareAll,
387        )
388        .unwrap();
389        let mut b = connect_address(
390            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5126),
391            GeneralTimestampMode::SoftwareAll,
392        )
393        .unwrap();
394
395        let before = SystemTime::now();
396        let send_ts = b.send(&[1, 2, 3]).await.unwrap().unwrap();
397        let after = SystemTime::now();
398
399        let mut buf = [0; 4];
400        let recv_result = a.recv(&mut buf).await.unwrap();
401        let recv_ts = recv_result.timestamp.unwrap();
402
403        let before = before
404            .duration_since(SystemTime::UNIX_EPOCH)
405            .unwrap()
406            .as_secs();
407        let after = after
408            .duration_since(SystemTime::UNIX_EPOCH)
409            .unwrap()
410            .as_secs();
411        assert!((send_ts.seconds - (before as i64)).abs() < 2);
412        assert!((send_ts.seconds - (after as i64)).abs() < 2);
413
414        let send_nanos = send_ts.seconds * 1_000_000_000 + (send_ts.nanos as i64);
415        let recv_nanos = recv_ts.seconds * 1_000_000_000 + (recv_ts.nanos as i64);
416        assert!((send_nanos - recv_nanos) < 1_000_000 * 10);
417    }
418}