1pub mod discovery;
9pub mod socket;
10pub mod stats;
11
12use super::{
13 DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr, TransportError,
14 TransportId, TransportState, TransportType,
15};
16use crate::config::EthernetConfig;
17use discovery::{DiscoveryBuffer, FRAME_TYPE_BEACON, FRAME_TYPE_DATA, build_beacon, parse_beacon};
18use socket::{AsyncPacketSocket, ETHERNET_BROADCAST, PacketSocket};
19use stats::EthernetStats;
20
21use secp256k1::XOnlyPublicKey;
22use std::sync::Arc;
23use tokio::task::JoinHandle;
24use tracing::{debug, info, trace, warn};
25
26pub struct EthernetTransport {
32 transport_id: TransportId,
34 name: Option<String>,
36 config: EthernetConfig,
38 state: TransportState,
40 socket: Option<Arc<AsyncPacketSocket>>,
42 packet_tx: PacketTx,
44 recv_task: Option<JoinHandle<()>>,
46 beacon_task: Option<JoinHandle<()>>,
48 local_mac: Option<[u8; 6]>,
50 interface: String,
52 effective_mtu: u16,
54 discovery_buffer: Arc<DiscoveryBuffer>,
56 stats: Arc<EthernetStats>,
58 local_pubkey: Option<XOnlyPublicKey>,
60}
61
62impl EthernetTransport {
63 pub fn new(
65 transport_id: TransportId,
66 name: Option<String>,
67 config: EthernetConfig,
68 packet_tx: PacketTx,
69 ) -> Self {
70 let interface = config.interface.clone();
71 let discovery_buffer = Arc::new(DiscoveryBuffer::new(transport_id));
72 let stats = Arc::new(EthernetStats::new());
73
74 Self {
75 transport_id,
76 name,
77 config,
78 state: TransportState::Configured,
79 socket: None,
80 packet_tx,
81 recv_task: None,
82 beacon_task: None,
83 local_mac: None,
84 interface,
85 effective_mtu: 1499, discovery_buffer,
87 stats,
88 local_pubkey: None,
89 }
90 }
91
92 pub fn name(&self) -> Option<&str> {
94 self.name.as_deref()
95 }
96
97 pub fn interface_name(&self) -> &str {
99 &self.interface
100 }
101
102 pub fn local_mac(&self) -> Option<[u8; 6]> {
104 self.local_mac
105 }
106
107 pub fn set_local_pubkey(&mut self, pubkey: XOnlyPublicKey) {
111 self.local_pubkey = Some(pubkey);
112 }
113
114 pub fn stats(&self) -> &Arc<EthernetStats> {
116 &self.stats
117 }
118
119 pub async fn start_async(&mut self) -> Result<(), TransportError> {
124 if !self.state.can_start() {
125 return Err(TransportError::AlreadyStarted);
126 }
127
128 self.state = TransportState::Starting;
129
130 let raw_socket = PacketSocket::open(&self.config.interface, self.config.ethertype())?;
132
133 let local_mac = raw_socket.local_mac()?;
135 let if_mtu = raw_socket.interface_mtu()?;
136
137 let effective_mtu = if let Some(configured_mtu) = self.config.mtu {
140 configured_mtu.min(if_mtu.saturating_sub(3))
142 } else {
143 if_mtu.saturating_sub(3)
144 };
145 self.effective_mtu = effective_mtu;
146 self.local_mac = Some(local_mac);
147
148 raw_socket.set_recv_buffer_size(self.config.recv_buf_size())?;
150 raw_socket.set_send_buffer_size(self.config.send_buf_size())?;
151
152 let async_socket = raw_socket.into_async()?;
154 let socket = Arc::new(async_socket);
155 self.socket = Some(socket.clone());
156
157 let transport_id = self.transport_id;
159 let packet_tx = self.packet_tx.clone();
160 let mtu = self.effective_mtu;
161 let discovery_enabled = self.config.discovery();
162 let discovery_buffer = self.discovery_buffer.clone();
163 let stats = self.stats.clone();
164 let recv_socket = socket.clone();
165
166 let recv_task = tokio::spawn(async move {
167 ethernet_receive_loop(
168 recv_socket,
169 transport_id,
170 packet_tx,
171 mtu,
172 discovery_enabled,
173 discovery_buffer,
174 stats,
175 )
176 .await;
177 });
178 self.recv_task = Some(recv_task);
179
180 if self.config.announce() {
182 if let Some(pubkey) = self.local_pubkey {
183 let beacon_socket = socket.clone();
184 let interval_secs = self.config.beacon_interval_secs();
185 let beacon_stats = self.stats.clone();
186 let beacon_transport_id = self.transport_id;
187
188 let beacon_interface = self.config.interface.clone();
189 let beacon_ethertype = self.config.ethertype();
190
191 let beacon_task = tokio::spawn(async move {
192 beacon_sender_loop(
193 beacon_socket,
194 pubkey,
195 interval_secs,
196 beacon_stats,
197 beacon_transport_id,
198 beacon_interface,
199 beacon_ethertype,
200 )
201 .await;
202 });
203 self.beacon_task = Some(beacon_task);
204 } else {
205 warn!(
206 transport_id = %self.transport_id,
207 "Announce enabled but no local pubkey set; beacons disabled"
208 );
209 }
210 }
211
212 self.state = TransportState::Up;
213
214 if let Some(ref name) = self.name {
215 info!(
216 name = %name,
217 interface = %self.interface,
218 mac = %format_mac(&local_mac),
219 mtu = effective_mtu,
220 if_mtu = if_mtu,
221 "Ethernet transport started"
222 );
223 } else {
224 info!(
225 interface = %self.interface,
226 mac = %format_mac(&local_mac),
227 mtu = effective_mtu,
228 if_mtu = if_mtu,
229 "Ethernet transport started"
230 );
231 }
232
233 Ok(())
234 }
235
236 pub async fn stop_async(&mut self) -> Result<(), TransportError> {
238 if !self.state.is_operational() {
239 return Err(TransportError::NotStarted);
240 }
241
242 if let Some(ref socket) = self.socket {
246 socket.shutdown();
247 }
248
249 if let Some(task) = self.beacon_task.take() {
254 task.abort();
255 #[cfg(not(target_os = "macos"))]
256 {
257 let _ = task.await;
258 }
259 }
260 if let Some(task) = self.recv_task.take() {
261 task.abort();
262 #[cfg(not(target_os = "macos"))]
263 {
264 let _ = task.await;
265 }
266 }
267
268 self.socket.take();
270 self.local_mac = None;
271
272 self.state = TransportState::Down;
273
274 info!(
275 transport_id = %self.transport_id,
276 interface = %self.interface,
277 "Ethernet transport stopped"
278 );
279
280 Ok(())
281 }
282
283 pub async fn send_async(
288 &self,
289 addr: &TransportAddr,
290 data: &[u8],
291 ) -> Result<usize, TransportError> {
292 if !self.state.is_operational() {
293 return Err(TransportError::NotStarted);
294 }
295
296 if data.len() > self.effective_mtu as usize {
297 return Err(TransportError::MtuExceeded {
298 packet_size: data.len(),
299 mtu: self.effective_mtu,
300 });
301 }
302
303 let dest_mac = parse_mac_addr(addr)?;
304 let socket = self.socket.as_ref().ok_or(TransportError::NotStarted)?;
305
306 let mut frame = Vec::with_capacity(3 + data.len());
311 frame.push(FRAME_TYPE_DATA);
312 frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
313 frame.extend_from_slice(data);
314
315 let bytes_sent = socket.send_to(&frame, &dest_mac).await?;
316 self.stats.record_send(bytes_sent);
317
318 trace!(
319 transport_id = %self.transport_id,
320 remote_mac = %format_mac(&dest_mac),
321 bytes = bytes_sent,
322 "Ethernet frame sent"
323 );
324
325 Ok(bytes_sent.saturating_sub(3))
327 }
328}
329
330impl Transport for EthernetTransport {
331 fn transport_id(&self) -> TransportId {
332 self.transport_id
333 }
334
335 fn transport_type(&self) -> &TransportType {
336 &TransportType::ETHERNET
337 }
338
339 fn state(&self) -> TransportState {
340 self.state
341 }
342
343 fn mtu(&self) -> u16 {
344 self.effective_mtu
345 }
346
347 fn start(&mut self) -> Result<(), TransportError> {
348 Err(TransportError::NotSupported(
349 "use start_async() for Ethernet transport".into(),
350 ))
351 }
352
353 fn stop(&mut self) -> Result<(), TransportError> {
354 Err(TransportError::NotSupported(
355 "use stop_async() for Ethernet transport".into(),
356 ))
357 }
358
359 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
360 Err(TransportError::NotSupported(
361 "use send_async() for Ethernet transport".into(),
362 ))
363 }
364
365 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
366 Ok(self.discovery_buffer.take())
367 }
368
369 fn auto_connect(&self) -> bool {
370 self.config.auto_connect()
371 }
372
373 fn accept_connections(&self) -> bool {
374 self.config.accept_connections()
375 }
376}
377
378async fn ethernet_receive_loop(
384 socket: Arc<AsyncPacketSocket>,
385 transport_id: TransportId,
386 packet_tx: PacketTx,
387 mtu: u16,
388 discovery_enabled: bool,
389 discovery_buffer: Arc<DiscoveryBuffer>,
390 stats: Arc<EthernetStats>,
391) {
392 let mut buf = vec![0u8; mtu as usize + 100];
394
395 debug!(transport_id = %transport_id, "Ethernet receive loop starting");
396
397 loop {
398 match socket.recv_from(&mut buf).await {
399 Ok((len, src_mac)) => {
400 if len == 0 {
401 continue;
402 }
403
404 stats.record_recv(len);
405
406 let frame_type = buf[0];
407 match frame_type {
408 FRAME_TYPE_DATA => {
409 if len < 3 {
412 trace!("Data frame too short ({len} bytes), ignoring");
413 continue;
414 }
415 let payload_len = u16::from_le_bytes([buf[1], buf[2]]) as usize;
416 if payload_len > len - 3 {
417 trace!(
418 "Data frame length field ({payload_len}) exceeds \
419 available bytes ({}), ignoring",
420 len - 3
421 );
422 continue;
423 }
424 let data = buf[3..3 + payload_len].to_vec();
425 let addr = TransportAddr::from_bytes(&src_mac);
426 let packet = ReceivedPacket::new(transport_id, addr, data);
427
428 trace!(
429 transport_id = %transport_id,
430 remote_mac = %format_mac(&src_mac),
431 bytes = payload_len,
432 "Ethernet data frame received"
433 );
434
435 if packet_tx.send(packet).is_err() {
436 debug!(
437 transport_id = %transport_id,
438 "Packet channel closed, stopping receive loop"
439 );
440 break;
441 }
442 }
443 FRAME_TYPE_BEACON => {
444 stats.record_beacon_recv();
445
446 if discovery_enabled && let Some(pubkey) = parse_beacon(&buf[..len]) {
447 discovery_buffer.add_peer(src_mac, pubkey);
448 trace!(
449 transport_id = %transport_id,
450 remote_mac = %format_mac(&src_mac),
451 "Discovery beacon received"
452 );
453 }
454 }
455 _ => {
456 trace!(
458 transport_id = %transport_id,
459 frame_type = frame_type,
460 "Unknown frame type, dropping"
461 );
462 }
463 }
464 }
465 Err(e) => {
466 stats.record_recv_error();
467 warn!(
468 transport_id = %transport_id,
469 error = %e,
470 "Ethernet receive error"
471 );
472 }
473 }
474 }
475
476 debug!(transport_id = %transport_id, "Ethernet receive loop stopped");
477}
478
479async fn beacon_sender_loop(
490 mut socket: Arc<AsyncPacketSocket>,
491 pubkey: XOnlyPublicKey,
492 interval_secs: u64,
493 stats: Arc<EthernetStats>,
494 transport_id: TransportId,
495 interface: String,
496 ethertype: u16,
497) {
498 const REOPEN_THRESHOLD: u32 = 3;
500
501 let beacon = build_beacon(&pubkey);
502 let interval = tokio::time::Duration::from_secs(interval_secs);
503
504 debug!(
505 transport_id = %transport_id,
506 interval_secs,
507 "Beacon sender starting"
508 );
509
510 if let Err(e) = socket.send_to(&beacon, ÐERNET_BROADCAST).await {
512 warn!(
513 transport_id = %transport_id,
514 error = %e,
515 "Failed to send initial beacon"
516 );
517 } else {
518 stats.record_beacon_sent();
519 }
520
521 let mut interval_timer = tokio::time::interval(interval);
522 interval_timer.tick().await; let mut consecutive_errors: u32 = 0;
524
525 loop {
526 interval_timer.tick().await;
527
528 match socket.send_to(&beacon, ÐERNET_BROADCAST).await {
529 Ok(_) => {
530 if consecutive_errors > 0 {
531 debug!(
532 transport_id = %transport_id,
533 "Beacon send recovered after {} errors", consecutive_errors,
534 );
535 }
536 consecutive_errors = 0;
537 stats.record_beacon_sent();
538 trace!(
539 transport_id = %transport_id,
540 "Beacon sent"
541 );
542 }
543 Err(e) => {
544 consecutive_errors += 1;
545 stats.record_send_error();
546
547 let is_enxio = format!("{e}").contains("os error 6");
548
549 if consecutive_errors == 1 {
551 warn!(
552 transport_id = %transport_id,
553 error = %e,
554 "Failed to send beacon"
555 );
556 }
557
558 if is_enxio && consecutive_errors >= REOPEN_THRESHOLD {
559 info!(
560 transport_id = %transport_id,
561 consecutive_errors,
562 interface = %interface,
563 "Stale veth detected (ENXIO), attempting socket reopen"
564 );
565 match reopen_beacon_socket(&interface, ethertype) {
566 Ok(new_socket) => {
567 socket = Arc::new(new_socket);
568 consecutive_errors = 0;
569 info!(
570 transport_id = %transport_id,
571 interface = %interface,
572 "Beacon socket reopened successfully"
573 );
574 }
575 Err(e) => {
576 warn!(
577 transport_id = %transport_id,
578 error = %e,
579 interface = %interface,
580 "Failed to reopen beacon socket, will retry"
581 );
582 }
583 }
584 }
585 }
586 }
587 }
588}
589
590fn reopen_beacon_socket(
595 interface: &str,
596 ethertype: u16,
597) -> Result<AsyncPacketSocket, TransportError> {
598 let raw_socket = PacketSocket::open(interface, ethertype)?;
599 raw_socket.into_async()
600}
601
602fn parse_mac_addr(addr: &TransportAddr) -> Result<[u8; 6], TransportError> {
608 let bytes = addr.as_bytes();
609 if bytes.len() != 6 {
610 return Err(TransportError::InvalidAddress(format!(
611 "expected 6-byte MAC, got {} bytes",
612 bytes.len()
613 )));
614 }
615 if bytes == [0, 0, 0, 0, 0, 0] {
616 return Err(TransportError::InvalidAddress(
617 "destination MAC is all zeros".into(),
618 ));
619 }
620 let mut mac = [0u8; 6];
621 mac.copy_from_slice(bytes);
622 Ok(mac)
623}
624
625pub fn format_mac(mac: &[u8; 6]) -> String {
627 format!(
628 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
629 mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
630 )
631}
632
633pub fn parse_mac_string(s: &str) -> Result<[u8; 6], TransportError> {
635 let parts: Vec<&str> = s.split(':').collect();
636 if parts.len() != 6 {
637 return Err(TransportError::InvalidAddress(format!(
638 "invalid MAC format: expected 6 colon-separated hex bytes, got '{}'",
639 s
640 )));
641 }
642 let mut mac = [0u8; 6];
643 for (i, part) in parts.iter().enumerate() {
644 mac[i] = u8::from_str_radix(part, 16).map_err(|_| {
645 TransportError::InvalidAddress(format!("invalid hex byte '{}' in MAC address", part))
646 })?;
647 }
648 Ok(mac)
649}
650
651#[cfg(test)]
656mod tests {
657 use super::*;
658
659 #[test]
660 fn test_parse_mac_addr_valid() {
661 let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
662 let mac = parse_mac_addr(&addr).unwrap();
663 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
664 }
665
666 #[test]
667 fn test_parse_mac_addr_wrong_length() {
668 let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc]);
669 assert!(parse_mac_addr(&addr).is_err());
670
671 let addr = TransportAddr::from_string("192.168.1.1:2121");
672 assert!(parse_mac_addr(&addr).is_err());
673 }
674
675 #[test]
676 fn test_parse_mac_addr_all_zeros() {
677 let addr = TransportAddr::from_bytes(&[0, 0, 0, 0, 0, 0]);
678 assert!(parse_mac_addr(&addr).is_err());
679 }
680
681 #[test]
682 fn test_format_mac() {
683 let mac = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff];
684 assert_eq!(format_mac(&mac), "aa:bb:cc:dd:ee:ff");
685 }
686
687 #[test]
688 fn test_format_mac_leading_zeros() {
689 let mac = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
690 assert_eq!(format_mac(&mac), "01:02:03:04:05:06");
691 }
692
693 #[test]
694 fn test_parse_mac_string_valid() {
695 let mac = parse_mac_string("aa:bb:cc:dd:ee:ff").unwrap();
696 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
697 }
698
699 #[test]
700 fn test_parse_mac_string_uppercase() {
701 let mac = parse_mac_string("AA:BB:CC:DD:EE:FF").unwrap();
702 assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
703 }
704
705 #[test]
706 fn test_parse_mac_string_invalid() {
707 assert!(parse_mac_string("aa:bb:cc").is_err());
708 assert!(parse_mac_string("not:a:mac:at:all:x").is_err());
709 assert!(parse_mac_string("").is_err());
710 assert!(parse_mac_string("aa-bb-cc-dd-ee-ff").is_err());
711 }
712
713 #[test]
714 fn test_frame_type_data_prefix() {
715 let data = vec![1, 2, 3, 4];
717 let mut frame = Vec::with_capacity(3 + data.len());
718 frame.push(FRAME_TYPE_DATA);
719 frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
720 frame.extend_from_slice(&data);
721
722 assert_eq!(frame[0], 0x00); assert_eq!(u16::from_le_bytes([frame[1], frame[2]]), 4); assert_eq!(&frame[3..], &[1, 2, 3, 4]); }
726
727 #[test]
728 fn test_data_frame_padding_trimmed() {
729 let payload = vec![0xAA, 0xBB, 0xCC, 0xDD];
732 let payload_len = payload.len() as u16;
733
734 let mut frame = Vec::with_capacity(3 + payload.len());
736 frame.push(FRAME_TYPE_DATA);
737 frame.extend_from_slice(&payload_len.to_le_bytes());
738 frame.extend_from_slice(&payload);
739
740 frame.resize(46, 0x00);
742
743 let recv_len = u16::from_le_bytes([frame[1], frame[2]]) as usize;
745 let extracted = &frame[3..3 + recv_len];
746 assert_eq!(extracted, &[0xAA, 0xBB, 0xCC, 0xDD]);
747 }
748
749 #[test]
750 fn test_beacon_size() {
751 assert_eq!(discovery::BEACON_SIZE, 34);
752 }
753}