use crate::config::ComponentConfig;
use crate::context::CuContext;
use crate::reflect::Reflect;
#[cfg(feature = "reflect")]
use crate::reflect::TypePath;
#[cfg(feature = "reflect")]
use bevy_reflect;
use bincode::de::{Decode, Decoder};
use bincode::enc::{Encode, Encoder};
use bincode::error::{DecodeError, EncodeError};
use compact_str::{CompactString, ToCompactString};
use core::any::{TypeId, type_name};
use cu29_clock::{PartialCuTimeRange, Tov};
use cu29_traits::{
COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
ErasedCuStampedData, Metadata,
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use alloc::format;
use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
#[cfg(feature = "reflect")]
pub trait CuMsgPayload:
Default
+ Debug
+ Clone
+ Encode
+ Decode<()>
+ Serialize
+ DeserializeOwned
+ Reflect
+ TypePath
+ Sized
{
}
#[cfg(not(feature = "reflect"))]
pub trait CuMsgPayload:
Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
{
}
pub trait CuMsgPack {}
#[cfg(feature = "reflect")]
impl<T> CuMsgPayload for T where
T: Default
+ Debug
+ Clone
+ Encode
+ Decode<()>
+ Serialize
+ DeserializeOwned
+ Reflect
+ TypePath
+ Sized
{
}
#[cfg(not(feature = "reflect"))]
impl<T> CuMsgPayload for T where
T: Default
+ Debug
+ Clone
+ Encode
+ Decode<()>
+ Serialize
+ DeserializeOwned
+ Reflect
+ Sized
{
}
macro_rules! impl_cu_msg_pack {
($($name:ident),+) => {
impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
where
$($name: CuMsgPayload),+
{}
};
}
macro_rules! impl_cu_msg_pack_up_to {
($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
impl_cu_msg_pack!($first, $second);
impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
};
(@accumulate ($($acc:ident),+);) => {};
(@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
impl_cu_msg_pack!($($acc),+, $next);
impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
};
}
impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
impl CuMsgPack for () {}
impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
#[macro_export]
macro_rules! input_msg {
($lt:lifetime, $first:ty, $($rest:ty),+) => {
( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
};
($lt:lifetime, $ty:ty) => {
CuMsg<$ty> };
($ty:ty) => {
CuMsg<$ty>
};
}
#[macro_export]
macro_rules! output_msg {
($lt:lifetime, $first:ty, $($rest:ty),+) => {
( CuMsg<$first>, $( CuMsg<$rest> ),+ )
};
($first:ty, $($rest:ty),+) => {
( CuMsg<$first>, $( CuMsg<$rest> ),+ )
};
($ty:ty) => {
CuMsg<$ty>
};
($lt:lifetime, $ty:ty) => {
CuMsg<$ty> };
}
#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
#[reflect(opaque, from_reflect = false, no_field_bounds)]
pub struct CuMsgMetadata {
pub process_time: PartialCuTimeRange,
pub status_txt: CuCompactString,
pub origin: Option<CuMsgOrigin>,
}
impl Metadata for CuMsgMetadata {}
impl CuMsgMetadata {
pub fn set_status(&mut self, status: impl ToCompactString) {
self.status_txt = CuCompactString(status.to_compact_string());
}
pub fn set_origin(&mut self, origin: CuMsgOrigin) {
self.origin = Some(origin);
}
pub fn clear_origin(&mut self) {
self.origin = None;
}
}
impl CuMsgMetadataTrait for CuMsgMetadata {
fn process_time(&self) -> PartialCuTimeRange {
self.process_time
}
fn status_txt(&self) -> &CuCompactString {
&self.status_txt
}
fn origin(&self) -> Option<&CuMsgOrigin> {
self.origin.as_ref()
}
}
impl Display for CuMsgMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(
f,
"process_time start: {}, process_time end: {}",
self.process_time.start, self.process_time.end
)
}
}
#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
#[reflect(opaque, from_reflect = false, no_field_bounds)]
#[serde(bound(
serialize = "T: Serialize, M: Serialize",
deserialize = "T: DeserializeOwned, M: DeserializeOwned"
))]
pub struct CuStampedData<T, M>
where
T: CuMsgPayload,
M: Metadata,
{
payload: Option<T>,
pub tov: Tov,
pub metadata: M,
}
impl<T, M> Encode for CuStampedData<T, M>
where
T: CuMsgPayload,
M: Metadata,
{
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
match &self.payload {
None => {
0u8.encode(encoder)?;
}
Some(payload) => {
1u8.encode(encoder)?;
let encoded_start = cu29_traits::observed_encode_bytes();
let handle_start = crate::monitoring::current_payload_handle_bytes();
payload.encode(encoder)?;
let encoded_bytes =
cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
let handle_bytes =
crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
crate::monitoring::record_current_slot_payload_io_stats(
core::mem::size_of::<T>(),
encoded_bytes,
handle_bytes,
);
}
}
self.tov.encode(encoder)?;
self.metadata.encode(encoder)?;
Ok(())
}
}
impl Default for CuMsgMetadata {
fn default() -> Self {
CuMsgMetadata {
process_time: PartialCuTimeRange::default(),
status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
origin: None,
}
}
}
impl<T, M> CuStampedData<T, M>
where
T: CuMsgPayload,
M: Metadata,
{
pub fn new(payload: Option<T>) -> Self {
CuStampedData {
payload,
tov: Tov::default(),
metadata: M::default(),
}
}
pub fn payload(&self) -> Option<&T> {
self.payload.as_ref()
}
pub fn set_payload(&mut self, payload: T) {
self.payload = Some(payload);
}
pub fn clear_payload(&mut self) {
self.payload = None;
}
pub fn payload_mut(&mut self) -> &mut Option<T> {
&mut self.payload
}
}
impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
where
T: CuMsgPayload,
M: CuMsgMetadataTrait + Metadata,
{
fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
self.payload
.as_ref()
.map(|p| p as &dyn erased_serde::Serialize)
}
#[cfg(feature = "reflect")]
fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
self.payload
.as_ref()
.map(|p| p as &dyn cu29_traits::Reflect)
}
fn tov(&self) -> Tov {
self.tov
}
fn metadata(&self) -> &dyn CuMsgMetadataTrait {
&self.metadata
}
}
pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
}
pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
}
}
impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
CuError::from(format!(
"CuMsg payload mismatch: {} cannot be reinterpreted as {}",
type_name::<T>(),
type_name::<U>()
))
}
pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
if TypeId::of::<T>() == TypeId::of::<U>() {
Ok(unsafe { self.assume_payload::<U>() })
} else {
Err(Self::downcast_err::<U>())
}
}
pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
if TypeId::of::<T>() == TypeId::of::<U>() {
Ok(unsafe { self.assume_payload_mut::<U>() })
} else {
Err(Self::downcast_err::<U>())
}
}
}
pub trait Freezable {
fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
Encode::encode(&(), encoder) }
fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
Ok(())
}
}
pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
self.0.freeze(encoder)
}
}
pub trait CuSrcTask: Freezable + Reflect {
type Output<'m>: CuMsgPayload;
type Resources<'r>;
fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
where
Self: Sized;
fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
}
pub trait CuTask: Freezable + Reflect {
type Input<'m>: CuMsgPack;
type Output<'m>: CuMsgPayload;
type Resources<'r>;
fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
where
Self: Sized;
fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn process<'i, 'o>(
&mut self,
_ctx: &CuContext,
input: &Self::Input<'i>,
output: &mut Self::Output<'o>,
) -> CuResult<()>;
fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
}
pub trait CuSinkTask: Freezable + Reflect {
type Input<'m>: CuMsgPack;
type Resources<'r>;
fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
where
Self: Sized;
fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bincode::{config, decode_from_slice, encode_to_vec};
#[test]
fn test_cucompactstr_encode_decode() {
let cstr = CuCompactString(CompactString::from("hello"));
let config = config::standard();
let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
let (decoded, _): (CuCompactString, usize) =
decode_from_slice(&encoded, config).expect("Decoding failed");
assert_eq!(cstr.0, decoded.0);
}
}