1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use cu29_clock::{CuTime, RobotClock};
4use cu29_traits::CuResult;
5use rayon::ThreadPool;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8struct AsyncState {
9 processing: bool,
10 ready_at: Option<CuTime>,
11}
12
13pub struct CuAsyncTask<T, O>
14where
15 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
16 O: CuMsgPayload + Send + 'static,
17{
18 task: Arc<Mutex<T>>,
19 output: Arc<Mutex<CuMsg<O>>>,
20 state: Arc<Mutex<AsyncState>>,
21 tp: Arc<ThreadPool>,
22}
23
24pub struct CuAsyncTaskResources<'r, T: CuTask> {
26 pub inner: T::Resources<'r>,
27 pub threadpool: Arc<ThreadPool>,
28}
29
30impl<T, O> CuAsyncTask<T, O>
31where
32 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
33 O: CuMsgPayload + Send + 'static,
34{
35 #[allow(unused)]
36 pub fn new(
37 config: Option<&ComponentConfig>,
38 resources: T::Resources<'_>,
39 tp: Arc<ThreadPool>,
40 ) -> CuResult<Self> {
41 let task = Arc::new(Mutex::new(T::new(config, resources)?));
42 let output = Arc::new(Mutex::new(CuMsg::default()));
43 Ok(Self {
44 task,
45 output,
46 state: Arc::new(Mutex::new(AsyncState {
47 processing: false,
48 ready_at: None,
49 })),
50 tp,
51 })
52 }
53}
54
55impl<T, O> Freezable for CuAsyncTask<T, O>
56where
57 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
58 O: CuMsgPayload + Send + 'static,
59{
60}
61
62impl<T, I, O> CuTask for CuAsyncTask<T, O>
63where
64 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
65 I: CuMsgPayload + Send + Sync + 'static,
66 O: CuMsgPayload + Send + 'static,
67{
68 type Resources<'r> = CuAsyncTaskResources<'r, T>;
69 type Input<'m> = T::Input<'m>;
70 type Output<'m> = T::Output<'m>;
71
72 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
73 where
74 Self: Sized,
75 {
76 let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
77 let output = Arc::new(Mutex::new(CuMsg::default()));
78 Ok(Self {
79 task,
80 output,
81 state: Arc::new(Mutex::new(AsyncState {
82 processing: false,
83 ready_at: None,
84 })),
85 tp: resources.threadpool,
86 })
87 }
88
89 fn process<'i, 'o>(
90 &mut self,
91 clock: &RobotClock,
92 input: &Self::Input<'i>,
93 real_output: &mut Self::Output<'o>,
94 ) -> CuResult<()> {
95 {
96 let mut state = self.state.lock().unwrap();
97 if state.processing {
98 return Ok(());
100 }
101
102 if let Some(ready_at) = state.ready_at
103 && clock.now() < ready_at
104 {
105 return Ok(());
107 }
108
109 state.processing = true;
111 state.ready_at = None;
112 }
113
114 let buffered_output = self.output.lock().unwrap();
116 *real_output = buffered_output.clone();
117
118 self.tp.spawn_fifo({
120 let clock = clock.clone();
121 let input = (*input).clone();
122 let output = self.output.clone();
123 let task = self.task.clone();
124 let state = self.state.clone();
125 move || {
126 let input_ref: &CuMsg<I> = &input;
127 let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
128
129 let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
131 let output_ref: &mut MutexGuard<CuMsg<O>> =
132 unsafe { std::mem::transmute(&mut output) };
133
134 if output_ref.metadata.process_time.start.is_none() {
136 output_ref.metadata.process_time.start = clock.now().into();
137 }
138 task.lock()
139 .unwrap()
140 .process(&clock, input_ref, output_ref)
141 .unwrap();
142 let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
143 let end_time = end_from_metadata.unwrap_or_else(|| {
144 let now = clock.now();
145 output_ref.metadata.process_time.end = now.into();
146 now
147 });
148
149 let mut guard = state.lock().unwrap();
150 guard.processing = false; guard.ready_at = Some(end_time);
152 }
153 });
154 Ok(())
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::config::ComponentConfig;
162 use crate::cutask::CuMsg;
163 use crate::cutask::Freezable;
164 use crate::input_msg;
165 use crate::output_msg;
166 use cu29_clock::RobotClock;
167 use cu29_traits::CuResult;
168 use rayon::ThreadPoolBuilder;
169 use std::borrow::BorrowMut;
170 use std::sync::OnceLock;
171 use std::sync::mpsc;
172 use std::time::Duration;
173
174 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
175 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
176 struct TestTask {}
177
178 impl Freezable for TestTask {}
179
180 impl CuTask for TestTask {
181 type Resources<'r> = ();
182 type Input<'m> = input_msg!(u32);
183 type Output<'m> = output_msg!(u32);
184
185 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
186 where
187 Self: Sized,
188 {
189 Ok(Self {})
190 }
191
192 fn process(
193 &mut self,
194 _clock: &RobotClock,
195 input: &Self::Input<'_>,
196 output: &mut Self::Output<'_>,
197 ) -> CuResult<()> {
198 output.borrow_mut().set_payload(*input.payload().unwrap());
199 Ok(())
200 }
201 }
202
203 #[test]
204 fn test_lifecycle() {
205 let tp = Arc::new(
206 rayon::ThreadPoolBuilder::new()
207 .num_threads(1)
208 .build()
209 .unwrap(),
210 );
211
212 let config = ComponentConfig::default();
213 let clock = RobotClock::default();
214 let mut async_task: CuAsyncTask<TestTask, u32> =
215 CuAsyncTask::new(Some(&config), (), tp).unwrap();
216 let input = CuMsg::new(Some(42u32));
217 let mut output = CuMsg::new(None);
218
219 loop {
220 {
221 let output_ref: &mut CuMsg<u32> = &mut output;
222 async_task.process(&clock, &input, output_ref).unwrap();
223 }
224
225 if let Some(val) = output.payload() {
226 assert_eq!(*val, 42u32);
227 break;
228 }
229 }
230 }
231
232 struct ControlledTask;
233
234 impl Freezable for ControlledTask {}
235
236 impl CuTask for ControlledTask {
237 type Resources<'r> = ();
238 type Input<'m> = input_msg!(u32);
239 type Output<'m> = output_msg!(u32);
240
241 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
242 where
243 Self: Sized,
244 {
245 Ok(Self {})
246 }
247
248 fn process(
249 &mut self,
250 clock: &RobotClock,
251 _input: &Self::Input<'_>,
252 output: &mut Self::Output<'_>,
253 ) -> CuResult<()> {
254 let rx = READY_RX
255 .get()
256 .expect("ready channel not set")
257 .lock()
258 .unwrap();
259 let ready_time = rx
260 .recv_timeout(Duration::from_secs(1))
261 .expect("timed out waiting for ready signal");
262
263 output.set_payload(ready_time.as_nanos() as u32);
264 output.metadata.process_time.start = clock.now().into();
265 output.metadata.process_time.end = ready_time.into();
266
267 if let Some(done_tx) = DONE_TX.get() {
268 let _ = done_tx.send(());
269 }
270 Ok(())
271 }
272 }
273
274 #[test]
275 fn background_respects_recorded_ready_time() {
276 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
277 let (clock, clock_mock) = RobotClock::mock();
278
279 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
281 let (done_tx, done_rx) = mpsc::channel::<()>();
282 READY_RX
283 .set(Arc::new(Mutex::new(ready_rx)))
284 .expect("ready channel already set");
285 DONE_TX
286 .set(done_tx)
287 .expect("completion channel already set");
288
289 let mut async_task: CuAsyncTask<ControlledTask, u32> =
290 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
291 let input = CuMsg::new(Some(1u32));
292 let mut output = CuMsg::new(None);
293
294 clock_mock.set_value(0);
296 async_task.process(&clock, &input, &mut output).unwrap();
297 assert!(output.payload().is_none());
298
299 clock_mock.set_value(10);
301 async_task.process(&clock, &input, &mut output).unwrap();
302 assert!(output.payload().is_none());
303
304 clock_mock.set_value(30);
306 ready_tx.send(CuTime::from(30u64)).unwrap();
307 done_rx
308 .recv_timeout(Duration::from_secs(1))
309 .expect("background task never finished");
310 let mut ready_at_recorded = None;
312 for _ in 0..100 {
313 let state = async_task.state.lock().unwrap();
314 if !state.processing {
315 ready_at_recorded = state.ready_at;
316 if ready_at_recorded.is_some() {
317 break;
318 }
319 }
320 drop(state);
321 std::thread::sleep(Duration::from_millis(1));
322 }
323 assert!(
324 ready_at_recorded.is_some(),
325 "background task finished without recording ready_at"
326 );
327
328 clock_mock.set_value(20);
330 async_task.process(&clock, &input, &mut output).unwrap();
331 assert!(
332 output.payload().is_none(),
333 "Output surfaced before recorded ready time"
334 );
335
336 clock_mock.set_value(30);
338 async_task.process(&clock, &input, &mut output).unwrap();
339 assert_eq!(output.payload(), Some(&30u32));
340
341 ready_tx.send(CuTime::from(40u64)).unwrap();
343 let _ = done_rx.recv_timeout(Duration::from_secs(1));
344 }
345}