use crate::bail;
use crate::prelude::ErrorKind;
use crate::traits::SendSyncAny;
use crate::types::{FlowId, NodeId, PortId};
use crate::{zferror, Result};
use async_std::sync::Arc;
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use std::{cmp::Ordering, fmt::Debug};
use uhlc::Timestamp;
use uuid::Uuid;
pub(crate) type SerializerFn =
dyn Fn(&mut Vec<u8>, Arc<dyn SendSyncAny>) -> Result<()> + Send + Sync;
pub(crate) type DeserializerFn<T> = dyn Fn(&[u8]) -> anyhow::Result<T> + Send + Sync;
#[derive(Clone, Serialize, Deserialize)]
pub enum Payload {
Bytes(Arc<Vec<u8>>),
#[serde(skip_serializing, skip_deserializing)]
Typed((Arc<dyn SendSyncAny>, Arc<SerializerFn>)),
}
impl Debug for Payload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Payload::Bytes(_) => write!(f, "Payload::Bytes"),
Payload::Typed(_) => write!(f, "Payload::Typed"),
}
}
}
impl Payload {
pub fn from_data<T: Send + Sync + 'static>(
data: Data<T>,
serializer: Arc<SerializerFn>,
) -> Self {
match data.inner {
DataInner::Payload { payload, data: _ } => payload,
DataInner::Data(data) => {
Self::Typed((Arc::new(data) as Arc<dyn SendSyncAny>, serializer))
}
}
}
pub(crate) fn try_as_bytes_into(&self, buffer: &mut Vec<u8>) -> Result<()> {
buffer.clear();
match self {
Payload::Bytes(bytes) => {
(**bytes).clone_into(buffer);
Ok(())
}
Payload::Typed((typed_data, serializer)) => {
(serializer)(buffer, Arc::clone(typed_data))
}
}
}
pub fn try_as_bytes(&self) -> Result<Arc<Vec<u8>>> {
match self {
Payload::Bytes(bytes) => Ok(bytes.clone()),
Payload::Typed((typed_data, serializer)) => {
let mut buffer = Vec::default();
(serializer)(&mut buffer, Arc::clone(typed_data))?;
Ok(Arc::new(buffer))
}
}
}
}
impl From<Vec<u8>> for Payload {
fn from(bytes: Vec<u8>) -> Self {
Self::Bytes(Arc::new(bytes))
}
}
impl From<&[u8]> for Payload {
fn from(bytes: &[u8]) -> Self {
Self::Bytes(Arc::new(bytes.to_vec()))
}
}
impl From<DataMessage> for Payload {
fn from(data_message: DataMessage) -> Self {
data_message.data
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataMessage {
pub(crate) data: Payload,
pub(crate) timestamp: Timestamp,
}
impl Deref for DataMessage {
type Target = Payload;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl DataMessage {
pub fn new_serialized(data: Vec<u8>, timestamp: Timestamp) -> Self {
Self {
data: Payload::Bytes(Arc::new(data)),
timestamp,
}
}
pub fn get_timestamp(&self) -> &Timestamp {
&self.timestamp
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RecordingMetadata {
pub(crate) timestamp: Timestamp,
pub(crate) port_id: PortId,
pub(crate) node_id: NodeId,
pub(crate) flow_id: FlowId,
pub(crate) instance_id: Uuid,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ControlMessage {
RecordingStart(RecordingMetadata),
RecordingStop(Timestamp),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LinkMessage {
Data(DataMessage),
Watermark(Timestamp),
}
impl LinkMessage {
pub fn from_payload(output: Payload, timestamp: Timestamp) -> Self {
Self::Data(DataMessage {
data: output,
timestamp,
})
}
pub fn serialize_bincode_into(
&self,
message_buffer: &mut Vec<u8>,
payload_buffer: &mut Vec<u8>,
) -> Result<()> {
payload_buffer.clear(); message_buffer.clear();
match &self {
LinkMessage::Data(data_message) => match &data_message.data {
Payload::Bytes(_) => bincode::serialize_into(message_buffer, &self)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into()),
Payload::Typed((data, serializer)) => {
(serializer)(payload_buffer, Arc::clone(data))?;
let serialized_message = LinkMessage::Data(DataMessage {
data: Payload::Bytes(Arc::new(payload_buffer.clone())),
timestamp: data_message.timestamp,
});
bincode::serialize_into(message_buffer, &serialized_message)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
},
_ => bincode::serialize_into(message_buffer, &self)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into()),
}
}
pub fn serialize_bincode_into_shm(
&self,
shm_buffer: &mut [u8],
payload_buffer: &mut Vec<u8>,
) -> Result<()> {
payload_buffer.clear();
match &self {
LinkMessage::Data(data_message) => match &data_message.data {
Payload::Bytes(_) => bincode::serialize_into(shm_buffer, &self)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into()),
Payload::Typed(_) => {
data_message.try_as_bytes_into(payload_buffer)?;
let serialized_message = LinkMessage::Data(DataMessage::new_serialized(
payload_buffer.clone(),
data_message.timestamp,
));
bincode::serialize_into(shm_buffer, &serialized_message)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
},
_ => bincode::serialize_into(shm_buffer, &self)
.map_err(|e| zferror!(ErrorKind::SerializationError, e).into()),
}
}
pub fn get_timestamp(&self) -> Timestamp {
match self {
Self::Data(data) => data.timestamp,
Self::Watermark(ref ts) => *ts,
}
}
}
impl Ord for LinkMessage {
fn cmp(&self, other: &Self) -> Ordering {
self.get_timestamp().cmp(&other.get_timestamp())
}
}
impl PartialOrd for LinkMessage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for LinkMessage {
fn eq(&self, other: &Self) -> bool {
self.get_timestamp() == other.get_timestamp()
}
}
impl Eq for LinkMessage {}
#[derive(Debug)]
pub enum Message<T> {
Data(Data<T>),
Watermark,
}
#[derive(Debug)]
pub struct Data<T> {
inner: DataInner<T>,
}
pub(crate) enum DataInner<T> {
Payload { payload: Payload, data: Option<T> },
Data(T),
}
impl<T> Debug for DataInner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataInner::Payload { payload, data } => {
let data = if data.is_some() { "Some" } else { "None" };
write!(f, "DataInner::Payload: {:?} - data: {}", payload, data)
}
DataInner::Data(_) => write!(f, "DataInner::Data(T)"),
}
}
}
impl<T: Send + Sync + 'static> From<T> for Data<T> {
fn from(value: T) -> Self {
Self {
inner: DataInner::Data(value),
}
}
}
impl<T: 'static> Deref for Data<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match &self.inner {
DataInner::Payload { payload, data } => {
if let Some(data) = data {
data
} else if let Payload::Typed((typed, _)) = payload {
(**typed).as_any().downcast_ref::<T>().expect(
r#"You probably managed to find a very nasty flaw in Zenoh-Flow’s code as we
believed this situation would never happen (unless explicitely triggered — "explicitely" being an
understatement here, we feel it’s more like you really, really, wanted to see that message — in
which case, congratulations!).
Our guess as to what happened is that:
- the data in `Payload::Typed` was, at first, correct (where we internally do the
`as_any().is::<T>()` check),
- in between this check and the call to `deref` the underlying data somehow changed.
If we did not do a mistake — fortunately the most likely scenario — then we do not know what
happened and we would be eager to investigate.
Feel free to contact us at < zenoh@zettascale.tech >.
"#,
)
} else {
panic!(
r#"You probably managed to find a very nasty flaw in Zenoh-Flow's code as we
believed this situation would never happen (unless explicitely triggered — "explicitely" being an
understatement here, we feel it's more like you really, really, wanted to see that message — in
which case, congratulations!).
Our guess as to what happened is that:
- the `data` field is a `Payload::Bytes`,
- the `typed` field is set to `None`.
If we did not do a mistake — fortunately the most likely scenario — then we do not know what
happened and we would be eager to investigate.
Feel free to contact us at < zenoh@zettascale.tech >.
"#
)
}
}
DataInner::Data(data) => data,
}
}
}
impl<T: 'static> Data<T> {
pub(crate) fn try_from_payload(
payload: Payload,
deserializer: Arc<DeserializerFn<T>>,
) -> Result<Self> {
let mut typed = None;
match payload {
Payload::Bytes(ref bytes) => typed = Some((deserializer)(bytes.as_slice())?),
Payload::Typed((ref typed, _)) => {
if !(**typed).as_any().is::<T>() {
bail!(
ErrorKind::DeserializationError,
"Failed to downcast provided value",
)
}
}
}
Ok(Self {
inner: DataInner::Payload {
payload,
data: typed,
},
})
}
}