Skip to main content

peat_btle/power/
scheduler.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Radio Scheduler for Peat-Lite
17//!
18//! Coordinates radio activities (scan, advertise, sync) to minimize
19//! power consumption while maintaining connectivity.
20
21#[cfg(not(feature = "std"))]
22use alloc::collections::VecDeque;
23#[cfg(feature = "std")]
24use std::collections::VecDeque;
25
26use super::profile::{BatteryState, PowerProfile, RadioTiming};
27use crate::NodeId;
28
29/// Priority levels for sync operations
30#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
31pub enum SyncPriority {
32    /// Background sync, can wait for next scheduled window
33    Low = 0,
34    /// Normal sync, should happen within current interval
35    #[default]
36    Normal = 1,
37    /// High priority, sync at next opportunity
38    High = 2,
39    /// Critical data, trigger immediate sync
40    Critical = 3,
41}
42
43/// A pending sync operation
44#[derive(Debug, Clone)]
45pub struct PendingSync {
46    /// Target peer
47    pub peer_id: NodeId,
48    /// Priority level
49    pub priority: SyncPriority,
50    /// Size of data to sync (bytes)
51    pub data_size: usize,
52    /// When this sync was queued (ms timestamp)
53    pub queued_at: u64,
54    /// Maximum age before dropping (ms)
55    pub max_age_ms: u64,
56}
57
58impl PendingSync {
59    /// Create a new pending sync
60    pub fn new(peer_id: NodeId, priority: SyncPriority, data_size: usize, queued_at: u64) -> Self {
61        let max_age_ms = match priority {
62            SyncPriority::Low => 60_000,     // 1 minute
63            SyncPriority::Normal => 30_000,  // 30 seconds
64            SyncPriority::High => 10_000,    // 10 seconds
65            SyncPriority::Critical => 5_000, // 5 seconds
66        };
67
68        Self {
69            peer_id,
70            priority,
71            data_size,
72            queued_at,
73            max_age_ms,
74        }
75    }
76
77    /// Check if this sync has expired
78    pub fn is_expired(&self, current_time: u64) -> bool {
79        current_time > self.queued_at + self.max_age_ms
80    }
81}
82
83/// Radio activity state
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum RadioState {
86    /// Radio is off/sleeping
87    #[default]
88    Idle,
89    /// Scanning for advertisements
90    Scanning,
91    /// Sending advertisements
92    Advertising,
93    /// Connected and syncing
94    Syncing,
95    /// Transitioning between states
96    Transitioning,
97}
98
99/// Event from the radio scheduler
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum SchedulerEvent {
102    /// Start scanning
103    StartScan,
104    /// Stop scanning
105    StopScan,
106    /// Start advertising
107    StartAdvertising,
108    /// Stop advertising
109    StopAdvertising,
110    /// Time to sync with a peer
111    SyncNow,
112    /// Profile changed
113    ProfileChanged,
114    /// Enter sleep mode
115    EnterSleep,
116}
117
118/// Radio scheduler configuration
119#[derive(Debug, Clone)]
120pub struct SchedulerConfig {
121    /// Maximum pending syncs to queue
122    pub max_pending_syncs: usize,
123    /// Coalesce syncs within this window (ms)
124    pub sync_coalesce_ms: u64,
125    /// Minimum time between profile changes (ms)
126    pub profile_change_cooldown_ms: u64,
127}
128
129impl Default for SchedulerConfig {
130    fn default() -> Self {
131        Self {
132            max_pending_syncs: 16,
133            sync_coalesce_ms: 500,
134            profile_change_cooldown_ms: 10_000,
135        }
136    }
137}
138
139/// Radio activity scheduler
140///
141/// Coordinates scan, advertise, and sync activities according to
142/// the current power profile.
143#[derive(Debug)]
144pub struct RadioScheduler {
145    /// Current power profile
146    profile: PowerProfile,
147    /// Current radio timing
148    timing: RadioTiming,
149    /// Current radio state
150    state: RadioState,
151    /// Pending sync operations
152    pending_syncs: VecDeque<PendingSync>,
153    /// Configuration
154    config: SchedulerConfig,
155    /// Next scan window start time (ms)
156    next_scan_time: u64,
157    /// Next advertising event time (ms)
158    next_adv_time: u64,
159    /// Last state change time (ms)
160    last_state_change: u64,
161    /// Last profile change time (ms)
162    last_profile_change: u64,
163    /// Battery state for auto-adjustment
164    battery: BatteryState,
165    /// Whether auto-profile adjustment is enabled
166    auto_adjust_enabled: bool,
167    /// Statistics
168    stats: SchedulerStats,
169}
170
171/// Scheduler statistics
172#[derive(Debug, Clone, Default)]
173pub struct SchedulerStats {
174    /// Total scan windows
175    pub scan_windows: u64,
176    /// Total advertising events
177    pub adv_events: u64,
178    /// Total syncs performed
179    pub syncs_performed: u64,
180    /// Syncs dropped due to expiration
181    pub syncs_dropped: u64,
182    /// Critical syncs triggered
183    pub critical_syncs: u64,
184    /// Profile changes
185    pub profile_changes: u64,
186}
187
188impl RadioScheduler {
189    /// Create a new radio scheduler with the given profile
190    pub fn new(profile: PowerProfile, config: SchedulerConfig) -> Self {
191        let timing = profile.timing();
192        Self {
193            profile,
194            timing,
195            state: RadioState::Idle,
196            pending_syncs: VecDeque::new(),
197            config,
198            next_scan_time: 0,
199            next_adv_time: 0,
200            last_state_change: 0,
201            last_profile_change: 0,
202            battery: BatteryState::default(),
203            auto_adjust_enabled: true,
204            stats: SchedulerStats::default(),
205        }
206    }
207
208    /// Create with default config
209    pub fn with_profile(profile: PowerProfile) -> Self {
210        Self::new(profile, SchedulerConfig::default())
211    }
212
213    /// Get current profile
214    pub fn profile(&self) -> PowerProfile {
215        self.profile
216    }
217
218    /// Get current timing
219    pub fn timing(&self) -> &RadioTiming {
220        &self.timing
221    }
222
223    /// Get current state
224    pub fn state(&self) -> RadioState {
225        self.state
226    }
227
228    /// Get pending sync count
229    pub fn pending_sync_count(&self) -> usize {
230        self.pending_syncs.len()
231    }
232
233    /// Get statistics
234    pub fn stats(&self) -> &SchedulerStats {
235        &self.stats
236    }
237
238    /// Set power profile
239    pub fn set_profile(&mut self, profile: PowerProfile, current_time: u64) -> bool {
240        // Check cooldown (skip if this is the first change or if enough time passed)
241        let cooldown_elapsed = self.stats.profile_changes == 0
242            || current_time >= self.last_profile_change + self.config.profile_change_cooldown_ms;
243
244        if !cooldown_elapsed {
245            return false;
246        }
247
248        self.profile = profile;
249        self.timing = profile.timing();
250        self.last_profile_change = current_time;
251        self.stats.profile_changes += 1;
252        true
253    }
254
255    /// Update battery state
256    pub fn update_battery(&mut self, battery: BatteryState, current_time: u64) {
257        self.battery = battery;
258
259        if self.auto_adjust_enabled {
260            let suggested = battery.suggested_profile(self.profile);
261            if suggested != self.profile {
262                self.set_profile(suggested, current_time);
263            }
264        }
265    }
266
267    /// Enable/disable auto profile adjustment
268    pub fn set_auto_adjust(&mut self, enabled: bool) {
269        self.auto_adjust_enabled = enabled;
270    }
271
272    /// Queue a sync operation
273    pub fn queue_sync(
274        &mut self,
275        peer_id: NodeId,
276        priority: SyncPriority,
277        data_size: usize,
278        current_time: u64,
279    ) -> bool {
280        // Check queue limit
281        if self.pending_syncs.len() >= self.config.max_pending_syncs {
282            // Find the lowest priority sync
283            let lowest_priority = self
284                .pending_syncs
285                .iter()
286                .map(|s| s.priority)
287                .min()
288                .unwrap_or(SyncPriority::Critical);
289
290            if priority <= lowest_priority {
291                return false;
292            }
293
294            // Remove ONE item with lowest priority (oldest first)
295            if let Some(idx) = self
296                .pending_syncs
297                .iter()
298                .position(|s| s.priority == lowest_priority)
299            {
300                self.pending_syncs.remove(idx);
301                self.stats.syncs_dropped += 1;
302            }
303        }
304
305        let sync = PendingSync::new(peer_id, priority, data_size, current_time);
306        self.pending_syncs.push_back(sync);
307        true
308    }
309
310    /// Check if there's a critical sync pending
311    pub fn has_critical_sync(&self) -> bool {
312        self.pending_syncs
313            .iter()
314            .any(|s| s.priority == SyncPriority::Critical)
315    }
316
317    /// Get the next scheduled event
318    pub fn next_event(&self, current_time: u64) -> Option<(SchedulerEvent, u64)> {
319        // Critical syncs take priority
320        if self.has_critical_sync() {
321            return Some((SchedulerEvent::SyncNow, current_time));
322        }
323
324        match self.state {
325            RadioState::Idle => {
326                // Determine next activity
327                let scan_due = current_time >= self.next_scan_time;
328                let adv_due = current_time >= self.next_adv_time;
329                let sync_due = !self.pending_syncs.is_empty();
330
331                if scan_due {
332                    Some((SchedulerEvent::StartScan, current_time))
333                } else if adv_due {
334                    Some((SchedulerEvent::StartAdvertising, current_time))
335                } else if sync_due {
336                    Some((SchedulerEvent::SyncNow, current_time))
337                } else {
338                    // Calculate next wake time
339                    let next_time = self.next_scan_time.min(self.next_adv_time);
340                    Some((SchedulerEvent::EnterSleep, next_time))
341                }
342            }
343            RadioState::Scanning => {
344                // Check if scan window should end
345                let scan_end = self.last_state_change + self.timing.scan_window_ms as u64;
346                if current_time >= scan_end {
347                    Some((SchedulerEvent::StopScan, current_time))
348                } else {
349                    None
350                }
351            }
352            RadioState::Advertising => {
353                // Single advertisement event, then stop
354                Some((SchedulerEvent::StopAdvertising, current_time))
355            }
356            RadioState::Syncing => {
357                // Sync completion handled externally
358                None
359            }
360            RadioState::Transitioning => None,
361        }
362    }
363
364    /// Process a scheduler event
365    pub fn process_event(&mut self, event: SchedulerEvent, current_time: u64) {
366        match event {
367            SchedulerEvent::StartScan => {
368                self.state = RadioState::Scanning;
369                self.last_state_change = current_time;
370                self.stats.scan_windows += 1;
371            }
372            SchedulerEvent::StopScan => {
373                self.state = RadioState::Idle;
374                self.next_scan_time = current_time + self.timing.scan_interval_ms as u64;
375                self.last_state_change = current_time;
376            }
377            SchedulerEvent::StartAdvertising => {
378                self.state = RadioState::Advertising;
379                self.last_state_change = current_time;
380                self.stats.adv_events += 1;
381            }
382            SchedulerEvent::StopAdvertising => {
383                self.state = RadioState::Idle;
384                self.next_adv_time = current_time + self.timing.adv_interval_ms as u64;
385                self.last_state_change = current_time;
386            }
387            SchedulerEvent::SyncNow => {
388                self.state = RadioState::Syncing;
389                self.last_state_change = current_time;
390            }
391            SchedulerEvent::ProfileChanged => {
392                // Already handled in set_profile
393            }
394            SchedulerEvent::EnterSleep => {
395                self.state = RadioState::Idle;
396            }
397        }
398    }
399
400    /// Get the next pending sync (highest priority, oldest first)
401    pub fn next_pending_sync(&mut self, current_time: u64) -> Option<PendingSync> {
402        // Remove expired syncs
403        let stats = &mut self.stats;
404        let initial_len = self.pending_syncs.len();
405        self.pending_syncs.retain(|s| !s.is_expired(current_time));
406        stats.syncs_dropped += (initial_len - self.pending_syncs.len()) as u64;
407
408        // Find highest priority sync
409        let max_priority = self.pending_syncs.iter().map(|s| s.priority).max()?;
410
411        // Find index of oldest sync with max priority
412        let idx = self
413            .pending_syncs
414            .iter()
415            .position(|s| s.priority == max_priority)?;
416
417        let sync = self.pending_syncs.remove(idx)?;
418
419        if sync.priority == SyncPriority::Critical {
420            self.stats.critical_syncs += 1;
421        }
422        self.stats.syncs_performed += 1;
423
424        Some(sync)
425    }
426
427    /// Mark sync as complete
428    pub fn complete_sync(&mut self, current_time: u64) {
429        self.state = RadioState::Idle;
430        self.last_state_change = current_time;
431    }
432
433    /// Reset scheduler state
434    pub fn reset(&mut self, current_time: u64) {
435        self.state = RadioState::Idle;
436        self.pending_syncs.clear();
437        self.next_scan_time = current_time;
438        self.next_adv_time = current_time;
439        self.last_state_change = current_time;
440    }
441
442    /// Calculate time until next activity
443    pub fn time_until_next_activity(&self, current_time: u64) -> u64 {
444        if self.has_critical_sync() {
445            return 0;
446        }
447
448        if !self.pending_syncs.is_empty() {
449            return 0;
450        }
451
452        let next_scan = self.next_scan_time.saturating_sub(current_time);
453        let next_adv = self.next_adv_time.saturating_sub(current_time);
454
455        next_scan.min(next_adv)
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    #[test]
464    fn test_scheduler_creation() {
465        let scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
466        assert_eq!(scheduler.profile(), PowerProfile::LowPower);
467        assert_eq!(scheduler.state(), RadioState::Idle);
468        assert_eq!(scheduler.pending_sync_count(), 0);
469    }
470
471    #[test]
472    fn test_queue_sync() {
473        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
474        let peer = NodeId::new(0x1234);
475
476        assert!(scheduler.queue_sync(peer, SyncPriority::Normal, 100, 1000));
477        assert_eq!(scheduler.pending_sync_count(), 1);
478    }
479
480    #[test]
481    fn test_critical_sync_priority() {
482        let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
483        let peer = NodeId::new(0x1234);
484
485        scheduler.queue_sync(peer, SyncPriority::Critical, 50, 1000);
486        assert!(scheduler.has_critical_sync());
487
488        let event = scheduler.next_event(1000);
489        assert_eq!(event, Some((SchedulerEvent::SyncNow, 1000)));
490    }
491
492    #[test]
493    fn test_sync_priority_ordering() {
494        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
495        let peer1 = NodeId::new(0x1111);
496        let peer2 = NodeId::new(0x2222);
497        let peer3 = NodeId::new(0x3333);
498
499        scheduler.queue_sync(peer1, SyncPriority::Low, 100, 1000);
500        scheduler.queue_sync(peer2, SyncPriority::High, 100, 1001);
501        scheduler.queue_sync(peer3, SyncPriority::Normal, 100, 1002);
502
503        // Should get high priority first
504        let sync = scheduler.next_pending_sync(1005).unwrap();
505        assert_eq!(sync.peer_id, peer2);
506
507        // Then normal
508        let sync = scheduler.next_pending_sync(1005).unwrap();
509        assert_eq!(sync.peer_id, peer3);
510
511        // Then low
512        let sync = scheduler.next_pending_sync(1005).unwrap();
513        assert_eq!(sync.peer_id, peer1);
514    }
515
516    #[test]
517    fn test_sync_expiration() {
518        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
519        let peer = NodeId::new(0x1234);
520
521        // Queue a low priority sync (1 minute max age)
522        scheduler.queue_sync(peer, SyncPriority::Low, 100, 1000);
523        assert_eq!(scheduler.pending_sync_count(), 1);
524
525        // After expiration
526        let sync = scheduler.next_pending_sync(70_000);
527        assert!(sync.is_none());
528        assert_eq!(scheduler.stats().syncs_dropped, 1);
529    }
530
531    #[test]
532    fn test_scan_window_scheduling() {
533        let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
534
535        // Initial state should trigger scan
536        let event = scheduler.next_event(0);
537        assert_eq!(event, Some((SchedulerEvent::StartScan, 0)));
538
539        scheduler.process_event(SchedulerEvent::StartScan, 0);
540        assert_eq!(scheduler.state(), RadioState::Scanning);
541
542        // After scan window (100ms for LowPower)
543        let event = scheduler.next_event(100);
544        assert_eq!(event, Some((SchedulerEvent::StopScan, 100)));
545
546        scheduler.process_event(SchedulerEvent::StopScan, 100);
547        assert_eq!(scheduler.state(), RadioState::Idle);
548
549        // Next scan should be at interval (5000ms for LowPower)
550        assert_eq!(scheduler.next_scan_time, 5100);
551    }
552
553    #[test]
554    fn test_profile_change() {
555        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
556
557        assert!(scheduler.set_profile(PowerProfile::LowPower, 1000));
558        assert_eq!(scheduler.profile(), PowerProfile::LowPower);
559        assert_eq!(scheduler.stats().profile_changes, 1);
560    }
561
562    #[test]
563    fn test_profile_change_cooldown() {
564        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
565
566        assert!(scheduler.set_profile(PowerProfile::LowPower, 1000));
567
568        // Too soon - should be rejected
569        assert!(!scheduler.set_profile(PowerProfile::Aggressive, 5000));
570        assert_eq!(scheduler.profile(), PowerProfile::LowPower);
571
572        // After cooldown
573        assert!(scheduler.set_profile(PowerProfile::Aggressive, 15000));
574        assert_eq!(scheduler.profile(), PowerProfile::Aggressive);
575    }
576
577    #[test]
578    fn test_battery_auto_adjust() {
579        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Aggressive);
580        scheduler.set_auto_adjust(true);
581
582        // Critical battery should force low power
583        let battery = BatteryState::new(5, false);
584        scheduler.update_battery(battery, 15000);
585
586        assert_eq!(scheduler.profile(), PowerProfile::LowPower);
587    }
588
589    #[test]
590    fn test_battery_charging_no_change() {
591        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Aggressive);
592        scheduler.set_auto_adjust(true);
593
594        // Charging should not downgrade
595        let battery = BatteryState::new(5, true);
596        scheduler.update_battery(battery, 15000);
597
598        assert_eq!(scheduler.profile(), PowerProfile::Aggressive);
599    }
600
601    #[test]
602    fn test_queue_overflow_priority() {
603        let config = SchedulerConfig {
604            max_pending_syncs: 2,
605            ..Default::default()
606        };
607        let mut scheduler = RadioScheduler::new(PowerProfile::Balanced, config);
608
609        let peer1 = NodeId::new(0x1111);
610        let peer2 = NodeId::new(0x2222);
611        let peer3 = NodeId::new(0x3333);
612
613        scheduler.queue_sync(peer1, SyncPriority::Low, 100, 1000);
614        scheduler.queue_sync(peer2, SyncPriority::Low, 100, 1001);
615
616        // Queue is full, but high priority should bump low
617        assert!(scheduler.queue_sync(peer3, SyncPriority::High, 100, 1002));
618        assert_eq!(scheduler.pending_sync_count(), 2);
619
620        // The high priority one should be there
621        assert!(scheduler.pending_syncs.iter().any(|s| s.peer_id == peer3));
622    }
623
624    #[test]
625    fn test_time_until_next_activity() {
626        let mut scheduler = RadioScheduler::with_profile(PowerProfile::LowPower);
627
628        // Start and stop scan to set next_scan_time
629        scheduler.process_event(SchedulerEvent::StartScan, 0);
630        scheduler.process_event(SchedulerEvent::StopScan, 100);
631        // Start and stop advertising to set next_adv_time
632        scheduler.process_event(SchedulerEvent::StartAdvertising, 100);
633        scheduler.process_event(SchedulerEvent::StopAdvertising, 102);
634
635        // next_scan_time = 5100, next_adv_time = 2102 (LowPower adv_interval = 2000)
636        let wait = scheduler.time_until_next_activity(1000);
637        assert!(wait > 0, "wait should be > 0, got {}", wait);
638        // Should be about 1102ms until next adv (2102 - 1000)
639        assert!(wait <= 2000, "wait should be <= 2000, got {}", wait);
640    }
641
642    #[test]
643    fn test_reset() {
644        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
645        let peer = NodeId::new(0x1234);
646
647        scheduler.queue_sync(peer, SyncPriority::Normal, 100, 1000);
648        scheduler.process_event(SchedulerEvent::StartScan, 1000);
649
650        scheduler.reset(2000);
651
652        assert_eq!(scheduler.state(), RadioState::Idle);
653        assert_eq!(scheduler.pending_sync_count(), 0);
654    }
655
656    #[test]
657    fn test_complete_sync() {
658        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
659
660        scheduler.process_event(SchedulerEvent::SyncNow, 1000);
661        assert_eq!(scheduler.state(), RadioState::Syncing);
662
663        scheduler.complete_sync(1500);
664        assert_eq!(scheduler.state(), RadioState::Idle);
665    }
666
667    #[test]
668    fn test_pending_sync_expiry_timing() {
669        let sync = PendingSync::new(NodeId::new(0x1234), SyncPriority::Critical, 100, 1000);
670        // Critical syncs have 5 second max age
671        assert!(!sync.is_expired(5000));
672        assert!(sync.is_expired(7000));
673    }
674
675    #[test]
676    fn test_stats_tracking() {
677        let mut scheduler = RadioScheduler::with_profile(PowerProfile::Balanced);
678
679        scheduler.process_event(SchedulerEvent::StartScan, 0);
680        scheduler.process_event(SchedulerEvent::StartScan, 100);
681        scheduler.process_event(SchedulerEvent::StartAdvertising, 200);
682
683        let stats = scheduler.stats();
684        assert_eq!(stats.scan_windows, 2);
685        assert_eq!(stats.adv_events, 1);
686    }
687}