Skip to main content

fips_core/transport/ethernet/
socket.rs

1//! Raw Ethernet socket abstraction.
2//!
3//! Platform-specific implementations live in `socket_linux.rs` (AF_PACKET)
4//! and `socket_macos.rs` (BPF). This module re-exports `PacketSocket` and
5//! provides `AsyncPacketSocket`.
6
7use crate::transport::TransportError;
8
9/// Broadcast MAC address.
10pub const ETHERNET_BROADCAST: [u8; 6] = [0xff; 6];
11
12// Platform-specific PacketSocket implementation.
13#[cfg(target_os = "linux")]
14#[path = "socket_linux.rs"]
15mod platform;
16
17#[cfg(target_os = "macos")]
18#[path = "socket_macos.rs"]
19mod platform;
20
21#[cfg(any(target_os = "linux", target_os = "macos"))]
22pub use platform::PacketSocket;
23
24// =============================================================================
25// Linux: AsyncFd-based async wrapper
26// =============================================================================
27
28#[cfg(target_os = "linux")]
29mod async_impl {
30    use super::PacketSocket;
31    use crate::transport::TransportError;
32    use tokio::io::unix::AsyncFd;
33
34    pub struct AsyncPacketSocket {
35        inner: AsyncFd<PacketSocket>,
36    }
37
38    impl AsyncPacketSocket {
39        pub fn new(socket: PacketSocket) -> Result<Self, TransportError> {
40            let async_fd = AsyncFd::new(socket)
41                .map_err(|e| TransportError::StartFailed(format!("AsyncFd::new failed: {}", e)))?;
42            Ok(Self { inner: async_fd })
43        }
44
45        pub async fn send_to(
46            &self,
47            data: &[u8],
48            dest_mac: &[u8; 6],
49        ) -> Result<usize, TransportError> {
50            loop {
51                let mut guard = self
52                    .inner
53                    .writable()
54                    .await
55                    .map_err(|e| TransportError::SendFailed(format!("writable wait: {}", e)))?;
56
57                match guard.try_io(|inner| inner.get_ref().send_to(data, dest_mac)) {
58                    Ok(Ok(n)) => return Ok(n),
59                    Ok(Err(e)) => return Err(TransportError::SendFailed(format!("{}", e))),
60                    Err(_would_block) => continue,
61                }
62            }
63        }
64
65        pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, [u8; 6]), TransportError> {
66            loop {
67                let mut guard = self
68                    .inner
69                    .readable()
70                    .await
71                    .map_err(|e| TransportError::RecvFailed(format!("readable wait: {}", e)))?;
72
73                match guard.try_io(|inner| inner.get_ref().recv_from(buf)) {
74                    Ok(Ok(result)) => return Ok(result),
75                    Ok(Err(e)) => return Err(TransportError::RecvFailed(format!("{}", e))),
76                    Err(_would_block) => continue,
77                }
78            }
79        }
80
81        pub fn get_ref(&self) -> &PacketSocket {
82            self.inner.get_ref()
83        }
84
85        /// Shut down the socket, unblocking any pending recv.
86        ///
87        /// On Linux this is a no-op — aborting the tokio task suffices
88        /// since AsyncFd is cancellation-aware.
89        pub fn shutdown(&self) {}
90    }
91}
92
93// =============================================================================
94// macOS: dedicated reader thread with async channel
95//
96// BPF fds don't support kqueue, so we can't use AsyncFd. Instead of
97// spawn_blocking per packet (which was the bottleneck causing 84 Mbps),
98// we spawn a single dedicated reader thread that loops on blocking
99// read() and feeds frames through a tokio mpsc channel.
100// =============================================================================
101
102#[cfg(target_os = "macos")]
103mod async_impl {
104    use super::PacketSocket;
105    use crate::transport::TransportError;
106    use std::os::unix::io::AsRawFd;
107    use std::sync::Arc;
108
109    /// A received frame: (payload, source_mac).
110    type Frame = (Vec<u8>, [u8; 6]);
111
112    pub struct AsyncPacketSocket {
113        inner: Arc<PacketSocket>,
114        rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Frame>>,
115        reader_thread: Option<std::thread::JoinHandle<()>>,
116    }
117
118    impl AsyncPacketSocket {
119        pub fn new(socket: PacketSocket) -> Result<Self, TransportError> {
120            // Channel capacity: buffer up to 1024 frames to decouple
121            // the blocking reader from the async consumer.
122            let (tx, rx) = tokio::sync::mpsc::channel::<Frame>(1024);
123            let inner = Arc::new(socket);
124            let reader_socket = Arc::clone(&inner);
125
126            let reader_thread = std::thread::Builder::new()
127                .name("bpf-reader".into())
128                .spawn(move || {
129                    let bpf_fd = reader_socket.as_raw_fd();
130                    let shutdown_fd = reader_socket.shutdown_read_fd();
131                    let bpf_buflen = reader_socket.bpf_buflen();
132                    let mut read_buf = vec![0u8; bpf_buflen];
133                    let mut parse_buf = vec![0u8; bpf_buflen];
134                    let mut parse_offset: usize = 0;
135                    let mut parse_len: usize = 0;
136                    let nfds = bpf_fd.max(shutdown_fd) + 1;
137
138                    loop {
139                        // Drain any buffered frames from the previous read
140                        while let Some(result) = super::platform::parse_next_frame(
141                            &parse_buf,
142                            &mut parse_offset,
143                            parse_len,
144                            &mut read_buf,
145                        ) {
146                            match result {
147                                Ok((n, mac)) => {
148                                    let data = read_buf[..n].to_vec();
149                                    if tx.blocking_send((data, mac)).is_err() {
150                                        return;
151                                    }
152                                }
153                                Err(_) => break,
154                            }
155                        }
156
157                        // Wait for BPF data or shutdown signal via select()
158                        unsafe {
159                            let mut read_fds: libc::fd_set = std::mem::zeroed();
160                            libc::FD_ZERO(&mut read_fds);
161                            libc::FD_SET(bpf_fd, &mut read_fds);
162                            libc::FD_SET(shutdown_fd, &mut read_fds);
163
164                            let ret = libc::select(
165                                nfds,
166                                &mut read_fds,
167                                std::ptr::null_mut(),
168                                std::ptr::null_mut(),
169                                std::ptr::null_mut(),
170                            );
171                            if ret < 0 {
172                                let err = std::io::Error::last_os_error();
173                                if err.kind() == std::io::ErrorKind::Interrupted {
174                                    continue;
175                                }
176                                break;
177                            }
178                            if libc::FD_ISSET(shutdown_fd, &read_fds) {
179                                break; // shutdown signal
180                            }
181                        }
182
183                        // BPF fd is readable
184                        let ret = unsafe {
185                            libc::read(
186                                bpf_fd,
187                                parse_buf.as_mut_ptr() as *mut libc::c_void,
188                                bpf_buflen,
189                            )
190                        };
191                        if ret <= 0 {
192                            if ret < 0 {
193                                let err = std::io::Error::last_os_error();
194                                if err.raw_os_error() == Some(libc::EBADF) {
195                                    break;
196                                }
197                            }
198                            parse_len = 0;
199                            parse_offset = 0;
200                            continue;
201                        }
202                        parse_len = ret as usize;
203                        parse_offset = 0;
204                    }
205                })
206                .map_err(|e| TransportError::StartFailed(format!("reader thread: {}", e)))?;
207
208            Ok(Self {
209                inner,
210                rx: tokio::sync::Mutex::new(rx),
211                reader_thread: Some(reader_thread),
212            })
213        }
214
215        pub async fn send_to(
216            &self,
217            data: &[u8],
218            dest_mac: &[u8; 6],
219        ) -> Result<usize, TransportError> {
220            let socket = Arc::clone(&self.inner);
221            let data = data.to_vec();
222            let dest = *dest_mac;
223            tokio::task::spawn_blocking(move || {
224                socket
225                    .send_to(&data, &dest)
226                    .map_err(|e| TransportError::SendFailed(format!("{}", e)))
227            })
228            .await
229            .map_err(|e| TransportError::SendFailed(format!("spawn_blocking: {}", e)))?
230        }
231
232        pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, [u8; 6]), TransportError> {
233            let mut rx = self.rx.lock().await;
234            match rx.recv().await {
235                Some((data, mac)) => {
236                    let n = data.len().min(buf.len());
237                    buf[..n].copy_from_slice(&data[..n]);
238                    Ok((n, mac))
239                }
240                None => Err(TransportError::RecvFailed("reader thread stopped".into())),
241            }
242        }
243
244        pub fn get_ref(&self) -> &PacketSocket {
245            &self.inner
246        }
247
248        /// Signal the reader thread to stop.
249        ///
250        /// Sets the shutdown flag; the reader thread checks it after
251        /// each BPF read timeout (~250ms) and exits.
252        pub fn shutdown(&self) {
253            self.inner.request_shutdown();
254        }
255    }
256
257    impl Drop for AsyncPacketSocket {
258        fn drop(&mut self) {
259            self.inner.request_shutdown();
260            if let Some(handle) = self.reader_thread.take() {
261                let _ = handle.join();
262            }
263        }
264    }
265}
266
267#[cfg(any(target_os = "linux", target_os = "macos"))]
268pub use async_impl::AsyncPacketSocket;
269
270#[cfg(any(target_os = "linux", target_os = "macos"))]
271impl PacketSocket {
272    /// Wrap this socket in an async wrapper for tokio integration.
273    pub fn into_async(self) -> Result<AsyncPacketSocket, TransportError> {
274        AsyncPacketSocket::new(self)
275    }
276}
277
278// =============================================================================
279// Windows: stub types (Ethernet not supported on Windows)
280// =============================================================================
281
282#[cfg(windows)]
283pub struct PacketSocket;
284
285#[cfg(windows)]
286pub struct AsyncPacketSocket;