1use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18 COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19 ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31 Default
32 + Debug
33 + Clone
34 + Encode
35 + Decode<()>
36 + Serialize
37 + DeserializeOwned
38 + Reflect
39 + TypePath
40 + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46 Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55 T: Default
56 + Debug
57 + Clone
58 + Encode
59 + Decode<()>
60 + Serialize
61 + DeserializeOwned
62 + Reflect
63 + TypePath
64 + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70 T: Default
71 + Debug
72 + Clone
73 + Encode
74 + Decode<()>
75 + Serialize
76 + DeserializeOwned
77 + Reflect
78 + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83 ($($name:ident),+) => {
84 impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85 where
86 $($name: CuMsgPayload),+
87 {}
88 };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92 ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93 impl_cu_msg_pack!($first, $second);
94 impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95 };
96 (@accumulate ($($acc:ident),+);) => {};
97 (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98 impl_cu_msg_pack!($($acc),+, $next);
99 impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100 };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111#[macro_export]
114macro_rules! input_msg {
115 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116 ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117 };
118 ($ty:ty) => {
119 CuMsg<$ty>
120 };
121}
122
123#[macro_export]
125macro_rules! output_msg {
126 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
127 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
128 };
129 ($first:ty, $($rest:ty),+) => {
130 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131 };
132 ($ty:ty) => {
133 CuMsg<$ty>
134 };
135}
136
137pub trait CuSingleOutputMsg {
140 type Payload: CuMsgPayload;
141}
142
143impl<T: CuMsgPayload> CuSingleOutputMsg for CuMsg<T> {
144 type Payload = T;
145}
146
147#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
149#[reflect(opaque, from_reflect = false, no_field_bounds)]
150pub struct CuMsgMetadata {
151 pub process_time: PartialCuTimeRange,
153 pub status_txt: CuCompactString,
156 pub origin: Option<CuMsgOrigin>,
158}
159
160impl Metadata for CuMsgMetadata {}
161
162impl CuMsgMetadata {
163 pub fn set_status(&mut self, status: impl ToCompactString) {
164 self.status_txt = CuCompactString(status.to_compact_string());
165 }
166
167 pub fn set_origin(&mut self, origin: CuMsgOrigin) {
168 self.origin = Some(origin);
169 }
170
171 pub fn clear_origin(&mut self) {
172 self.origin = None;
173 }
174}
175
176impl CuMsgMetadataTrait for CuMsgMetadata {
177 fn process_time(&self) -> PartialCuTimeRange {
178 self.process_time
179 }
180
181 fn status_txt(&self) -> &CuCompactString {
182 &self.status_txt
183 }
184
185 fn origin(&self) -> Option<&CuMsgOrigin> {
186 self.origin.as_ref()
187 }
188}
189
190impl Display for CuMsgMetadata {
191 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
192 write!(
193 f,
194 "process_time start: {}, process_time end: {}",
195 self.process_time.start, self.process_time.end
196 )
197 }
198}
199
200#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
202#[reflect(opaque, from_reflect = false, no_field_bounds)]
203#[serde(bound(
204 serialize = "T: Serialize, M: Serialize",
205 deserialize = "T: DeserializeOwned, M: DeserializeOwned"
206))]
207pub struct CuStampedData<T, M>
208where
209 T: CuMsgPayload,
210 M: Metadata,
211{
212 payload: Option<T>,
214
215 pub tov: Tov,
218
219 pub metadata: M,
221}
222
223impl<T, M> Encode for CuStampedData<T, M>
224where
225 T: CuMsgPayload,
226 M: Metadata,
227{
228 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
229 match &self.payload {
230 None => {
231 0u8.encode(encoder)?;
232 }
233 Some(payload) => {
234 1u8.encode(encoder)?;
235 let encoded_start = cu29_traits::observed_encode_bytes();
236 let handle_start = crate::monitoring::current_payload_handle_bytes();
237 payload.encode(encoder)?;
238 let encoded_bytes =
239 cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
240 let handle_bytes =
241 crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
242 crate::monitoring::record_current_slot_payload_io_stats(
243 core::mem::size_of::<T>(),
244 encoded_bytes,
245 handle_bytes,
246 );
247 }
248 }
249 self.tov.encode(encoder)?;
250 self.metadata.encode(encoder)?;
251 Ok(())
252 }
253}
254
255impl Default for CuMsgMetadata {
256 fn default() -> Self {
257 CuMsgMetadata {
258 process_time: PartialCuTimeRange::default(),
259 status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
260 origin: None,
261 }
262 }
263}
264
265impl<T, M> CuStampedData<T, M>
266where
267 T: CuMsgPayload,
268 M: Metadata,
269{
270 pub(crate) fn from_parts(payload: Option<T>, tov: Tov, metadata: M) -> Self {
271 CuStampedData {
272 payload,
273 tov,
274 metadata,
275 }
276 }
277
278 pub fn new(payload: Option<T>) -> Self {
279 Self::from_parts(payload, Tov::default(), M::default())
280 }
281 pub fn payload(&self) -> Option<&T> {
282 self.payload.as_ref()
283 }
284
285 pub fn set_payload(&mut self, payload: T) {
286 self.payload = Some(payload);
287 }
288
289 pub fn clear_payload(&mut self) {
290 self.payload = None;
291 }
292
293 pub fn payload_mut(&mut self) -> &mut Option<T> {
294 &mut self.payload
295 }
296}
297
298impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
299where
300 T: CuMsgPayload,
301 M: CuMsgMetadataTrait + Metadata,
302{
303 fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
304 self.payload
305 .as_ref()
306 .map(|p| p as &dyn erased_serde::Serialize)
307 }
308
309 #[cfg(feature = "reflect")]
310 fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
311 self.payload
312 .as_ref()
313 .map(|p| p as &dyn cu29_traits::Reflect)
314 }
315
316 fn tov(&self) -> Tov {
317 self.tov
318 }
319
320 fn metadata(&self) -> &dyn CuMsgMetadataTrait {
321 &self.metadata
322 }
323}
324
325pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
328
329impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
330 pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
337 unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
339 }
340
341 pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
348 unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
350 }
351}
352
353impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
354 fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
355 CuError::from(format!(
356 "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
357 type_name::<T>(),
358 type_name::<U>()
359 ))
360 }
361
362 pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
364 if TypeId::of::<T>() == TypeId::of::<U>() {
365 Ok(unsafe { self.assume_payload::<U>() })
367 } else {
368 Err(Self::downcast_err::<U>())
369 }
370 }
371
372 pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
374 if TypeId::of::<T>() == TypeId::of::<U>() {
375 Ok(unsafe { self.assume_payload_mut::<U>() })
377 } else {
378 Err(Self::downcast_err::<U>())
379 }
380 }
381}
382
383pub trait Freezable {
386 fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
390 Encode::encode(&(), encoder) }
392
393 fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
396 Ok(())
397 }
398}
399
400pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
403
404impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
405 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
406 self.0.freeze(encoder)
407 }
408}
409
410pub trait CuSrcTask: Freezable + Reflect {
415 type Output<'m>: CuMsgPayload;
416 type Resources<'r>;
418
419 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
422 where
423 Self: Sized;
424
425 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
427 Ok(())
428 }
429
430 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
434 Ok(())
435 }
436
437 fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
441
442 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
446 Ok(())
447 }
448
449 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
451 Ok(())
452 }
453}
454
455pub trait CuTask: Freezable + Reflect {
457 type Input<'m>: CuMsgPack;
458 type Output<'m>: CuMsgPayload;
459 type Resources<'r>;
461
462 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
465 where
466 Self: Sized;
467
468 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
470 Ok(())
471 }
472
473 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
477 Ok(())
478 }
479
480 fn process<'i, 'o>(
484 &mut self,
485 _ctx: &CuContext,
486 input: &Self::Input<'i>,
487 output: &mut Self::Output<'o>,
488 ) -> CuResult<()>;
489
490 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
494 Ok(())
495 }
496
497 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
499 Ok(())
500 }
501}
502
503pub trait CuSinkTask: Freezable + Reflect {
505 type Input<'m>: CuMsgPack;
506 type Resources<'r>;
508
509 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
512 where
513 Self: Sized;
514
515 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
517 Ok(())
518 }
519
520 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
524 Ok(())
525 }
526
527 fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
531
532 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
536 Ok(())
537 }
538
539 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
541 Ok(())
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use bincode::{config, decode_from_slice, encode_to_vec};
549
550 #[test]
551 fn test_cucompactstr_encode_decode() {
552 let cstr = CuCompactString(CompactString::from("hello"));
553 let config = config::standard();
554 let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
555 let (decoded, _): (CuCompactString, usize) =
556 decode_from_slice(&encoded, config).expect("Decoding failed");
557 assert_eq!(cstr.0, decoded.0);
558 }
559}