pub mod discovery;
pub mod socket;
pub mod stats;
use super::{
DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr, TransportError,
TransportId, TransportState, TransportType,
};
use crate::config::EthernetConfig;
use discovery::{DiscoveryBuffer, FRAME_TYPE_BEACON, FRAME_TYPE_DATA, build_beacon, parse_beacon};
use socket::{AsyncPacketSocket, ETHERNET_BROADCAST, PacketSocket};
use stats::EthernetStats;
use secp256k1::XOnlyPublicKey;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{debug, info, trace, warn};
pub struct EthernetTransport {
transport_id: TransportId,
name: Option<String>,
config: EthernetConfig,
state: TransportState,
socket: Option<Arc<AsyncPacketSocket>>,
packet_tx: PacketTx,
recv_task: Option<JoinHandle<()>>,
beacon_task: Option<JoinHandle<()>>,
local_mac: Option<[u8; 6]>,
interface: String,
effective_mtu: u16,
discovery_buffer: Arc<DiscoveryBuffer>,
stats: Arc<EthernetStats>,
local_pubkey: Option<XOnlyPublicKey>,
}
impl EthernetTransport {
pub fn new(
transport_id: TransportId,
name: Option<String>,
config: EthernetConfig,
packet_tx: PacketTx,
) -> Self {
let interface = config.interface.clone();
let discovery_buffer = Arc::new(DiscoveryBuffer::new(transport_id));
let stats = Arc::new(EthernetStats::new());
Self {
transport_id,
name,
config,
state: TransportState::Configured,
socket: None,
packet_tx,
recv_task: None,
beacon_task: None,
local_mac: None,
interface,
effective_mtu: 1499, discovery_buffer,
stats,
local_pubkey: None,
}
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn interface_name(&self) -> &str {
&self.interface
}
pub fn local_mac(&self) -> Option<[u8; 6]> {
self.local_mac
}
pub fn set_local_pubkey(&mut self, pubkey: XOnlyPublicKey) {
self.local_pubkey = Some(pubkey);
}
pub fn stats(&self) -> &Arc<EthernetStats> {
&self.stats
}
pub async fn start_async(&mut self) -> Result<(), TransportError> {
if !self.state.can_start() {
return Err(TransportError::AlreadyStarted);
}
self.state = TransportState::Starting;
let raw_socket = PacketSocket::open(&self.config.interface, self.config.ethertype())?;
let local_mac = raw_socket.local_mac()?;
let if_mtu = raw_socket.interface_mtu()?;
let effective_mtu = if let Some(configured_mtu) = self.config.mtu {
configured_mtu.min(if_mtu.saturating_sub(3))
} else {
if_mtu.saturating_sub(3)
};
self.effective_mtu = effective_mtu;
self.local_mac = Some(local_mac);
raw_socket.set_recv_buffer_size(self.config.recv_buf_size())?;
raw_socket.set_send_buffer_size(self.config.send_buf_size())?;
let async_socket = raw_socket.into_async()?;
let socket = Arc::new(async_socket);
self.socket = Some(socket.clone());
let transport_id = self.transport_id;
let packet_tx = self.packet_tx.clone();
let mtu = self.effective_mtu;
let discovery_enabled = self.config.discovery();
let discovery_buffer = self.discovery_buffer.clone();
let stats = self.stats.clone();
let recv_socket = socket.clone();
let recv_task = tokio::spawn(async move {
ethernet_receive_loop(
recv_socket,
transport_id,
packet_tx,
mtu,
discovery_enabled,
discovery_buffer,
stats,
)
.await;
});
self.recv_task = Some(recv_task);
if self.config.announce() {
if let Some(pubkey) = self.local_pubkey {
let beacon_socket = socket.clone();
let interval_secs = self.config.beacon_interval_secs();
let beacon_stats = self.stats.clone();
let beacon_transport_id = self.transport_id;
let beacon_interface = self.config.interface.clone();
let beacon_ethertype = self.config.ethertype();
let beacon_task = tokio::spawn(async move {
beacon_sender_loop(
beacon_socket,
pubkey,
interval_secs,
beacon_stats,
beacon_transport_id,
beacon_interface,
beacon_ethertype,
)
.await;
});
self.beacon_task = Some(beacon_task);
} else {
warn!(
transport_id = %self.transport_id,
"Announce enabled but no local pubkey set; beacons disabled"
);
}
}
self.state = TransportState::Up;
if let Some(ref name) = self.name {
info!(
name = %name,
interface = %self.interface,
mac = %format_mac(&local_mac),
mtu = effective_mtu,
if_mtu = if_mtu,
"Ethernet transport started"
);
} else {
info!(
interface = %self.interface,
mac = %format_mac(&local_mac),
mtu = effective_mtu,
if_mtu = if_mtu,
"Ethernet transport started"
);
}
Ok(())
}
pub async fn stop_async(&mut self) -> Result<(), TransportError> {
if !self.state.is_operational() {
return Err(TransportError::NotStarted);
}
if let Some(ref socket) = self.socket {
socket.shutdown();
}
if let Some(task) = self.beacon_task.take() {
task.abort();
#[cfg(not(target_os = "macos"))]
{
let _ = task.await;
}
}
if let Some(task) = self.recv_task.take() {
task.abort();
#[cfg(not(target_os = "macos"))]
{
let _ = task.await;
}
}
self.socket.take();
self.local_mac = None;
self.state = TransportState::Down;
info!(
transport_id = %self.transport_id,
interface = %self.interface,
"Ethernet transport stopped"
);
Ok(())
}
pub async fn send_async(
&self,
addr: &TransportAddr,
data: &[u8],
) -> Result<usize, TransportError> {
if !self.state.is_operational() {
return Err(TransportError::NotStarted);
}
if data.len() > self.effective_mtu as usize {
return Err(TransportError::MtuExceeded {
packet_size: data.len(),
mtu: self.effective_mtu,
});
}
let dest_mac = parse_mac_addr(addr)?;
let socket = self.socket.as_ref().ok_or(TransportError::NotStarted)?;
let mut frame = Vec::with_capacity(3 + data.len());
frame.push(FRAME_TYPE_DATA);
frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
frame.extend_from_slice(data);
let bytes_sent = socket.send_to(&frame, &dest_mac).await?;
self.stats.record_send(bytes_sent);
trace!(
transport_id = %self.transport_id,
remote_mac = %format_mac(&dest_mac),
bytes = bytes_sent,
"Ethernet frame sent"
);
Ok(bytes_sent.saturating_sub(3))
}
}
impl Transport for EthernetTransport {
fn transport_id(&self) -> TransportId {
self.transport_id
}
fn transport_type(&self) -> &TransportType {
&TransportType::ETHERNET
}
fn state(&self) -> TransportState {
self.state
}
fn mtu(&self) -> u16 {
self.effective_mtu
}
fn start(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use start_async() for Ethernet transport".into(),
))
}
fn stop(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use stop_async() for Ethernet transport".into(),
))
}
fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use send_async() for Ethernet transport".into(),
))
}
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
Ok(self.discovery_buffer.take())
}
fn auto_connect(&self) -> bool {
self.config.auto_connect()
}
fn accept_connections(&self) -> bool {
self.config.accept_connections()
}
}
async fn ethernet_receive_loop(
socket: Arc<AsyncPacketSocket>,
transport_id: TransportId,
packet_tx: PacketTx,
mtu: u16,
discovery_enabled: bool,
discovery_buffer: Arc<DiscoveryBuffer>,
stats: Arc<EthernetStats>,
) {
let mut buf = vec![0u8; mtu as usize + 100];
debug!(transport_id = %transport_id, "Ethernet receive loop starting");
loop {
match socket.recv_from(&mut buf).await {
Ok((len, src_mac)) => {
if len == 0 {
continue;
}
stats.record_recv(len);
let frame_type = buf[0];
match frame_type {
FRAME_TYPE_DATA => {
if len < 3 {
trace!("Data frame too short ({len} bytes), ignoring");
continue;
}
let payload_len = u16::from_le_bytes([buf[1], buf[2]]) as usize;
if payload_len > len - 3 {
trace!(
"Data frame length field ({payload_len}) exceeds \
available bytes ({}), ignoring",
len - 3
);
continue;
}
let data = buf[3..3 + payload_len].to_vec();
let addr = TransportAddr::from_bytes(&src_mac);
let packet = ReceivedPacket::new(transport_id, addr, data);
trace!(
transport_id = %transport_id,
remote_mac = %format_mac(&src_mac),
bytes = payload_len,
"Ethernet data frame received"
);
if packet_tx.send(packet).is_err() {
debug!(
transport_id = %transport_id,
"Packet channel closed, stopping receive loop"
);
break;
}
}
FRAME_TYPE_BEACON => {
stats.record_beacon_recv();
if discovery_enabled && let Some(pubkey) = parse_beacon(&buf[..len]) {
discovery_buffer.add_peer(src_mac, pubkey);
trace!(
transport_id = %transport_id,
remote_mac = %format_mac(&src_mac),
"Discovery beacon received"
);
}
}
_ => {
trace!(
transport_id = %transport_id,
frame_type = frame_type,
"Unknown frame type, dropping"
);
}
}
}
Err(e) => {
stats.record_recv_error();
warn!(
transport_id = %transport_id,
error = %e,
"Ethernet receive error"
);
}
}
}
debug!(transport_id = %transport_id, "Ethernet receive loop stopped");
}
async fn beacon_sender_loop(
mut socket: Arc<AsyncPacketSocket>,
pubkey: XOnlyPublicKey,
interval_secs: u64,
stats: Arc<EthernetStats>,
transport_id: TransportId,
interface: String,
ethertype: u16,
) {
const REOPEN_THRESHOLD: u32 = 3;
let beacon = build_beacon(&pubkey);
let interval = tokio::time::Duration::from_secs(interval_secs);
debug!(
transport_id = %transport_id,
interval_secs,
"Beacon sender starting"
);
if let Err(e) = socket.send_to(&beacon, ÐERNET_BROADCAST).await {
warn!(
transport_id = %transport_id,
error = %e,
"Failed to send initial beacon"
);
} else {
stats.record_beacon_sent();
}
let mut interval_timer = tokio::time::interval(interval);
interval_timer.tick().await; let mut consecutive_errors: u32 = 0;
loop {
interval_timer.tick().await;
match socket.send_to(&beacon, ÐERNET_BROADCAST).await {
Ok(_) => {
if consecutive_errors > 0 {
debug!(
transport_id = %transport_id,
"Beacon send recovered after {} errors", consecutive_errors,
);
}
consecutive_errors = 0;
stats.record_beacon_sent();
trace!(
transport_id = %transport_id,
"Beacon sent"
);
}
Err(e) => {
consecutive_errors += 1;
stats.record_send_error();
let is_enxio = format!("{e}").contains("os error 6");
if consecutive_errors == 1 {
warn!(
transport_id = %transport_id,
error = %e,
"Failed to send beacon"
);
}
if is_enxio && consecutive_errors >= REOPEN_THRESHOLD {
info!(
transport_id = %transport_id,
consecutive_errors,
interface = %interface,
"Stale veth detected (ENXIO), attempting socket reopen"
);
match reopen_beacon_socket(&interface, ethertype) {
Ok(new_socket) => {
socket = Arc::new(new_socket);
consecutive_errors = 0;
info!(
transport_id = %transport_id,
interface = %interface,
"Beacon socket reopened successfully"
);
}
Err(e) => {
warn!(
transport_id = %transport_id,
error = %e,
interface = %interface,
"Failed to reopen beacon socket, will retry"
);
}
}
}
}
}
}
}
fn reopen_beacon_socket(
interface: &str,
ethertype: u16,
) -> Result<AsyncPacketSocket, TransportError> {
let raw_socket = PacketSocket::open(interface, ethertype)?;
raw_socket.into_async()
}
fn parse_mac_addr(addr: &TransportAddr) -> Result<[u8; 6], TransportError> {
let bytes = addr.as_bytes();
if bytes.len() != 6 {
return Err(TransportError::InvalidAddress(format!(
"expected 6-byte MAC, got {} bytes",
bytes.len()
)));
}
if bytes == [0, 0, 0, 0, 0, 0] {
return Err(TransportError::InvalidAddress(
"destination MAC is all zeros".into(),
));
}
let mut mac = [0u8; 6];
mac.copy_from_slice(bytes);
Ok(mac)
}
pub fn format_mac(mac: &[u8; 6]) -> String {
format!(
"{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
)
}
pub fn parse_mac_string(s: &str) -> Result<[u8; 6], TransportError> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 6 {
return Err(TransportError::InvalidAddress(format!(
"invalid MAC format: expected 6 colon-separated hex bytes, got '{}'",
s
)));
}
let mut mac = [0u8; 6];
for (i, part) in parts.iter().enumerate() {
mac[i] = u8::from_str_radix(part, 16).map_err(|_| {
TransportError::InvalidAddress(format!("invalid hex byte '{}' in MAC address", part))
})?;
}
Ok(mac)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_mac_addr_valid() {
let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
let mac = parse_mac_addr(&addr).unwrap();
assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
}
#[test]
fn test_parse_mac_addr_wrong_length() {
let addr = TransportAddr::from_bytes(&[0xaa, 0xbb, 0xcc]);
assert!(parse_mac_addr(&addr).is_err());
let addr = TransportAddr::from_string("192.168.1.1:2121");
assert!(parse_mac_addr(&addr).is_err());
}
#[test]
fn test_parse_mac_addr_all_zeros() {
let addr = TransportAddr::from_bytes(&[0, 0, 0, 0, 0, 0]);
assert!(parse_mac_addr(&addr).is_err());
}
#[test]
fn test_format_mac() {
let mac = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff];
assert_eq!(format_mac(&mac), "aa:bb:cc:dd:ee:ff");
}
#[test]
fn test_format_mac_leading_zeros() {
let mac = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
assert_eq!(format_mac(&mac), "01:02:03:04:05:06");
}
#[test]
fn test_parse_mac_string_valid() {
let mac = parse_mac_string("aa:bb:cc:dd:ee:ff").unwrap();
assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
}
#[test]
fn test_parse_mac_string_uppercase() {
let mac = parse_mac_string("AA:BB:CC:DD:EE:FF").unwrap();
assert_eq!(mac, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]);
}
#[test]
fn test_parse_mac_string_invalid() {
assert!(parse_mac_string("aa:bb:cc").is_err());
assert!(parse_mac_string("not:a:mac:at:all:x").is_err());
assert!(parse_mac_string("").is_err());
assert!(parse_mac_string("aa-bb-cc-dd-ee-ff").is_err());
}
#[test]
fn test_frame_type_data_prefix() {
let data = vec![1, 2, 3, 4];
let mut frame = Vec::with_capacity(3 + data.len());
frame.push(FRAME_TYPE_DATA);
frame.extend_from_slice(&(data.len() as u16).to_le_bytes());
frame.extend_from_slice(&data);
assert_eq!(frame[0], 0x00); assert_eq!(u16::from_le_bytes([frame[1], frame[2]]), 4); assert_eq!(&frame[3..], &[1, 2, 3, 4]); }
#[test]
fn test_data_frame_padding_trimmed() {
let payload = vec![0xAA, 0xBB, 0xCC, 0xDD];
let payload_len = payload.len() as u16;
let mut frame = Vec::with_capacity(3 + payload.len());
frame.push(FRAME_TYPE_DATA);
frame.extend_from_slice(&payload_len.to_le_bytes());
frame.extend_from_slice(&payload);
frame.resize(46, 0x00);
let recv_len = u16::from_le_bytes([frame[1], frame[2]]) as usize;
let extracted = &frame[3..3 + recv_len];
assert_eq!(extracted, &[0xAA, 0xBB, 0xCC, 0xDD]);
}
#[test]
fn test_beacon_size() {
assert_eq!(discovery::BEACON_SIZE, 34);
}
}