agentic_robotics_rt/
executor.rs1use crate::scheduler::PriorityScheduler;
6use crate::RTPriority;
7use anyhow::Result;
8use parking_lot::Mutex;
9use std::future::Future;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::runtime::{Builder, Runtime};
13use tracing::{debug, info};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct Priority(pub u8);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Deadline(pub Duration);
22
23impl From<Duration> for Deadline {
24 fn from(duration: Duration) -> Self {
25 Deadline(duration)
26 }
27}
28
29pub struct ROS3Executor {
31 tokio_rt_high: Runtime,
32 tokio_rt_low: Runtime,
33 scheduler: Arc<Mutex<PriorityScheduler>>,
34}
35
36impl ROS3Executor {
37 pub fn new() -> Result<Self> {
39 info!("Initializing ROS3 unified executor");
40
41 let tokio_rt_high = Builder::new_multi_thread()
43 .worker_threads(2)
44 .thread_name("ros3-rt-high")
45 .enable_all()
46 .build()?;
47
48 let tokio_rt_low = Builder::new_multi_thread()
50 .worker_threads(4)
51 .thread_name("ros3-rt-low")
52 .enable_all()
53 .build()?;
54
55 let scheduler = Arc::new(Mutex::new(PriorityScheduler::new()));
56
57 Ok(Self {
58 tokio_rt_high,
59 tokio_rt_low,
60 scheduler,
61 })
62 }
63
64 pub fn spawn_rt<F>(&self, priority: Priority, deadline: Deadline, task: F)
66 where
67 F: Future<Output = ()> + Send + 'static,
68 {
69 let rt_priority: RTPriority = priority.0.into();
70
71 debug!(
72 "Spawning RT task with priority {:?} and deadline {:?}",
73 rt_priority, deadline.0
74 );
75
76 if deadline.0 < Duration::from_millis(1) {
78 self.tokio_rt_high.spawn(async move {
80 task.await;
82 });
83 } else {
84 self.tokio_rt_low.spawn(async move {
86 task.await;
87 });
88 }
89 }
90
91 pub fn spawn_high<F>(&self, task: F)
93 where
94 F: Future<Output = ()> + Send + 'static,
95 {
96 self.spawn_rt(Priority(3), Deadline(Duration::from_micros(500)), task);
97 }
98
99 pub fn spawn_low<F>(&self, task: F)
101 where
102 F: Future<Output = ()> + Send + 'static,
103 {
104 self.spawn_rt(Priority(1), Deadline(Duration::from_millis(100)), task);
105 }
106
107 pub fn spawn_blocking<F, R>(&self, f: F) -> tokio::task::JoinHandle<R>
109 where
110 F: FnOnce() -> R + Send + 'static,
111 R: Send + 'static,
112 {
113 self.tokio_rt_low.spawn_blocking(f)
114 }
115
116 pub fn high_priority_runtime(&self) -> &Runtime {
118 &self.tokio_rt_high
119 }
120
121 pub fn low_priority_runtime(&self) -> &Runtime {
123 &self.tokio_rt_low
124 }
125}
126
127impl Default for ROS3Executor {
128 fn default() -> Self {
129 Self::new().expect("Failed to create ROS3Executor")
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use std::sync::atomic::{AtomicBool, Ordering};
137
138 #[test]
139 fn test_executor_creation() {
140 let executor = ROS3Executor::new();
141 assert!(executor.is_ok());
142 }
143
144 #[test]
145 fn test_spawn_high_priority() {
146 let executor = ROS3Executor::new().unwrap();
147 let completed = Arc::new(AtomicBool::new(false));
148 let completed_clone = completed.clone();
149
150 executor.spawn_high(async move {
151 completed_clone.store(true, Ordering::SeqCst);
152 });
153
154 std::thread::sleep(Duration::from_millis(100));
155 }
157}