Skip to main content

cu29_runtime/
cutask.rs

1//! This module contains all the main definition of the traits you need to implement
2//! or interact with to create a Copper task.
3
4use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18    COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19    ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27/// The state of a task.
28// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
29#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31    Default
32    + Debug
33    + Clone
34    + Encode
35    + Decode<()>
36    + Serialize
37    + DeserializeOwned
38    + Reflect
39    + TypePath
40    + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46    Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52// Also anything that follows this contract can be a payload (blanket implementation)
53#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55    T: Default
56        + Debug
57        + Clone
58        + Encode
59        + Decode<()>
60        + Serialize
61        + DeserializeOwned
62        + Reflect
63        + TypePath
64        + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70    T: Default
71        + Debug
72        + Clone
73        + Encode
74        + Decode<()>
75        + Serialize
76        + DeserializeOwned
77        + Reflect
78        + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83    ($($name:ident),+) => {
84        impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85        where
86            $($name: CuMsgPayload),+
87        {}
88    };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92    ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93        impl_cu_msg_pack!($first, $second);
94        impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95    };
96    (@accumulate ($($acc:ident),+);) => {};
97    (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98        impl_cu_msg_pack!($($acc),+, $next);
99        impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100    };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108// Apply the macro to generate implementations for tuple sizes up to 12.
109impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111// A convenience macro to get from a payload or a list of payloads to a proper CuMsg or CuMsgPack
112// declaration for your tasks used for input messages.
113#[macro_export]
114macro_rules! input_msg {
115    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116        ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117    };
118    ($ty:ty) => {
119        CuMsg<$ty>
120    };
121}
122
123// A convenience macro to get from a payload to a proper CuMsg used as output.
124#[macro_export]
125macro_rules! output_msg {
126    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
127        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
128    };
129    ($first:ty, $($rest:ty),+) => {
130        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131    };
132    ($ty:ty) => {
133        CuMsg<$ty>
134    };
135}
136
137/// Helper trait used by codegen when Copper needs to treat a task output as a
138/// single message slot without relying on config-declared output edges.
139pub trait CuSingleOutputMsg {
140    type Payload: CuMsgPayload;
141}
142
143impl<T: CuMsgPayload> CuSingleOutputMsg for CuMsg<T> {
144    type Payload = T;
145}
146
147/// CuMsgMetadata is a structure that contains metadata common to all CuStampedDataSet.
148#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
149#[reflect(opaque, from_reflect = false, no_field_bounds)]
150pub struct CuMsgMetadata {
151    /// The time range used for the processing of this message
152    pub process_time: PartialCuTimeRange,
153    /// A small string for real time feedback purposes.
154    /// This is useful for to display on the field when the tasks are operating correctly.
155    pub status_txt: CuCompactString,
156    /// Remote Copper provenance captured on receive, when available.
157    pub origin: Option<CuMsgOrigin>,
158}
159
160impl Metadata for CuMsgMetadata {}
161
162impl CuMsgMetadata {
163    pub fn set_status(&mut self, status: impl ToCompactString) {
164        self.status_txt = CuCompactString(status.to_compact_string());
165    }
166
167    pub fn set_origin(&mut self, origin: CuMsgOrigin) {
168        self.origin = Some(origin);
169    }
170
171    pub fn clear_origin(&mut self) {
172        self.origin = None;
173    }
174}
175
176impl CuMsgMetadataTrait for CuMsgMetadata {
177    fn process_time(&self) -> PartialCuTimeRange {
178        self.process_time
179    }
180
181    fn status_txt(&self) -> &CuCompactString {
182        &self.status_txt
183    }
184
185    fn origin(&self) -> Option<&CuMsgOrigin> {
186        self.origin.as_ref()
187    }
188}
189
190impl Display for CuMsgMetadata {
191    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
192        write!(
193            f,
194            "process_time start: {}, process_time end: {}",
195            self.process_time.start, self.process_time.end
196        )
197    }
198}
199
200/// CuMsg is the envelope holding the msg payload and the metadata between tasks.
201#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
202#[reflect(opaque, from_reflect = false, no_field_bounds)]
203#[serde(bound(
204    serialize = "T: Serialize, M: Serialize",
205    deserialize = "T: DeserializeOwned, M: DeserializeOwned"
206))]
207pub struct CuStampedData<T, M>
208where
209    T: CuMsgPayload,
210    M: Metadata,
211{
212    /// This payload is the actual data exchanged between tasks.
213    payload: Option<T>,
214
215    /// The time of validity of the message.
216    /// It can be undefined (None), one measure point or a range of measures (TimeRange).
217    pub tov: Tov,
218
219    /// This metadata is the data that is common to all messages.
220    pub metadata: M,
221}
222
223impl<T, M> Encode for CuStampedData<T, M>
224where
225    T: CuMsgPayload,
226    M: Metadata,
227{
228    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
229        match &self.payload {
230            None => {
231                0u8.encode(encoder)?;
232            }
233            Some(payload) => {
234                1u8.encode(encoder)?;
235                let encoded_start = cu29_traits::observed_encode_bytes();
236                let handle_start = crate::monitoring::current_payload_handle_bytes();
237                payload.encode(encoder)?;
238                let encoded_bytes =
239                    cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
240                let handle_bytes =
241                    crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
242                crate::monitoring::record_current_slot_payload_io_stats(
243                    core::mem::size_of::<T>(),
244                    encoded_bytes,
245                    handle_bytes,
246                );
247            }
248        }
249        self.tov.encode(encoder)?;
250        self.metadata.encode(encoder)?;
251        Ok(())
252    }
253}
254
255impl Default for CuMsgMetadata {
256    fn default() -> Self {
257        CuMsgMetadata {
258            process_time: PartialCuTimeRange::default(),
259            status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
260            origin: None,
261        }
262    }
263}
264
265impl<T, M> CuStampedData<T, M>
266where
267    T: CuMsgPayload,
268    M: Metadata,
269{
270    pub(crate) fn from_parts(payload: Option<T>, tov: Tov, metadata: M) -> Self {
271        CuStampedData {
272            payload,
273            tov,
274            metadata,
275        }
276    }
277
278    pub fn new(payload: Option<T>) -> Self {
279        Self::from_parts(payload, Tov::default(), M::default())
280    }
281    pub fn payload(&self) -> Option<&T> {
282        self.payload.as_ref()
283    }
284
285    pub fn set_payload(&mut self, payload: T) {
286        self.payload = Some(payload);
287    }
288
289    pub fn clear_payload(&mut self) {
290        self.payload = None;
291    }
292
293    pub fn payload_mut(&mut self) -> &mut Option<T> {
294        &mut self.payload
295    }
296}
297
298impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
299where
300    T: CuMsgPayload,
301    M: CuMsgMetadataTrait + Metadata,
302{
303    fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
304        self.payload
305            .as_ref()
306            .map(|p| p as &dyn erased_serde::Serialize)
307    }
308
309    #[cfg(feature = "reflect")]
310    fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
311        self.payload
312            .as_ref()
313            .map(|p| p as &dyn cu29_traits::Reflect)
314    }
315
316    fn tov(&self) -> Tov {
317        self.tov
318    }
319
320    fn metadata(&self) -> &dyn CuMsgMetadataTrait {
321        &self.metadata
322    }
323}
324
325/// This is the robotics message type for Copper with the correct Metadata type
326/// that will be used by the runtime.
327pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
328
329impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
330    /// Reinterprets the payload type carried by this message.
331    ///
332    /// # Safety
333    ///
334    /// The caller must guarantee that the message really contains a payload of type `U`. Failing
335    /// to do so is undefined behaviour.
336    pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
337        // SAFETY: Caller guarantees that the underlying payload is of type U.
338        unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
339    }
340
341    /// Mutable variant of [`assume_payload`](Self::assume_payload).
342    ///
343    /// # Safety
344    ///
345    /// The caller must guarantee that mutating the returned message is sound for the actual
346    /// payload type stored in the buffer.
347    pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
348        // SAFETY: Caller guarantees that the underlying payload is of type U.
349        unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
350    }
351}
352
353impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
354    fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
355        CuError::from(format!(
356            "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
357            type_name::<T>(),
358            type_name::<U>()
359        ))
360    }
361
362    /// Attempts to view this message as carrying payload `U`.
363    pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
364        if TypeId::of::<T>() == TypeId::of::<U>() {
365            // SAFETY: We just proved that T == U.
366            Ok(unsafe { self.assume_payload::<U>() })
367        } else {
368            Err(Self::downcast_err::<U>())
369        }
370    }
371
372    /// Mutable variant of [`downcast_ref`](Self::downcast_ref).
373    pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
374        if TypeId::of::<T>() == TypeId::of::<U>() {
375            // SAFETY: We just proved that T == U.
376            Ok(unsafe { self.assume_payload_mut::<U>() })
377        } else {
378            Err(Self::downcast_err::<U>())
379        }
380    }
381}
382
383/// The internal state of a task needs to be serializable
384/// so the framework can take a snapshot of the task graph.
385pub trait Freezable {
386    /// This method is called by the framework when it wants to save the task state.
387    /// The default implementation is to encode nothing (stateless).
388    /// If you have a state, you need to implement this method.
389    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
390        Encode::encode(&(), encoder) // default is stateless
391    }
392
393    /// This method is called by the framework when it wants to restore the task to a specific state.
394    /// Here it is similar to Decode but the framework will give you a new instance of the task (the new method will be called)
395    fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
396        Ok(())
397    }
398}
399
400/// Bincode Adapter for Freezable tasks
401/// This allows the use of the bincode API directly to freeze and thaw tasks.
402pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
403
404impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
405    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
406        self.0.freeze(encoder)
407    }
408}
409
410/// A Src Task is a task that only produces messages. For example drivers for sensors are Src Tasks.
411/// They are in push mode from the runtime.
412/// To set the frequency of the pulls and align them to any hw, see the runtime configuration.
413/// Note: A source has the privilege to have a clock passed to it vs a frozen clock.
414pub trait CuSrcTask: Freezable + Reflect {
415    type Output<'m>: CuMsgPayload;
416    /// Resources required by the task.
417    type Resources<'r>;
418
419    /// Here you need to initialize everything your task will need for the duration of its lifetime.
420    /// The config allows you to access the configuration of the task.
421    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
422    where
423        Self: Sized;
424
425    /// Start is called between the creation of the task and the first call to pre/process.
426    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
427        Ok(())
428    }
429
430    /// This is a method called by the runtime before "process". This is a kind of best effort,
431    /// as soon as possible call to give a chance for the task to do some work before to prepare
432    /// to make "process" as short as possible.
433    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
434        Ok(())
435    }
436
437    /// Process is the most critical execution of the task.
438    /// The goal will be to produce the output message as soon as possible.
439    /// Use preprocess to prepare the task to make this method as short as possible.
440    fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
441
442    /// This is a method called by the runtime after "process". It is best effort a chance for
443    /// the task to update some state after process is out of the way.
444    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
445    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
446        Ok(())
447    }
448
449    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
450    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
451        Ok(())
452    }
453}
454
455/// This is the most generic Task of copper. It is a "transform" task deriving an output from an input.
456pub trait CuTask: Freezable + Reflect {
457    type Input<'m>: CuMsgPack;
458    type Output<'m>: CuMsgPayload;
459    /// Resources required by the task.
460    type Resources<'r>;
461
462    /// Here you need to initialize everything your task will need for the duration of its lifetime.
463    /// The config allows you to access the configuration of the task.
464    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
465    where
466        Self: Sized;
467
468    /// Start is called between the creation of the task and the first call to pre/process.
469    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
470        Ok(())
471    }
472
473    /// This is a method called by the runtime before "process". This is a kind of best effort,
474    /// as soon as possible call to give a chance for the task to do some work before to prepare
475    /// to make "process" as short as possible.
476    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
477        Ok(())
478    }
479
480    /// Process is the most critical execution of the task.
481    /// The goal will be to produce the output message as soon as possible.
482    /// Use preprocess to prepare the task to make this method as short as possible.
483    fn process<'i, 'o>(
484        &mut self,
485        _ctx: &CuContext,
486        input: &Self::Input<'i>,
487        output: &mut Self::Output<'o>,
488    ) -> CuResult<()>;
489
490    /// This is a method called by the runtime after "process". It is best effort a chance for
491    /// the task to update some state after process is out of the way.
492    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
493    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
494        Ok(())
495    }
496
497    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
498    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
499        Ok(())
500    }
501}
502
503/// A Sink Task is a task that only consumes messages. For example drivers for actuators are Sink Tasks.
504pub trait CuSinkTask: Freezable + Reflect {
505    type Input<'m>: CuMsgPack;
506    /// Resources required by the task.
507    type Resources<'r>;
508
509    /// Here you need to initialize everything your task will need for the duration of its lifetime.
510    /// The config allows you to access the configuration of the task.
511    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
512    where
513        Self: Sized;
514
515    /// Start is called between the creation of the task and the first call to pre/process.
516    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
517        Ok(())
518    }
519
520    /// This is a method called by the runtime before "process". This is a kind of best effort,
521    /// as soon as possible call to give a chance for the task to do some work before to prepare
522    /// to make "process" as short as possible.
523    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
524        Ok(())
525    }
526
527    /// Process is the most critical execution of the task.
528    /// The goal will be to produce the output message as soon as possible.
529    /// Use preprocess to prepare the task to make this method as short as possible.
530    fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
531
532    /// This is a method called by the runtime after "process". It is best effort a chance for
533    /// the task to update some state after process is out of the way.
534    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
535    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
536        Ok(())
537    }
538
539    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
540    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
541        Ok(())
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use bincode::{config, decode_from_slice, encode_to_vec};
549
550    #[test]
551    fn test_cucompactstr_encode_decode() {
552        let cstr = CuCompactString(CompactString::from("hello"));
553        let config = config::standard();
554        let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
555        let (decoded, _): (CuCompactString, usize) =
556            decode_from_slice(&encoded, config).expect("Decoding failed");
557        assert_eq!(cstr.0, decoded.0);
558    }
559}