Skip to main content

cu29_runtime/
cutask.rs

1//! This module contains all the main definition of the traits you need to implement
2//! or interact with to create a Copper task.
3
4use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18    COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19    ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27/// The state of a task.
28// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
29#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31    Default
32    + Debug
33    + Clone
34    + Encode
35    + Decode<()>
36    + Serialize
37    + DeserializeOwned
38    + Reflect
39    + TypePath
40    + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46    Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52// Also anything that follows this contract can be a payload (blanket implementation)
53#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55    T: Default
56        + Debug
57        + Clone
58        + Encode
59        + Decode<()>
60        + Serialize
61        + DeserializeOwned
62        + Reflect
63        + TypePath
64        + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70    T: Default
71        + Debug
72        + Clone
73        + Encode
74        + Decode<()>
75        + Serialize
76        + DeserializeOwned
77        + Reflect
78        + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83    ($($name:ident),+) => {
84        impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85        where
86            $($name: CuMsgPayload),+
87        {}
88    };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92    ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93        impl_cu_msg_pack!($first, $second);
94        impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95    };
96    (@accumulate ($($acc:ident),+);) => {};
97    (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98        impl_cu_msg_pack!($($acc),+, $next);
99        impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100    };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108// Apply the macro to generate implementations for tuple sizes up to 12.
109impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111// A convenience macro to get from a payload or a list of payloads to a proper CuMsg or CuMsgPack
112// declaration for your tasks used for input messages.
113#[macro_export]
114macro_rules! input_msg {
115    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116        ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117    };
118    ($lt:lifetime, $ty:ty) => {
119        CuMsg<$ty>   // This is for backward compatibility
120    };
121    ($ty:ty) => {
122        CuMsg<$ty>
123    };
124}
125
126// A convenience macro to get from a payload to a proper CuMsg used as output.
127#[macro_export]
128macro_rules! output_msg {
129    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
130        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131    };
132    ($first:ty, $($rest:ty),+) => {
133        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
134    };
135    ($ty:ty) => {
136        CuMsg<$ty>
137    };
138    ($lt:lifetime, $ty:ty) => {
139        CuMsg<$ty>  // This is for backward compatibility
140    };
141}
142
143/// CuMsgMetadata is a structure that contains metadata common to all CuStampedDataSet.
144#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
145#[reflect(opaque, from_reflect = false, no_field_bounds)]
146pub struct CuMsgMetadata {
147    /// The time range used for the processing of this message
148    pub process_time: PartialCuTimeRange,
149    /// A small string for real time feedback purposes.
150    /// This is useful for to display on the field when the tasks are operating correctly.
151    pub status_txt: CuCompactString,
152    /// Remote Copper provenance captured on receive, when available.
153    pub origin: Option<CuMsgOrigin>,
154}
155
156impl Metadata for CuMsgMetadata {}
157
158impl CuMsgMetadata {
159    pub fn set_status(&mut self, status: impl ToCompactString) {
160        self.status_txt = CuCompactString(status.to_compact_string());
161    }
162
163    pub fn set_origin(&mut self, origin: CuMsgOrigin) {
164        self.origin = Some(origin);
165    }
166
167    pub fn clear_origin(&mut self) {
168        self.origin = None;
169    }
170}
171
172impl CuMsgMetadataTrait for CuMsgMetadata {
173    fn process_time(&self) -> PartialCuTimeRange {
174        self.process_time
175    }
176
177    fn status_txt(&self) -> &CuCompactString {
178        &self.status_txt
179    }
180
181    fn origin(&self) -> Option<&CuMsgOrigin> {
182        self.origin.as_ref()
183    }
184}
185
186impl Display for CuMsgMetadata {
187    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
188        write!(
189            f,
190            "process_time start: {}, process_time end: {}",
191            self.process_time.start, self.process_time.end
192        )
193    }
194}
195
196/// CuMsg is the envelope holding the msg payload and the metadata between tasks.
197#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
198#[reflect(opaque, from_reflect = false, no_field_bounds)]
199#[serde(bound(
200    serialize = "T: Serialize, M: Serialize",
201    deserialize = "T: DeserializeOwned, M: DeserializeOwned"
202))]
203pub struct CuStampedData<T, M>
204where
205    T: CuMsgPayload,
206    M: Metadata,
207{
208    /// This payload is the actual data exchanged between tasks.
209    payload: Option<T>,
210
211    /// The time of validity of the message.
212    /// It can be undefined (None), one measure point or a range of measures (TimeRange).
213    pub tov: Tov,
214
215    /// This metadata is the data that is common to all messages.
216    pub metadata: M,
217}
218
219impl<T, M> Encode for CuStampedData<T, M>
220where
221    T: CuMsgPayload,
222    M: Metadata,
223{
224    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
225        match &self.payload {
226            None => {
227                0u8.encode(encoder)?;
228            }
229            Some(payload) => {
230                1u8.encode(encoder)?;
231                let encoded_start = cu29_traits::observed_encode_bytes();
232                let handle_start = crate::monitoring::current_payload_handle_bytes();
233                payload.encode(encoder)?;
234                let encoded_bytes =
235                    cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
236                let handle_bytes =
237                    crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
238                crate::monitoring::record_current_slot_payload_io_stats(
239                    core::mem::size_of::<T>(),
240                    encoded_bytes,
241                    handle_bytes,
242                );
243            }
244        }
245        self.tov.encode(encoder)?;
246        self.metadata.encode(encoder)?;
247        Ok(())
248    }
249}
250
251impl Default for CuMsgMetadata {
252    fn default() -> Self {
253        CuMsgMetadata {
254            process_time: PartialCuTimeRange::default(),
255            status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
256            origin: None,
257        }
258    }
259}
260
261impl<T, M> CuStampedData<T, M>
262where
263    T: CuMsgPayload,
264    M: Metadata,
265{
266    pub fn new(payload: Option<T>) -> Self {
267        CuStampedData {
268            payload,
269            tov: Tov::default(),
270            metadata: M::default(),
271        }
272    }
273    pub fn payload(&self) -> Option<&T> {
274        self.payload.as_ref()
275    }
276
277    pub fn set_payload(&mut self, payload: T) {
278        self.payload = Some(payload);
279    }
280
281    pub fn clear_payload(&mut self) {
282        self.payload = None;
283    }
284
285    pub fn payload_mut(&mut self) -> &mut Option<T> {
286        &mut self.payload
287    }
288}
289
290impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
291where
292    T: CuMsgPayload,
293    M: CuMsgMetadataTrait + Metadata,
294{
295    fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
296        self.payload
297            .as_ref()
298            .map(|p| p as &dyn erased_serde::Serialize)
299    }
300
301    #[cfg(feature = "reflect")]
302    fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
303        self.payload
304            .as_ref()
305            .map(|p| p as &dyn cu29_traits::Reflect)
306    }
307
308    fn tov(&self) -> Tov {
309        self.tov
310    }
311
312    fn metadata(&self) -> &dyn CuMsgMetadataTrait {
313        &self.metadata
314    }
315}
316
317/// This is the robotics message type for Copper with the correct Metadata type
318/// that will be used by the runtime.
319pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
320
321impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
322    /// Reinterprets the payload type carried by this message.
323    ///
324    /// # Safety
325    ///
326    /// The caller must guarantee that the message really contains a payload of type `U`. Failing
327    /// to do so is undefined behaviour.
328    pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
329        // SAFETY: Caller guarantees that the underlying payload is of type U.
330        unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
331    }
332
333    /// Mutable variant of [`assume_payload`](Self::assume_payload).
334    ///
335    /// # Safety
336    ///
337    /// The caller must guarantee that mutating the returned message is sound for the actual
338    /// payload type stored in the buffer.
339    pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
340        // SAFETY: Caller guarantees that the underlying payload is of type U.
341        unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
342    }
343}
344
345impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
346    fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
347        CuError::from(format!(
348            "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
349            type_name::<T>(),
350            type_name::<U>()
351        ))
352    }
353
354    /// Attempts to view this message as carrying payload `U`.
355    pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
356        if TypeId::of::<T>() == TypeId::of::<U>() {
357            // SAFETY: We just proved that T == U.
358            Ok(unsafe { self.assume_payload::<U>() })
359        } else {
360            Err(Self::downcast_err::<U>())
361        }
362    }
363
364    /// Mutable variant of [`downcast_ref`](Self::downcast_ref).
365    pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
366        if TypeId::of::<T>() == TypeId::of::<U>() {
367            // SAFETY: We just proved that T == U.
368            Ok(unsafe { self.assume_payload_mut::<U>() })
369        } else {
370            Err(Self::downcast_err::<U>())
371        }
372    }
373}
374
375/// The internal state of a task needs to be serializable
376/// so the framework can take a snapshot of the task graph.
377pub trait Freezable {
378    /// This method is called by the framework when it wants to save the task state.
379    /// The default implementation is to encode nothing (stateless).
380    /// If you have a state, you need to implement this method.
381    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
382        Encode::encode(&(), encoder) // default is stateless
383    }
384
385    /// This method is called by the framework when it wants to restore the task to a specific state.
386    /// Here it is similar to Decode but the framework will give you a new instance of the task (the new method will be called)
387    fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
388        Ok(())
389    }
390}
391
392/// Bincode Adapter for Freezable tasks
393/// This allows the use of the bincode API directly to freeze and thaw tasks.
394pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
395
396impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
397    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
398        self.0.freeze(encoder)
399    }
400}
401
402/// A Src Task is a task that only produces messages. For example drivers for sensors are Src Tasks.
403/// They are in push mode from the runtime.
404/// To set the frequency of the pulls and align them to any hw, see the runtime configuration.
405/// Note: A source has the privilege to have a clock passed to it vs a frozen clock.
406pub trait CuSrcTask: Freezable + Reflect {
407    type Output<'m>: CuMsgPayload;
408    /// Resources required by the task.
409    type Resources<'r>;
410
411    /// Here you need to initialize everything your task will need for the duration of its lifetime.
412    /// The config allows you to access the configuration of the task.
413    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
414    where
415        Self: Sized;
416
417    /// Start is called between the creation of the task and the first call to pre/process.
418    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
419        Ok(())
420    }
421
422    /// This is a method called by the runtime before "process". This is a kind of best effort,
423    /// as soon as possible call to give a chance for the task to do some work before to prepare
424    /// to make "process" as short as possible.
425    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
426        Ok(())
427    }
428
429    /// Process is the most critical execution of the task.
430    /// The goal will be to produce the output message as soon as possible.
431    /// Use preprocess to prepare the task to make this method as short as possible.
432    fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
433
434    /// This is a method called by the runtime after "process". It is best effort a chance for
435    /// the task to update some state after process is out of the way.
436    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
437    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
438        Ok(())
439    }
440
441    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
442    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
443        Ok(())
444    }
445}
446
447/// This is the most generic Task of copper. It is a "transform" task deriving an output from an input.
448pub trait CuTask: Freezable + Reflect {
449    type Input<'m>: CuMsgPack;
450    type Output<'m>: CuMsgPayload;
451    /// Resources required by the task.
452    type Resources<'r>;
453
454    /// Here you need to initialize everything your task will need for the duration of its lifetime.
455    /// The config allows you to access the configuration of the task.
456    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
457    where
458        Self: Sized;
459
460    /// Start is called between the creation of the task and the first call to pre/process.
461    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
462        Ok(())
463    }
464
465    /// This is a method called by the runtime before "process". This is a kind of best effort,
466    /// as soon as possible call to give a chance for the task to do some work before to prepare
467    /// to make "process" as short as possible.
468    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
469        Ok(())
470    }
471
472    /// Process is the most critical execution of the task.
473    /// The goal will be to produce the output message as soon as possible.
474    /// Use preprocess to prepare the task to make this method as short as possible.
475    fn process<'i, 'o>(
476        &mut self,
477        _ctx: &CuContext,
478        input: &Self::Input<'i>,
479        output: &mut Self::Output<'o>,
480    ) -> CuResult<()>;
481
482    /// This is a method called by the runtime after "process". It is best effort a chance for
483    /// the task to update some state after process is out of the way.
484    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
485    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
486        Ok(())
487    }
488
489    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
490    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
491        Ok(())
492    }
493}
494
495/// A Sink Task is a task that only consumes messages. For example drivers for actuators are Sink Tasks.
496pub trait CuSinkTask: Freezable + Reflect {
497    type Input<'m>: CuMsgPack;
498    /// Resources required by the task.
499    type Resources<'r>;
500
501    /// Here you need to initialize everything your task will need for the duration of its lifetime.
502    /// The config allows you to access the configuration of the task.
503    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
504    where
505        Self: Sized;
506
507    /// Start is called between the creation of the task and the first call to pre/process.
508    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
509        Ok(())
510    }
511
512    /// This is a method called by the runtime before "process". This is a kind of best effort,
513    /// as soon as possible call to give a chance for the task to do some work before to prepare
514    /// to make "process" as short as possible.
515    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
516        Ok(())
517    }
518
519    /// Process is the most critical execution of the task.
520    /// The goal will be to produce the output message as soon as possible.
521    /// Use preprocess to prepare the task to make this method as short as possible.
522    fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
523
524    /// This is a method called by the runtime after "process". It is best effort a chance for
525    /// the task to update some state after process is out of the way.
526    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
527    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
528        Ok(())
529    }
530
531    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
532    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
533        Ok(())
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use bincode::{config, decode_from_slice, encode_to_vec};
541
542    #[test]
543    fn test_cucompactstr_encode_decode() {
544        let cstr = CuCompactString(CompactString::from("hello"));
545        let config = config::standard();
546        let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
547        let (decoded, _): (CuCompactString, usize) =
548            decode_from_slice(&encoded, config).expect("Decoding failed");
549        assert_eq!(cstr.0, decoded.0);
550    }
551}