1use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18 COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19 ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31 Default
32 + Debug
33 + Clone
34 + Encode
35 + Decode<()>
36 + Serialize
37 + DeserializeOwned
38 + Reflect
39 + TypePath
40 + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46 Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55 T: Default
56 + Debug
57 + Clone
58 + Encode
59 + Decode<()>
60 + Serialize
61 + DeserializeOwned
62 + Reflect
63 + TypePath
64 + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70 T: Default
71 + Debug
72 + Clone
73 + Encode
74 + Decode<()>
75 + Serialize
76 + DeserializeOwned
77 + Reflect
78 + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83 ($($name:ident),+) => {
84 impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85 where
86 $($name: CuMsgPayload),+
87 {}
88 };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92 ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93 impl_cu_msg_pack!($first, $second);
94 impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95 };
96 (@accumulate ($($acc:ident),+);) => {};
97 (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98 impl_cu_msg_pack!($($acc),+, $next);
99 impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100 };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111#[macro_export]
114macro_rules! input_msg {
115 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116 ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117 };
118 ($lt:lifetime, $ty:ty) => {
119 CuMsg<$ty> };
121 ($ty:ty) => {
122 CuMsg<$ty>
123 };
124}
125
126#[macro_export]
128macro_rules! output_msg {
129 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
130 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131 };
132 ($first:ty, $($rest:ty),+) => {
133 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
134 };
135 ($ty:ty) => {
136 CuMsg<$ty>
137 };
138 ($lt:lifetime, $ty:ty) => {
139 CuMsg<$ty> };
141}
142
143#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
145#[reflect(opaque, from_reflect = false, no_field_bounds)]
146pub struct CuMsgMetadata {
147 pub process_time: PartialCuTimeRange,
149 pub status_txt: CuCompactString,
152 pub origin: Option<CuMsgOrigin>,
154}
155
156impl Metadata for CuMsgMetadata {}
157
158impl CuMsgMetadata {
159 pub fn set_status(&mut self, status: impl ToCompactString) {
160 self.status_txt = CuCompactString(status.to_compact_string());
161 }
162
163 pub fn set_origin(&mut self, origin: CuMsgOrigin) {
164 self.origin = Some(origin);
165 }
166
167 pub fn clear_origin(&mut self) {
168 self.origin = None;
169 }
170}
171
172impl CuMsgMetadataTrait for CuMsgMetadata {
173 fn process_time(&self) -> PartialCuTimeRange {
174 self.process_time
175 }
176
177 fn status_txt(&self) -> &CuCompactString {
178 &self.status_txt
179 }
180
181 fn origin(&self) -> Option<&CuMsgOrigin> {
182 self.origin.as_ref()
183 }
184}
185
186impl Display for CuMsgMetadata {
187 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
188 write!(
189 f,
190 "process_time start: {}, process_time end: {}",
191 self.process_time.start, self.process_time.end
192 )
193 }
194}
195
196#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
198#[reflect(opaque, from_reflect = false, no_field_bounds)]
199#[serde(bound(
200 serialize = "T: Serialize, M: Serialize",
201 deserialize = "T: DeserializeOwned, M: DeserializeOwned"
202))]
203pub struct CuStampedData<T, M>
204where
205 T: CuMsgPayload,
206 M: Metadata,
207{
208 payload: Option<T>,
210
211 pub tov: Tov,
214
215 pub metadata: M,
217}
218
219impl<T, M> Encode for CuStampedData<T, M>
220where
221 T: CuMsgPayload,
222 M: Metadata,
223{
224 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
225 match &self.payload {
226 None => {
227 0u8.encode(encoder)?;
228 }
229 Some(payload) => {
230 1u8.encode(encoder)?;
231 let encoded_start = cu29_traits::observed_encode_bytes();
232 let handle_start = crate::monitoring::current_payload_handle_bytes();
233 payload.encode(encoder)?;
234 let encoded_bytes =
235 cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
236 let handle_bytes =
237 crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
238 crate::monitoring::record_current_slot_payload_io_stats(
239 core::mem::size_of::<T>(),
240 encoded_bytes,
241 handle_bytes,
242 );
243 }
244 }
245 self.tov.encode(encoder)?;
246 self.metadata.encode(encoder)?;
247 Ok(())
248 }
249}
250
251impl Default for CuMsgMetadata {
252 fn default() -> Self {
253 CuMsgMetadata {
254 process_time: PartialCuTimeRange::default(),
255 status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
256 origin: None,
257 }
258 }
259}
260
261impl<T, M> CuStampedData<T, M>
262where
263 T: CuMsgPayload,
264 M: Metadata,
265{
266 pub fn new(payload: Option<T>) -> Self {
267 CuStampedData {
268 payload,
269 tov: Tov::default(),
270 metadata: M::default(),
271 }
272 }
273 pub fn payload(&self) -> Option<&T> {
274 self.payload.as_ref()
275 }
276
277 pub fn set_payload(&mut self, payload: T) {
278 self.payload = Some(payload);
279 }
280
281 pub fn clear_payload(&mut self) {
282 self.payload = None;
283 }
284
285 pub fn payload_mut(&mut self) -> &mut Option<T> {
286 &mut self.payload
287 }
288}
289
290impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
291where
292 T: CuMsgPayload,
293 M: CuMsgMetadataTrait + Metadata,
294{
295 fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
296 self.payload
297 .as_ref()
298 .map(|p| p as &dyn erased_serde::Serialize)
299 }
300
301 #[cfg(feature = "reflect")]
302 fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
303 self.payload
304 .as_ref()
305 .map(|p| p as &dyn cu29_traits::Reflect)
306 }
307
308 fn tov(&self) -> Tov {
309 self.tov
310 }
311
312 fn metadata(&self) -> &dyn CuMsgMetadataTrait {
313 &self.metadata
314 }
315}
316
317pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
320
321impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
322 pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
329 unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
331 }
332
333 pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
340 unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
342 }
343}
344
345impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
346 fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
347 CuError::from(format!(
348 "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
349 type_name::<T>(),
350 type_name::<U>()
351 ))
352 }
353
354 pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
356 if TypeId::of::<T>() == TypeId::of::<U>() {
357 Ok(unsafe { self.assume_payload::<U>() })
359 } else {
360 Err(Self::downcast_err::<U>())
361 }
362 }
363
364 pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
366 if TypeId::of::<T>() == TypeId::of::<U>() {
367 Ok(unsafe { self.assume_payload_mut::<U>() })
369 } else {
370 Err(Self::downcast_err::<U>())
371 }
372 }
373}
374
375pub trait Freezable {
378 fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
382 Encode::encode(&(), encoder) }
384
385 fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
388 Ok(())
389 }
390}
391
392pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
395
396impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
397 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
398 self.0.freeze(encoder)
399 }
400}
401
402pub trait CuSrcTask: Freezable + Reflect {
407 type Output<'m>: CuMsgPayload;
408 type Resources<'r>;
410
411 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
414 where
415 Self: Sized;
416
417 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
419 Ok(())
420 }
421
422 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
426 Ok(())
427 }
428
429 fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
433
434 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
438 Ok(())
439 }
440
441 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
443 Ok(())
444 }
445}
446
447pub trait CuTask: Freezable + Reflect {
449 type Input<'m>: CuMsgPack;
450 type Output<'m>: CuMsgPayload;
451 type Resources<'r>;
453
454 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
457 where
458 Self: Sized;
459
460 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
462 Ok(())
463 }
464
465 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
469 Ok(())
470 }
471
472 fn process<'i, 'o>(
476 &mut self,
477 _ctx: &CuContext,
478 input: &Self::Input<'i>,
479 output: &mut Self::Output<'o>,
480 ) -> CuResult<()>;
481
482 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
486 Ok(())
487 }
488
489 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
491 Ok(())
492 }
493}
494
495pub trait CuSinkTask: Freezable + Reflect {
497 type Input<'m>: CuMsgPack;
498 type Resources<'r>;
500
501 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
504 where
505 Self: Sized;
506
507 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
509 Ok(())
510 }
511
512 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
516 Ok(())
517 }
518
519 fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
523
524 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
528 Ok(())
529 }
530
531 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
533 Ok(())
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use bincode::{config, decode_from_slice, encode_to_vec};
541
542 #[test]
543 fn test_cucompactstr_encode_decode() {
544 let cstr = CuCompactString(CompactString::from("hello"));
545 let config = config::standard();
546 let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
547 let (decoded, _): (CuCompactString, usize) =
548 decode_from_slice(&encoded, config).expect("Decoding failed");
549 assert_eq!(cstr.0, decoded.0);
550 }
551}