cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use cu29_clock::{CuTime, RobotClock};
4use cu29_traits::CuResult;
5use rayon::ThreadPool;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8struct AsyncState {
9    processing: bool,
10    ready_at: Option<CuTime>,
11}
12
13pub struct CuAsyncTask<T, O>
14where
15    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
16    O: CuMsgPayload + Send + 'static,
17{
18    task: Arc<Mutex<T>>,
19    output: Arc<Mutex<CuMsg<O>>>,
20    state: Arc<Mutex<AsyncState>>,
21    tp: Arc<ThreadPool>,
22}
23
24/// Resource bundle required by a backgrounded task.
25pub struct CuAsyncTaskResources<'r, T: CuTask> {
26    pub inner: T::Resources<'r>,
27    pub threadpool: Arc<ThreadPool>,
28}
29
30impl<T, O> CuAsyncTask<T, O>
31where
32    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
33    O: CuMsgPayload + Send + 'static,
34{
35    #[allow(unused)]
36    pub fn new(
37        config: Option<&ComponentConfig>,
38        resources: T::Resources<'_>,
39        tp: Arc<ThreadPool>,
40    ) -> CuResult<Self> {
41        let task = Arc::new(Mutex::new(T::new(config, resources)?));
42        let output = Arc::new(Mutex::new(CuMsg::default()));
43        Ok(Self {
44            task,
45            output,
46            state: Arc::new(Mutex::new(AsyncState {
47                processing: false,
48                ready_at: None,
49            })),
50            tp,
51        })
52    }
53}
54
55impl<T, O> Freezable for CuAsyncTask<T, O>
56where
57    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
58    O: CuMsgPayload + Send + 'static,
59{
60}
61
62impl<T, I, O> CuTask for CuAsyncTask<T, O>
63where
64    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
65    I: CuMsgPayload + Send + Sync + 'static,
66    O: CuMsgPayload + Send + 'static,
67{
68    type Resources<'r> = CuAsyncTaskResources<'r, T>;
69    type Input<'m> = T::Input<'m>;
70    type Output<'m> = T::Output<'m>;
71
72    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
73    where
74        Self: Sized,
75    {
76        let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
77        let output = Arc::new(Mutex::new(CuMsg::default()));
78        Ok(Self {
79            task,
80            output,
81            state: Arc::new(Mutex::new(AsyncState {
82                processing: false,
83                ready_at: None,
84            })),
85            tp: resources.threadpool,
86        })
87    }
88
89    fn process<'i, 'o>(
90        &mut self,
91        clock: &RobotClock,
92        input: &Self::Input<'i>,
93        real_output: &mut Self::Output<'o>,
94    ) -> CuResult<()> {
95        {
96            let mut state = self.state.lock().unwrap();
97            if state.processing {
98                // background task still running
99                return Ok(());
100            }
101
102            if let Some(ready_at) = state.ready_at
103                && clock.now() < ready_at
104            {
105                // result not yet allowed to surface based on recorded completion time
106                return Ok(());
107            }
108
109            // mark as processing before spawning the next job
110            state.processing = true;
111            state.ready_at = None;
112        }
113
114        // clone the last finished output (if any) as the visible result for this polling round
115        let buffered_output = self.output.lock().unwrap();
116        *real_output = buffered_output.clone();
117
118        // immediately requeue a task based on the new input
119        self.tp.spawn_fifo({
120            let clock = clock.clone();
121            let input = (*input).clone();
122            let output = self.output.clone();
123            let task = self.task.clone();
124            let state = self.state.clone();
125            move || {
126                let input_ref: &CuMsg<I> = &input;
127                let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
128
129                // Safety: because copied the input and output, their lifetime are bound to the task and we control its lifetime.
130                let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
131                let output_ref: &mut MutexGuard<CuMsg<O>> =
132                    unsafe { std::mem::transmute(&mut output) };
133
134                // Track the actual processing interval so replay can honor it.
135                if output_ref.metadata.process_time.start.is_none() {
136                    output_ref.metadata.process_time.start = clock.now().into();
137                }
138                task.lock()
139                    .unwrap()
140                    .process(&clock, input_ref, output_ref)
141                    .unwrap();
142                let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
143                let end_time = end_from_metadata.unwrap_or_else(|| {
144                    let now = clock.now();
145                    output_ref.metadata.process_time.end = now.into();
146                    now
147                });
148
149                let mut guard = state.lock().unwrap();
150                guard.processing = false; // Mark processing as done
151                guard.ready_at = Some(end_time);
152            }
153        });
154        Ok(())
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::config::ComponentConfig;
162    use crate::cutask::CuMsg;
163    use crate::cutask::Freezable;
164    use crate::input_msg;
165    use crate::output_msg;
166    use cu29_clock::RobotClock;
167    use cu29_traits::CuResult;
168    use rayon::ThreadPoolBuilder;
169    use std::borrow::BorrowMut;
170    use std::sync::OnceLock;
171    use std::sync::mpsc;
172    use std::time::Duration;
173
174    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
175    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
176    struct TestTask {}
177
178    impl Freezable for TestTask {}
179
180    impl CuTask for TestTask {
181        type Resources<'r> = ();
182        type Input<'m> = input_msg!(u32);
183        type Output<'m> = output_msg!(u32);
184
185        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
186        where
187            Self: Sized,
188        {
189            Ok(Self {})
190        }
191
192        fn process(
193            &mut self,
194            _clock: &RobotClock,
195            input: &Self::Input<'_>,
196            output: &mut Self::Output<'_>,
197        ) -> CuResult<()> {
198            output.borrow_mut().set_payload(*input.payload().unwrap());
199            Ok(())
200        }
201    }
202
203    #[test]
204    fn test_lifecycle() {
205        let tp = Arc::new(
206            rayon::ThreadPoolBuilder::new()
207                .num_threads(1)
208                .build()
209                .unwrap(),
210        );
211
212        let config = ComponentConfig::default();
213        let clock = RobotClock::default();
214        let mut async_task: CuAsyncTask<TestTask, u32> =
215            CuAsyncTask::new(Some(&config), (), tp).unwrap();
216        let input = CuMsg::new(Some(42u32));
217        let mut output = CuMsg::new(None);
218
219        loop {
220            {
221                let output_ref: &mut CuMsg<u32> = &mut output;
222                async_task.process(&clock, &input, output_ref).unwrap();
223            }
224
225            if let Some(val) = output.payload() {
226                assert_eq!(*val, 42u32);
227                break;
228            }
229        }
230    }
231
232    struct ControlledTask;
233
234    impl Freezable for ControlledTask {}
235
236    impl CuTask for ControlledTask {
237        type Resources<'r> = ();
238        type Input<'m> = input_msg!(u32);
239        type Output<'m> = output_msg!(u32);
240
241        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
242        where
243            Self: Sized,
244        {
245            Ok(Self {})
246        }
247
248        fn process(
249            &mut self,
250            clock: &RobotClock,
251            _input: &Self::Input<'_>,
252            output: &mut Self::Output<'_>,
253        ) -> CuResult<()> {
254            let rx = READY_RX
255                .get()
256                .expect("ready channel not set")
257                .lock()
258                .unwrap();
259            let ready_time = rx
260                .recv_timeout(Duration::from_secs(1))
261                .expect("timed out waiting for ready signal");
262
263            output.set_payload(ready_time.as_nanos() as u32);
264            output.metadata.process_time.start = clock.now().into();
265            output.metadata.process_time.end = ready_time.into();
266
267            if let Some(done_tx) = DONE_TX.get() {
268                let _ = done_tx.send(());
269            }
270            Ok(())
271        }
272    }
273
274    #[test]
275    fn background_respects_recorded_ready_time() {
276        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
277        let (clock, clock_mock) = RobotClock::mock();
278
279        // Install the control channels for the task.
280        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
281        let (done_tx, done_rx) = mpsc::channel::<()>();
282        READY_RX
283            .set(Arc::new(Mutex::new(ready_rx)))
284            .expect("ready channel already set");
285        DONE_TX
286            .set(done_tx)
287            .expect("completion channel already set");
288
289        let mut async_task: CuAsyncTask<ControlledTask, u32> =
290            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
291        let input = CuMsg::new(Some(1u32));
292        let mut output = CuMsg::new(None);
293
294        // Copperlist 0: kick off processing, nothing ready yet.
295        clock_mock.set_value(0);
296        async_task.process(&clock, &input, &mut output).unwrap();
297        assert!(output.payload().is_none());
298
299        // Copperlist 1 at time 10: still running in the background.
300        clock_mock.set_value(10);
301        async_task.process(&clock, &input, &mut output).unwrap();
302        assert!(output.payload().is_none());
303
304        // The background thread finishes at time 30 (recorded in metadata).
305        clock_mock.set_value(30);
306        ready_tx.send(CuTime::from(30u64)).unwrap();
307        done_rx
308            .recv_timeout(Duration::from_secs(1))
309            .expect("background task never finished");
310        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
311        let mut ready_at_recorded = None;
312        for _ in 0..100 {
313            let state = async_task.state.lock().unwrap();
314            if !state.processing {
315                ready_at_recorded = state.ready_at;
316                if ready_at_recorded.is_some() {
317                    break;
318                }
319            }
320            drop(state);
321            std::thread::sleep(Duration::from_millis(1));
322        }
323        assert!(
324            ready_at_recorded.is_some(),
325            "background task finished without recording ready_at"
326        );
327
328        // Replay earlier than the recorded end time: the output should be held back.
329        clock_mock.set_value(20);
330        async_task.process(&clock, &input, &mut output).unwrap();
331        assert!(
332            output.payload().is_none(),
333            "Output surfaced before recorded ready time"
334        );
335
336        // Once the mock clock reaches the recorded end time, the result is released.
337        clock_mock.set_value(30);
338        async_task.process(&clock, &input, &mut output).unwrap();
339        assert_eq!(output.payload(), Some(&30u32));
340
341        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
342        ready_tx.send(CuTime::from(40u64)).unwrap();
343        let _ = done_rx.recv_timeout(Duration::from_secs(1));
344    }
345}