1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11 processing: bool,
12 ready_at: Option<CuTime>,
13 last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17 let mut guard = match state.lock() {
18 Ok(guard) => guard,
19 Err(poison) => poison.into_inner(),
20 };
21 guard.processing = false;
22 guard.ready_at = None;
23 guard.last_error = Some(error);
24}
25
26#[derive(Reflect)]
27#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
28pub struct CuAsyncTask<T, O>
29where
30 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
31 O: CuMsgPayload + Send + 'static,
32{
33 #[reflect(ignore)]
34 task: Arc<Mutex<T>>,
35 #[reflect(ignore)]
36 output: Arc<Mutex<CuMsg<O>>>,
37 #[reflect(ignore)]
38 state: Arc<Mutex<AsyncState>>,
39 #[reflect(ignore)]
40 tp: Arc<ThreadPool>,
41}
42
43impl<T, O> TypePath for CuAsyncTask<T, O>
44where
45 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
46 O: CuMsgPayload + Send + 'static,
47{
48 fn type_path() -> &'static str {
49 "cu29_runtime::cuasynctask::CuAsyncTask"
50 }
51
52 fn short_type_path() -> &'static str {
53 "CuAsyncTask"
54 }
55
56 fn type_ident() -> Option<&'static str> {
57 Some("CuAsyncTask")
58 }
59
60 fn crate_name() -> Option<&'static str> {
61 Some("cu29_runtime")
62 }
63
64 fn module_path() -> Option<&'static str> {
65 Some("cuasynctask")
66 }
67}
68
69pub struct CuAsyncTaskResources<'r, T: CuTask> {
71 pub inner: T::Resources<'r>,
72 pub threadpool: Arc<ThreadPool>,
73}
74
75impl<T, O> CuAsyncTask<T, O>
76where
77 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
78 O: CuMsgPayload + Send + 'static,
79{
80 #[allow(unused)]
81 pub fn new(
82 config: Option<&ComponentConfig>,
83 resources: T::Resources<'_>,
84 tp: Arc<ThreadPool>,
85 ) -> CuResult<Self> {
86 let task = Arc::new(Mutex::new(T::new(config, resources)?));
87 let output = Arc::new(Mutex::new(CuMsg::default()));
88 Ok(Self {
89 task,
90 output,
91 state: Arc::new(Mutex::new(AsyncState {
92 processing: false,
93 ready_at: None,
94 last_error: None,
95 })),
96 tp,
97 })
98 }
99}
100
101impl<T, O> Freezable for CuAsyncTask<T, O>
102where
103 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
104 O: CuMsgPayload + Send + 'static,
105{
106}
107
108impl<T, I, O> CuTask for CuAsyncTask<T, O>
109where
110 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
111 I: CuMsgPayload + Send + Sync + 'static,
112 O: CuMsgPayload + Send + 'static,
113{
114 type Resources<'r> = CuAsyncTaskResources<'r, T>;
115 type Input<'m> = T::Input<'m>;
116 type Output<'m> = T::Output<'m>;
117
118 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
119 where
120 Self: Sized,
121 {
122 let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
123 let output = Arc::new(Mutex::new(CuMsg::default()));
124 Ok(Self {
125 task,
126 output,
127 state: Arc::new(Mutex::new(AsyncState {
128 processing: false,
129 ready_at: None,
130 last_error: None,
131 })),
132 tp: resources.threadpool,
133 })
134 }
135
136 fn process<'i, 'o>(
137 &mut self,
138 ctx: &CuContext,
139 input: &Self::Input<'i>,
140 real_output: &mut Self::Output<'o>,
141 ) -> CuResult<()> {
142 {
143 let mut state = self.state.lock().map_err(|_| {
144 CuError::from("Async task state mutex poisoned while scheduling background work")
145 })?;
146 if let Some(error) = state.last_error.take() {
147 return Err(error);
148 }
149 if state.processing {
150 *real_output = CuMsg::default();
152 return Ok(());
153 }
154
155 if let Some(ready_at) = state.ready_at
156 && ctx.now() < ready_at
157 {
158 *real_output = CuMsg::default();
160 return Ok(());
161 }
162
163 state.processing = true;
165 state.ready_at = None;
166 }
167
168 let buffered_output = self.output.lock().map_err(|_| {
170 let error = CuError::from("Async task output mutex poisoned");
171 record_async_error(&self.state, error.clone());
172 error
173 })?;
174 *real_output = buffered_output.clone();
175
176 self.tp.spawn_fifo({
178 let ctx = ctx.clone();
179 let input = (*input).clone();
180 let output = self.output.clone();
181 let task = self.task.clone();
182 let state = self.state.clone();
183 move || {
184 let input_ref: &CuMsg<I> = &input;
185 let mut output_guard = match output.lock() {
186 Ok(guard) => guard,
187 Err(_) => {
188 record_async_error(
189 &state,
190 CuError::from("Async task output mutex poisoned"),
191 );
192 return;
193 }
194 };
195 let output_ref: &mut CuMsg<O> = &mut output_guard;
196
197 *output_ref = CuMsg::default();
200
201 if output_ref.metadata.process_time.start.is_none() {
203 output_ref.metadata.process_time.start = ctx.now().into();
204 }
205 let task_result = match task.lock() {
206 Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
207 Err(poison) => Err(CuError::from(format!(
208 "Async task mutex poisoned: {poison}"
209 ))),
210 };
211
212 let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
213 guard.processing = false;
214
215 match task_result {
216 Ok(()) => {
217 let end_from_metadata: Option<CuTime> =
218 output_ref.metadata.process_time.end.into();
219 let end_time = end_from_metadata.unwrap_or_else(|| {
220 let now = ctx.now();
221 output_ref.metadata.process_time.end = now.into();
222 now
223 });
224 guard.ready_at = Some(end_time);
225 }
226 Err(error) => {
227 guard.ready_at = None;
228 guard.last_error = Some(error);
229 }
230 }
231 }
232 });
233 Ok(())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::config::ComponentConfig;
241 use crate::cutask::CuMsg;
242 use crate::cutask::Freezable;
243 use crate::input_msg;
244 use crate::output_msg;
245 use cu29_traits::CuResult;
246 use rayon::ThreadPoolBuilder;
247 use std::borrow::BorrowMut;
248 use std::sync::OnceLock;
249 use std::sync::mpsc;
250 use std::time::Duration;
251
252 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
253 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
254 #[derive(Reflect)]
255 struct TestTask {}
256
257 impl Freezable for TestTask {}
258
259 impl CuTask for TestTask {
260 type Resources<'r> = ();
261 type Input<'m> = input_msg!(u32);
262 type Output<'m> = output_msg!(u32);
263
264 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
265 where
266 Self: Sized,
267 {
268 Ok(Self {})
269 }
270
271 fn process(
272 &mut self,
273 _ctx: &CuContext,
274 input: &Self::Input<'_>,
275 output: &mut Self::Output<'_>,
276 ) -> CuResult<()> {
277 output.borrow_mut().set_payload(*input.payload().unwrap());
278 Ok(())
279 }
280 }
281
282 #[test]
283 fn test_lifecycle() {
284 let tp = Arc::new(
285 rayon::ThreadPoolBuilder::new()
286 .num_threads(1)
287 .build()
288 .unwrap(),
289 );
290
291 let config = ComponentConfig::default();
292 let context = CuContext::new_with_clock();
293 let mut async_task: CuAsyncTask<TestTask, u32> =
294 CuAsyncTask::new(Some(&config), (), tp).unwrap();
295 let input = CuMsg::new(Some(42u32));
296 let mut output = CuMsg::new(None);
297
298 loop {
299 {
300 let output_ref: &mut CuMsg<u32> = &mut output;
301 async_task.process(&context, &input, output_ref).unwrap();
302 }
303
304 if let Some(val) = output.payload() {
305 assert_eq!(*val, 42u32);
306 break;
307 }
308 }
309 }
310
311 #[derive(Reflect)]
312 struct ControlledTask;
313
314 impl Freezable for ControlledTask {}
315
316 impl CuTask for ControlledTask {
317 type Resources<'r> = ();
318 type Input<'m> = input_msg!(u32);
319 type Output<'m> = output_msg!(u32);
320
321 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
322 where
323 Self: Sized,
324 {
325 Ok(Self {})
326 }
327
328 fn process(
329 &mut self,
330 ctx: &CuContext,
331 _input: &Self::Input<'_>,
332 output: &mut Self::Output<'_>,
333 ) -> CuResult<()> {
334 let rx = READY_RX
335 .get()
336 .expect("ready channel not set")
337 .lock()
338 .unwrap();
339 let ready_time = rx
340 .recv_timeout(Duration::from_secs(1))
341 .expect("timed out waiting for ready signal");
342
343 output.set_payload(ready_time.as_nanos() as u32);
344 output.metadata.process_time.start = ctx.now().into();
345 output.metadata.process_time.end = ready_time.into();
346
347 if let Some(done_tx) = DONE_TX.get() {
348 let _ = done_tx.send(());
349 }
350 Ok(())
351 }
352 }
353
354 fn wait_until_async_idle<T, O>(async_task: &CuAsyncTask<T, O>)
355 where
356 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
357 O: CuMsgPayload + Send + 'static,
358 {
359 for _ in 0..100 {
360 let state = async_task.state.lock().unwrap();
361 if !state.processing {
362 return;
363 }
364 drop(state);
365 std::thread::sleep(Duration::from_millis(1));
366 }
367 panic!("background task never became idle");
368 }
369
370 #[derive(Clone)]
371 struct ActionTaskResources {
372 actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
373 done: mpsc::Sender<()>,
374 }
375
376 #[derive(Reflect)]
377 #[reflect(no_field_bounds, from_reflect = false)]
378 struct ActionTask {
379 #[reflect(ignore)]
380 actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
381 #[reflect(ignore)]
382 done: mpsc::Sender<()>,
383 }
384
385 impl Freezable for ActionTask {}
386
387 impl CuTask for ActionTask {
388 type Resources<'r> = ActionTaskResources;
389 type Input<'m> = input_msg!(u32);
390 type Output<'m> = output_msg!(u32);
391
392 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
393 where
394 Self: Sized,
395 {
396 let _ = config;
397 Ok(Self {
398 actions: resources.actions,
399 done: resources.done,
400 })
401 }
402
403 fn process(
404 &mut self,
405 _ctx: &CuContext,
406 _input: &Self::Input<'_>,
407 output: &mut Self::Output<'_>,
408 ) -> CuResult<()> {
409 let action = self
410 .actions
411 .lock()
412 .unwrap()
413 .recv_timeout(Duration::from_secs(1))
414 .expect("timed out waiting for action");
415 if let Some(value) = action {
416 output.set_payload(value);
417 }
418 let _ = self.done.send(());
419 Ok(())
420 }
421 }
422
423 #[test]
424 fn background_clears_output_while_processing() {
425 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
426 let context = CuContext::new_with_clock();
427 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
428 let (done_tx, done_rx) = mpsc::channel::<()>();
429 let resources = ActionTaskResources {
430 actions: Arc::new(Mutex::new(action_rx)),
431 done: done_tx,
432 };
433
434 let mut async_task: CuAsyncTask<ActionTask, u32> =
435 CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
436 let input = CuMsg::new(Some(1u32));
437 let mut output = CuMsg::new(None);
438
439 async_task.process(&context, &input, &mut output).unwrap();
440 assert!(output.payload().is_none());
441
442 output.set_payload(999);
443 async_task.process(&context, &input, &mut output).unwrap();
444 assert!(
445 output.payload().is_none(),
446 "background poll should clear stale output while the worker is still running"
447 );
448
449 action_tx.send(Some(7)).unwrap();
450 done_rx
451 .recv_timeout(Duration::from_secs(1))
452 .expect("background worker never finished");
453 }
454
455 #[test]
456 fn background_empty_run_does_not_reemit_previous_payload() {
457 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
458 let context = CuContext::new_with_clock();
459 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
460 let (done_tx, done_rx) = mpsc::channel::<()>();
461 let resources = ActionTaskResources {
462 actions: Arc::new(Mutex::new(action_rx)),
463 done: done_tx,
464 };
465
466 let mut async_task: CuAsyncTask<ActionTask, u32> =
467 CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
468 let some_input = CuMsg::new(Some(1u32));
469 let no_input = CuMsg::new(None::<u32>);
470 let mut output = CuMsg::new(None);
471
472 action_tx.send(Some(42)).unwrap();
473 async_task
474 .process(&context, &some_input, &mut output)
475 .expect("failed to start first background run");
476 done_rx
477 .recv_timeout(Duration::from_secs(1))
478 .expect("first background run never finished");
479 wait_until_async_idle(&async_task);
480
481 action_tx.send(None).unwrap();
482 async_task
483 .process(&context, &no_input, &mut output)
484 .expect("failed to start empty background run");
485 assert_eq!(output.payload(), Some(&42));
486 done_rx
487 .recv_timeout(Duration::from_secs(1))
488 .expect("empty background run never finished");
489 wait_until_async_idle(&async_task);
490
491 action_tx.send(None).unwrap();
492 async_task
493 .process(&context, &no_input, &mut output)
494 .expect("failed to poll after empty background run");
495 assert!(
496 output.payload().is_none(),
497 "background task re-emitted the previous payload after an empty run"
498 );
499 done_rx
500 .recv_timeout(Duration::from_secs(1))
501 .expect("cleanup background run never finished");
502 }
503
504 #[test]
505 fn background_respects_recorded_ready_time() {
506 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
507 let (context, clock_mock) = CuContext::new_mock_clock();
508
509 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
511 let (done_tx, done_rx) = mpsc::channel::<()>();
512 READY_RX
513 .set(Arc::new(Mutex::new(ready_rx)))
514 .expect("ready channel already set");
515 DONE_TX
516 .set(done_tx)
517 .expect("completion channel already set");
518
519 let mut async_task: CuAsyncTask<ControlledTask, u32> =
520 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
521 let input = CuMsg::new(Some(1u32));
522 let mut output = CuMsg::new(None);
523
524 clock_mock.set_value(0);
526 async_task.process(&context, &input, &mut output).unwrap();
527 assert!(output.payload().is_none());
528
529 clock_mock.set_value(10);
531 async_task.process(&context, &input, &mut output).unwrap();
532 assert!(output.payload().is_none());
533
534 clock_mock.set_value(30);
536 ready_tx.send(CuTime::from(30u64)).unwrap();
537 done_rx
538 .recv_timeout(Duration::from_secs(1))
539 .expect("background task never finished");
540 let mut ready_at_recorded = None;
542 for _ in 0..100 {
543 let state = async_task.state.lock().unwrap();
544 if !state.processing {
545 ready_at_recorded = state.ready_at;
546 if ready_at_recorded.is_some() {
547 break;
548 }
549 }
550 drop(state);
551 std::thread::sleep(Duration::from_millis(1));
552 }
553 assert!(
554 ready_at_recorded.is_some(),
555 "background task finished without recording ready_at"
556 );
557
558 clock_mock.set_value(20);
560 async_task.process(&context, &input, &mut output).unwrap();
561 assert!(
562 output.payload().is_none(),
563 "Output surfaced before recorded ready time"
564 );
565
566 clock_mock.set_value(30);
568 async_task.process(&context, &input, &mut output).unwrap();
569 assert_eq!(output.payload(), Some(&30u32));
570
571 ready_tx.send(CuTime::from(40u64)).unwrap();
573 let _ = done_rx.recv_timeout(Duration::from_secs(1));
574 }
575}