Skip to main content

cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use crate::reflect::{Reflect, TypePath};
4use cu29_clock::{CuTime, RobotClock};
5use cu29_traits::{CuError, CuResult};
6use rayon::ThreadPool;
7use std::sync::{Arc, Mutex};
8
9struct AsyncState {
10    processing: bool,
11    ready_at: Option<CuTime>,
12    last_error: Option<CuError>,
13}
14
15fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
16    let mut guard = match state.lock() {
17        Ok(guard) => guard,
18        Err(poison) => poison.into_inner(),
19    };
20    guard.processing = false;
21    guard.ready_at = None;
22    guard.last_error = Some(error);
23}
24
25#[derive(Reflect)]
26#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
27pub struct CuAsyncTask<T, O>
28where
29    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
30    O: CuMsgPayload + Send + 'static,
31{
32    #[reflect(ignore)]
33    task: Arc<Mutex<T>>,
34    #[reflect(ignore)]
35    output: Arc<Mutex<CuMsg<O>>>,
36    #[reflect(ignore)]
37    state: Arc<Mutex<AsyncState>>,
38    #[reflect(ignore)]
39    tp: Arc<ThreadPool>,
40}
41
42impl<T, O> TypePath for CuAsyncTask<T, O>
43where
44    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
45    O: CuMsgPayload + Send + 'static,
46{
47    fn type_path() -> &'static str {
48        "cu29_runtime::cuasynctask::CuAsyncTask"
49    }
50
51    fn short_type_path() -> &'static str {
52        "CuAsyncTask"
53    }
54
55    fn type_ident() -> Option<&'static str> {
56        Some("CuAsyncTask")
57    }
58
59    fn crate_name() -> Option<&'static str> {
60        Some("cu29_runtime")
61    }
62
63    fn module_path() -> Option<&'static str> {
64        Some("cuasynctask")
65    }
66}
67
68/// Resource bundle required by a backgrounded task.
69pub struct CuAsyncTaskResources<'r, T: CuTask> {
70    pub inner: T::Resources<'r>,
71    pub threadpool: Arc<ThreadPool>,
72}
73
74impl<T, O> CuAsyncTask<T, O>
75where
76    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
77    O: CuMsgPayload + Send + 'static,
78{
79    #[allow(unused)]
80    pub fn new(
81        config: Option<&ComponentConfig>,
82        resources: T::Resources<'_>,
83        tp: Arc<ThreadPool>,
84    ) -> CuResult<Self> {
85        let task = Arc::new(Mutex::new(T::new(config, resources)?));
86        let output = Arc::new(Mutex::new(CuMsg::default()));
87        Ok(Self {
88            task,
89            output,
90            state: Arc::new(Mutex::new(AsyncState {
91                processing: false,
92                ready_at: None,
93                last_error: None,
94            })),
95            tp,
96        })
97    }
98}
99
100impl<T, O> Freezable for CuAsyncTask<T, O>
101where
102    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
103    O: CuMsgPayload + Send + 'static,
104{
105}
106
107impl<T, I, O> CuTask for CuAsyncTask<T, O>
108where
109    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
110    I: CuMsgPayload + Send + Sync + 'static,
111    O: CuMsgPayload + Send + 'static,
112{
113    type Resources<'r> = CuAsyncTaskResources<'r, T>;
114    type Input<'m> = T::Input<'m>;
115    type Output<'m> = T::Output<'m>;
116
117    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
118    where
119        Self: Sized,
120    {
121        let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
122        let output = Arc::new(Mutex::new(CuMsg::default()));
123        Ok(Self {
124            task,
125            output,
126            state: Arc::new(Mutex::new(AsyncState {
127                processing: false,
128                ready_at: None,
129                last_error: None,
130            })),
131            tp: resources.threadpool,
132        })
133    }
134
135    fn process<'i, 'o>(
136        &mut self,
137        clock: &RobotClock,
138        input: &Self::Input<'i>,
139        real_output: &mut Self::Output<'o>,
140    ) -> CuResult<()> {
141        {
142            let mut state = self.state.lock().map_err(|_| {
143                CuError::from("Async task state mutex poisoned while scheduling background work")
144            })?;
145            if let Some(error) = state.last_error.take() {
146                return Err(error);
147            }
148            if state.processing {
149                // background task still running
150                return Ok(());
151            }
152
153            if let Some(ready_at) = state.ready_at
154                && clock.now() < ready_at
155            {
156                // result not yet allowed to surface based on recorded completion time
157                return Ok(());
158            }
159
160            // mark as processing before spawning the next job
161            state.processing = true;
162            state.ready_at = None;
163        }
164
165        // clone the last finished output (if any) as the visible result for this polling round
166        let buffered_output = self.output.lock().map_err(|_| {
167            let error = CuError::from("Async task output mutex poisoned");
168            record_async_error(&self.state, error.clone());
169            error
170        })?;
171        *real_output = buffered_output.clone();
172
173        // immediately requeue a task based on the new input
174        self.tp.spawn_fifo({
175            let clock = clock.clone();
176            let input = (*input).clone();
177            let output = self.output.clone();
178            let task = self.task.clone();
179            let state = self.state.clone();
180            move || {
181                let input_ref: &CuMsg<I> = &input;
182                let mut output_guard = match output.lock() {
183                    Ok(guard) => guard,
184                    Err(_) => {
185                        record_async_error(
186                            &state,
187                            CuError::from("Async task output mutex poisoned"),
188                        );
189                        return;
190                    }
191                };
192                let output_ref: &mut CuMsg<O> = &mut output_guard;
193
194                // Track the actual processing interval so replay can honor it.
195                if output_ref.metadata.process_time.start.is_none() {
196                    output_ref.metadata.process_time.start = clock.now().into();
197                }
198                let task_result = match task.lock() {
199                    Ok(mut task_guard) => task_guard.process(&clock, input_ref, output_ref),
200                    Err(poison) => Err(CuError::from(format!(
201                        "Async task mutex poisoned: {poison}"
202                    ))),
203                };
204
205                let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
206                guard.processing = false;
207
208                match task_result {
209                    Ok(()) => {
210                        let end_from_metadata: Option<CuTime> =
211                            output_ref.metadata.process_time.end.into();
212                        let end_time = end_from_metadata.unwrap_or_else(|| {
213                            let now = clock.now();
214                            output_ref.metadata.process_time.end = now.into();
215                            now
216                        });
217                        guard.ready_at = Some(end_time);
218                    }
219                    Err(error) => {
220                        guard.ready_at = None;
221                        guard.last_error = Some(error);
222                    }
223                }
224            }
225        });
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::config::ComponentConfig;
234    use crate::cutask::CuMsg;
235    use crate::cutask::Freezable;
236    use crate::input_msg;
237    use crate::output_msg;
238    use cu29_clock::RobotClock;
239    use cu29_traits::CuResult;
240    use rayon::ThreadPoolBuilder;
241    use std::borrow::BorrowMut;
242    use std::sync::OnceLock;
243    use std::sync::mpsc;
244    use std::time::Duration;
245
246    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
247    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
248    #[derive(Reflect)]
249    struct TestTask {}
250
251    impl Freezable for TestTask {}
252
253    impl CuTask for TestTask {
254        type Resources<'r> = ();
255        type Input<'m> = input_msg!(u32);
256        type Output<'m> = output_msg!(u32);
257
258        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
259        where
260            Self: Sized,
261        {
262            Ok(Self {})
263        }
264
265        fn process(
266            &mut self,
267            _clock: &RobotClock,
268            input: &Self::Input<'_>,
269            output: &mut Self::Output<'_>,
270        ) -> CuResult<()> {
271            output.borrow_mut().set_payload(*input.payload().unwrap());
272            Ok(())
273        }
274    }
275
276    #[test]
277    fn test_lifecycle() {
278        let tp = Arc::new(
279            rayon::ThreadPoolBuilder::new()
280                .num_threads(1)
281                .build()
282                .unwrap(),
283        );
284
285        let config = ComponentConfig::default();
286        let clock = RobotClock::default();
287        let mut async_task: CuAsyncTask<TestTask, u32> =
288            CuAsyncTask::new(Some(&config), (), tp).unwrap();
289        let input = CuMsg::new(Some(42u32));
290        let mut output = CuMsg::new(None);
291
292        loop {
293            {
294                let output_ref: &mut CuMsg<u32> = &mut output;
295                async_task.process(&clock, &input, output_ref).unwrap();
296            }
297
298            if let Some(val) = output.payload() {
299                assert_eq!(*val, 42u32);
300                break;
301            }
302        }
303    }
304
305    #[derive(Reflect)]
306    struct ControlledTask;
307
308    impl Freezable for ControlledTask {}
309
310    impl CuTask for ControlledTask {
311        type Resources<'r> = ();
312        type Input<'m> = input_msg!(u32);
313        type Output<'m> = output_msg!(u32);
314
315        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
316        where
317            Self: Sized,
318        {
319            Ok(Self {})
320        }
321
322        fn process(
323            &mut self,
324            clock: &RobotClock,
325            _input: &Self::Input<'_>,
326            output: &mut Self::Output<'_>,
327        ) -> CuResult<()> {
328            let rx = READY_RX
329                .get()
330                .expect("ready channel not set")
331                .lock()
332                .unwrap();
333            let ready_time = rx
334                .recv_timeout(Duration::from_secs(1))
335                .expect("timed out waiting for ready signal");
336
337            output.set_payload(ready_time.as_nanos() as u32);
338            output.metadata.process_time.start = clock.now().into();
339            output.metadata.process_time.end = ready_time.into();
340
341            if let Some(done_tx) = DONE_TX.get() {
342                let _ = done_tx.send(());
343            }
344            Ok(())
345        }
346    }
347
348    #[test]
349    fn background_respects_recorded_ready_time() {
350        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
351        let (clock, clock_mock) = RobotClock::mock();
352
353        // Install the control channels for the task.
354        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
355        let (done_tx, done_rx) = mpsc::channel::<()>();
356        READY_RX
357            .set(Arc::new(Mutex::new(ready_rx)))
358            .expect("ready channel already set");
359        DONE_TX
360            .set(done_tx)
361            .expect("completion channel already set");
362
363        let mut async_task: CuAsyncTask<ControlledTask, u32> =
364            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
365        let input = CuMsg::new(Some(1u32));
366        let mut output = CuMsg::new(None);
367
368        // Copperlist 0: kick off processing, nothing ready yet.
369        clock_mock.set_value(0);
370        async_task.process(&clock, &input, &mut output).unwrap();
371        assert!(output.payload().is_none());
372
373        // Copperlist 1 at time 10: still running in the background.
374        clock_mock.set_value(10);
375        async_task.process(&clock, &input, &mut output).unwrap();
376        assert!(output.payload().is_none());
377
378        // The background thread finishes at time 30 (recorded in metadata).
379        clock_mock.set_value(30);
380        ready_tx.send(CuTime::from(30u64)).unwrap();
381        done_rx
382            .recv_timeout(Duration::from_secs(1))
383            .expect("background task never finished");
384        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
385        let mut ready_at_recorded = None;
386        for _ in 0..100 {
387            let state = async_task.state.lock().unwrap();
388            if !state.processing {
389                ready_at_recorded = state.ready_at;
390                if ready_at_recorded.is_some() {
391                    break;
392                }
393            }
394            drop(state);
395            std::thread::sleep(Duration::from_millis(1));
396        }
397        assert!(
398            ready_at_recorded.is_some(),
399            "background task finished without recording ready_at"
400        );
401
402        // Replay earlier than the recorded end time: the output should be held back.
403        clock_mock.set_value(20);
404        async_task.process(&clock, &input, &mut output).unwrap();
405        assert!(
406            output.payload().is_none(),
407            "Output surfaced before recorded ready time"
408        );
409
410        // Once the mock clock reaches the recorded end time, the result is released.
411        clock_mock.set_value(30);
412        async_task.process(&clock, &input, &mut output).unwrap();
413        assert_eq!(output.payload(), Some(&30u32));
414
415        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
416        ready_tx.send(CuTime::from(40u64)).unwrap();
417        let _ = done_rx.recv_timeout(Duration::from_secs(1));
418    }
419}