Skip to main content

cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11    processing: bool,
12    ready_at: Option<CuTime>,
13    last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17    let mut guard = match state.lock() {
18        Ok(guard) => guard,
19        Err(poison) => poison.into_inner(),
20    };
21    guard.processing = false;
22    guard.ready_at = None;
23    guard.last_error = Some(error);
24}
25
26#[derive(Reflect)]
27#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
28pub struct CuAsyncTask<T, O>
29where
30    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
31    O: CuMsgPayload + Send + 'static,
32{
33    #[reflect(ignore)]
34    task: Arc<Mutex<T>>,
35    #[reflect(ignore)]
36    output: Arc<Mutex<CuMsg<O>>>,
37    #[reflect(ignore)]
38    state: Arc<Mutex<AsyncState>>,
39    #[reflect(ignore)]
40    tp: Arc<ThreadPool>,
41}
42
43impl<T, O> TypePath for CuAsyncTask<T, O>
44where
45    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
46    O: CuMsgPayload + Send + 'static,
47{
48    fn type_path() -> &'static str {
49        "cu29_runtime::cuasynctask::CuAsyncTask"
50    }
51
52    fn short_type_path() -> &'static str {
53        "CuAsyncTask"
54    }
55
56    fn type_ident() -> Option<&'static str> {
57        Some("CuAsyncTask")
58    }
59
60    fn crate_name() -> Option<&'static str> {
61        Some("cu29_runtime")
62    }
63
64    fn module_path() -> Option<&'static str> {
65        Some("cuasynctask")
66    }
67}
68
69/// Resource bundle required by a backgrounded task.
70pub struct CuAsyncTaskResources<'r, T: CuTask> {
71    pub inner: T::Resources<'r>,
72    pub threadpool: Arc<ThreadPool>,
73}
74
75impl<T, O> CuAsyncTask<T, O>
76where
77    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
78    O: CuMsgPayload + Send + 'static,
79{
80    #[allow(unused)]
81    pub fn new(
82        config: Option<&ComponentConfig>,
83        resources: T::Resources<'_>,
84        tp: Arc<ThreadPool>,
85    ) -> CuResult<Self> {
86        let task = Arc::new(Mutex::new(T::new(config, resources)?));
87        let output = Arc::new(Mutex::new(CuMsg::default()));
88        Ok(Self {
89            task,
90            output,
91            state: Arc::new(Mutex::new(AsyncState {
92                processing: false,
93                ready_at: None,
94                last_error: None,
95            })),
96            tp,
97        })
98    }
99}
100
101impl<T, O> Freezable for CuAsyncTask<T, O>
102where
103    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
104    O: CuMsgPayload + Send + 'static,
105{
106}
107
108impl<T, I, O> CuTask for CuAsyncTask<T, O>
109where
110    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
111    I: CuMsgPayload + Send + Sync + 'static,
112    O: CuMsgPayload + Send + 'static,
113{
114    type Resources<'r> = CuAsyncTaskResources<'r, T>;
115    type Input<'m> = T::Input<'m>;
116    type Output<'m> = T::Output<'m>;
117
118    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
119    where
120        Self: Sized,
121    {
122        let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
123        let output = Arc::new(Mutex::new(CuMsg::default()));
124        Ok(Self {
125            task,
126            output,
127            state: Arc::new(Mutex::new(AsyncState {
128                processing: false,
129                ready_at: None,
130                last_error: None,
131            })),
132            tp: resources.threadpool,
133        })
134    }
135
136    fn process<'i, 'o>(
137        &mut self,
138        ctx: &CuContext,
139        input: &Self::Input<'i>,
140        real_output: &mut Self::Output<'o>,
141    ) -> CuResult<()> {
142        {
143            let mut state = self.state.lock().map_err(|_| {
144                CuError::from("Async task state mutex poisoned while scheduling background work")
145            })?;
146            if let Some(error) = state.last_error.take() {
147                return Err(error);
148            }
149            if state.processing {
150                // background task still running
151                *real_output = CuMsg::default();
152                return Ok(());
153            }
154
155            if let Some(ready_at) = state.ready_at
156                && ctx.now() < ready_at
157            {
158                // result not yet allowed to surface based on recorded completion time
159                *real_output = CuMsg::default();
160                return Ok(());
161            }
162
163            // mark as processing before spawning the next job
164            state.processing = true;
165            state.ready_at = None;
166        }
167
168        // clone the last finished output (if any) as the visible result for this polling round
169        let buffered_output = self.output.lock().map_err(|_| {
170            let error = CuError::from("Async task output mutex poisoned");
171            record_async_error(&self.state, error.clone());
172            error
173        })?;
174        *real_output = buffered_output.clone();
175
176        // immediately requeue a task based on the new input
177        self.tp.spawn_fifo({
178            let ctx = ctx.clone();
179            let input = (*input).clone();
180            let output = self.output.clone();
181            let task = self.task.clone();
182            let state = self.state.clone();
183            move || {
184                let input_ref: &CuMsg<I> = &input;
185                let mut output_guard = match output.lock() {
186                    Ok(guard) => guard,
187                    Err(_) => {
188                        record_async_error(
189                            &state,
190                            CuError::from("Async task output mutex poisoned"),
191                        );
192                        return;
193                    }
194                };
195                let output_ref: &mut CuMsg<O> = &mut output_guard;
196
197                // Each async run starts from an empty output so a task that
198                // chooses not to publish does not leak the previous payload.
199                *output_ref = CuMsg::default();
200
201                // Track the actual processing interval so replay can honor it.
202                if output_ref.metadata.process_time.start.is_none() {
203                    output_ref.metadata.process_time.start = ctx.now().into();
204                }
205                let task_result = match task.lock() {
206                    Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
207                    Err(poison) => Err(CuError::from(format!(
208                        "Async task mutex poisoned: {poison}"
209                    ))),
210                };
211
212                let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
213                guard.processing = false;
214
215                match task_result {
216                    Ok(()) => {
217                        let end_from_metadata: Option<CuTime> =
218                            output_ref.metadata.process_time.end.into();
219                        let end_time = end_from_metadata.unwrap_or_else(|| {
220                            let now = ctx.now();
221                            output_ref.metadata.process_time.end = now.into();
222                            now
223                        });
224                        guard.ready_at = Some(end_time);
225                    }
226                    Err(error) => {
227                        guard.ready_at = None;
228                        guard.last_error = Some(error);
229                    }
230                }
231            }
232        });
233        Ok(())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::config::ComponentConfig;
241    use crate::cutask::CuMsg;
242    use crate::cutask::Freezable;
243    use crate::input_msg;
244    use crate::output_msg;
245    use cu29_traits::CuResult;
246    use rayon::ThreadPoolBuilder;
247    use std::borrow::BorrowMut;
248    use std::sync::OnceLock;
249    use std::sync::mpsc;
250    use std::time::Duration;
251
252    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
253    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
254    #[derive(Reflect)]
255    struct TestTask {}
256
257    impl Freezable for TestTask {}
258
259    impl CuTask for TestTask {
260        type Resources<'r> = ();
261        type Input<'m> = input_msg!(u32);
262        type Output<'m> = output_msg!(u32);
263
264        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
265        where
266            Self: Sized,
267        {
268            Ok(Self {})
269        }
270
271        fn process(
272            &mut self,
273            _ctx: &CuContext,
274            input: &Self::Input<'_>,
275            output: &mut Self::Output<'_>,
276        ) -> CuResult<()> {
277            output.borrow_mut().set_payload(*input.payload().unwrap());
278            Ok(())
279        }
280    }
281
282    #[test]
283    fn test_lifecycle() {
284        let tp = Arc::new(
285            rayon::ThreadPoolBuilder::new()
286                .num_threads(1)
287                .build()
288                .unwrap(),
289        );
290
291        let config = ComponentConfig::default();
292        let context = CuContext::new_with_clock();
293        let mut async_task: CuAsyncTask<TestTask, u32> =
294            CuAsyncTask::new(Some(&config), (), tp).unwrap();
295        let input = CuMsg::new(Some(42u32));
296        let mut output = CuMsg::new(None);
297
298        loop {
299            {
300                let output_ref: &mut CuMsg<u32> = &mut output;
301                async_task.process(&context, &input, output_ref).unwrap();
302            }
303
304            if let Some(val) = output.payload() {
305                assert_eq!(*val, 42u32);
306                break;
307            }
308        }
309    }
310
311    #[derive(Reflect)]
312    struct ControlledTask;
313
314    impl Freezable for ControlledTask {}
315
316    impl CuTask for ControlledTask {
317        type Resources<'r> = ();
318        type Input<'m> = input_msg!(u32);
319        type Output<'m> = output_msg!(u32);
320
321        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
322        where
323            Self: Sized,
324        {
325            Ok(Self {})
326        }
327
328        fn process(
329            &mut self,
330            ctx: &CuContext,
331            _input: &Self::Input<'_>,
332            output: &mut Self::Output<'_>,
333        ) -> CuResult<()> {
334            let rx = READY_RX
335                .get()
336                .expect("ready channel not set")
337                .lock()
338                .unwrap();
339            let ready_time = rx
340                .recv_timeout(Duration::from_secs(1))
341                .expect("timed out waiting for ready signal");
342
343            output.set_payload(ready_time.as_nanos() as u32);
344            output.metadata.process_time.start = ctx.now().into();
345            output.metadata.process_time.end = ready_time.into();
346
347            if let Some(done_tx) = DONE_TX.get() {
348                let _ = done_tx.send(());
349            }
350            Ok(())
351        }
352    }
353
354    fn wait_until_async_idle<T, O>(async_task: &CuAsyncTask<T, O>)
355    where
356        T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
357        O: CuMsgPayload + Send + 'static,
358    {
359        for _ in 0..100 {
360            let state = async_task.state.lock().unwrap();
361            if !state.processing {
362                return;
363            }
364            drop(state);
365            std::thread::sleep(Duration::from_millis(1));
366        }
367        panic!("background task never became idle");
368    }
369
370    #[derive(Clone)]
371    struct ActionTaskResources {
372        actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
373        done: mpsc::Sender<()>,
374    }
375
376    #[derive(Reflect)]
377    #[reflect(no_field_bounds, from_reflect = false)]
378    struct ActionTask {
379        #[reflect(ignore)]
380        actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
381        #[reflect(ignore)]
382        done: mpsc::Sender<()>,
383    }
384
385    impl Freezable for ActionTask {}
386
387    impl CuTask for ActionTask {
388        type Resources<'r> = ActionTaskResources;
389        type Input<'m> = input_msg!(u32);
390        type Output<'m> = output_msg!(u32);
391
392        fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
393        where
394            Self: Sized,
395        {
396            let _ = config;
397            Ok(Self {
398                actions: resources.actions,
399                done: resources.done,
400            })
401        }
402
403        fn process(
404            &mut self,
405            _ctx: &CuContext,
406            _input: &Self::Input<'_>,
407            output: &mut Self::Output<'_>,
408        ) -> CuResult<()> {
409            let action = self
410                .actions
411                .lock()
412                .unwrap()
413                .recv_timeout(Duration::from_secs(1))
414                .expect("timed out waiting for action");
415            if let Some(value) = action {
416                output.set_payload(value);
417            }
418            let _ = self.done.send(());
419            Ok(())
420        }
421    }
422
423    #[test]
424    fn background_clears_output_while_processing() {
425        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
426        let context = CuContext::new_with_clock();
427        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
428        let (done_tx, done_rx) = mpsc::channel::<()>();
429        let resources = ActionTaskResources {
430            actions: Arc::new(Mutex::new(action_rx)),
431            done: done_tx,
432        };
433
434        let mut async_task: CuAsyncTask<ActionTask, u32> =
435            CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
436        let input = CuMsg::new(Some(1u32));
437        let mut output = CuMsg::new(None);
438
439        async_task.process(&context, &input, &mut output).unwrap();
440        assert!(output.payload().is_none());
441
442        output.set_payload(999);
443        async_task.process(&context, &input, &mut output).unwrap();
444        assert!(
445            output.payload().is_none(),
446            "background poll should clear stale output while the worker is still running"
447        );
448
449        action_tx.send(Some(7)).unwrap();
450        done_rx
451            .recv_timeout(Duration::from_secs(1))
452            .expect("background worker never finished");
453    }
454
455    #[test]
456    fn background_empty_run_does_not_reemit_previous_payload() {
457        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
458        let context = CuContext::new_with_clock();
459        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
460        let (done_tx, done_rx) = mpsc::channel::<()>();
461        let resources = ActionTaskResources {
462            actions: Arc::new(Mutex::new(action_rx)),
463            done: done_tx,
464        };
465
466        let mut async_task: CuAsyncTask<ActionTask, u32> =
467            CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
468        let some_input = CuMsg::new(Some(1u32));
469        let no_input = CuMsg::new(None::<u32>);
470        let mut output = CuMsg::new(None);
471
472        action_tx.send(Some(42)).unwrap();
473        async_task
474            .process(&context, &some_input, &mut output)
475            .expect("failed to start first background run");
476        done_rx
477            .recv_timeout(Duration::from_secs(1))
478            .expect("first background run never finished");
479        wait_until_async_idle(&async_task);
480
481        action_tx.send(None).unwrap();
482        async_task
483            .process(&context, &no_input, &mut output)
484            .expect("failed to start empty background run");
485        assert_eq!(output.payload(), Some(&42));
486        done_rx
487            .recv_timeout(Duration::from_secs(1))
488            .expect("empty background run never finished");
489        wait_until_async_idle(&async_task);
490
491        action_tx.send(None).unwrap();
492        async_task
493            .process(&context, &no_input, &mut output)
494            .expect("failed to poll after empty background run");
495        assert!(
496            output.payload().is_none(),
497            "background task re-emitted the previous payload after an empty run"
498        );
499        done_rx
500            .recv_timeout(Duration::from_secs(1))
501            .expect("cleanup background run never finished");
502    }
503
504    #[test]
505    fn background_respects_recorded_ready_time() {
506        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
507        let (context, clock_mock) = CuContext::new_mock_clock();
508
509        // Install the control channels for the task.
510        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
511        let (done_tx, done_rx) = mpsc::channel::<()>();
512        READY_RX
513            .set(Arc::new(Mutex::new(ready_rx)))
514            .expect("ready channel already set");
515        DONE_TX
516            .set(done_tx)
517            .expect("completion channel already set");
518
519        let mut async_task: CuAsyncTask<ControlledTask, u32> =
520            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
521        let input = CuMsg::new(Some(1u32));
522        let mut output = CuMsg::new(None);
523
524        // Copperlist 0: kick off processing, nothing ready yet.
525        clock_mock.set_value(0);
526        async_task.process(&context, &input, &mut output).unwrap();
527        assert!(output.payload().is_none());
528
529        // Copperlist 1 at time 10: still running in the background.
530        clock_mock.set_value(10);
531        async_task.process(&context, &input, &mut output).unwrap();
532        assert!(output.payload().is_none());
533
534        // The background thread finishes at time 30 (recorded in metadata).
535        clock_mock.set_value(30);
536        ready_tx.send(CuTime::from(30u64)).unwrap();
537        done_rx
538            .recv_timeout(Duration::from_secs(1))
539            .expect("background task never finished");
540        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
541        let mut ready_at_recorded = None;
542        for _ in 0..100 {
543            let state = async_task.state.lock().unwrap();
544            if !state.processing {
545                ready_at_recorded = state.ready_at;
546                if ready_at_recorded.is_some() {
547                    break;
548                }
549            }
550            drop(state);
551            std::thread::sleep(Duration::from_millis(1));
552        }
553        assert!(
554            ready_at_recorded.is_some(),
555            "background task finished without recording ready_at"
556        );
557
558        // Replay earlier than the recorded end time: the output should be held back.
559        clock_mock.set_value(20);
560        async_task.process(&context, &input, &mut output).unwrap();
561        assert!(
562            output.payload().is_none(),
563            "Output surfaced before recorded ready time"
564        );
565
566        // Once the mock clock reaches the recorded end time, the result is released.
567        clock_mock.set_value(30);
568        async_task.process(&context, &input, &mut output).unwrap();
569        assert_eq!(output.payload(), Some(&30u32));
570
571        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
572        ready_tx.send(CuTime::from(40u64)).unwrap();
573        let _ = done_rx.recv_timeout(Duration::from_secs(1));
574    }
575}