use embassy_futures::select::{select3, Either3};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::signal::Signal;
use embassy_sync::waitqueue::AtomicWaker;
use embassy_time::Timer;
#[cfg(feature = "statistics")]
use embassy_time::Instant;
use heapless::Vec;
use portable_atomic::{AtomicBool, Ordering};
use esp_radio::wifi::csi::CsiConfig;
use esp_radio::wifi::WifiController;
use crate::config::CsiConfig as CsiConfiguration;
use super::{CSIDataPacket, RxCSIFmt};
use crate::STOP_SIGNAL;
#[cfg(feature = "statistics")]
use crate::stats::STATS;
#[cfg(all(feature = "statistics", not(feature = "esp32c5")))]
use crate::stats::{seq_drop_detection_enabled, MAX_TRACKED_PEERS, RESET_SEQ_TRACKER};
#[cfg(all(feature = "statistics", not(feature = "esp32c5")))]
use heapless::LinearMap;
static CSI_QUEUE: heapless::mpmc::Q32<CSIDataPacket> = heapless::mpmc::Q32::new();
static CSI_WAKER: AtomicWaker = AtomicWaker::new();
pub(crate) static IS_COLLECTOR: AtomicBool = AtomicBool::new(false);
static CSI_PUBLISH_ENABLED: AtomicBool = AtomicBool::new(false);
pub(crate) static COLLECTION_MODE_CHANGED: Signal<CriticalSectionRawMutex, ()> = Signal::new();
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum CsiDeliveryMode {
Off = 0,
Callback = 1,
Async = 2,
}
static CSI_DELIVERY_MODE: portable_atomic::AtomicU8 = portable_atomic::AtomicU8::new(0);
static CSI_CALLBACK: core::sync::atomic::AtomicPtr<()> =
core::sync::atomic::AtomicPtr::new(core::ptr::null_mut());
static CSI_RAW_CALLBACK: core::sync::atomic::AtomicPtr<()> =
core::sync::atomic::AtomicPtr::new(core::ptr::null_mut());
static CSI_INLINE_LOG_ENABLED: AtomicBool = AtomicBool::new(false);
pub fn set_csi_logging_enabled(enabled: bool) {
CSI_INLINE_LOG_ENABLED.store(enabled, Ordering::Release);
CSI_PUBLISH_ENABLED.store(enabled, Ordering::Release);
}
pub fn csi_logging_enabled() -> bool {
CSI_INLINE_LOG_ENABLED.load(Ordering::Relaxed)
}
pub fn set_csi_delivery_mode(mode: CsiDeliveryMode) {
CSI_DELIVERY_MODE.store(mode as u8, Ordering::Release);
}
pub fn csi_delivery_mode() -> CsiDeliveryMode {
match CSI_DELIVERY_MODE.load(Ordering::Relaxed) {
1 => CsiDeliveryMode::Callback,
2 => CsiDeliveryMode::Async,
_ => CsiDeliveryMode::Off,
}
}
pub fn set_csi_callback(cb: fn(&CSIDataPacket)) {
CSI_CALLBACK.store(cb as *mut (), core::sync::atomic::Ordering::Release);
CSI_DELIVERY_MODE.store(CsiDeliveryMode::Callback as u8, Ordering::Release);
CSI_PUBLISH_ENABLED.store(true, Ordering::Release);
}
pub fn clear_csi_callback() {
CSI_DELIVERY_MODE.store(CsiDeliveryMode::Off as u8, Ordering::Release);
CSI_CALLBACK.store(core::ptr::null_mut(), core::sync::atomic::Ordering::Release);
}
pub fn set_csi_raw_callback(cb: fn()) {
CSI_RAW_CALLBACK.store(cb as *mut (), core::sync::atomic::Ordering::Release);
CSI_PUBLISH_ENABLED.store(true, Ordering::Release);
}
pub(crate) fn set_runtime_collection_mode(is_collector: bool) {
IS_COLLECTOR.store(is_collector, Ordering::Relaxed);
COLLECTION_MODE_CHANGED.signal(());
}
pub(crate) fn reset() {
CSI_INLINE_LOG_ENABLED.store(false, Ordering::Release);
CSI_PUBLISH_ENABLED.store(false, Ordering::Release);
CSI_DELIVERY_MODE.store(CsiDeliveryMode::Off as u8, Ordering::Release);
CSI_CALLBACK.store(core::ptr::null_mut(), core::sync::atomic::Ordering::Release);
}
pub struct CSINodeClient {
_private: (),
}
impl CSINodeClient {
pub fn new() -> Self {
Self { _private: () }
}
pub async fn next_csi_packet(&mut self) -> CSIDataPacket {
if CSI_DELIVERY_MODE.load(Ordering::Relaxed) == CsiDeliveryMode::Off as u8 {
CSI_DELIVERY_MODE.store(CsiDeliveryMode::Async as u8, Ordering::Release);
CSI_PUBLISH_ENABLED.store(true, Ordering::Release);
}
core::future::poll_fn(|cx| {
if let Some(p) = CSI_QUEUE.dequeue() {
return core::task::Poll::Ready(p);
}
CSI_WAKER.register(cx.waker());
if let Some(p) = CSI_QUEUE.dequeue() {
core::task::Poll::Ready(p)
} else {
core::task::Poll::Pending
}
})
.await
}
pub async fn get_csi_data(&mut self) -> CSIDataPacket {
self.next_csi_packet().await
}
pub async fn print_csi_w_metadata(&mut self) {
let packet = self.next_csi_packet().await;
crate::logging::logging::log_csi(packet);
embassy_futures::yield_now().await;
}
pub async fn send_stop(&self) {
STOP_SIGNAL.signal(());
}
}
impl Default for CSINodeClient {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "esp32c5")]
pub(crate) fn build_csi_config(csi_config: &CsiConfiguration) -> CsiConfig {
CsiConfig {
enable: csi_config.enable,
acquire_csi_legacy: csi_config.acquire_csi_legacy,
acquire_csi_force_lltf: csi_config.acquire_csi_force_lltf,
acquire_csi_ht20: csi_config.acquire_csi_ht20,
acquire_csi_ht40: csi_config.acquire_csi_ht40,
acquire_csi_vht: csi_config.acquire_csi_vht,
acquire_csi_su: csi_config.acquire_csi_su,
acquire_csi_mu: csi_config.acquire_csi_mu,
acquire_csi_dcm: csi_config.acquire_csi_dcm,
acquire_csi_beamformed: csi_config.acquire_csi_beamformed,
acquire_csi_he_stbc: csi_config.acquire_csi_he_stbc,
val_scale_cfg: csi_config.val_scale_cfg,
dump_ack_en: csi_config.dump_ack_en,
reserved: csi_config.reserved,
}
}
#[cfg(feature = "esp32c6")]
pub(crate) fn build_csi_config(csi_config: &CsiConfiguration) -> CsiConfig {
CsiConfig {
enable: csi_config.enable,
acquire_csi_legacy: csi_config.acquire_csi_legacy,
acquire_csi_ht20: csi_config.acquire_csi_ht20,
acquire_csi_ht40: csi_config.acquire_csi_ht40,
acquire_csi_su: csi_config.acquire_csi_su,
acquire_csi_mu: csi_config.acquire_csi_mu,
acquire_csi_dcm: csi_config.acquire_csi_dcm,
acquire_csi_beamformed: csi_config.acquire_csi_beamformed,
acquire_csi_he_stbc: csi_config.acquire_csi_he_stbc,
val_scale_cfg: csi_config.val_scale_cfg,
dump_ack_en: csi_config.dump_ack_en,
reserved: csi_config.reserved,
}
}
#[cfg(not(any(feature = "esp32c5", feature = "esp32c6")))]
pub(crate) fn build_csi_config(csi_config: &CsiConfiguration) -> CsiConfig {
CsiConfig {
lltf_en: csi_config.lltf_en,
htltf_en: csi_config.htltf_en,
stbc_htltf2_en: csi_config.stbc_htltf2_en,
ltf_merge_en: csi_config.ltf_merge_en,
channel_filter_en: csi_config.channel_filter_en,
manu_scale: csi_config.manu_scale,
shift: csi_config.shift,
dump_ack_en: csi_config.dump_ack_en,
}
}
pub(crate) fn set_csi(controller: &mut WifiController, config: CsiConfig) {
controller
.set_csi(config, |info: esp_radio::wifi::csi::WifiCsiInfo<'_>| {
capture_csi_info(info);
})
.unwrap();
}
fn capture_csi_info(info: esp_radio::wifi::csi::WifiCsiInfo<'_>) {
#[cfg(feature = "statistics")]
STATS.rx_count.fetch_add(1, Ordering::Relaxed);
let raw_cb_ptr = CSI_RAW_CALLBACK.load(Ordering::Relaxed);
if !raw_cb_ptr.is_null() {
let raw_cb: fn() = unsafe { core::mem::transmute::<*mut (), fn()>(raw_cb_ptr) };
raw_cb();
return;
}
if !CSI_PUBLISH_ENABLED.load(Ordering::Relaxed) {
return;
}
let rssi = info.rssi();
let mut csi_data = Vec::<i8, 612>::new();
let csi_slice = info.buf();
let csi_buf_len = csi_slice.len() as u16;
match csi_data.extend_from_slice(csi_slice) {
Ok(_) => {}
Err(_) => {
#[cfg(feature = "statistics")]
STATS.rx_drop_count.fetch_add(1, Ordering::Relaxed);
return;
}
}
let mac_arr = *info.mac();
let timestamp_us = info.timestamp().duration_since_epoch().as_micros() as u32;
#[cfg(not(any(feature = "esp32c5", feature = "esp32c6")))]
let csi_packet = CSIDataPacket {
sequence_number: info.rx_sequence(),
data_format: RxCSIFmt::Undefined,
date_time: None,
mac: mac_arr,
rssi: rssi as i32,
bandwidth: info.cwb() as u32,
antenna: info.antenna() as u32,
rate: info.rate() as u32,
sig_mode: info.packet_mode() as u32,
mcs: info.modulation_coding_scheme() as u32,
smoothing: info.smoothing() as u32,
not_sounding: info.not_sounding() as u32,
aggregation: info.aggregation() as u32,
stbc: info.space_time_block_code() as u32,
fec_coding: info.forward_error_correction_coding() as u32,
sgi: info.short_guide_interval() as u32,
noise_floor: info.noise_floor() as i32,
ampdu_cnt: info.ampdu_count() as u32,
channel: info.channel() as u32,
secondary_channel: info.secondary_channel() as u32,
timestamp: timestamp_us,
rx_state: info.rx_state() as u32,
sig_len: info.signal_length() as u32,
csi_data_len: csi_buf_len,
csi_data,
};
#[cfg(any(feature = "esp32c5", feature = "esp32c6"))]
let csi_packet = CSIDataPacket {
mac: mac_arr,
rssi: rssi as i32,
timestamp: timestamp_us,
rate: info.rate() as u32,
noise_floor: info.noise_floor() as i32,
sig_len: info.signal_length() as u32,
rx_state: info.rx_state() as u32,
dump_len: info.dump_length(),
#[cfg(feature = "esp32c6")]
he_sigb_len: info.he_sigb_length() as u32,
#[cfg(feature = "esp32c6")]
cur_single_mpdu: info.cur_single_mpdu() as u32,
cur_bb_format: info.cur_bb_format() as u32,
rx_channel_estimate_info_vld: info.rx_channel_estimate_info_valid() as u32,
rx_channel_estimate_len: info.rx_channel_estimate_length(),
second: info.secondary_channel() as u32,
channel: info.channel() as u32,
is_group: info.is_group() as u32,
rxend_state: info.rx_end_state() as u32,
rxmatch3: info.rx_match3() as u32,
rxmatch2: info.rx_match2() as u32,
rxmatch1: info.rx_match1() as u32,
#[cfg(feature = "esp32c6")]
rxmatch0: info.rx_match0() as u32,
date_time: None,
sequence_number: info.rx_sequence(),
data_format: RxCSIFmt::Undefined,
csi_data_len: csi_buf_len,
csi_data,
};
#[cfg(all(feature = "statistics", not(feature = "esp32c5")))]
#[allow(static_mut_refs)] {
if seq_drop_detection_enabled() {
static mut PEER_SEQ_TRACKER: LinearMap<[u8; 6], u16, MAX_TRACKED_PEERS> =
LinearMap::new();
unsafe {
if RESET_SEQ_TRACKER.swap(false, Ordering::Relaxed) {
PEER_SEQ_TRACKER.clear();
}
let current_seq = csi_packet.sequence_number;
if let Some(&last_seq) = PEER_SEQ_TRACKER.get(&csi_packet.mac) {
let diff = (current_seq.wrapping_sub(last_seq)) & 0x0FFF;
if diff > 1 {
let lost = (diff - 1) as u32;
if lost < 500 {
STATS.rx_drop_count.fetch_add(lost, Ordering::Relaxed);
}
}
}
if PEER_SEQ_TRACKER.insert(csi_packet.mac, current_seq).is_err() {
PEER_SEQ_TRACKER.clear();
let _ = PEER_SEQ_TRACKER.insert(csi_packet.mac, current_seq);
}
}
}
}
match CSI_DELIVERY_MODE.load(Ordering::Relaxed) {
m if m == CsiDeliveryMode::Callback as u8 => {
let cb_ptr = CSI_CALLBACK.load(core::sync::atomic::Ordering::Relaxed);
if !cb_ptr.is_null() {
let cb: fn(&CSIDataPacket) =
unsafe { core::mem::transmute::<*mut (), fn(&CSIDataPacket)>(cb_ptr) };
cb(&csi_packet);
}
return;
}
m if m == CsiDeliveryMode::Async as u8 => {
if CSI_QUEUE.enqueue(csi_packet).is_err() {
#[cfg(feature = "statistics")]
STATS.rx_drop_count.fetch_add(1, Ordering::Relaxed);
} else {
CSI_WAKER.wake();
}
return;
}
_ => {}
}
if CSI_INLINE_LOG_ENABLED.load(Ordering::Relaxed) {
crate::logging::logging::log_csi(csi_packet);
}
}
pub async fn run_process_csi_packet() {
#[cfg(feature = "statistics")]
STATS
.capture_start_time
.store(Instant::now().as_ticks(), Ordering::Relaxed);
#[cfg(feature = "statistics")]
let mut last_rate_update = Instant::now();
#[cfg(feature = "statistics")]
let mut last_rx_count = STATS.rx_count.load(Ordering::Relaxed);
#[cfg(feature = "statistics")]
let mut last_tx_count = STATS.tx_count.load(Ordering::Relaxed);
loop {
match select3(
STOP_SIGNAL.wait(),
COLLECTION_MODE_CHANGED.wait(),
Timer::after_millis(500),
)
.await
{
Either3::First(_) => {
STOP_SIGNAL.signal(());
break;
}
Either3::Second(_) => {
COLLECTION_MODE_CHANGED.reset();
crate::reset_globals();
#[cfg(feature = "statistics")]
{
STATS
.capture_start_time
.store(Instant::now().as_ticks(), Ordering::Relaxed);
last_rate_update = Instant::now();
last_rx_count = STATS.rx_count.load(Ordering::Relaxed);
last_tx_count = STATS.tx_count.load(Ordering::Relaxed);
#[cfg(not(feature = "esp32c5"))]
RESET_SEQ_TRACKER.store(true, Ordering::Relaxed);
}
}
Either3::Third(_) => {
#[cfg(feature = "statistics")]
{
let elapsed_secs = last_rate_update.elapsed().as_secs();
if elapsed_secs >= 1 {
let current_rx = STATS.rx_count.load(Ordering::Relaxed);
let current_tx = STATS.tx_count.load(Ordering::Relaxed);
let rx_rate = ((current_rx.saturating_sub(last_rx_count))
/ elapsed_secs) as u32;
let tx_rate = ((current_tx.saturating_sub(last_tx_count))
/ elapsed_secs) as u32;
STATS.rx_rate_hz.store(rx_rate, Ordering::Relaxed);
STATS.tx_rate_hz.store(tx_rate, Ordering::Relaxed);
last_rx_count = current_rx;
last_tx_count = current_tx;
last_rate_update = Instant::now();
}
}
}
}
}
}