use core::sync::atomic::Ordering;
#[cfg(feature = "statistics")]
use crate::STATS;
use crate::log_ln;
use crate::set_runtime_collection_mode;
use crate::ControlPacket;
use crate::PeripheralPacket;
use crate::CENTRAL_MAGIC_NUMBER;
use crate::IS_COLLECTOR;
use crate::STOP_SIGNAL;
use embassy_futures::select::{select, Either};
use embassy_time::Instant;
use embassy_time::Timer;
use esp_radio::esp_now::{Error as EspNowInnerError, EspNow, EspNowError, PeerInfo, ReceivedData};
use heapless::Vec;
use portable_atomic::{AtomicBool, AtomicU64};
use crate::EspNowConfig;
const TX_BACKOFF_US: u64 = 200;
const RX_BURST_DRAIN_LIMIT: u8 = 1;
const TX_WAIT_SLICE_US: u64 = 200;
const ADAPT_UP_EVERY_SUCCESSES: u16 = 32;
const ADAPT_DOWN_PERCENT: u64 = 25;
const ADAPT_UP_PERCENT: u64 = 10;
const MIN_REPLY_HZ_FLOOR: u64 = 100;
const MAX_REPLY_HZ_CEILING: u64 = 2_000;
fn hz_to_interval_us(hz: u64) -> u64 {
(1_000_000u64 / hz.max(1)).max(1)
}
fn mac_to_u64(mac: &[u8; 6]) -> u64 {
(mac[0] as u64)
| ((mac[1] as u64) << 8)
| ((mac[2] as u64) << 16)
| ((mac[3] as u64) << 24)
| ((mac[4] as u64) << 32)
| ((mac[5] as u64) << 40)
}
fn u64_to_mac(v: u64) -> [u8; 6] {
[
(v & 0xFF) as u8,
((v >> 8) & 0xFF) as u8,
((v >> 16) & 0xFF) as u8,
((v >> 24) & 0xFF) as u8,
((v >> 32) & 0xFF) as u8,
((v >> 40) & 0xFF) as u8,
]
}
struct Shared {
is_connected: AtomicBool,
is_collector: AtomicBool,
central_mac: AtomicU64,
pending_recv_time: AtomicU64,
pending_csu: AtomicU64,
pending_flag: AtomicBool,
}
fn ingest_control_packet(
esp_now: &EspNow<'static>,
channel: u8,
r: ReceivedData,
shared: &Shared,
) {
let Ok(packet) = postcard::from_bytes::<ControlPacket>(r.data()) else {
return;
};
if packet.magic_number != CENTRAL_MAGIC_NUMBER {
return;
}
let recv_time = Instant::now().as_micros();
if !shared.is_connected.load(Ordering::Acquire) {
let add_res = esp_now.add_peer(PeerInfo {
interface: esp_radio::esp_now::EspNowWifiInterface::Sta,
peer_address: r.info.src_address,
lmk: None,
channel: Some(channel),
encrypt: false,
});
if add_res.is_err() {
return;
}
shared
.central_mac
.store(mac_to_u64(&r.info.src_address), Ordering::Relaxed);
shared.is_connected.store(true, Ordering::Release);
} else {
let expected = u64_to_mac(shared.central_mac.load(Ordering::Relaxed));
if expected != r.info.src_address {
log_ln!(
"Received control packet from unexpected peer {:02X?}, expected {:02X?}",
r.info.src_address,
expected
);
return;
}
if !esp_now.peer_exists(&expected) {
let _ = esp_now.add_peer(PeerInfo {
interface: esp_radio::esp_now::EspNowWifiInterface::Sta,
peer_address: expected,
lmk: None,
channel: Some(channel),
encrypt: false,
});
}
}
#[cfg(feature = "statistics")]
STATS.rx_count.fetch_add(1, Ordering::Relaxed);
let desired_collector = !packet.is_collector;
let prev = shared.is_collector.load(Ordering::Relaxed);
if desired_collector != prev {
set_runtime_collection_mode(desired_collector);
shared.is_collector.store(desired_collector, Ordering::Relaxed);
}
#[cfg(feature = "statistics")]
if packet.latency_offset != -1 {
let one_way_latency =
recv_time as i64 - (packet.central_send_uptime as i64 + packet.latency_offset);
STATS
.one_way_latency
.store(one_way_latency, Ordering::Relaxed);
}
shared
.pending_recv_time
.store(recv_time, Ordering::Relaxed);
shared
.pending_csu
.store(packet.central_send_uptime, Ordering::Relaxed);
shared.pending_flag.store(true, Ordering::Release);
}
pub async fn run_esp_now_peripheral(
esp_now: &mut EspNow<'static>,
config: &EspNowConfig,
freq_hz: Option<u16>,
) {
esp_now.set_channel(config.channel).unwrap();
log_ln!("esp-now version {}", esp_now.version().unwrap());
let freq = match freq_hz {
Some(freq) => freq as u64,
None => u16::MAX as u64,
};
responder(esp_now, config.channel, freq).await;
}
async fn responder(esp_now: &mut EspNow<'static>, channel: u8, frequency_hz: u64) {
let shared = Shared {
is_connected: AtomicBool::new(false),
is_collector: AtomicBool::new(IS_COLLECTOR.load(Ordering::Relaxed)),
central_mac: AtomicU64::new(0),
pending_recv_time: AtomicU64::new(0),
pending_csu: AtomicU64::new(0),
pending_flag: AtomicBool::new(false),
};
let reply_hz_max = frequency_hz.clamp(1, MAX_REPLY_HZ_CEILING);
let reply_hz_min = (reply_hz_max / 8).max(MIN_REPLY_HZ_FLOOR).min(reply_hz_max);
let mut adaptive_reply_hz = reply_hz_max;
let mut tx_interval_us = hz_to_interval_us(adaptive_reply_hz);
let mut consecutive_tx_ok: u16 = 0;
let mut next_tx_us = Instant::now().as_micros().saturating_add(tx_interval_us);
loop {
let now_us = Instant::now().as_micros();
let tx_due = now_us >= next_tx_us;
if tx_due
&& shared.is_connected.load(Ordering::Acquire)
&& shared.pending_flag.swap(false, Ordering::Acquire)
{
let recv_time = shared.pending_recv_time.load(Ordering::Relaxed);
let csu = shared.pending_csu.load(Ordering::Relaxed);
let central_mac = u64_to_mac(shared.central_mac.load(Ordering::Relaxed));
let peripheral_packet = PeripheralPacket::new(recv_time, csu);
let message_u8: Vec<u8, 32> = postcard::to_vec(&peripheral_packet).unwrap();
let send_result = match esp_now.send(¢ral_mac, &message_u8) {
Ok(waiter) => waiter.wait(),
Err(e) => Err(e),
};
match send_result {
Ok(()) => {
#[cfg(feature = "statistics")]
STATS.tx_count.fetch_add(1, Ordering::Relaxed);
consecutive_tx_ok = consecutive_tx_ok.saturating_add(1);
if consecutive_tx_ok >= ADAPT_UP_EVERY_SUCCESSES {
consecutive_tx_ok = 0;
let step_up = (adaptive_reply_hz * ADAPT_UP_PERCENT / 100).max(1);
adaptive_reply_hz = (adaptive_reply_hz + step_up).min(reply_hz_max);
tx_interval_us = hz_to_interval_us(adaptive_reply_hz);
}
}
Err(EspNowError::Error(EspNowInnerError::OutOfMemory) | EspNowError::SendFailed) => {
consecutive_tx_ok = 0;
let step_down = (adaptive_reply_hz * ADAPT_DOWN_PERCENT / 100).max(1);
adaptive_reply_hz = adaptive_reply_hz
.saturating_sub(step_down)
.max(reply_hz_min);
tx_interval_us = hz_to_interval_us(adaptive_reply_hz);
Timer::after_micros(TX_BACKOFF_US).await;
log_ln!("TX buffer full, backing off for {} us", TX_BACKOFF_US);
}
Err(e) => {
consecutive_tx_ok = 0;
log_ln!("Failed to send ESP-NOW packet: {:?}", e);
}
}
next_tx_us = next_tx_us.saturating_add(tx_interval_us);
let now_after_tx = Instant::now().as_micros();
if next_tx_us <= now_after_tx {
next_tx_us = now_after_tx.saturating_add(1);
}
}
let mut rx_packets: u8 = 0;
while rx_packets < RX_BURST_DRAIN_LIMIT {
let Some(r) = esp_now.receive() else {
break;
};
ingest_control_packet(esp_now, channel, r, &shared);
rx_packets = rx_packets.saturating_add(1);
}
let now_us = Instant::now().as_micros();
let until_tx_us = next_tx_us.saturating_sub(now_us);
let wait_us = until_tx_us.min(TX_WAIT_SLICE_US).max(1);
match select(STOP_SIGNAL.wait(), Timer::after_micros(wait_us)).await {
Either::First(_) => {
log_ln!("STOP signal received, shutting down responder...");
STOP_SIGNAL.signal(());
break;
}
Either::Second(_) => {}
}
}
log_ln!("Node Stopped. Halting CSI Sending.");
}