use core::sync::atomic::Ordering;
use embassy_futures::select::{select, Either};
use embassy_time::Instant;
use embassy_time::Timer;
use heapless::Vec;
use crate::log_ln;
use crate::ControlPacket;
use crate::PeripheralPacket;
use crate::PERIPHERAL_MAGIC_NUMBER;
#[cfg(feature = "statistics")]
use crate::STATS;
use crate::STOP_SIGNAL;
use esp_radio::esp_now::{Error as EspNowInnerError, EspNow, EspNowError, PeerInfo, ReceivedData, BROADCAST_ADDRESS};
use crate::EspNowConfig;
const TX_BACKOFF_US: u64 = 200;
const RX_BURST_DRAIN_LIMIT: u8 = 1;
const TX_WAIT_SLICE_US: u64 = 200;
const ADAPT_UP_EVERY_SUCCESSES: u16 = 32;
const ADAPT_DOWN_PERCENT: u64 = 25;
const ADAPT_UP_PERCENT: u64 = 10;
const MIN_TX_HZ_FLOOR: u64 = 100;
const MAX_TX_HZ_CEILING: u64 = 2_000;
fn hz_to_interval_us(hz: u64) -> u64 {
(1_000_000u64 / hz.max(1)).max(1)
}
fn handle_peripheral_packet(
esp_now: &mut EspNow<'static>,
r: ReceivedData,
channel: u8,
latency_offset: &mut i64,
) {
#[cfg(feature = "statistics")]
let r_time = Instant::now().as_micros();
let Ok(packet) = postcard::from_bytes::<PeripheralPacket>(r.data()) else {
return;
};
if packet.magic_number != PERIPHERAL_MAGIC_NUMBER {
return;
}
if !esp_now.peer_exists(&r.info.src_address) {
let _ = esp_now.add_peer(PeerInfo {
interface: esp_radio::esp_now::EspNowWifiInterface::Sta,
peer_address: r.info.src_address,
lmk: None,
channel: Some(channel),
encrypt: false,
});
}
#[cfg(feature = "statistics")]
{
let rtt = r_time.saturating_sub(packet.central_send_uptime);
if rtt > 0 && rtt < 1_000_000 {
*latency_offset =
packet.recv_uptime as i64 - (packet.central_send_uptime + rtt / 2) as i64;
let total_elapsed = r_time.saturating_sub(packet.central_send_uptime);
let b_processing_delay = packet.send_uptime.saturating_sub(packet.recv_uptime);
let two_way_latency = (total_elapsed.saturating_sub(b_processing_delay)) as i64;
let one_way_latency =
(r_time as i64 - (packet.send_uptime as i64 - *latency_offset)) as i64;
STATS
.two_way_latency
.store(two_way_latency, Ordering::Relaxed);
STATS
.one_way_latency
.store(one_way_latency, Ordering::Relaxed);
}
}
}
pub async fn run_esp_now_central(
esp_now: &mut EspNow<'static>, _mac_addr: [u8; 6],
config: &EspNowConfig,
frequency_hz: Option<u16>,
is_collector: bool,
) {
let mut latency_offset: i64 = -1;
esp_now.set_channel(config.channel).unwrap();
log_ln!("esp-now version {}", esp_now.version().unwrap());
let freq = match frequency_hz {
Some(freq) => u64::from(freq.max(1)),
None => u16::MAX as u64,
};
let tx_hz_max = freq.clamp(1, MAX_TX_HZ_CEILING);
let tx_hz_min = (tx_hz_max / 8).max(MIN_TX_HZ_FLOOR).min(tx_hz_max);
let mut adaptive_tx_hz = tx_hz_max;
let mut tx_interval_us = hz_to_interval_us(adaptive_tx_hz);
let mut consecutive_tx_ok: u16 = 0;
let mut next_tx_us = Instant::now().as_micros().saturating_add(tx_interval_us);
loop {
let now_us = Instant::now().as_micros();
let tx_due = now_us >= next_tx_us;
if tx_due {
let control_packet = ControlPacket::new(is_collector, latency_offset);
let message_u8: Vec<u8, 16> = postcard::to_vec(&control_packet).unwrap();
let send_result = match esp_now.send(&BROADCAST_ADDRESS, &message_u8) {
Ok(waiter) => waiter.wait(),
Err(e) => Err(e),
};
match send_result {
Ok(()) => {
#[cfg(feature = "statistics")]
STATS.tx_count.fetch_add(1, Ordering::Relaxed);
consecutive_tx_ok = consecutive_tx_ok.saturating_add(1);
if consecutive_tx_ok >= ADAPT_UP_EVERY_SUCCESSES {
consecutive_tx_ok = 0;
let step_up = (adaptive_tx_hz * ADAPT_UP_PERCENT / 100).max(1);
adaptive_tx_hz = (adaptive_tx_hz + step_up).min(tx_hz_max);
tx_interval_us = hz_to_interval_us(adaptive_tx_hz);
}
}
Err(EspNowError::Error(EspNowInnerError::OutOfMemory) | EspNowError::SendFailed) => {
consecutive_tx_ok = 0;
let step_down = (adaptive_tx_hz * ADAPT_DOWN_PERCENT / 100).max(1);
adaptive_tx_hz = adaptive_tx_hz
.saturating_sub(step_down)
.max(tx_hz_min);
tx_interval_us = hz_to_interval_us(adaptive_tx_hz);
Timer::after_micros(TX_BACKOFF_US).await;
}
Err(e) => {
consecutive_tx_ok = 0;
log_ln!("Failed to send ESP-NOW packet: {:?}", e);
}
}
next_tx_us = next_tx_us.saturating_add(tx_interval_us);
let now_after_tx = Instant::now().as_micros();
if next_tx_us <= now_after_tx {
next_tx_us = now_after_tx.saturating_add(1);
}
}
let mut rx_packets: u8 = 0;
while rx_packets < RX_BURST_DRAIN_LIMIT {
let Some(r) = esp_now.receive() else {
break;
};
handle_peripheral_packet(esp_now, r, config.channel, &mut latency_offset);
rx_packets = rx_packets.saturating_add(1);
}
let now_us = Instant::now().as_micros();
let until_tx_us = next_tx_us.saturating_sub(now_us);
let wait_us = until_tx_us.min(TX_WAIT_SLICE_US).max(1);
match select(STOP_SIGNAL.wait(), Timer::after_micros(wait_us)).await {
Either::First(_) => {
STOP_SIGNAL.signal(());
break;
}
Either::Second(_) => {}
}
}
}