agentic_robotics_rt/
executor.rs

1//! Unified async real-time executor
2//!
3//! Combines Tokio for soft real-time I/O and priority scheduling for hard real-time tasks
4
5use crate::scheduler::PriorityScheduler;
6use crate::RTPriority;
7use anyhow::Result;
8use parking_lot::Mutex;
9use std::future::Future;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::runtime::{Builder, Runtime};
13use tracing::{debug, info};
14
15/// Task priority wrapper
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct Priority(pub u8);
18
19/// Task deadline
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Deadline(pub Duration);
22
23impl From<Duration> for Deadline {
24    fn from(duration: Duration) -> Self {
25        Deadline(duration)
26    }
27}
28
29/// ROS3 unified executor
30pub struct ROS3Executor {
31    tokio_rt_high: Runtime,
32    tokio_rt_low: Runtime,
33    scheduler: Arc<Mutex<PriorityScheduler>>,
34}
35
36impl ROS3Executor {
37    /// Create a new executor
38    pub fn new() -> Result<Self> {
39        info!("Initializing ROS3 unified executor");
40
41        // High-priority runtime for control loops (2 threads)
42        let tokio_rt_high = Builder::new_multi_thread()
43            .worker_threads(2)
44            .thread_name("ros3-rt-high")
45            .enable_all()
46            .build()?;
47
48        // Low-priority runtime for planning (4 threads)
49        let tokio_rt_low = Builder::new_multi_thread()
50            .worker_threads(4)
51            .thread_name("ros3-rt-low")
52            .enable_all()
53            .build()?;
54
55        let scheduler = Arc::new(Mutex::new(PriorityScheduler::new()));
56
57        Ok(Self {
58            tokio_rt_high,
59            tokio_rt_low,
60            scheduler,
61        })
62    }
63
64    /// Spawn a real-time task with priority and deadline
65    pub fn spawn_rt<F>(&self, priority: Priority, deadline: Deadline, task: F)
66    where
67        F: Future<Output = ()> + Send + 'static,
68    {
69        let rt_priority: RTPriority = priority.0.into();
70
71        debug!(
72            "Spawning RT task with priority {:?} and deadline {:?}",
73            rt_priority, deadline.0
74        );
75
76        // Route to appropriate runtime based on deadline
77        if deadline.0 < Duration::from_millis(1) {
78            // Hard RT: Use high-priority runtime
79            self.tokio_rt_high.spawn(async move {
80                // In a real implementation with RTIC, this would use hardware interrupts
81                task.await;
82            });
83        } else {
84            // Soft RT: Use low-priority runtime
85            self.tokio_rt_low.spawn(async move {
86                task.await;
87            });
88        }
89    }
90
91    /// Spawn a high-priority task
92    pub fn spawn_high<F>(&self, task: F)
93    where
94        F: Future<Output = ()> + Send + 'static,
95    {
96        self.spawn_rt(Priority(3), Deadline(Duration::from_micros(500)), task);
97    }
98
99    /// Spawn a low-priority task
100    pub fn spawn_low<F>(&self, task: F)
101    where
102        F: Future<Output = ()> + Send + 'static,
103    {
104        self.spawn_rt(Priority(1), Deadline(Duration::from_millis(100)), task);
105    }
106
107    /// Spawn CPU-bound blocking work
108    pub fn spawn_blocking<F, R>(&self, f: F) -> tokio::task::JoinHandle<R>
109    where
110        F: FnOnce() -> R + Send + 'static,
111        R: Send + 'static,
112    {
113        self.tokio_rt_low.spawn_blocking(f)
114    }
115
116    /// Get a handle to the high-priority runtime
117    pub fn high_priority_runtime(&self) -> &Runtime {
118        &self.tokio_rt_high
119    }
120
121    /// Get a handle to the low-priority runtime
122    pub fn low_priority_runtime(&self) -> &Runtime {
123        &self.tokio_rt_low
124    }
125}
126
127impl Default for ROS3Executor {
128    fn default() -> Self {
129        Self::new().expect("Failed to create ROS3Executor")
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use std::sync::atomic::{AtomicBool, Ordering};
137
138    #[test]
139    fn test_executor_creation() {
140        let executor = ROS3Executor::new();
141        assert!(executor.is_ok());
142    }
143
144    #[test]
145    fn test_spawn_high_priority() {
146        let executor = ROS3Executor::new().unwrap();
147        let completed = Arc::new(AtomicBool::new(false));
148        let completed_clone = completed.clone();
149
150        executor.spawn_high(async move {
151            completed_clone.store(true, Ordering::SeqCst);
152        });
153
154        std::thread::sleep(Duration::from_millis(100));
155        // Note: In a real test, we'd use proper synchronization
156    }
157}