#[cfg(not(feature = "std"))]
use alloc::collections::VecDeque;
#[cfg(feature = "std")]
use std::collections::VecDeque;
use super::profile::{BatteryState, PowerProfile, RadioTiming};
use crate::NodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum SyncPriority {
Low = 0,
#[default]
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug, Clone)]
pub struct PendingSync {
pub peer_id: NodeId,
pub priority: SyncPriority,
pub data_size: usize,
pub queued_at: u64,
pub max_age_ms: u64,
}
impl PendingSync {
pub fn new(peer_id: NodeId, priority: SyncPriority, data_size: usize, queued_at: u64) -> Self {
let max_age_ms = match priority {
SyncPriority::Low => 60_000, SyncPriority::Normal => 30_000, SyncPriority::High => 10_000, SyncPriority::Critical => 5_000, };
Self {
peer_id,
priority,
data_size,
queued_at,
max_age_ms,
}
}
pub fn is_expired(&self, current_time: u64) -> bool {
current_time > self.queued_at + self.max_age_ms
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RadioState {
#[default]
Idle,
Scanning,
Advertising,
Syncing,
Transitioning,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulerEvent {
StartScan,
StopScan,
StartAdvertising,
StopAdvertising,
SyncNow,
ProfileChanged,
EnterSleep,
}
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub max_pending_syncs: usize,
pub sync_coalesce_ms: u64,
pub profile_change_cooldown_ms: u64,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
max_pending_syncs: 16,
sync_coalesce_ms: 500,
profile_change_cooldown_ms: 10_000,
}
}
}
#[derive(Debug)]
pub struct RadioScheduler {
profile: PowerProfile,
timing: RadioTiming,
state: RadioState,
pending_syncs: VecDeque<PendingSync>,
config: SchedulerConfig,
next_scan_time: u64,
next_adv_time: u64,
last_state_change: u64,
last_profile_change: u64,
battery: BatteryState,
auto_adjust_enabled: bool,
stats: SchedulerStats,
}
#[derive(Debug, Clone, Default)]
pub struct SchedulerStats {
pub scan_windows: u64,
pub adv_events: u64,
pub syncs_performed: u64,
pub syncs_dropped: u64,
pub critical_syncs: u64,
pub profile_changes: u64,
}
impl RadioScheduler {
pub fn new(profile: PowerProfile, config: SchedulerConfig) -> Self {
let timing = profile.timing();
Self {
profile,
timing,
state: RadioState::Idle,
pending_syncs: VecDeque::new(),
config,
next_scan_time: 0,
next_adv_time: 0,
last_state_change: 0,
last_profile_change: 0,
battery: BatteryState::default(),
auto_adjust_enabled: true,
stats: SchedulerStats::default(),
}
}
pub fn with_profile(profile: PowerProfile) -> Self {
Self::new(profile, SchedulerConfig::default())
}
pub fn profile(&self) -> PowerProfile {
self.profile
}
pub fn timing(&self) -> &RadioTiming {
&self.timing
}
pub fn state(&self) -> RadioState {
self.state
}
pub fn pending_sync_count(&self) -> usize {
self.pending_syncs.len()
}
pub fn stats(&self) -> &SchedulerStats {
&self.stats
}
pub fn set_profile(&mut self, profile: PowerProfile, current_time: u64) -> bool {
let cooldown_elapsed = self.stats.profile_changes == 0
|| current_time >= self.last_profile_change + self.config.profile_change_cooldown_ms;
if !cooldown_elapsed {
return false;
}
self.profile = profile;
self.timing = profile.timing();
self.last_profile_change = current_time;
self.stats.profile_changes += 1;
true
}
pub fn update_battery(&mut self, battery: BatteryState, current_time: u64) {
self.battery = battery;
if self.auto_adjust_enabled {
let suggested = battery.suggested_profile(self.profile);
if suggested != self.profile {
self.set_profile(suggested, current_time);
}
}
}
pub fn set_auto_adjust(&mut self, enabled: bool) {
self.auto_adjust_enabled = enabled;
}
pub fn queue_sync(
&mut self,
peer_id: NodeId,
priority: SyncPriority,
data_size: usize,
current_time: u64,
) -> bool {
if self.pending_syncs.len() >= self.config.max_pending_syncs {
let lowest_priority = self
.pending_syncs
.iter()
.map(|s| s.priority)
.min()
.unwrap_or(SyncPriority::Critical);
if priority <= lowest_priority {
return false;
}
if let Some(idx) = self
.pending_syncs
.iter()
.position(|s| s.priority == lowest_priority)
{
self.pending_syncs.remove(idx);
self.stats.syncs_dropped += 1;
}
}
let sync = PendingSync::new(peer_id, priority, data_size, current_time);
self.pending_syncs.push_back(sync);
true
}
pub fn has_critical_sync(&self) -> bool {
self.pending_syncs
.iter()
.any(|s| s.priority == SyncPriority::Critical)
}
pub fn next_event(&self, current_time: u64) -> Option<(SchedulerEvent, u64)> {
if self.has_critical_sync() {
return Some((SchedulerEvent::SyncNow, current_time));
}
match self.state {
RadioState::Idle => {
let scan_due = current_time >= self.next_scan_time;
let adv_due = current_time >= self.next_adv_time;
let sync_due = !self.pending_syncs.is_empty();
if scan_due {
Some((SchedulerEvent::StartScan, current_time))
} else if adv_due {
Some((SchedulerEvent::StartAdvertising, current_time))
} else if sync_due {
Some((SchedulerEvent::SyncNow, current_time))
} else {
let next_time = self.next_scan_time.min(self.next_adv_time);
Some((SchedulerEvent::EnterSleep, next_time))
}
}
RadioState::Scanning => {
let scan_end = self.last_state_change + self.timing.scan_window_ms as u64;
if current_time >= scan_end {
Some((SchedulerEvent::StopScan, current_time))
} else {
None
}
}
RadioState::Advertising => {
Some((SchedulerEvent::StopAdvertising, current_time))
}
RadioState::Syncing => {
None
}
RadioState::Transitioning => None,
}
}
pub fn process_event(&mut self, event: SchedulerEvent, current_time: u64) {
match event {
SchedulerEvent::StartScan => {
self.state = RadioState::Scanning;
self.last_state_change = current_time;
self.stats.scan_windows += 1;
}
SchedulerEvent::StopScan => {
self.state = RadioState::Idle;
self.next_scan_time = current_time + self.timing.scan_interval_ms as u64;
self.last_state_change = current_time;
}
SchedulerEvent::StartAdvertising => {
self.state = RadioState::Advertising;
self.last_state_change = current_time;
self.stats.adv_events += 1;
}
SchedulerEvent::StopAdvertising => {
self.state = RadioState::Idle;
self.next_adv_time = current_time + self.timing.adv_interval_ms as u64;
self.last_state_change = current_time;
}
SchedulerEvent::SyncNow => {
self.state = RadioState::Syncing;
self.last_state_change = current_time;
}
SchedulerEvent::ProfileChanged => {
}
SchedulerEvent::EnterSleep => {
self.state = RadioState::Idle;
}
}
}
pub fn next_pending_sync(&mut self, current_time: u64) -> Option<PendingSync> {
let stats = &mut self.stats;
let initial_len = self.pending_syncs.len();
self.pending_syncs.retain(|s| !s.is_expired(current_time));
stats.syncs_dropped += (initial_len - self.pending_syncs.len()) as u64;
let max_priority = self.pending_syncs.iter().map(|s| s.priority).max()?;
let idx = self
.pending_syncs
.iter()
.position(|s| s.priority == max_priority)?;
let sync = self.pending_syncs.remove(idx)?;
if sync.priority == SyncPriority::Critical {
self.stats.critical_syncs += 1;
}
self.stats.syncs_performed += 1;
Some(sync)
}
pub fn complete_sync(&mut self, current_time: u64) {
self.state = RadioState::Idle;
self.last_state_change = current_time;
}
pub fn reset(&mut self, current_time: u64) {
self.state = RadioState::Idle;
self.pending_syncs.clear();
self.next_scan_time = current_time;
self.next_adv_time = current_time;
self.last_state_change = current_time;
}
pub fn time_until_next_activity(&self, current_time: u64) -> u64 {
if self.has_critical_sync() {
return 0;
}
if !self.pending_syncs.is_empty() {
return 0;
}
let next_scan = self.next_scan_time.saturating_sub(current_time);
let next_adv = self.next_adv_time.saturating_sub(current_time);
next_scan.min(next_adv)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scheduler_creation() {
let scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
assert_eq!(scheduler.profile(), PowerProfile::LowPower);
assert_eq!(scheduler.state(), RadioState::Idle);
assert_eq!(scheduler.pending_sync_count(), 0);
}
#[test]
fn test_queue_sync() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
let peer = NodeId::new(0x1234);
assert!(scheduler.queue_sync(peer, SyncPriority::Normal, 100, 1000));
assert_eq!(scheduler.pending_sync_count(), 1);
}
#[test]
fn test_critical_sync_priority() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
let peer = NodeId::new(0x1234);
scheduler.queue_sync(peer, SyncPriority::Critical, 50, 1000);
assert!(scheduler.has_critical_sync());
let event = scheduler.next_event(1000);
assert_eq!(event, Some((SchedulerEvent::SyncNow, 1000)));
}
#[test]
fn test_sync_priority_ordering() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
let peer1 = NodeId::new(0x1111);
let peer2 = NodeId::new(0x2222);
let peer3 = NodeId::new(0x3333);
scheduler.queue_sync(peer1, SyncPriority::Low, 100, 1000);
scheduler.queue_sync(peer2, SyncPriority::High, 100, 1001);
scheduler.queue_sync(peer3, SyncPriority::Normal, 100, 1002);
let sync = scheduler.next_pending_sync(1005).unwrap();
assert_eq!(sync.peer_id, peer2);
let sync = scheduler.next_pending_sync(1005).unwrap();
assert_eq!(sync.peer_id, peer3);
let sync = scheduler.next_pending_sync(1005).unwrap();
assert_eq!(sync.peer_id, peer1);
}
#[test]
fn test_sync_expiration() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
let peer = NodeId::new(0x1234);
scheduler.queue_sync(peer, SyncPriority::Low, 100, 1000);
assert_eq!(scheduler.pending_sync_count(), 1);
let sync = scheduler.next_pending_sync(70_000);
assert!(sync.is_none());
assert_eq!(scheduler.stats().syncs_dropped, 1);
}
#[test]
fn test_scan_window_scheduling() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
let event = scheduler.next_event(0);
assert_eq!(event, Some((SchedulerEvent::StartScan, 0)));
scheduler.process_event(SchedulerEvent::StartScan, 0);
assert_eq!(scheduler.state(), RadioState::Scanning);
let event = scheduler.next_event(100);
assert_eq!(event, Some((SchedulerEvent::StopScan, 100)));
scheduler.process_event(SchedulerEvent::StopScan, 100);
assert_eq!(scheduler.state(), RadioState::Idle);
assert_eq!(scheduler.next_scan_time, 5100);
}
#[test]
fn test_profile_change() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
assert!(scheduler.set_profile(PowerProfile::LowPower, 1000));
assert_eq!(scheduler.profile(), PowerProfile::LowPower);
assert_eq!(scheduler.stats().profile_changes, 1);
}
#[test]
fn test_profile_change_cooldown() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
assert!(scheduler.set_profile(PowerProfile::LowPower, 1000));
assert!(!scheduler.set_profile(PowerProfile::Aggressive, 5000));
assert_eq!(scheduler.profile(), PowerProfile::LowPower);
assert!(scheduler.set_profile(PowerProfile::Aggressive, 15000));
assert_eq!(scheduler.profile(), PowerProfile::Aggressive);
}
#[test]
fn test_battery_auto_adjust() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Aggressive);
scheduler.set_auto_adjust(true);
let battery = BatteryState::new(5, false);
scheduler.update_battery(battery, 15000);
assert_eq!(scheduler.profile(), PowerProfile::LowPower);
}
#[test]
fn test_battery_charging_no_change() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Aggressive);
scheduler.set_auto_adjust(true);
let battery = BatteryState::new(5, true);
scheduler.update_battery(battery, 15000);
assert_eq!(scheduler.profile(), PowerProfile::Aggressive);
}
#[test]
fn test_queue_overflow_priority() {
let config = SchedulerConfig {
max_pending_syncs: 2,
..Default::default()
};
let mut scheduler = RadioScheduler::new(PowerProfile::Balanced, config);
let peer1 = NodeId::new(0x1111);
let peer2 = NodeId::new(0x2222);
let peer3 = NodeId::new(0x3333);
scheduler.queue_sync(peer1, SyncPriority::Low, 100, 1000);
scheduler.queue_sync(peer2, SyncPriority::Low, 100, 1001);
assert!(scheduler.queue_sync(peer3, SyncPriority::High, 100, 1002));
assert_eq!(scheduler.pending_sync_count(), 2);
assert!(scheduler.pending_syncs.iter().any(|s| s.peer_id == peer3));
}
#[test]
fn test_time_until_next_activity() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
scheduler.process_event(SchedulerEvent::StartScan, 0);
scheduler.process_event(SchedulerEvent::StopScan, 100);
scheduler.process_event(SchedulerEvent::StartAdvertising, 100);
scheduler.process_event(SchedulerEvent::StopAdvertising, 102);
let wait = scheduler.time_until_next_activity(1000);
assert!(wait > 0, "wait should be > 0, got {}", wait);
assert!(wait <= 2000, "wait should be <= 2000, got {}", wait);
}
#[test]
fn test_reset() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
let peer = NodeId::new(0x1234);
scheduler.queue_sync(peer, SyncPriority::Normal, 100, 1000);
scheduler.process_event(SchedulerEvent::StartScan, 1000);
scheduler.reset(2000);
assert_eq!(scheduler.state(), RadioState::Idle);
assert_eq!(scheduler.pending_sync_count(), 0);
}
#[test]
fn test_complete_sync() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
scheduler.process_event(SchedulerEvent::SyncNow, 1000);
assert_eq!(scheduler.state(), RadioState::Syncing);
scheduler.complete_sync(1500);
assert_eq!(scheduler.state(), RadioState::Idle);
}
#[test]
fn test_pending_sync_expiry_timing() {
let sync = PendingSync::new(NodeId::new(0x1234), SyncPriority::Critical, 100, 1000);
assert!(!sync.is_expired(5000));
assert!(sync.is_expired(7000));
}
#[test]
fn test_stats_tracking() {
let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
scheduler.process_event(SchedulerEvent::StartScan, 0);
scheduler.process_event(SchedulerEvent::StartScan, 100);
scheduler.process_event(SchedulerEvent::StartAdvertising, 200);
let stats = scheduler.stats();
assert_eq!(stats.scan_windows, 2);
assert_eq!(stats.adv_events, 1);
}
}