1pub mod discovery;
9pub mod socket;
10pub mod stats;
11
12use super::{
13 DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr, TransportError,
14 TransportId, TransportState, TransportType,
15};
16use crate::config::EthernetConfig;
17use discovery::{
18 DiscoveryBuffer, FRAME_TYPE_BEACON, FRAME_TYPE_DATA, build_scoped_beacon, parse_beacon_record,
19};
20use socket::{AsyncPacketSocket, ETHERNET_BROADCAST, PacketSocket};
21use stats::EthernetStats;
22
23use secp256k1::XOnlyPublicKey;
24use std::sync::Arc;
25use tokio::task::JoinHandle;
26use tracing::{debug, info, trace, warn};
27
28pub struct EthernetTransport {
34 transport_id: TransportId,
36 name: Option<String>,
38 config: EthernetConfig,
40 state: TransportState,
42 socket: Option<Arc<AsyncPacketSocket>>,
44 packet_tx: PacketTx,
46 recv_task: Option<JoinHandle<()>>,
48 beacon_task: Option<JoinHandle<()>>,
50 local_mac: Option<[u8; 6]>,
52 interface: String,
54 effective_mtu: u16,
56 discovery_buffer: Arc<DiscoveryBuffer>,
58 stats: Arc<EthernetStats>,
60 local_pubkey: Option<XOnlyPublicKey>,
62}
63
64impl EthernetTransport {
65 pub fn new(
67 transport_id: TransportId,
68 name: Option<String>,
69 config: EthernetConfig,
70 packet_tx: PacketTx,
71 ) -> Self {
72 let interface = config.interface.clone();
73 let discovery_buffer = Arc::new(DiscoveryBuffer::new(
74 transport_id,
75 config.discovery_scope().map(str::to_string),
76 ));
77 let stats = Arc::new(EthernetStats::new());
78
79 Self {
80 transport_id,
81 name,
82 config,
83 state: TransportState::Configured,
84 socket: None,
85 packet_tx,
86 recv_task: None,
87 beacon_task: None,
88 local_mac: None,
89 interface,
90 effective_mtu: 1497, discovery_buffer,
92 stats,
93 local_pubkey: None,
94 }
95 }
96
97 pub fn name(&self) -> Option<&str> {
99 self.name.as_deref()
100 }
101
102 pub fn interface_name(&self) -> &str {
104 &self.interface
105 }
106
107 pub fn local_mac(&self) -> Option<[u8; 6]> {
109 self.local_mac
110 }
111
112 pub fn set_local_pubkey(&mut self, pubkey: XOnlyPublicKey) {
116 self.local_pubkey = Some(pubkey);
117 }
118
119 pub fn stats(&self) -> &Arc<EthernetStats> {
121 &self.stats
122 }
123
124 pub async fn start_async(&mut self) -> Result<(), TransportError> {
129 if !self.state.can_start() {
130 return Err(TransportError::AlreadyStarted);
131 }
132
133 self.state = TransportState::Starting;
134
135 let raw_socket = PacketSocket::open(&self.config.interface, self.config.ethertype())?;
137
138 let local_mac = raw_socket.local_mac()?;
140 let if_mtu = raw_socket.interface_mtu()?;
141
142 let effective_mtu = if let Some(configured_mtu) = self.config.mtu {
145 configured_mtu.min(if_mtu.saturating_sub(3))
147 } else {
148 if_mtu.saturating_sub(3)
149 };
150 self.effective_mtu = effective_mtu;
151 self.local_mac = Some(local_mac);
152
153 raw_socket.set_recv_buffer_size(self.config.recv_buf_size())?;
155 raw_socket.set_send_buffer_size(self.config.send_buf_size())?;
156
157 let async_socket = raw_socket.into_async()?;
159 let socket = Arc::new(async_socket);
160 self.socket = Some(socket.clone());
161
162 let transport_id = self.transport_id;
164 let packet_tx = self.packet_tx.clone();
165 let mtu = self.effective_mtu;
166 let discovery_enabled = self.config.discovery();
167 let discovery_buffer = self.discovery_buffer.clone();
168 let stats = self.stats.clone();
169 let recv_socket = socket.clone();
170 let recv_local_mac = local_mac;
171
172 let recv_task = tokio::spawn(async move {
173 ethernet_receive_loop(
174 recv_socket,
175 transport_id,
176 packet_tx,
177 mtu,
178 discovery_enabled,
179 discovery_buffer,
180 stats,
181 recv_local_mac,
182 )
183 .await;
184 });
185 self.recv_task = Some(recv_task);
186
187 if self.config.announce() {
189 if let Some(pubkey) = self.local_pubkey {
190 let beacon_socket = socket.clone();
191 let interval_secs = self.config.beacon_interval_secs();
192 let discovery_scope = self.config.discovery_scope().map(str::to_string);
193 let beacon_stats = self.stats.clone();
194 let beacon_transport_id = self.transport_id;
195
196 let beacon_interface = self.config.interface.clone();
197 let beacon_ethertype = self.config.ethertype();
198
199 let beacon_task = tokio::spawn(async move {
200 beacon_sender_loop(
201 beacon_socket,
202 pubkey,
203 discovery_scope,
204 interval_secs,
205 beacon_stats,
206 beacon_transport_id,
207 beacon_interface,
208 beacon_ethertype,
209 )
210 .await;
211 });
212 self.beacon_task = Some(beacon_task);
213 } else {
214 warn!(
215 transport_id = %self.transport_id,
216 "Announce enabled but no local pubkey set; beacons disabled"
217 );
218 }
219 }
220
221 self.state = TransportState::Up;
222
223 if let Some(ref name) = self.name {
224 info!(
225 name = %name,
226 interface = %self.interface,
227 mac = %format_mac(&local_mac),
228 mtu = effective_mtu,
229 if_mtu = if_mtu,
230 "Ethernet transport started"
231 );
232 } else {
233 info!(
234 interface = %self.interface,
235 mac = %format_mac(&local_mac),
236 mtu = effective_mtu,
237 if_mtu = if_mtu,
238 "Ethernet transport started"
239 );
240 }
241
242 Ok(())
243 }
244
245 pub async fn stop_async(&mut self) -> Result<(), TransportError> {
247 if !self.state.is_operational() {
248 return Err(TransportError::NotStarted);
249 }
250
251 if let Some(ref socket) = self.socket {
255 socket.shutdown();
256 }
257
258 if let Some(task) = self.beacon_task.take() {
263 task.abort();
264 #[cfg(not(target_os = "macos"))]
265 {
266 let _ = task.await;
267 }
268 }
269 if let Some(task) = self.recv_task.take() {
270 task.abort();
271 #[cfg(not(target_os = "macos"))]
272 {
273 let _ = task.await;
274 }
275 }
276
277 self.socket.take();
279 self.local_mac = None;
280
281 self.state = TransportState::Down;
282
283 info!(
284 transport_id = %self.transport_id,
285 interface = %self.interface,
286 "Ethernet transport stopped"
287 );
288
289 Ok(())
290 }
291
292 pub async fn send_async(
297 &self,
298 addr: &TransportAddr,
299 data: &[u8],
300 ) -> Result<usize, TransportError> {
301 if !self.state.is_operational() {
302 return Err(TransportError::NotStarted);
303 }
304
305 if data.len() > self.effective_mtu as usize {
306 return Err(TransportError::MtuExceeded {
307 packet_size: data.len(),
308 mtu: self.effective_mtu,
309 });
310 }
311
312 let dest_mac = parse_mac_addr(addr)?;
313 let socket = self.socket.as_ref().ok_or(TransportError::NotStarted)?;
314
315 let mut frame = Vec::with_capacity(3 + data.len());
320 frame.push(FRAME_TYPE_DATA);
321 frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
322 frame.extend_from_slice(data);
323
324 let bytes_sent = socket.send_to(&frame, &dest_mac).await?;
325 self.stats.record_send(bytes_sent);
326
327 trace!(
328 transport_id = %self.transport_id,
329 remote_mac = %format_mac(&dest_mac),
330 bytes = bytes_sent,
331 "Ethernet frame sent"
332 );
333
334 Ok(bytes_sent.saturating_sub(3))
336 }
337}
338
339impl Transport for EthernetTransport {
340 fn transport_id(&self) -> TransportId {
341 self.transport_id
342 }
343
344 fn transport_type(&self) -> &TransportType {
345 &TransportType::ETHERNET
346 }
347
348 fn state(&self) -> TransportState {
349 self.state
350 }
351
352 fn mtu(&self) -> u16 {
353 self.effective_mtu
354 }
355
356 fn start(&mut self) -> Result<(), TransportError> {
357 Err(TransportError::NotSupported(
358 "use start_async() for Ethernet transport".into(),
359 ))
360 }
361
362 fn stop(&mut self) -> Result<(), TransportError> {
363 Err(TransportError::NotSupported(
364 "use stop_async() for Ethernet transport".into(),
365 ))
366 }
367
368 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
369 Err(TransportError::NotSupported(
370 "use send_async() for Ethernet transport".into(),
371 ))
372 }
373
374 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
375 Ok(self.discovery_buffer.take())
376 }
377
378 fn auto_connect(&self) -> bool {
379 self.config.auto_connect()
380 }
381
382 fn accept_connections(&self) -> bool {
383 self.config.accept_connections()
384 }
385}
386
387#[allow(clippy::too_many_arguments)]
393async fn ethernet_receive_loop(
394 socket: Arc<AsyncPacketSocket>,
395 transport_id: TransportId,
396 packet_tx: PacketTx,
397 mtu: u16,
398 discovery_enabled: bool,
399 discovery_buffer: Arc<DiscoveryBuffer>,
400 stats: Arc<EthernetStats>,
401 local_mac: [u8; 6],
402) {
403 let mut buf = vec![0u8; mtu as usize + 100];
405
406 debug!(transport_id = %transport_id, "Ethernet receive loop starting");
407
408 loop {
409 match socket.recv_from(&mut buf).await {
410 Ok((len, src_mac)) => {
411 if len == 0 {
412 continue;
413 }
414 if src_mac == local_mac {
415 trace!(
416 transport_id = %transport_id,
417 local_mac = %format_mac(&local_mac),
418 "Ignoring self-echoed Ethernet frame"
419 );
420 continue;
421 }
422
423 stats.record_recv(len);
424
425 let frame_type = buf[0];
426 match frame_type {
427 FRAME_TYPE_DATA => {
428 if len < 3 {
431 trace!("Data frame too short ({len} bytes), ignoring");
432 continue;
433 }
434 let payload_len = u16::from_le_bytes([buf[1], buf[2]]) as usize;
435 if payload_len > len - 3 {
436 trace!(
437 "Data frame length field ({payload_len}) exceeds \
438 available bytes ({}), ignoring",
439 len - 3
440 );
441 continue;
442 }
443 let data = buf[3..3 + payload_len].to_vec();
444 let addr = TransportAddr::from_bytes(&src_mac);
445 let packet = ReceivedPacket::new(transport_id, addr, data);
446
447 trace!(
448 transport_id = %transport_id,
449 remote_mac = %format_mac(&src_mac),
450 bytes = payload_len,
451 "Ethernet data frame received"
452 );
453
454 if packet_tx.send(packet).is_err() {
455 debug!(
456 transport_id = %transport_id,
457 "Packet channel closed, stopping receive loop"
458 );
459 break;
460 }
461 }
462 FRAME_TYPE_BEACON => {
463 stats.record_beacon_recv();
464
465 if discovery_enabled && let Some(beacon) = parse_beacon_record(&buf[..len])
466 {
467 discovery_buffer.add_peer(src_mac, beacon);
468 trace!(
469 transport_id = %transport_id,
470 remote_mac = %format_mac(&src_mac),
471 "Discovery beacon received"
472 );
473 }
474 }
475 _ => {
476 trace!(
478 transport_id = %transport_id,
479 frame_type = frame_type,
480 "Unknown frame type, dropping"
481 );
482 }
483 }
484 }
485 Err(e) => {
486 stats.record_recv_error();
487 warn!(
488 transport_id = %transport_id,
489 error = %e,
490 "Ethernet receive error"
491 );
492 }
493 }
494 }
495
496 debug!(transport_id = %transport_id, "Ethernet receive loop stopped");
497}
498
499#[allow(clippy::too_many_arguments)]
510async fn beacon_sender_loop(
511 mut socket: Arc<AsyncPacketSocket>,
512 pubkey: XOnlyPublicKey,
513 discovery_scope: Option<String>,
514 interval_secs: u64,
515 stats: Arc<EthernetStats>,
516 transport_id: TransportId,
517 interface: String,
518 ethertype: u16,
519) {
520 const REOPEN_THRESHOLD: u32 = 3;
522
523 let beacon = build_scoped_beacon(&pubkey, discovery_scope.as_deref());
524 let interval = tokio::time::Duration::from_secs(interval_secs);
525
526 debug!(
527 transport_id = %transport_id,
528 interval_secs,
529 "Beacon sender starting"
530 );
531
532 if let Err(e) = socket.send_to(&beacon, ÐERNET_BROADCAST).await {
534 warn!(
535 transport_id = %transport_id,
536 error = %e,
537 "Failed to send initial beacon"
538 );
539 } else {
540 stats.record_beacon_sent();
541 }
542
543 let mut interval_timer = tokio::time::interval(interval);
544 interval_timer.tick().await; let mut consecutive_errors: u32 = 0;
546
547 loop {
548 interval_timer.tick().await;
549
550 match socket.send_to(&beacon, ÐERNET_BROADCAST).await {
551 Ok(_) => {
552 if consecutive_errors > 0 {
553 debug!(
554 transport_id = %transport_id,
555 "Beacon send recovered after {} errors", consecutive_errors,
556 );
557 }
558 consecutive_errors = 0;
559 stats.record_beacon_sent();
560 trace!(
561 transport_id = %transport_id,
562 "Beacon sent"
563 );
564 }
565 Err(e) => {
566 consecutive_errors += 1;
567 stats.record_send_error();
568
569 let is_enxio = format!("{e}").contains("os error 6");
570
571 if consecutive_errors == 1 {
573 warn!(
574 transport_id = %transport_id,
575 error = %e,
576 "Failed to send beacon"
577 );
578 }
579
580 if is_enxio && consecutive_errors >= REOPEN_THRESHOLD {
581 info!(
582 transport_id = %transport_id,
583 consecutive_errors,
584 interface = %interface,
585 "Stale veth detected (ENXIO), attempting socket reopen"
586 );
587 match reopen_beacon_socket(&interface, ethertype) {
588 Ok(new_socket) => {
589 socket = Arc::new(new_socket);
590 consecutive_errors = 0;
591 info!(
592 transport_id = %transport_id,
593 interface = %interface,
594 "Beacon socket reopened successfully"
595 );
596 }
597 Err(e) => {
598 warn!(
599 transport_id = %transport_id,
600 error = %e,
601 interface = %interface,
602 "Failed to reopen beacon socket, will retry"
603 );
604 }
605 }
606 }
607 }
608 }
609 }
610}
611
612fn reopen_beacon_socket(
617 interface: &str,
618 ethertype: u16,
619) -> Result<AsyncPacketSocket, TransportError> {
620 let raw_socket = PacketSocket::open(interface, ethertype)?;
621 raw_socket.into_async()
622}
623
624fn parse_mac_addr(addr: &TransportAddr) -> Result<[u8; 6], TransportError> {
630 let bytes = addr.as_bytes();
631 if bytes.len() != 6 {
632 return Err(TransportError::InvalidAddress(format!(
633 "expected 6-byte MAC, got {} bytes",
634 bytes.len()
635 )));
636 }
637 if bytes == [0, 0, 0, 0, 0, 0] {
638 return Err(TransportError::InvalidAddress(
639 "destination MAC is all zeros".into(),
640 ));
641 }
642 let mut mac = [0u8; 6];
643 mac.copy_from_slice(bytes);
644 Ok(mac)
645}
646
647pub fn format_mac(mac: &[u8; 6]) -> String {
649 format!(
650 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
651 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
652 )
653}
654
655pub fn parse_mac_string(s: &str) -> Result<[u8; 6], TransportError> {
657 let parts: Vec<&str> = s.split(':').collect();
658 if parts.len() != 6 {
659 return Err(TransportError::InvalidAddress(format!(
660 "invalid MAC format: expected 6 colon-separated hex bytes, got '{}'",
661 s
662 )));
663 }
664 let mut mac = [0u8; 6];
665 for (i, part) in parts.iter().enumerate() {
666 mac[i] = u8::from_str_radix(part, 16).map_err(|_| {
667 TransportError::InvalidAddress(format!("invalid hex byte '{}' in MAC address", part))
668 })?;
669 }
670 Ok(mac)
671}
672
673#[cfg(test)]
678mod tests {
679 use super::*;
680
681 #[test]
682 fn test_parse_mac_addr_valid() {
683 let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
684 let mac = parse_mac_addr(&addr).unwrap();
685 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
686 }
687
688 #[test]
689 fn test_parse_mac_addr_wrong_length() {
690 let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc]);
691 assert!(parse_mac_addr(&addr).is_err());
692
693 let addr = TransportAddr::from_string("192.168.1.1:2121");
694 assert!(parse_mac_addr(&addr).is_err());
695 }
696
697 #[test]
698 fn test_parse_mac_addr_all_zeros() {
699 let addr = TransportAddr::from_bytes(&[0, 0, 0, 0, 0, 0]);
700 assert!(parse_mac_addr(&addr).is_err());
701 }
702
703 #[test]
704 fn test_format_mac() {
705 let mac = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff];
706 assert_eq!(format_mac(&mac), "aa:bb:cc:dd:ee:ff");
707 }
708
709 #[test]
710 fn test_format_mac_leading_zeros() {
711 let mac = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
712 assert_eq!(format_mac(&mac), "01:02:03:04:05:06");
713 }
714
715 #[test]
716 fn test_parse_mac_string_valid() {
717 let mac = parse_mac_string("aa:bb:cc:dd:ee:ff").unwrap();
718 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
719 }
720
721 #[test]
722 fn test_parse_mac_string_uppercase() {
723 let mac = parse_mac_string("AA:BB:CC:DD:EE:FF").unwrap();
724 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
725 }
726
727 #[test]
728 fn test_parse_mac_string_invalid() {
729 assert!(parse_mac_string("aa:bb:cc").is_err());
730 assert!(parse_mac_string("not:a:mac:at:all:x").is_err());
731 assert!(parse_mac_string("").is_err());
732 assert!(parse_mac_string("aa-bb-cc-dd-ee-ff").is_err());
733 }
734
735 #[test]
736 fn test_frame_type_data_prefix() {
737 let data = vec![1, 2, 3, 4];
739 let mut frame = Vec::with_capacity(3 + data.len());
740 frame.push(FRAME_TYPE_DATA);
741 frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
742 frame.extend_from_slice(&data);
743
744 assert_eq!(frame[0], 0x00); assert_eq!(u16::from_le_bytes([frame[1], frame[2]]), 4); assert_eq!(&frame[3..], &[1, 2, 3, 4]); }
748
749 #[test]
750 fn test_data_frame_padding_trimmed() {
751 let payload = vec![0xAA, 0xBB, 0xCC, 0xDD];
754 let payload_len = payload.len() as u16;
755
756 let mut frame = Vec::with_capacity(3 + payload.len());
758 frame.push(FRAME_TYPE_DATA);
759 frame.extend_from_slice(&payload_len.to_le_bytes());
760 frame.extend_from_slice(&payload);
761
762 frame.resize(46, 0x00);
764
765 let recv_len = u16::from_le_bytes([frame[1], frame[2]]) as usize;
767 let extracted = &frame[3..3 + recv_len];
768 assert_eq!(extracted, &[0xAA, 0xBB, 0xCC, 0xDD]);
769 }
770
771 #[test]
772 fn test_beacon_size() {
773 assert_eq!(discovery::BEACON_SIZE, 34);
774 }
775}