fips_core/transport/ethernet/
socket.rs1use crate::transport::TransportError;
8
9pub const ETHERNET_BROADCAST: [u8; 6] = [0xff; 6];
11
12#[cfg(target_os = "linux")]
14#[path = "socket_linux.rs"]
15mod platform;
16
17#[cfg(target_os = "macos")]
18#[path = "socket_macos.rs"]
19mod platform;
20
21#[cfg(any(target_os = "linux", target_os = "macos"))]
22pub use platform::PacketSocket;
23
24#[cfg(target_os = "linux")]
29mod async_impl {
30 use super::PacketSocket;
31 use crate::transport::TransportError;
32 use tokio::io::unix::AsyncFd;
33
34 pub struct AsyncPacketSocket {
35 inner: AsyncFd<PacketSocket>,
36 }
37
38 impl AsyncPacketSocket {
39 pub fn new(socket: PacketSocket) -> Result<Self, TransportError> {
40 let async_fd = AsyncFd::new(socket)
41 .map_err(|e| TransportError::StartFailed(format!("AsyncFd::new failed: {}", e)))?;
42 Ok(Self { inner: async_fd })
43 }
44
45 pub async fn send_to(
46 &self,
47 data: &[u8],
48 dest_mac: &[u8; 6],
49 ) -> Result<usize, TransportError> {
50 loop {
51 let mut guard = self
52 .inner
53 .writable()
54 .await
55 .map_err(|e| TransportError::SendFailed(format!("writable wait: {}", e)))?;
56
57 match guard.try_io(|inner| inner.get_ref().send_to(data, dest_mac)) {
58 Ok(Ok(n)) => return Ok(n),
59 Ok(Err(e)) => return Err(TransportError::SendFailed(format!("{}", e))),
60 Err(_would_block) => continue,
61 }
62 }
63 }
64
65 pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, [u8; 6]), TransportError> {
66 loop {
67 let mut guard = self
68 .inner
69 .readable()
70 .await
71 .map_err(|e| TransportError::RecvFailed(format!("readable wait: {}", e)))?;
72
73 match guard.try_io(|inner| inner.get_ref().recv_from(buf)) {
74 Ok(Ok(result)) => return Ok(result),
75 Ok(Err(e)) => return Err(TransportError::RecvFailed(format!("{}", e))),
76 Err(_would_block) => continue,
77 }
78 }
79 }
80
81 pub fn get_ref(&self) -> &PacketSocket {
82 self.inner.get_ref()
83 }
84
85 pub fn shutdown(&self) {}
90 }
91}
92
93#[cfg(target_os = "macos")]
103mod async_impl {
104 use super::PacketSocket;
105 use crate::transport::TransportError;
106 use std::os::unix::io::AsRawFd;
107 use std::sync::Arc;
108
109 type Frame = (Vec<u8>, [u8; 6]);
111
112 pub struct AsyncPacketSocket {
113 inner: Arc<PacketSocket>,
114 rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Frame>>,
115 reader_thread: Option<std::thread::JoinHandle<()>>,
116 }
117
118 impl AsyncPacketSocket {
119 pub fn new(socket: PacketSocket) -> Result<Self, TransportError> {
120 let (tx, rx) = tokio::sync::mpsc::channel::<Frame>(1024);
123 let inner = Arc::new(socket);
124 let reader_socket = Arc::clone(&inner);
125
126 let reader_thread = std::thread::Builder::new()
127 .name("bpf-reader".into())
128 .spawn(move || {
129 let bpf_fd = reader_socket.as_raw_fd();
130 let shutdown_fd = reader_socket.shutdown_read_fd();
131 let bpf_buflen = reader_socket.bpf_buflen();
132 let mut read_buf = vec![0u8; bpf_buflen];
133 let mut parse_buf = vec![0u8; bpf_buflen];
134 let mut parse_offset: usize = 0;
135 let mut parse_len: usize = 0;
136 let nfds = bpf_fd.max(shutdown_fd) + 1;
137
138 loop {
139 while let Some(result) = super::platform::parse_next_frame(
141 &parse_buf,
142 &mut parse_offset,
143 parse_len,
144 &mut read_buf,
145 ) {
146 match result {
147 Ok((n, mac)) => {
148 let data = read_buf[..n].to_vec();
149 if tx.blocking_send((data, mac)).is_err() {
150 return;
151 }
152 }
153 Err(_) => break,
154 }
155 }
156
157 unsafe {
159 let mut read_fds: libc::fd_set = std::mem::zeroed();
160 libc::FD_ZERO(&mut read_fds);
161 libc::FD_SET(bpf_fd, &mut read_fds);
162 libc::FD_SET(shutdown_fd, &mut read_fds);
163
164 let ret = libc::select(
165 nfds,
166 &mut read_fds,
167 std::ptr::null_mut(),
168 std::ptr::null_mut(),
169 std::ptr::null_mut(),
170 );
171 if ret < 0 {
172 let err = std::io::Error::last_os_error();
173 if err.kind() == std::io::ErrorKind::Interrupted {
174 continue;
175 }
176 break;
177 }
178 if libc::FD_ISSET(shutdown_fd, &read_fds) {
179 break; }
181 }
182
183 let ret = unsafe {
185 libc::read(
186 bpf_fd,
187 parse_buf.as_mut_ptr() as *mut libc::c_void,
188 bpf_buflen,
189 )
190 };
191 if ret <= 0 {
192 if ret < 0 {
193 let err = std::io::Error::last_os_error();
194 if err.raw_os_error() == Some(libc::EBADF) {
195 break;
196 }
197 }
198 parse_len = 0;
199 parse_offset = 0;
200 continue;
201 }
202 parse_len = ret as usize;
203 parse_offset = 0;
204 }
205 })
206 .map_err(|e| TransportError::StartFailed(format!("reader thread: {}", e)))?;
207
208 Ok(Self {
209 inner,
210 rx: tokio::sync::Mutex::new(rx),
211 reader_thread: Some(reader_thread),
212 })
213 }
214
215 pub async fn send_to(
216 &self,
217 data: &[u8],
218 dest_mac: &[u8; 6],
219 ) -> Result<usize, TransportError> {
220 let socket = Arc::clone(&self.inner);
221 let data = data.to_vec();
222 let dest = *dest_mac;
223 tokio::task::spawn_blocking(move || {
224 socket
225 .send_to(&data, &dest)
226 .map_err(|e| TransportError::SendFailed(format!("{}", e)))
227 })
228 .await
229 .map_err(|e| TransportError::SendFailed(format!("spawn_blocking: {}", e)))?
230 }
231
232 pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, [u8; 6]), TransportError> {
233 let mut rx = self.rx.lock().await;
234 match rx.recv().await {
235 Some((data, mac)) => {
236 let n = data.len().min(buf.len());
237 buf[..n].copy_from_slice(&data[..n]);
238 Ok((n, mac))
239 }
240 None => Err(TransportError::RecvFailed("reader thread stopped".into())),
241 }
242 }
243
244 pub fn get_ref(&self) -> &PacketSocket {
245 &self.inner
246 }
247
248 pub fn shutdown(&self) {
253 self.inner.request_shutdown();
254 }
255 }
256
257 impl Drop for AsyncPacketSocket {
258 fn drop(&mut self) {
259 self.inner.request_shutdown();
260 if let Some(handle) = self.reader_thread.take() {
261 let _ = handle.join();
262 }
263 }
264 }
265}
266
267#[cfg(any(target_os = "linux", target_os = "macos"))]
268pub use async_impl::AsyncPacketSocket;
269
270#[cfg(any(target_os = "linux", target_os = "macos"))]
271impl PacketSocket {
272 pub fn into_async(self) -> Result<AsyncPacketSocket, TransportError> {
274 AsyncPacketSocket::new(self)
275 }
276}
277
278#[cfg(windows)]
283pub struct PacketSocket;
284
285#[cfg(windows)]
286pub struct AsyncPacketSocket;