1use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18 COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuResult,
19 ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31 Default
32 + Debug
33 + Clone
34 + Encode
35 + Decode<()>
36 + Serialize
37 + DeserializeOwned
38 + Reflect
39 + TypePath
40 + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46 Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55 T: Default
56 + Debug
57 + Clone
58 + Encode
59 + Decode<()>
60 + Serialize
61 + DeserializeOwned
62 + Reflect
63 + TypePath
64 + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70 T: Default
71 + Debug
72 + Clone
73 + Encode
74 + Decode<()>
75 + Serialize
76 + DeserializeOwned
77 + Reflect
78 + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83 ($($name:ident),+) => {
84 impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85 where
86 $($name: CuMsgPayload),+
87 {}
88 };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92 ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93 impl_cu_msg_pack!($first, $second);
94 impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95 };
96 (@accumulate ($($acc:ident),+);) => {};
97 (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98 impl_cu_msg_pack!($($acc),+, $next);
99 impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100 };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111#[macro_export]
114macro_rules! input_msg {
115 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116 ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117 };
118 ($lt:lifetime, $ty:ty) => {
119 CuMsg<$ty> };
121 ($ty:ty) => {
122 CuMsg<$ty>
123 };
124}
125
126#[macro_export]
128macro_rules! output_msg {
129 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
130 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131 };
132 ($first:ty, $($rest:ty),+) => {
133 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
134 };
135 ($ty:ty) => {
136 CuMsg<$ty>
137 };
138 ($lt:lifetime, $ty:ty) => {
139 CuMsg<$ty> };
141}
142
143#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
145#[reflect(opaque, from_reflect = false, no_field_bounds)]
146pub struct CuMsgMetadata {
147 pub process_time: PartialCuTimeRange,
149 pub status_txt: CuCompactString,
152}
153
154impl Metadata for CuMsgMetadata {}
155
156impl CuMsgMetadata {
157 pub fn set_status(&mut self, status: impl ToCompactString) {
158 self.status_txt = CuCompactString(status.to_compact_string());
159 }
160}
161
162impl CuMsgMetadataTrait for CuMsgMetadata {
163 fn process_time(&self) -> PartialCuTimeRange {
164 self.process_time
165 }
166
167 fn status_txt(&self) -> &CuCompactString {
168 &self.status_txt
169 }
170}
171
172impl Display for CuMsgMetadata {
173 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
174 write!(
175 f,
176 "process_time start: {}, process_time end: {}",
177 self.process_time.start, self.process_time.end
178 )
179 }
180}
181
182#[derive(
184 Default, Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect,
185)]
186#[reflect(opaque, from_reflect = false, no_field_bounds)]
187#[serde(bound(
188 serialize = "T: Serialize, M: Serialize",
189 deserialize = "T: DeserializeOwned, M: DeserializeOwned"
190))]
191pub struct CuStampedData<T, M>
192where
193 T: CuMsgPayload,
194 M: Metadata,
195{
196 payload: Option<T>,
198
199 pub tov: Tov,
202
203 pub metadata: M,
205}
206
207impl Default for CuMsgMetadata {
208 fn default() -> Self {
209 CuMsgMetadata {
210 process_time: PartialCuTimeRange::default(),
211 status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
212 }
213 }
214}
215
216impl<T, M> CuStampedData<T, M>
217where
218 T: CuMsgPayload,
219 M: Metadata,
220{
221 pub fn new(payload: Option<T>) -> Self {
222 CuStampedData {
223 payload,
224 tov: Tov::default(),
225 metadata: M::default(),
226 }
227 }
228 pub fn payload(&self) -> Option<&T> {
229 self.payload.as_ref()
230 }
231
232 pub fn set_payload(&mut self, payload: T) {
233 self.payload = Some(payload);
234 }
235
236 pub fn clear_payload(&mut self) {
237 self.payload = None;
238 }
239
240 pub fn payload_mut(&mut self) -> &mut Option<T> {
241 &mut self.payload
242 }
243}
244
245impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
246where
247 T: CuMsgPayload,
248 M: CuMsgMetadataTrait + Metadata,
249{
250 fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
251 self.payload
252 .as_ref()
253 .map(|p| p as &dyn erased_serde::Serialize)
254 }
255
256 #[cfg(feature = "reflect")]
257 fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
258 self.payload
259 .as_ref()
260 .map(|p| p as &dyn cu29_traits::Reflect)
261 }
262
263 fn tov(&self) -> Tov {
264 self.tov
265 }
266
267 fn metadata(&self) -> &dyn CuMsgMetadataTrait {
268 &self.metadata
269 }
270}
271
272pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
275
276impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
277 pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
284 unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
286 }
287
288 pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
295 unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
297 }
298}
299
300impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
301 fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
302 CuError::from(format!(
303 "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
304 type_name::<T>(),
305 type_name::<U>()
306 ))
307 }
308
309 pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
311 if TypeId::of::<T>() == TypeId::of::<U>() {
312 Ok(unsafe { self.assume_payload::<U>() })
314 } else {
315 Err(Self::downcast_err::<U>())
316 }
317 }
318
319 pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
321 if TypeId::of::<T>() == TypeId::of::<U>() {
322 Ok(unsafe { self.assume_payload_mut::<U>() })
324 } else {
325 Err(Self::downcast_err::<U>())
326 }
327 }
328}
329
330pub trait Freezable {
333 fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
337 Encode::encode(&(), encoder) }
339
340 fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
343 Ok(())
344 }
345}
346
347pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
350
351impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
352 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
353 self.0.freeze(encoder)
354 }
355}
356
357pub trait CuSrcTask: Freezable + Reflect {
362 type Output<'m>: CuMsgPayload;
363 type Resources<'r>;
365
366 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
369 where
370 Self: Sized;
371
372 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
374 Ok(())
375 }
376
377 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
381 Ok(())
382 }
383
384 fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
388
389 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
393 Ok(())
394 }
395
396 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
398 Ok(())
399 }
400}
401
402pub trait CuTask: Freezable + Reflect {
404 type Input<'m>: CuMsgPack;
405 type Output<'m>: CuMsgPayload;
406 type Resources<'r>;
408
409 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
412 where
413 Self: Sized;
414
415 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
417 Ok(())
418 }
419
420 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
424 Ok(())
425 }
426
427 fn process<'i, 'o>(
431 &mut self,
432 _ctx: &CuContext,
433 input: &Self::Input<'i>,
434 output: &mut Self::Output<'o>,
435 ) -> CuResult<()>;
436
437 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
441 Ok(())
442 }
443
444 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
446 Ok(())
447 }
448}
449
450pub trait CuSinkTask: Freezable + Reflect {
452 type Input<'m>: CuMsgPack;
453 type Resources<'r>;
455
456 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
459 where
460 Self: Sized;
461
462 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
464 Ok(())
465 }
466
467 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
471 Ok(())
472 }
473
474 fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
478
479 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
483 Ok(())
484 }
485
486 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
488 Ok(())
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use bincode::{config, decode_from_slice, encode_to_vec};
496
497 #[test]
498 fn test_cucompactstr_encode_decode() {
499 let cstr = CuCompactString(CompactString::from("hello"));
500 let config = config::standard();
501 let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
502 let (decoded, _): (CuCompactString, usize) =
503 decode_from_slice(&encoded, config).expect("Decoding failed");
504 assert_eq!(cstr.0, decoded.0);
505 }
506}