use core::sync::atomic::Ordering;
use crate::log_ln;
use crate::parse_with_magic;
use crate::serialize_with_magic;
use crate::set_runtime_collection_mode;
use crate::ControlPacket;
use crate::PeripheralPacket;
use crate::CENTRAL_MAGIC_NUMBER;
use crate::PERIPHERAL_BEACON_SENTINEL;
use crate::PERIPHERAL_MAGIC_NUMBER;
use crate::IS_COLLECTOR;
#[cfg(feature = "statistics")]
use crate::STATS;
use crate::STOP_SIGNAL;
use crate::espnow_phy::{apply_peer_espnow_phy, with_espnow_recv_suspended};
use embassy_futures::select::{select, Either};
use embassy_futures::yield_now;
use embassy_time::Instant;
use embassy_time::Timer;
use esp_radio::esp_now::{Error as EspNowInnerError, EspNow, EspNowError, PeerInfo, BROADCAST_ADDRESS};
use crate::esp_now_pool::PoolFrame;
use portable_atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicU8};
#[cfg(feature = "statistics")]
use portable_atomic::AtomicU32;
use crate::{EspNowConfig, IOTaskConfig};
const TX_BACKOFF_US: u64 = 200;
const TX_FAST_BACKOFF_US: u64 = 50;
const TX_WAIT_SLICE_US: u64 = 100;
const ADAPT_UP_EVERY_SUCCESSES: u16 = 32;
const ADAPT_DOWN_PERCENT: u64 = 25;
const ADAPT_UP_PERCENT: u64 = 10;
const MIN_REPLY_HZ_FLOOR: u64 = 100;
const MAX_REPLY_HZ_CEILING: u64 = 8_000;
const PERIPHERAL_PACKET_BUF_LEN: usize = 8;
const TX_CATCH_UP_BURST: u8 = 3;
const TX_CATCH_UP_BURST_NO_WAIT: u8 = 16;
const RX_BURST_MAX_WITH_TX: u16 = 16;
const RX_RESERVED_TX_GUARD_US: u64 = 15;
const PEER_HEALTHCHECK_PERIOD: u16 = 256;
const RX_BURST_MAX_RX_ONLY: u16 = 64;
const MODE_SWITCH_HYSTERESIS: u8 = 3;
#[cfg(feature = "statistics")]
static RX_PARSE_FAIL_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_MAGIC_DROP_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_SOURCE_FILTER_DROP_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_PEER_ADD_FAIL_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_SEQUENCE_MISS_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_CONTROL_PACKET_COUNT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "statistics")]
static RX_TX_GUARD_BREAK_COUNT: AtomicU64 = AtomicU64::new(0);
static RAW_LISTEN: AtomicBool = AtomicBool::new(false);
pub fn set_raw_listen(enabled: bool) {
RAW_LISTEN.store(enabled, Ordering::Relaxed);
crate::esp_now_pool::set_raw_drop(enabled);
}
#[cfg(feature = "statistics")]
fn reset_rx_diagnostics() {
RX_PARSE_FAIL_COUNT.store(0, Ordering::Relaxed);
RX_MAGIC_DROP_COUNT.store(0, Ordering::Relaxed);
RX_SOURCE_FILTER_DROP_COUNT.store(0, Ordering::Relaxed);
RX_PEER_ADD_FAIL_COUNT.store(0, Ordering::Relaxed);
RX_SEQUENCE_MISS_COUNT.store(0, Ordering::Relaxed);
RX_CONTROL_PACKET_COUNT.store(0, Ordering::Relaxed);
RX_TX_GUARD_BREAK_COUNT.store(0, Ordering::Relaxed);
}
#[cfg(feature = "statistics")]
pub fn get_rx_parse_fail_packets() -> u64 {
RX_PARSE_FAIL_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_magic_drop_packets() -> u64 {
RX_MAGIC_DROP_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_source_filter_drop_packets() -> u64 {
RX_SOURCE_FILTER_DROP_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_peer_add_fail_packets() -> u64 {
RX_PEER_ADD_FAIL_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_sequence_miss_packets() -> u64 {
RX_SEQUENCE_MISS_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_control_packets() -> u64 {
RX_CONTROL_PACKET_COUNT.load(Ordering::Relaxed)
}
#[cfg(feature = "statistics")]
pub fn get_rx_tx_guard_breaks() -> u64 {
RX_TX_GUARD_BREAK_COUNT.load(Ordering::Relaxed)
}
fn hz_to_interval_us(hz: u64) -> u64 {
(1_000_000u64 / hz.max(1)).max(1)
}
fn mac_to_u64(mac: &[u8; 6]) -> u64 {
(mac[0] as u64)
| ((mac[1] as u64) << 8)
| ((mac[2] as u64) << 16)
| ((mac[3] as u64) << 24)
| ((mac[4] as u64) << 32)
| ((mac[5] as u64) << 40)
}
fn u64_to_mac(v: u64) -> [u8; 6] {
[
(v & 0xFF) as u8,
((v >> 8) & 0xFF) as u8,
((v >> 16) & 0xFF) as u8,
((v >> 24) & 0xFF) as u8,
((v >> 32) & 0xFF) as u8,
((v >> 40) & 0xFF) as u8,
]
}
fn unicast_replies(config: &EspNowConfig) -> bool {
if config.secondary_channel().is_some() {
return true;
}
#[cfg(feature = "esp32c5")]
{
config.force_phy()
}
#[cfg(not(feature = "esp32c5"))]
{
false
}
}
fn apply_central_peer_phy(config: &EspNowConfig, central_mac: &[u8; 6]) {
if !config.force_phy() {
return;
}
if unicast_replies(config) {
apply_peer_espnow_phy(central_mac, *config.phy_rate(), config.secondary_channel());
}
}
fn reply_destination(shared: &Shared, config: &EspNowConfig) -> [u8; 6] {
if unicast_replies(config) {
u64_to_mac(shared.central_mac.load(Ordering::Relaxed))
} else {
BROADCAST_ADDRESS
}
}
fn register_central_peer(
esp_now: &EspNow<'static>,
channel: u8,
config: &EspNowConfig,
central_mac: [u8; 6],
) -> bool {
let add_res = if esp_now.peer_exists(¢ral_mac) {
Ok(())
} else {
esp_now.add_peer(PeerInfo {
interface: esp_radio::esp_now::EspNowWifiInterface::Station,
peer_address: central_mac,
lmk: None,
channel: Some(channel),
encrypt: false,
})
};
match add_res {
Ok(()) => {
apply_central_peer_phy(config, ¢ral_mac);
true
}
Err(_) => {
#[cfg(feature = "statistics")]
RX_PEER_ADD_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
false
}
}
}
struct Shared {
is_connected: AtomicBool,
is_collector: AtomicBool,
central_mac: AtomicU64,
peer_healthcheck_counter: AtomicU16,
#[cfg(feature = "statistics")]
last_control_sequence: AtomicU32,
#[cfg(feature = "statistics")]
sequence_initialized: AtomicBool,
pending_flag: AtomicBool,
last_central_is_listener: AtomicBool,
mode_streak: AtomicU8,
}
fn ingest_control_packet(
esp_now: &EspNow<'static>,
channel: u8,
config: &EspNowConfig,
r: PoolFrame,
shared: &Shared,
tx_enabled: bool,
peer_mac: Option<[u8; 6]>,
) {
if RAW_LISTEN.load(Ordering::Relaxed) {
return;
}
let is_connected = shared.is_connected.load(Ordering::Acquire);
if is_connected {
let expected = u64_to_mac(shared.central_mac.load(Ordering::Relaxed));
if expected != r.info.src_address {
#[cfg(feature = "statistics")]
RX_SOURCE_FILTER_DROP_COUNT.fetch_add(1, Ordering::Relaxed);
return;
}
}
let expect_magic = peer_mac.is_none();
let Some(packet) =
parse_with_magic::<ControlPacket>(r.data(), CENTRAL_MAGIC_NUMBER, expect_magic)
else {
#[cfg(feature = "statistics")]
if expect_magic {
RX_MAGIC_DROP_COUNT.fetch_add(1, Ordering::Relaxed);
} else {
RX_PARSE_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
}
return;
};
#[cfg(feature = "statistics")]
{
RX_CONTROL_PACKET_COUNT.fetch_add(1, Ordering::Relaxed);
if shared.sequence_initialized.load(Ordering::Acquire) {
let last_seq = shared.last_control_sequence.load(Ordering::Relaxed);
let gap = packet.sequence_number as i64 - last_seq as i64;
if gap > 1 {
RX_SEQUENCE_MISS_COUNT.fetch_add((gap - 1) as u64, Ordering::Relaxed);
}
} else {
shared.sequence_initialized.store(true, Ordering::Release);
}
shared
.last_control_sequence
.store(packet.sequence_number, Ordering::Relaxed);
}
if tx_enabled {
if !is_connected {
if register_central_peer(esp_now, channel, config, r.info.src_address) {
shared
.central_mac
.store(mac_to_u64(&r.info.src_address), Ordering::Relaxed);
shared.peer_healthcheck_counter.store(0, Ordering::Relaxed);
shared.is_connected.store(true, Ordering::Release);
if unicast_replies(config) {
log_ln!(
"ESP-NOW peripheral: locked central {:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}, unicast forced-PHY replies enabled (rate {:?})",
r.info.src_address[0],
r.info.src_address[1],
r.info.src_address[2],
r.info.src_address[3],
r.info.src_address[4],
r.info.src_address[5],
config.phy_rate()
);
} else {
log_ln!(
"ESP-NOW peripheral: locked central {:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}, broadcast HT20 replies",
r.info.src_address[0],
r.info.src_address[1],
r.info.src_address[2],
r.info.src_address[3],
r.info.src_address[4],
r.info.src_address[5]
);
}
}
} else {
let expected = u64_to_mac(shared.central_mac.load(Ordering::Relaxed));
let check_counter = shared
.peer_healthcheck_counter
.fetch_add(1, Ordering::Relaxed)
.wrapping_add(1);
if (check_counter & (PEER_HEALTHCHECK_PERIOD - 1)) == 0 {
let _ = register_central_peer(esp_now, channel, config, expected);
}
}
shared.pending_flag.store(true, Ordering::Release);
}
let central_is_listener = !packet.is_collector;
let prev_seen = shared.last_central_is_listener.load(Ordering::Relaxed);
if central_is_listener != prev_seen {
shared
.last_central_is_listener
.store(central_is_listener, Ordering::Relaxed);
shared.mode_streak.store(1, Ordering::Relaxed);
} else {
let streak = shared
.mode_streak
.load(Ordering::Relaxed)
.saturating_add(1);
shared.mode_streak.store(streak, Ordering::Relaxed);
if streak == MODE_SWITCH_HYSTERESIS && central_is_listener {
if !shared.is_collector.load(Ordering::Relaxed) {
set_runtime_collection_mode(true);
shared.is_collector.store(true, Ordering::Relaxed);
}
}
}
}
pub async fn run_esp_now_peripheral(
esp_now: &mut EspNow<'static>,
config: &EspNowConfig,
freq_hz: Option<u16>,
io_tasks: IOTaskConfig,
) {
if config.secondary_channel().is_none() {
with_espnow_recv_suspended(|| {
esp_now.set_channel(config.channel).unwrap();
});
}
#[cfg(not(feature = "esp32c5"))]
if config.force_phy() && config.secondary_channel().is_none() {
crate::set_peer_espnow_phy(
&BROADCAST_ADDRESS,
*config.phy_rate(),
config.secondary_channel(),
);
}
log_ln!("esp-now version {}", esp_now.version().unwrap());
let freq = match freq_hz {
Some(freq) => freq as u64,
None => u16::MAX as u64,
};
#[cfg(feature = "statistics")]
reset_rx_diagnostics();
responder(esp_now, config, freq, io_tasks).await;
}
async fn responder(
esp_now: &mut EspNow<'static>,
config: &EspNowConfig,
frequency_hz: u64,
io_tasks: IOTaskConfig,
) {
let channel = config.channel;
let peer_mac = config.peer_mac();
let send_magic = peer_mac.is_none();
let shared = Shared {
is_connected: AtomicBool::new(peer_mac.is_some()),
is_collector: AtomicBool::new(IS_COLLECTOR.load(Ordering::Relaxed)),
central_mac: AtomicU64::new(peer_mac.map(|m| mac_to_u64(&m)).unwrap_or(0)),
peer_healthcheck_counter: AtomicU16::new(0),
#[cfg(feature = "statistics")]
last_control_sequence: AtomicU32::new(0),
#[cfg(feature = "statistics")]
sequence_initialized: AtomicBool::new(false),
pending_flag: AtomicBool::new(false),
last_central_is_listener: AtomicBool::new(false),
mode_streak: AtomicU8::new(0),
};
if let Some(mac) = peer_mac
&& io_tasks.tx_enabled
&& !register_central_peer(esp_now, channel, config, mac)
{
}
let reply_hz_max = frequency_hz.clamp(1, MAX_REPLY_HZ_CEILING);
let reply_hz_min = (reply_hz_max / 8).max(MIN_REPLY_HZ_FLOOR).min(reply_hz_max);
let mut adaptive_reply_hz = reply_hz_max;
let mut tx_interval_us = if io_tasks.rx_enabled {
hz_to_interval_us(adaptive_reply_hz)
} else {
hz_to_interval_us(frequency_hz)
};
let adaptive_pacing_enabled = io_tasks.rx_enabled && io_tasks.tx_enabled;
let tx_fast_no_wait = io_tasks.tx_enabled && !io_tasks.rx_enabled;
let mut consecutive_tx_ok: u16 = 0;
let mut next_tx_us = Instant::now().as_micros().saturating_add(tx_interval_us);
let mut tx_buf = [0u8; PERIPHERAL_PACKET_BUF_LEN];
loop {
let mut now_us = Instant::now().as_micros();
if io_tasks.tx_enabled {
let mut burst_budget = if tx_fast_no_wait {
TX_CATCH_UP_BURST_NO_WAIT
} else {
TX_CATCH_UP_BURST
};
while now_us >= next_tx_us
&& burst_budget > 0
&& shared.is_connected.load(Ordering::Acquire)
&& shared.pending_flag.swap(false, Ordering::Acquire)
{
burst_budget = burst_budget.saturating_sub(1);
let dest_mac = reply_destination(&shared, config);
let message: &[u8] = if send_magic {
match serialize_with_magic(
&PeripheralPacket::new(),
PERIPHERAL_MAGIC_NUMBER,
true,
&mut tx_buf,
) {
Ok(slice) => slice,
Err(_) => {
log_ln!("Failed to serialize ESP-NOW peripheral packet");
break;
}
}
} else {
tx_buf[0] = PERIPHERAL_BEACON_SENTINEL;
&tx_buf[..1]
};
let mut send_succeeded = false;
if tx_fast_no_wait {
match esp_now.send(&dest_mac, message) {
Ok(waiter) => {
core::mem::forget(waiter);
send_succeeded = true;
#[cfg(feature = "statistics")]
STATS.tx_count.fetch_add(1, Ordering::Relaxed);
}
Err(
EspNowError::Error(EspNowInnerError::OutOfMemory)
| EspNowError::SendFailed,
) => {
Timer::after_micros(TX_FAST_BACKOFF_US).await;
}
Err(e) => {
log_ln!("Failed to queue ESP-NOW packet: {:?}", e);
}
}
} else {
let send_result = match esp_now.send(&dest_mac, message) {
Ok(waiter) => waiter.wait(),
Err(e) => Err(e),
};
match send_result {
Ok(()) => {
send_succeeded = true;
#[cfg(feature = "statistics")]
STATS.tx_count.fetch_add(1, Ordering::Relaxed);
if adaptive_pacing_enabled {
consecutive_tx_ok = consecutive_tx_ok.saturating_add(1);
if consecutive_tx_ok >= ADAPT_UP_EVERY_SUCCESSES {
consecutive_tx_ok = 0;
let step_up = (adaptive_reply_hz * ADAPT_UP_PERCENT / 100).max(1);
adaptive_reply_hz = (adaptive_reply_hz + step_up).min(reply_hz_max);
tx_interval_us = hz_to_interval_us(adaptive_reply_hz);
}
}
}
Err(
EspNowError::Error(EspNowInnerError::OutOfMemory)
| EspNowError::SendFailed,
) => {
consecutive_tx_ok = 0;
if adaptive_pacing_enabled {
let step_down = (adaptive_reply_hz * ADAPT_DOWN_PERCENT / 100).max(1);
adaptive_reply_hz =
adaptive_reply_hz.saturating_sub(step_down).max(reply_hz_min);
tx_interval_us = hz_to_interval_us(adaptive_reply_hz);
Timer::after_micros(TX_BACKOFF_US).await;
}
}
Err(e) => {
consecutive_tx_ok = 0;
log_ln!("Failed to send ESP-NOW packet: {:?}", e);
}
}
}
next_tx_us = next_tx_us.saturating_add(tx_interval_us);
now_us = Instant::now().as_micros();
if !send_succeeded {
break;
}
}
}
if io_tasks.rx_enabled {
let tx_reply_pending = io_tasks.tx_enabled
&& shared.is_connected.load(Ordering::Acquire)
&& shared.pending_flag.load(Ordering::Acquire);
let rx_deadline_us = if tx_reply_pending {
next_tx_us.saturating_sub(RX_RESERVED_TX_GUARD_US)
} else {
u64::MAX
};
let rx_burst_drain_limit = if tx_reply_pending {
RX_BURST_MAX_WITH_TX
} else {
RX_BURST_MAX_RX_ONLY
};
let mut rx_packets: u16 = 0;
while rx_packets < rx_burst_drain_limit {
if tx_reply_pending && Instant::now().as_micros() >= rx_deadline_us {
#[cfg(feature = "statistics")]
RX_TX_GUARD_BREAK_COUNT.fetch_add(1, Ordering::Relaxed);
break;
}
let Some(r) = crate::esp_now_pool::receive() else {
break;
};
ingest_control_packet(
esp_now,
channel,
config,
r,
&shared,
io_tasks.tx_enabled,
peer_mac,
);
rx_packets = rx_packets.saturating_add(1);
}
if rx_packets > 0
&& (!tx_reply_pending || Instant::now().as_micros() < next_tx_us)
{
yield_now().await;
continue;
}
}
if !io_tasks.tx_enabled && io_tasks.rx_enabled {
match select(STOP_SIGNAL.wait(), crate::esp_now_pool::receive_async()).await {
Either::First(_) => {
log_ln!("STOP signal received, shutting down responder...");
STOP_SIGNAL.signal(());
break;
}
Either::Second(r) => {
ingest_control_packet(esp_now, channel, config, r, &shared, false, peer_mac);
let mut drained: u16 = 0;
while drained < RX_BURST_MAX_RX_ONLY {
let Some(r) = crate::esp_now_pool::receive() else { break; };
ingest_control_packet(esp_now, channel, config, r, &shared, false, peer_mac);
drained = drained.saturating_add(1);
}
}
}
} else {
let tx_reply_pending = io_tasks.tx_enabled
&& shared.is_connected.load(Ordering::Acquire)
&& shared.pending_flag.load(Ordering::Acquire);
let wait_us = if tx_reply_pending {
let until_tx_us = next_tx_us.saturating_sub(Instant::now().as_micros());
let slice_div = if io_tasks.rx_enabled { 8 } else { 4 };
let slice_us = (tx_interval_us / slice_div).clamp(1, TX_WAIT_SLICE_US);
until_tx_us.min(slice_us).max(1)
} else {
1
};
match select(STOP_SIGNAL.wait(), Timer::after_micros(wait_us)).await {
Either::First(_) => {
log_ln!("STOP signal received, shutting down responder...");
STOP_SIGNAL.signal(());
break;
}
Either::Second(_) => {}
}
}
}
log_ln!("Node Stopped. Halting CSI Sending.");
}