1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use crate::reflect::{Reflect, TypePath};
4use cu29_clock::{CuTime, RobotClock};
5use cu29_traits::{CuError, CuResult};
6use rayon::ThreadPool;
7use std::sync::{Arc, Mutex};
8
9struct AsyncState {
10 processing: bool,
11 ready_at: Option<CuTime>,
12 last_error: Option<CuError>,
13}
14
15fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
16 let mut guard = match state.lock() {
17 Ok(guard) => guard,
18 Err(poison) => poison.into_inner(),
19 };
20 guard.processing = false;
21 guard.ready_at = None;
22 guard.last_error = Some(error);
23}
24
25#[derive(Reflect)]
26#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
27pub struct CuAsyncTask<T, O>
28where
29 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
30 O: CuMsgPayload + Send + 'static,
31{
32 #[reflect(ignore)]
33 task: Arc<Mutex<T>>,
34 #[reflect(ignore)]
35 output: Arc<Mutex<CuMsg<O>>>,
36 #[reflect(ignore)]
37 state: Arc<Mutex<AsyncState>>,
38 #[reflect(ignore)]
39 tp: Arc<ThreadPool>,
40}
41
42impl<T, O> TypePath for CuAsyncTask<T, O>
43where
44 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
45 O: CuMsgPayload + Send + 'static,
46{
47 fn type_path() -> &'static str {
48 "cu29_runtime::cuasynctask::CuAsyncTask"
49 }
50
51 fn short_type_path() -> &'static str {
52 "CuAsyncTask"
53 }
54
55 fn type_ident() -> Option<&'static str> {
56 Some("CuAsyncTask")
57 }
58
59 fn crate_name() -> Option<&'static str> {
60 Some("cu29_runtime")
61 }
62
63 fn module_path() -> Option<&'static str> {
64 Some("cuasynctask")
65 }
66}
67
68pub struct CuAsyncTaskResources<'r, T: CuTask> {
70 pub inner: T::Resources<'r>,
71 pub threadpool: Arc<ThreadPool>,
72}
73
74impl<T, O> CuAsyncTask<T, O>
75where
76 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
77 O: CuMsgPayload + Send + 'static,
78{
79 #[allow(unused)]
80 pub fn new(
81 config: Option<&ComponentConfig>,
82 resources: T::Resources<'_>,
83 tp: Arc<ThreadPool>,
84 ) -> CuResult<Self> {
85 let task = Arc::new(Mutex::new(T::new(config, resources)?));
86 let output = Arc::new(Mutex::new(CuMsg::default()));
87 Ok(Self {
88 task,
89 output,
90 state: Arc::new(Mutex::new(AsyncState {
91 processing: false,
92 ready_at: None,
93 last_error: None,
94 })),
95 tp,
96 })
97 }
98}
99
100impl<T, O> Freezable for CuAsyncTask<T, O>
101where
102 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
103 O: CuMsgPayload + Send + 'static,
104{
105}
106
107impl<T, I, O> CuTask for CuAsyncTask<T, O>
108where
109 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
110 I: CuMsgPayload + Send + Sync + 'static,
111 O: CuMsgPayload + Send + 'static,
112{
113 type Resources<'r> = CuAsyncTaskResources<'r, T>;
114 type Input<'m> = T::Input<'m>;
115 type Output<'m> = T::Output<'m>;
116
117 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
118 where
119 Self: Sized,
120 {
121 let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
122 let output = Arc::new(Mutex::new(CuMsg::default()));
123 Ok(Self {
124 task,
125 output,
126 state: Arc::new(Mutex::new(AsyncState {
127 processing: false,
128 ready_at: None,
129 last_error: None,
130 })),
131 tp: resources.threadpool,
132 })
133 }
134
135 fn process<'i, 'o>(
136 &mut self,
137 clock: &RobotClock,
138 input: &Self::Input<'i>,
139 real_output: &mut Self::Output<'o>,
140 ) -> CuResult<()> {
141 {
142 let mut state = self.state.lock().map_err(|_| {
143 CuError::from("Async task state mutex poisoned while scheduling background work")
144 })?;
145 if let Some(error) = state.last_error.take() {
146 return Err(error);
147 }
148 if state.processing {
149 return Ok(());
151 }
152
153 if let Some(ready_at) = state.ready_at
154 && clock.now() < ready_at
155 {
156 return Ok(());
158 }
159
160 state.processing = true;
162 state.ready_at = None;
163 }
164
165 let buffered_output = self.output.lock().map_err(|_| {
167 let error = CuError::from("Async task output mutex poisoned");
168 record_async_error(&self.state, error.clone());
169 error
170 })?;
171 *real_output = buffered_output.clone();
172
173 self.tp.spawn_fifo({
175 let clock = clock.clone();
176 let input = (*input).clone();
177 let output = self.output.clone();
178 let task = self.task.clone();
179 let state = self.state.clone();
180 move || {
181 let input_ref: &CuMsg<I> = &input;
182 let mut output_guard = match output.lock() {
183 Ok(guard) => guard,
184 Err(_) => {
185 record_async_error(
186 &state,
187 CuError::from("Async task output mutex poisoned"),
188 );
189 return;
190 }
191 };
192 let output_ref: &mut CuMsg<O> = &mut output_guard;
193
194 if output_ref.metadata.process_time.start.is_none() {
196 output_ref.metadata.process_time.start = clock.now().into();
197 }
198 let task_result = match task.lock() {
199 Ok(mut task_guard) => task_guard.process(&clock, input_ref, output_ref),
200 Err(poison) => Err(CuError::from(format!(
201 "Async task mutex poisoned: {poison}"
202 ))),
203 };
204
205 let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
206 guard.processing = false;
207
208 match task_result {
209 Ok(()) => {
210 let end_from_metadata: Option<CuTime> =
211 output_ref.metadata.process_time.end.into();
212 let end_time = end_from_metadata.unwrap_or_else(|| {
213 let now = clock.now();
214 output_ref.metadata.process_time.end = now.into();
215 now
216 });
217 guard.ready_at = Some(end_time);
218 }
219 Err(error) => {
220 guard.ready_at = None;
221 guard.last_error = Some(error);
222 }
223 }
224 }
225 });
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::config::ComponentConfig;
234 use crate::cutask::CuMsg;
235 use crate::cutask::Freezable;
236 use crate::input_msg;
237 use crate::output_msg;
238 use cu29_clock::RobotClock;
239 use cu29_traits::CuResult;
240 use rayon::ThreadPoolBuilder;
241 use std::borrow::BorrowMut;
242 use std::sync::OnceLock;
243 use std::sync::mpsc;
244 use std::time::Duration;
245
246 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
247 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
248 #[derive(Reflect)]
249 struct TestTask {}
250
251 impl Freezable for TestTask {}
252
253 impl CuTask for TestTask {
254 type Resources<'r> = ();
255 type Input<'m> = input_msg!(u32);
256 type Output<'m> = output_msg!(u32);
257
258 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
259 where
260 Self: Sized,
261 {
262 Ok(Self {})
263 }
264
265 fn process(
266 &mut self,
267 _clock: &RobotClock,
268 input: &Self::Input<'_>,
269 output: &mut Self::Output<'_>,
270 ) -> CuResult<()> {
271 output.borrow_mut().set_payload(*input.payload().unwrap());
272 Ok(())
273 }
274 }
275
276 #[test]
277 fn test_lifecycle() {
278 let tp = Arc::new(
279 rayon::ThreadPoolBuilder::new()
280 .num_threads(1)
281 .build()
282 .unwrap(),
283 );
284
285 let config = ComponentConfig::default();
286 let clock = RobotClock::default();
287 let mut async_task: CuAsyncTask<TestTask, u32> =
288 CuAsyncTask::new(Some(&config), (), tp).unwrap();
289 let input = CuMsg::new(Some(42u32));
290 let mut output = CuMsg::new(None);
291
292 loop {
293 {
294 let output_ref: &mut CuMsg<u32> = &mut output;
295 async_task.process(&clock, &input, output_ref).unwrap();
296 }
297
298 if let Some(val) = output.payload() {
299 assert_eq!(*val, 42u32);
300 break;
301 }
302 }
303 }
304
305 #[derive(Reflect)]
306 struct ControlledTask;
307
308 impl Freezable for ControlledTask {}
309
310 impl CuTask for ControlledTask {
311 type Resources<'r> = ();
312 type Input<'m> = input_msg!(u32);
313 type Output<'m> = output_msg!(u32);
314
315 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
316 where
317 Self: Sized,
318 {
319 Ok(Self {})
320 }
321
322 fn process(
323 &mut self,
324 clock: &RobotClock,
325 _input: &Self::Input<'_>,
326 output: &mut Self::Output<'_>,
327 ) -> CuResult<()> {
328 let rx = READY_RX
329 .get()
330 .expect("ready channel not set")
331 .lock()
332 .unwrap();
333 let ready_time = rx
334 .recv_timeout(Duration::from_secs(1))
335 .expect("timed out waiting for ready signal");
336
337 output.set_payload(ready_time.as_nanos() as u32);
338 output.metadata.process_time.start = clock.now().into();
339 output.metadata.process_time.end = ready_time.into();
340
341 if let Some(done_tx) = DONE_TX.get() {
342 let _ = done_tx.send(());
343 }
344 Ok(())
345 }
346 }
347
348 #[test]
349 fn background_respects_recorded_ready_time() {
350 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
351 let (clock, clock_mock) = RobotClock::mock();
352
353 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
355 let (done_tx, done_rx) = mpsc::channel::<()>();
356 READY_RX
357 .set(Arc::new(Mutex::new(ready_rx)))
358 .expect("ready channel already set");
359 DONE_TX
360 .set(done_tx)
361 .expect("completion channel already set");
362
363 let mut async_task: CuAsyncTask<ControlledTask, u32> =
364 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
365 let input = CuMsg::new(Some(1u32));
366 let mut output = CuMsg::new(None);
367
368 clock_mock.set_value(0);
370 async_task.process(&clock, &input, &mut output).unwrap();
371 assert!(output.payload().is_none());
372
373 clock_mock.set_value(10);
375 async_task.process(&clock, &input, &mut output).unwrap();
376 assert!(output.payload().is_none());
377
378 clock_mock.set_value(30);
380 ready_tx.send(CuTime::from(30u64)).unwrap();
381 done_rx
382 .recv_timeout(Duration::from_secs(1))
383 .expect("background task never finished");
384 let mut ready_at_recorded = None;
386 for _ in 0..100 {
387 let state = async_task.state.lock().unwrap();
388 if !state.processing {
389 ready_at_recorded = state.ready_at;
390 if ready_at_recorded.is_some() {
391 break;
392 }
393 }
394 drop(state);
395 std::thread::sleep(Duration::from_millis(1));
396 }
397 assert!(
398 ready_at_recorded.is_some(),
399 "background task finished without recording ready_at"
400 );
401
402 clock_mock.set_value(20);
404 async_task.process(&clock, &input, &mut output).unwrap();
405 assert!(
406 output.payload().is_none(),
407 "Output surfaced before recorded ready time"
408 );
409
410 clock_mock.set_value(30);
412 async_task.process(&clock, &input, &mut output).unwrap();
413 assert_eq!(output.payload(), Some(&30u32));
414
415 ready_tx.send(CuTime::from(40u64)).unwrap();
417 let _ = done_rx.recv_timeout(Duration::from_secs(1));
418 }
419}